This is an automated email from the ASF dual-hosted git repository.

stefanegli pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git


The following commit(s) were added to refs/heads/master by this push:
     new f4af6cc  SLING-12743 - stop job processing if readiness condition not 
present (#43)
f4af6cc is described below

commit f4af6ccee5ba161bb41df4bee472333e6c18b349
Author: ionutzpi <[email protected]>
AuthorDate: Mon May 19 11:28:17 2025 +0300

    SLING-12743 - stop job processing if readiness condition not present (#43)
---
 pom.xml                                            |   6 +
 .../impl/jobs/config/JobManagerConfiguration.java  |  53 ++++
 .../sling/event/impl/jobs/queues/JobQueueImpl.java |  35 ++-
 .../sling/event/impl/jobs/queues/QueueManager.java |  24 +-
 .../jobs/config/JobManagerConfigurationTest.java   |   6 +
 .../event/impl/jobs/queues/JobQueueImplTest.java   | 303 +++++++++++++++++++++
 .../event/impl/jobs/queues/TestTopicHalting.java   |   5 +
 .../sling/event/it/AbstractJobHandlingIT.java      |   8 +
 .../sling/event/it/JobHandlingConditionIT.java     | 235 ++++++++++++++++
 9 files changed, 670 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index 0def461..bc3c32f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -376,6 +376,12 @@
             <version>2.6.7</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.condition</artifactId>
+            <version>1.0.0</version>
+            <scope>provided</scope>
+        </dependency>
 
     </dependencies>
 </project>
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
 
b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index a66d626..4064cc2 100644
--- 
a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -41,7 +41,10 @@ import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
 import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.osgi.service.component.annotations.ReferencePolicyOption;
+import org.osgi.service.condition.Condition;
 import org.osgi.service.metatype.annotations.AttributeDefinition;
 import org.osgi.service.metatype.annotations.Designate;
 import org.osgi.service.metatype.annotations.ObjectClassDefinition;
@@ -199,6 +202,56 @@ public class JobManagerConfiguration {
     /** The topology capabilities. */
     private volatile TopologyCapabilities topologyCapabilities;
 
+    /** The condition that determines if job processing is enabled. */
+    private volatile Condition jobProcessingEnabledCondition;
+
+    /**
+     * Handle binding of the job processing condition.
+     * @param condition The condition being bound
+     */
+    @Reference(
+        target = "(osgi.condition.id=true)",
+        cardinality = ReferenceCardinality.OPTIONAL,
+        policy = ReferencePolicy.DYNAMIC,
+        policyOption = ReferencePolicyOption.GREEDY
+    )
+    protected void bindJobProcessingEnabledCondition(final Condition 
condition) {
+        if (this.jobProcessingEnabledCondition != null) {
+            logger.warn("Job processing readiness condition already set - 
ignoring new condition");
+            return;
+        }
+        this.jobProcessingEnabledCondition = condition;
+        // If condition becomes true, trigger maintenance to start processing 
jobs
+        if (condition != null) {
+            logger.info("Job processing readiness condition has been set - 
jobs will be processed");
+            notifyListeners();
+        }
+    }
+
+    /**
+     * Handle unbinding of the job processing condition.
+     * @param condition The condition being unbound
+     */
+    protected void unbindJobProcessingEnabledCondition(final Condition 
condition) {
+        if (this.jobProcessingEnabledCondition == condition) {
+            this.jobProcessingEnabledCondition = null;
+            logger.info("Job processing readiness condition has been removed - 
jobs will not be processed");
+            // Signal jobs to stop before notifying listeners
+            stopProcessing();
+            notifyListeners();
+        }
+    }
+
+    /**
+     * Check if job processing is enabled.
+     * This only affects whether jobs are processed/executed - jobs can still 
be
+     * assigned, stored, and managed through the API even when processing is 
disabled.
+     * @return true if job processing is enabled, false otherwise
+     */
+    public boolean isJobProcessingEnabled() {
+        return jobProcessingEnabledCondition != null;
+    }
+
     /**
      * Activate this component.
      * @param props Configuration properties
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index 3a272ea..15192fd 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -169,7 +169,7 @@ public class JobQueueImpl
      * @param cache The job cache
      * @param outdatedQueue
      */
-    private JobQueueImpl(final String name,
+    protected JobQueueImpl(final String name,
                         final InternalQueueConfiguration config,
                         final QueueServices services,
                         final QueueJobCache cache,
@@ -252,6 +252,10 @@ public class JobQueueImpl
      * This method might be called concurrently, therefore we use a guard
      */
     public void startJobs() {
+        if (!services.configuration.isJobProcessingEnabled()) {
+            logger.debug("Job processing is disabled, skipping job starts for 
queue {}", queueName);
+            return;
+        }
         if ( this.startJobsGuard.compareAndSet(false, true) ) {
             // we start as many jobs in parallel as possible
             while ( this.running && !this.isOutdated.get() && 
!this.isSuspended() && this.available.tryAcquire() ) {
@@ -310,6 +314,10 @@ public class JobQueueImpl
     }
 
     private void startJob(final JobHandler handler) {
+        if (!services.configuration.isJobProcessingEnabled()) {
+            logger.debug("Job processing is disabled, stopping job {} in queue 
{}", handler.getJob().getId(), queueName);
+            return;
+        }
         try {
             this.closeMarker.set(false);
             try {
@@ -468,6 +476,10 @@ public class JobQueueImpl
      * Periodic maintenance
      */
     public void maintain() {
+        if (!services.configuration.isJobProcessingEnabled()) {
+            logger.debug("Job processing is disabled, skipping maintenance for 
queue {}", queueName);
+            return;
+        }
         // check suspended
         final long since = this.suspendedSince.get();
         if ( since != -1 && since + MAX_SUSPEND_TIME < 
System.currentTimeMillis() ) {
@@ -736,6 +748,19 @@ public class JobQueueImpl
         return handler != null;
     }
 
+    /**
+     * Stop all currently running jobs in this queue
+     */
+    public void stopAllJobs() {
+        logger.debug("Stopping all running jobs in queue {}", queueName);
+        synchronized ( this.processingJobsLists ) {
+            for (final JobHandler handler : this.processingJobsLists.values()) 
{
+                handler.stop();
+            }
+        }
+        logger.debug("All running jobs stopped in queue {}", queueName);
+    }
+
     private void reschedule(final JobHandler handler) {
         // we delay putting back the job until the retry delay is over
         final long delay = this.getRetryDelay(handler);
@@ -789,5 +814,13 @@ public class JobQueueImpl
     OutdatedJobQueueInfo getOutdatedJobQueueInfo() {
         return new OutdatedJobQueueInfo(available, maxParallel, drainage);
     }
+
+    Map<String, JobHandler> getProcessingJobsLists() {
+        return processingJobsLists;
+    }
+
+    boolean isRunning() {
+        return running;
+    }
 }
 
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 5d919e0..d1fbd7f 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -82,7 +82,7 @@ import org.slf4j.LoggerFactory;
 public class QueueManager
     implements Runnable, EventHandler, ConfigurationChangeListener {
 
-    static QueueManager newForTest(EventAdmin eventAdmin, JobConsumerManager 
jobConsumerManager,
+    public static QueueManager newForTest(EventAdmin eventAdmin, 
JobConsumerManager jobConsumerManager,
             QueuesMBean queuesMBean, ThreadPoolManager threadPoolManager, 
ThreadPool threadPool,
             JobManagerConfiguration configuration, StatisticsManager 
statisticsManager) {
         final QueueManager qm = new QueueManager();
@@ -385,17 +385,33 @@ public class QueueManager
     public void configurationChanged(final boolean active) {
         // are we still active?
         if ( this.configuration != null ) {
-            logger.debug("Topology changed {}", active);
-            this.isActive.set(active);
+            boolean isActive = active && 
configuration.isJobProcessingEnabled();
+            logger.debug("Topology changed {}, job processing enabled: {}", 
active, configuration.isJobProcessingEnabled());
+            this.isActive.set(isActive);
             clearHaltedTopics("configurationChanged : unhalted topics due to 
configuration change");
-            if ( active ) {
+            if ( isActive ) {
                 fullTopicScan();
             } else {
+                // Only stop jobs if readiness condition is not met
+                if (!configuration.isJobProcessingEnabled()) {
+                    stopAllJobs();
+                }
                 this.restart();
             }
         }
     }
 
+    /**
+     * Stop all running jobs in all queues
+     */
+    private void stopAllJobs() {
+        logger.debug("Stopping all running jobs...");
+        for (final JobQueueImpl queue : this.queues.values()) {
+            queue.stopAllJobs();
+        }
+        logger.debug("All running jobs stopped");
+    }
+
     private void clearHaltedTopics(String logPrefix) {
         final String haltedTopicsToString;
         // Note: the synchronized below is just to avoid wrong logging about 
unhalting,
diff --git 
a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
 
b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
index a2e68cd..828b6ae 100644
--- 
a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
+++ 
b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
@@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs.config;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -38,6 +39,7 @@ import org.apache.sling.event.impl.TestUtil;
 import org.apache.sling.event.impl.discovery.InitDelayingTopologyEventListener;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.osgi.service.condition.Condition;
 
 public class JobManagerConfigurationTest {
 
@@ -99,6 +101,10 @@ public class JobManagerConfigurationTest {
         });
         TestUtil.setFieldValue(config, "startupDelayListener", 
startupDelayListener);
 
+        // Create and bind the condition
+        Condition condition = mock(Condition.class);
+        config.bindJobProcessingEnabledCondition(condition);
+
         config.addListener(ccl);
         ccl.await();
 
diff --git 
a/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java 
b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
new file mode 100644
index 0000000..6be86cb
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.JobHandler;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+public class JobQueueImplTest {
+
+    private JobQueueImpl jobQueue;
+    private QueueServices services;
+    private JobManagerConfiguration configuration;
+    private QueueJobCache cache;
+    @InjectMocks
+    private ThreadPool threadPool;
+    private String testQueue = "testQueue";
+
+    @Before
+    public void setUp() {
+        configuration = mock(JobManagerConfiguration.class);
+        services = spy(QueueServices.class);
+        InternalQueueConfiguration internalConfig = 
mock(InternalQueueConfiguration.class);
+        services.configuration = configuration;
+        when(configuration.isJobProcessingEnabled()).thenReturn(false);
+        when(internalConfig.getMaxParallel()).thenReturn(5);
+        when(internalConfig.getRetryDelayInMs()).thenReturn(1000L);
+        cache = mock(QueueJobCache.class);
+        threadPool = mock(ThreadPool.class);
+        jobQueue = new JobQueueImpl(testQueue, internalConfig, services, 
cache, null);
+    }
+
+    @Test
+    public void testStartJobsWhenDisabled() {
+        // Add a job handler to the queue
+        JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+        String jobId = "testJob";
+        when(jobHandler.getJob().getId()).thenReturn(jobId);
+        jobQueue.getProcessingJobsLists().put(jobId, jobHandler);
+
+        when(configuration.isJobProcessingEnabled()).thenReturn(false);
+        jobQueue.startJobs();
+
+        // The job should not be started/removed
+        assertTrue("Job should remain in processingJobsLists when processing 
is disabled", jobQueue.getProcessingJobsLists().containsKey(jobId));
+    }
+
+    @Test
+    public void testStartJob() throws Exception {
+        // Arrange
+        JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+        String jobId = "testJob";
+        when(jobHandler.getJob().getId()).thenReturn(jobId);
+        when(jobHandler.getConsumer().process(any(), 
any())).thenReturn(JobExecutionResultImpl.SUCCEEDED);
+
+        Logger auditLogger = mock(Logger.class);
+        when(configuration.getAuditLogger()).thenReturn(auditLogger);
+
+        // Processing disabled: should not process any job
+        when(configuration.isJobProcessingEnabled()).thenReturn(false);
+
+        // Act
+        jobQueue.startJobs();
+
+        // Assert: No job should be in processingJobsLists
+        assertTrue("No jobs should be processed when processing is disabled",
+                jobQueue.getProcessingJobsLists().isEmpty());
+
+        // Enable processing
+        when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+        // Mock the cache to return the job handler once, then null
+        when(cache.getNextJob(
+                any(JobConsumerManager.class),
+                any(StatisticsManager.class),
+                eq(jobQueue),
+                anyBoolean()
+        )).thenReturn(jobHandler).thenReturn(null);
+
+        Field threadPoolField = 
JobQueueImpl.class.getDeclaredField("threadPool");
+        threadPoolField.setAccessible(true);
+        threadPoolField.set(jobQueue, threadPool);
+        doNothing().when(threadPool).execute(any());
+
+        // Act: Try to start jobs again
+        jobQueue.startJobs();
+
+        // Wait for the job to be processed/removed (asynchronously)
+        long timeout = System.currentTimeMillis() + 2000;
+        while (System.currentTimeMillis() < timeout) {
+            if (!jobQueue.getProcessingJobsLists().containsKey(jobId)) {
+                break;
+            }
+            Thread.sleep(50);
+        }
+
+        // Assert: Job should now be removed
+        assertFalse("Job should be removed from processingJobsLists when 
processing is enabled",
+                jobQueue.getProcessingJobsLists().containsKey(jobId));
+    }
+
+    @Test
+    public void testQueueShutdown() {
+        // Enable the configuration
+        when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+        jobQueue.close();
+
+        // Verify that the queue is no longer running
+        assertFalse("Queue should not be running after shutdown", 
jobQueue.isRunning());
+
+        jobQueue.startJobs();
+
+        // Verify that no jobs were started by checking the internal state
+        assertTrue("No jobs should be started after queue shutdown", 
jobQueue.getProcessingJobsLists().isEmpty());
+    }
+
+    @Test
+    public void testQueueStartupAndShutdown() {
+        when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+        jobQueue.startJobs();
+
+        // Add a job and verify it is present
+        JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+        String jobId = "testJob";
+        when(jobHandler.getJob().getId()).thenReturn(jobId);
+        jobQueue.getProcessingJobsLists().put(jobId, jobHandler);
+
+        assertTrue("Processing jobs list should contain the job after adding", 
jobQueue.getProcessingJobsLists().containsKey(jobId));
+
+        jobQueue.close();
+
+        // Verify that the processingJobsLists is cleared after shutdown
+        assertTrue("Processing jobs list should be empty after shutdown", 
jobQueue.getProcessingJobsLists().isEmpty());
+    }
+
+    @Test
+    public void testJobAssignmentWhenProcessingDisabled() {
+        JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+        String jobId = "testJob";
+        when(jobHandler.getJob().getId()).thenReturn(jobId);
+
+        when(configuration.isJobProcessingEnabled()).thenReturn(false);
+
+        jobQueue.getProcessingJobsLists().put(jobId, jobHandler);
+
+        try {
+            Method startJobMethod = 
JobQueueImpl.class.getDeclaredMethod("startJob", JobHandler.class);
+            startJobMethod.setAccessible(true);
+            startJobMethod.invoke(jobQueue, jobHandler);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to invoke startJob method", e);
+        }
+
+        assertTrue("Job should remain in processingJobsLists even when 
processing is disabled", jobQueue.getProcessingJobsLists().containsKey(jobId));
+    }
+
+    @Test
+    public void testStartJobsWhenQueueSuspended() {
+        // Add a job before suspending
+        JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+        String jobId = "testJob";
+        when(jobHandler.getJob().getId()).thenReturn(jobId);
+        jobQueue.getProcessingJobsLists().put(jobId, jobHandler);
+
+        jobQueue.suspend();
+        jobQueue.startJobs();
+
+        // The job should still be present since the queue is suspended
+        assertTrue("No jobs should be started when the queue is suspended", 
jobQueue.getProcessingJobsLists().containsKey(jobId));
+
+        // Activate the queue and enable processing
+        when(configuration.isJobProcessingEnabled()).thenReturn(true);
+        jobQueue.resume(); // Use resume() to wake up the queue
+
+        // Wait for the job to be processed/removed (asynchronously)
+        long timeout = System.currentTimeMillis() + 2000; // 2 seconds timeout
+        while (System.currentTimeMillis() < timeout) {
+            if (!jobQueue.getProcessingJobsLists().containsKey(jobId)) {
+                break;
+            }
+            try {
+                Thread.sleep(50);
+            } catch (InterruptedException ignored) {}
+        }
+
+        assertTrue("Job should be removed from processingJobsLists after 
resuming queue when processing is enabled", 
jobQueue.getProcessingJobsLists().containsKey(jobId));
+    }
+
+    @Test
+    public void testStopAllJobs() {
+        // Enable job processing
+        when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+        // Create and add multiple jobs to the processing list
+        JobHandler jobHandler1 = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+        JobHandler jobHandler2 = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+        String jobId1 = "testJob1";
+        String jobId2 = "testJob2";
+        when(jobHandler1.getJob().getId()).thenReturn(jobId1);
+        when(jobHandler2.getJob().getId()).thenReturn(jobId2);
+
+        // Add jobs to processing list
+        jobQueue.getProcessingJobsLists().put(jobId1, jobHandler1);
+        jobQueue.getProcessingJobsLists().put(jobId2, jobHandler2);
+
+        // Stop all jobs
+        jobQueue.stopAllJobs();
+
+        // Verify both jobs were stopped
+        verify(jobHandler1).stop();
+        verify(jobHandler2).stop();
+
+        // Verify jobs remain in processing list
+        assertTrue("Job1 should remain in processing list after stopAllJobs",
+                jobQueue.getProcessingJobsLists().containsKey(jobId1));
+        assertTrue("Job2 should remain in processing list after stopAllJobs",
+                jobQueue.getProcessingJobsLists().containsKey(jobId2));
+        assertEquals("Processing list should still contain 2 jobs", 2,
+                jobQueue.getProcessingJobsLists().size());
+    }
+
+    @Test
+    public void testActualJobProcessing() throws Exception {
+        // Arrange
+        JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+        String jobId = "testJob";
+        when(jobHandler.getJob().getId()).thenReturn(jobId);
+        when(jobHandler.getConsumer().process(any(), 
any())).thenReturn(JobExecutionResultImpl.SUCCEEDED);
+
+        // Enable job processing
+        when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+            // Mock the cache to return the job handler once, then null
+            when(cache.getNextJob(
+                    any(),
+                    any(),
+                    any(),
+                    anyBoolean()
+            )).thenReturn(jobHandler).thenReturn(null);
+
+            Field threadPoolField = 
JobQueueImpl.class.getDeclaredField("threadPool");
+            threadPoolField.setAccessible(true);
+            threadPoolField.set(jobQueue, threadPool);
+            doNothing().when(threadPool).execute(any());
+        // Act: Start jobs
+        jobQueue.startJobs();
+
+        // Wait for the job to be processed/removed (asynchronously)
+        long timeout = System.currentTimeMillis() + 2000; // 2 seconds timeout
+        while (System.currentTimeMillis() < timeout) {
+            if (!jobQueue.getProcessingJobsLists().containsKey(jobId)) {
+                break;
+            }
+            Thread.sleep(50);
+        }
+
+        // Assert: Job should be removed after processing
+        assertFalse("Job should be removed from processingJobsLists after 
successful processing",
+                jobQueue.getProcessingJobsLists().containsKey(jobId));
+    }
+}
diff --git 
a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java 
b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
index a75c92d..755cc1f 100644
--- 
a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
+++ 
b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
@@ -21,6 +21,8 @@ package org.apache.sling.event.impl.jobs.queues;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import java.io.Externalizable;
 import java.io.IOException;
@@ -149,6 +151,9 @@ public class TestTopicHalting {
 
         configuration = 
JobManagerConfigurationTestFactory.create(JobManagerConfiguration.DEFAULT_REPOSITORY_PATH,
 
                 factory, queueConfigurationManager);
+
+        configuration = spy(configuration);
+        when(configuration.isJobProcessingEnabled()).thenReturn(true);
         
         queueManager = QueueManager.newForTest(eventAdmin, jobConsumerManager, 
                 queuesMBean, threadPoolManager, threadPool, configuration, 
statisticsManager);
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingIT.java 
b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingIT.java
index 76d514f..beb4b28 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingIT.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingIT.java
@@ -57,6 +57,14 @@ public abstract class AbstractJobHandlingIT extends 
JobsTestSupport {
     public void setup() throws IOException {
         log.info("starting setup");
         registerTopologyListener();
+        
+        // Register the job processing condition
+        Hashtable<String, Object> propsConfig = new Hashtable<>();
+        propsConfig.put("osgi.condition.id", 
"org.apache.sling.event.jobs.processing.enabled");
+        ServiceRegistration<org.osgi.service.condition.Condition> conditionReg 
= 
+            
this.bundleContext.registerService(org.osgi.service.condition.Condition.class, 
+                new org.osgi.service.condition.Condition() {}, propsConfig);
+        this.registrations.add(conditionReg);
     }
 
     protected AtomicReference<TopologyEvent> lastTopologyEvent = new 
AtomicReference<>();
diff --git 
a/src/test/java/org/apache/sling/event/it/JobHandlingConditionIT.java 
b/src/test/java/org/apache/sling/event/it/JobHandlingConditionIT.java
new file mode 100644
index 0000000..25110d3
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/it/JobHandlingConditionIT.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.condition.Condition;
+
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.ops4j.pax.exam.CoreOptions.options;
+import static 
org.ops4j.pax.exam.cm.ConfigurationAdminOptions.factoryConfiguration;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class JobHandlingConditionIT extends AbstractJobHandlingIT {
+
+    public static final String TOPIC = "sling/test/condition";
+
+    private ServiceRegistration<Condition> jobProcessingConditionReg;
+
+    @Configuration
+    public Option[] configuration() {
+        return options(
+                baseConfiguration(),
+                // create test queue
+                
factoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration")
+                        .put(ConfigurationConstants.PROP_NAME, "test")
+                        .put(ConfigurationConstants.PROP_TYPE, 
QueueConfiguration.Type.UNORDERED.name())
+                        .put(ConfigurationConstants.PROP_TOPICS, new 
String[]{TOPIC, TOPIC + "2"})
+                        .put(ConfigurationConstants.PROP_RETRIES, 2)
+                        .put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L)
+                        .asOption()
+        );
+    }
+
+    @Before
+    public void additionalStartupDelay() throws InterruptedException {
+        Thread.sleep(2000);
+    }
+
+    /**
+     * Simulates toggling the job processing condition while jobs are being 
processed.
+     */
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testJobProcessingConditionToggle() throws Exception {
+        final AtomicInteger processed = new AtomicInteger(0);
+        final int jobCount = 10;
+        final String uniqueTopic = TOPIC + "/race/" + 
System.currentTimeMillis();
+
+        this.registerJobConsumer(uniqueTopic, job -> {
+            processed.incrementAndGet();
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return JobConsumer.JobResult.OK;
+        });
+
+        // Disable job processing before adding jobs
+        setJobProcessingEnabled(false);
+
+        // Add jobs while processing is disabled
+        for (int i = 0; i < jobCount; i++) {
+            jobManager.addJob(uniqueTopic, Collections.singletonMap("id", i));
+        }
+
+        // Wait to ensure jobs are not processed
+        Thread.sleep(1000);
+        assertEquals("No jobs should be processed while processing is 
disabled", 0, processed.get());
+
+        // Re-enable job processing
+        setJobProcessingEnabled(true);
+
+        // Wait for all jobs to finish
+        while (processed.get() < jobCount) {
+            Thread.sleep(100);
+        }
+        assertEquals("All jobs should eventually be processed", jobCount, 
processed.get());
+    }
+
+    /**
+     * Simulates topology changes while jobs are being processed.
+     */
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testTopologyChangeDuringJobProcessing() throws Exception {
+        final AtomicInteger processed = new AtomicInteger(0);
+        final int jobCount = 5;
+
+        this.registerJobConsumer(TOPIC, job -> {
+            processed.incrementAndGet();
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return JobConsumer.JobResult.OK;
+        });
+
+        // Disable job processing before adding jobs
+        setJobProcessingEnabled(false);
+
+        // Add jobs while processing is disabled
+        for (int i = 0; i < jobCount; i++) {
+            jobManager.addJob(TOPIC, Collections.singletonMap("id", i));
+        }
+
+        // Wait to ensure jobs are not processed
+        Thread.sleep(1000);
+        assertEquals("No jobs should be processed while processing is 
disabled", 0, processed.get());
+
+        // Re-enable job processing
+        setJobProcessingEnabled(true);
+
+        // Simulate topology change by registering/unregistering a dummy 
service
+        BundleContext ctx = bundleContext;
+        ServiceRegistration<?> reg = ctx.registerService(Object.class, new 
Object(), null);
+        Thread.sleep(500);
+        reg.unregister();
+
+        // Wait for all jobs to finish
+        while (processed.get() < jobCount) {
+            Thread.sleep(50);
+        }
+        assertEquals("All jobs should be processed despite topology changes", 
jobCount, processed.get());
+    }
+
+    /**
+     * Simulates rapid toggling of both condition and topology during job 
processing.
+     */
+    @Test(timeout = DEFAULT_TEST_TIMEOUT)
+    public void testRaceConditionWithConditionAndTopology() throws Exception {
+        final AtomicInteger processed = new AtomicInteger(0);
+        final int jobCount = 8;
+        final String uniqueTopic = TOPIC + "/race/" + 
System.currentTimeMillis();
+
+        this.registerJobConsumer(uniqueTopic, job -> {
+            processed.incrementAndGet();
+            try {
+                Thread.sleep(200);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return JobConsumer.JobResult.OK;
+        });
+
+        // Disable job processing before adding jobs
+        setJobProcessingEnabled(false);
+
+        // Add jobs while processing is disabled
+        for (int i = 0; i < jobCount; i++) {
+            jobManager.addJob(uniqueTopic, Collections.singletonMap("id", i));
+        }
+
+        // Wait to ensure jobs are not processed
+        Thread.sleep(1000);
+        assertEquals("No jobs should be processed while processing is 
disabled", 0, processed.get());
+
+        // Rapidly toggle condition and topology
+        BundleContext ctx = bundleContext;
+        for (int i = 0; i < 3; i++) {
+            setJobProcessingEnabled(true);
+            ServiceRegistration<?> reg = ctx.registerService(Object.class, new 
Object(), null);
+            Thread.sleep(300);
+            setJobProcessingEnabled(false);
+            reg.unregister();
+            Thread.sleep(300);
+        }
+
+        // Re-enable job processing to allow jobs to finish
+        setJobProcessingEnabled(true);
+
+        // Wait for all jobs to finish
+        long start = System.currentTimeMillis();
+        long maxWait = 10000; // 10 seconds
+        while (processed.get() < jobCount && (System.currentTimeMillis() - 
start) < maxWait) {
+            Thread.sleep(100);
+        }
+        assertEquals("All jobs should be processed after race conditions", 
jobCount, processed.get());
+    }
+
+    // Helper to toggle the job processing condition
+    private void setJobProcessingEnabled(boolean enabled) throws Exception {
+        String conditionId = "org.apache.sling.event.jobs.processing.enabled";
+        if (enabled) {
+            if (jobProcessingConditionReg == null) {
+                Dictionary<String, Object> props = new Hashtable<>();
+                props.put("osgi.condition.id", conditionId);
+                jobProcessingConditionReg = bundleContext.registerService(
+                        Condition.class, UNCONDITIONAL_CONDITION, props
+                );
+            }
+        } else {
+            if (jobProcessingConditionReg != null) {
+                jobProcessingConditionReg.unregister();
+                jobProcessingConditionReg = null;
+            }
+        }
+        Thread.sleep(300); // Wait for the system to react
+    }
+
+    private static final Condition UNCONDITIONAL_CONDITION = new Condition() 
{};
+}


Reply via email to