This is an automated email from the ASF dual-hosted git repository.
stefanegli pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git
The following commit(s) were added to refs/heads/master by this push:
new f4af6cc SLING-12743 - stop job processing if readiness condition not
present (#43)
f4af6cc is described below
commit f4af6ccee5ba161bb41df4bee472333e6c18b349
Author: ionutzpi <[email protected]>
AuthorDate: Mon May 19 11:28:17 2025 +0300
SLING-12743 - stop job processing if readiness condition not present (#43)
---
pom.xml | 6 +
.../impl/jobs/config/JobManagerConfiguration.java | 53 ++++
.../sling/event/impl/jobs/queues/JobQueueImpl.java | 35 ++-
.../sling/event/impl/jobs/queues/QueueManager.java | 24 +-
.../jobs/config/JobManagerConfigurationTest.java | 6 +
.../event/impl/jobs/queues/JobQueueImplTest.java | 303 +++++++++++++++++++++
.../event/impl/jobs/queues/TestTopicHalting.java | 5 +
.../sling/event/it/AbstractJobHandlingIT.java | 8 +
.../sling/event/it/JobHandlingConditionIT.java | 235 ++++++++++++++++
9 files changed, 670 insertions(+), 5 deletions(-)
diff --git a/pom.xml b/pom.xml
index 0def461..bc3c32f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -376,6 +376,12 @@
<version>2.6.7</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.condition</artifactId>
+ <version>1.0.0</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index a66d626..4064cc2 100644
---
a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++
b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -41,7 +41,10 @@ import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.component.annotations.ReferencePolicyOption;
+import org.osgi.service.condition.Condition;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
@@ -199,6 +202,56 @@ public class JobManagerConfiguration {
/** The topology capabilities. */
private volatile TopologyCapabilities topologyCapabilities;
+ /** The condition that determines if job processing is enabled. */
+ private volatile Condition jobProcessingEnabledCondition;
+
+ /**
+ * Handle binding of the job processing condition.
+ * @param condition The condition being bound
+ */
+ @Reference(
+ target = "(osgi.condition.id=true)",
+ cardinality = ReferenceCardinality.OPTIONAL,
+ policy = ReferencePolicy.DYNAMIC,
+ policyOption = ReferencePolicyOption.GREEDY
+ )
+ protected void bindJobProcessingEnabledCondition(final Condition
condition) {
+ if (this.jobProcessingEnabledCondition != null) {
+ logger.warn("Job processing readiness condition already set -
ignoring new condition");
+ return;
+ }
+ this.jobProcessingEnabledCondition = condition;
+ // If condition becomes true, trigger maintenance to start processing
jobs
+ if (condition != null) {
+ logger.info("Job processing readiness condition has been set -
jobs will be processed");
+ notifyListeners();
+ }
+ }
+
+ /**
+ * Handle unbinding of the job processing condition.
+ * @param condition The condition being unbound
+ */
+ protected void unbindJobProcessingEnabledCondition(final Condition
condition) {
+ if (this.jobProcessingEnabledCondition == condition) {
+ this.jobProcessingEnabledCondition = null;
+ logger.info("Job processing readiness condition has been removed -
jobs will not be processed");
+ // Signal jobs to stop before notifying listeners
+ stopProcessing();
+ notifyListeners();
+ }
+ }
+
+ /**
+ * Check if job processing is enabled.
+ * This only affects whether jobs are processed/executed - jobs can still
be
+ * assigned, stored, and managed through the API even when processing is
disabled.
+ * @return true if job processing is enabled, false otherwise
+ */
+ public boolean isJobProcessingEnabled() {
+ return jobProcessingEnabledCondition != null;
+ }
+
/**
* Activate this component.
* @param props Configuration properties
diff --git
a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index 3a272ea..15192fd 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -169,7 +169,7 @@ public class JobQueueImpl
* @param cache The job cache
* @param outdatedQueue
*/
- private JobQueueImpl(final String name,
+ protected JobQueueImpl(final String name,
final InternalQueueConfiguration config,
final QueueServices services,
final QueueJobCache cache,
@@ -252,6 +252,10 @@ public class JobQueueImpl
* This method might be called concurrently, therefore we use a guard
*/
public void startJobs() {
+ if (!services.configuration.isJobProcessingEnabled()) {
+ logger.debug("Job processing is disabled, skipping job starts for
queue {}", queueName);
+ return;
+ }
if ( this.startJobsGuard.compareAndSet(false, true) ) {
// we start as many jobs in parallel as possible
while ( this.running && !this.isOutdated.get() &&
!this.isSuspended() && this.available.tryAcquire() ) {
@@ -310,6 +314,10 @@ public class JobQueueImpl
}
private void startJob(final JobHandler handler) {
+ if (!services.configuration.isJobProcessingEnabled()) {
+ logger.debug("Job processing is disabled, stopping job {} in queue
{}", handler.getJob().getId(), queueName);
+ return;
+ }
try {
this.closeMarker.set(false);
try {
@@ -468,6 +476,10 @@ public class JobQueueImpl
* Periodic maintenance
*/
public void maintain() {
+ if (!services.configuration.isJobProcessingEnabled()) {
+ logger.debug("Job processing is disabled, skipping maintenance for
queue {}", queueName);
+ return;
+ }
// check suspended
final long since = this.suspendedSince.get();
if ( since != -1 && since + MAX_SUSPEND_TIME <
System.currentTimeMillis() ) {
@@ -736,6 +748,19 @@ public class JobQueueImpl
return handler != null;
}
+ /**
+ * Stop all currently running jobs in this queue
+ */
+ public void stopAllJobs() {
+ logger.debug("Stopping all running jobs in queue {}", queueName);
+ synchronized ( this.processingJobsLists ) {
+ for (final JobHandler handler : this.processingJobsLists.values())
{
+ handler.stop();
+ }
+ }
+ logger.debug("All running jobs stopped in queue {}", queueName);
+ }
+
private void reschedule(final JobHandler handler) {
// we delay putting back the job until the retry delay is over
final long delay = this.getRetryDelay(handler);
@@ -789,5 +814,13 @@ public class JobQueueImpl
OutdatedJobQueueInfo getOutdatedJobQueueInfo() {
return new OutdatedJobQueueInfo(available, maxParallel, drainage);
}
+
+ Map<String, JobHandler> getProcessingJobsLists() {
+ return processingJobsLists;
+ }
+
+ boolean isRunning() {
+ return running;
+ }
}
diff --git
a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 5d919e0..d1fbd7f 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -82,7 +82,7 @@ import org.slf4j.LoggerFactory;
public class QueueManager
implements Runnable, EventHandler, ConfigurationChangeListener {
- static QueueManager newForTest(EventAdmin eventAdmin, JobConsumerManager
jobConsumerManager,
+ public static QueueManager newForTest(EventAdmin eventAdmin,
JobConsumerManager jobConsumerManager,
QueuesMBean queuesMBean, ThreadPoolManager threadPoolManager,
ThreadPool threadPool,
JobManagerConfiguration configuration, StatisticsManager
statisticsManager) {
final QueueManager qm = new QueueManager();
@@ -385,17 +385,33 @@ public class QueueManager
public void configurationChanged(final boolean active) {
// are we still active?
if ( this.configuration != null ) {
- logger.debug("Topology changed {}", active);
- this.isActive.set(active);
+ boolean isActive = active &&
configuration.isJobProcessingEnabled();
+ logger.debug("Topology changed {}, job processing enabled: {}",
active, configuration.isJobProcessingEnabled());
+ this.isActive.set(isActive);
clearHaltedTopics("configurationChanged : unhalted topics due to
configuration change");
- if ( active ) {
+ if ( isActive ) {
fullTopicScan();
} else {
+ // Only stop jobs if readiness condition is not met
+ if (!configuration.isJobProcessingEnabled()) {
+ stopAllJobs();
+ }
this.restart();
}
}
}
+ /**
+ * Stop all running jobs in all queues
+ */
+ private void stopAllJobs() {
+ logger.debug("Stopping all running jobs...");
+ for (final JobQueueImpl queue : this.queues.values()) {
+ queue.stopAllJobs();
+ }
+ logger.debug("All running jobs stopped");
+ }
+
private void clearHaltedTopics(String logPrefix) {
final String haltedTopicsToString;
// Note: the synchronized below is just to avoid wrong logging about
unhalting,
diff --git
a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
index a2e68cd..828b6ae 100644
---
a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
+++
b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
@@ -21,6 +21,7 @@ package org.apache.sling.event.impl.jobs.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,6 +39,7 @@ import org.apache.sling.event.impl.TestUtil;
import org.apache.sling.event.impl.discovery.InitDelayingTopologyEventListener;
import org.junit.Test;
import org.mockito.Mockito;
+import org.osgi.service.condition.Condition;
public class JobManagerConfigurationTest {
@@ -99,6 +101,10 @@ public class JobManagerConfigurationTest {
});
TestUtil.setFieldValue(config, "startupDelayListener",
startupDelayListener);
+ // Create and bind the condition
+ Condition condition = mock(Condition.class);
+ config.bindJobProcessingEnabledCondition(condition);
+
config.addListener(ccl);
ccl.await();
diff --git
a/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
new file mode 100644
index 0000000..6be86cb
--- /dev/null
+++
b/src/test/java/org/apache/sling/event/impl/jobs/queues/JobQueueImplTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.JobHandler;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.slf4j.Logger;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+public class JobQueueImplTest {
+
+ private JobQueueImpl jobQueue;
+ private QueueServices services;
+ private JobManagerConfiguration configuration;
+ private QueueJobCache cache;
+ @InjectMocks
+ private ThreadPool threadPool;
+ private String testQueue = "testQueue";
+
+ @Before
+ public void setUp() {
+ configuration = mock(JobManagerConfiguration.class);
+ services = spy(QueueServices.class);
+ InternalQueueConfiguration internalConfig =
mock(InternalQueueConfiguration.class);
+ services.configuration = configuration;
+ when(configuration.isJobProcessingEnabled()).thenReturn(false);
+ when(internalConfig.getMaxParallel()).thenReturn(5);
+ when(internalConfig.getRetryDelayInMs()).thenReturn(1000L);
+ cache = mock(QueueJobCache.class);
+ threadPool = mock(ThreadPool.class);
+ jobQueue = new JobQueueImpl(testQueue, internalConfig, services,
cache, null);
+ }
+
+ @Test
+ public void testStartJobsWhenDisabled() {
+ // Add a job handler to the queue
+ JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+ String jobId = "testJob";
+ when(jobHandler.getJob().getId()).thenReturn(jobId);
+ jobQueue.getProcessingJobsLists().put(jobId, jobHandler);
+
+ when(configuration.isJobProcessingEnabled()).thenReturn(false);
+ jobQueue.startJobs();
+
+ // The job should not be started/removed
+ assertTrue("Job should remain in processingJobsLists when processing
is disabled", jobQueue.getProcessingJobsLists().containsKey(jobId));
+ }
+
+ @Test
+ public void testStartJob() throws Exception {
+ // Arrange
+ JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+ String jobId = "testJob";
+ when(jobHandler.getJob().getId()).thenReturn(jobId);
+ when(jobHandler.getConsumer().process(any(),
any())).thenReturn(JobExecutionResultImpl.SUCCEEDED);
+
+ Logger auditLogger = mock(Logger.class);
+ when(configuration.getAuditLogger()).thenReturn(auditLogger);
+
+ // Processing disabled: should not process any job
+ when(configuration.isJobProcessingEnabled()).thenReturn(false);
+
+ // Act
+ jobQueue.startJobs();
+
+ // Assert: No job should be in processingJobsLists
+ assertTrue("No jobs should be processed when processing is disabled",
+ jobQueue.getProcessingJobsLists().isEmpty());
+
+ // Enable processing
+ when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+ // Mock the cache to return the job handler once, then null
+ when(cache.getNextJob(
+ any(JobConsumerManager.class),
+ any(StatisticsManager.class),
+ eq(jobQueue),
+ anyBoolean()
+ )).thenReturn(jobHandler).thenReturn(null);
+
+ Field threadPoolField =
JobQueueImpl.class.getDeclaredField("threadPool");
+ threadPoolField.setAccessible(true);
+ threadPoolField.set(jobQueue, threadPool);
+ doNothing().when(threadPool).execute(any());
+
+ // Act: Try to start jobs again
+ jobQueue.startJobs();
+
+ // Wait for the job to be processed/removed (asynchronously)
+ long timeout = System.currentTimeMillis() + 2000;
+ while (System.currentTimeMillis() < timeout) {
+ if (!jobQueue.getProcessingJobsLists().containsKey(jobId)) {
+ break;
+ }
+ Thread.sleep(50);
+ }
+
+ // Assert: Job should now be removed
+ assertFalse("Job should be removed from processingJobsLists when
processing is enabled",
+ jobQueue.getProcessingJobsLists().containsKey(jobId));
+ }
+
+ @Test
+ public void testQueueShutdown() {
+ // Enable the configuration
+ when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+ jobQueue.close();
+
+ // Verify that the queue is no longer running
+ assertFalse("Queue should not be running after shutdown",
jobQueue.isRunning());
+
+ jobQueue.startJobs();
+
+ // Verify that no jobs were started by checking the internal state
+ assertTrue("No jobs should be started after queue shutdown",
jobQueue.getProcessingJobsLists().isEmpty());
+ }
+
+ @Test
+ public void testQueueStartupAndShutdown() {
+ when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+ jobQueue.startJobs();
+
+ // Add a job and verify it is present
+ JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+ String jobId = "testJob";
+ when(jobHandler.getJob().getId()).thenReturn(jobId);
+ jobQueue.getProcessingJobsLists().put(jobId, jobHandler);
+
+ assertTrue("Processing jobs list should contain the job after adding",
jobQueue.getProcessingJobsLists().containsKey(jobId));
+
+ jobQueue.close();
+
+ // Verify that the processingJobsLists is cleared after shutdown
+ assertTrue("Processing jobs list should be empty after shutdown",
jobQueue.getProcessingJobsLists().isEmpty());
+ }
+
+ @Test
+ public void testJobAssignmentWhenProcessingDisabled() {
+ JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+ String jobId = "testJob";
+ when(jobHandler.getJob().getId()).thenReturn(jobId);
+
+ when(configuration.isJobProcessingEnabled()).thenReturn(false);
+
+ jobQueue.getProcessingJobsLists().put(jobId, jobHandler);
+
+ try {
+ Method startJobMethod =
JobQueueImpl.class.getDeclaredMethod("startJob", JobHandler.class);
+ startJobMethod.setAccessible(true);
+ startJobMethod.invoke(jobQueue, jobHandler);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke startJob method", e);
+ }
+
+ assertTrue("Job should remain in processingJobsLists even when
processing is disabled", jobQueue.getProcessingJobsLists().containsKey(jobId));
+ }
+
+ @Test
+ public void testStartJobsWhenQueueSuspended() {
+ // Add a job before suspending
+ JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+ String jobId = "testJob";
+ when(jobHandler.getJob().getId()).thenReturn(jobId);
+ jobQueue.getProcessingJobsLists().put(jobId, jobHandler);
+
+ jobQueue.suspend();
+ jobQueue.startJobs();
+
+ // The job should still be present since the queue is suspended
+ assertTrue("No jobs should be started when the queue is suspended",
jobQueue.getProcessingJobsLists().containsKey(jobId));
+
+ // Activate the queue and enable processing
+ when(configuration.isJobProcessingEnabled()).thenReturn(true);
+ jobQueue.resume(); // Use resume() to wake up the queue
+
+ // Wait for the job to be processed/removed (asynchronously)
+ long timeout = System.currentTimeMillis() + 2000; // 2 seconds timeout
+ while (System.currentTimeMillis() < timeout) {
+ if (!jobQueue.getProcessingJobsLists().containsKey(jobId)) {
+ break;
+ }
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException ignored) {}
+ }
+
+ assertTrue("Job should be removed from processingJobsLists after
resuming queue when processing is enabled",
jobQueue.getProcessingJobsLists().containsKey(jobId));
+ }
+
+ @Test
+ public void testStopAllJobs() {
+ // Enable job processing
+ when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+ // Create and add multiple jobs to the processing list
+ JobHandler jobHandler1 = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+ JobHandler jobHandler2 = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+ String jobId1 = "testJob1";
+ String jobId2 = "testJob2";
+ when(jobHandler1.getJob().getId()).thenReturn(jobId1);
+ when(jobHandler2.getJob().getId()).thenReturn(jobId2);
+
+ // Add jobs to processing list
+ jobQueue.getProcessingJobsLists().put(jobId1, jobHandler1);
+ jobQueue.getProcessingJobsLists().put(jobId2, jobHandler2);
+
+ // Stop all jobs
+ jobQueue.stopAllJobs();
+
+ // Verify both jobs were stopped
+ verify(jobHandler1).stop();
+ verify(jobHandler2).stop();
+
+ // Verify jobs remain in processing list
+ assertTrue("Job1 should remain in processing list after stopAllJobs",
+ jobQueue.getProcessingJobsLists().containsKey(jobId1));
+ assertTrue("Job2 should remain in processing list after stopAllJobs",
+ jobQueue.getProcessingJobsLists().containsKey(jobId2));
+ assertEquals("Processing list should still contain 2 jobs", 2,
+ jobQueue.getProcessingJobsLists().size());
+ }
+
+ @Test
+ public void testActualJobProcessing() throws Exception {
+ // Arrange
+ JobHandler jobHandler = mock(JobHandler.class, RETURNS_DEEP_STUBS);
+ String jobId = "testJob";
+ when(jobHandler.getJob().getId()).thenReturn(jobId);
+ when(jobHandler.getConsumer().process(any(),
any())).thenReturn(JobExecutionResultImpl.SUCCEEDED);
+
+ // Enable job processing
+ when(configuration.isJobProcessingEnabled()).thenReturn(true);
+
+ // Mock the cache to return the job handler once, then null
+ when(cache.getNextJob(
+ any(),
+ any(),
+ any(),
+ anyBoolean()
+ )).thenReturn(jobHandler).thenReturn(null);
+
+ Field threadPoolField =
JobQueueImpl.class.getDeclaredField("threadPool");
+ threadPoolField.setAccessible(true);
+ threadPoolField.set(jobQueue, threadPool);
+ doNothing().when(threadPool).execute(any());
+ // Act: Start jobs
+ jobQueue.startJobs();
+
+ // Wait for the job to be processed/removed (asynchronously)
+ long timeout = System.currentTimeMillis() + 2000; // 2 seconds timeout
+ while (System.currentTimeMillis() < timeout) {
+ if (!jobQueue.getProcessingJobsLists().containsKey(jobId)) {
+ break;
+ }
+ Thread.sleep(50);
+ }
+
+ // Assert: Job should be removed after processing
+ assertFalse("Job should be removed from processingJobsLists after
successful processing",
+ jobQueue.getProcessingJobsLists().containsKey(jobId));
+ }
+}
diff --git
a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
index a75c92d..755cc1f 100644
---
a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
+++
b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
@@ -21,6 +21,8 @@ package org.apache.sling.event.impl.jobs.queues;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
import java.io.Externalizable;
import java.io.IOException;
@@ -149,6 +151,9 @@ public class TestTopicHalting {
configuration =
JobManagerConfigurationTestFactory.create(JobManagerConfiguration.DEFAULT_REPOSITORY_PATH,
factory, queueConfigurationManager);
+
+ configuration = spy(configuration);
+ when(configuration.isJobProcessingEnabled()).thenReturn(true);
queueManager = QueueManager.newForTest(eventAdmin, jobConsumerManager,
queuesMBean, threadPoolManager, threadPool, configuration,
statisticsManager);
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingIT.java
b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingIT.java
index 76d514f..beb4b28 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingIT.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingIT.java
@@ -57,6 +57,14 @@ public abstract class AbstractJobHandlingIT extends
JobsTestSupport {
public void setup() throws IOException {
log.info("starting setup");
registerTopologyListener();
+
+ // Register the job processing condition
+ Hashtable<String, Object> propsConfig = new Hashtable<>();
+ propsConfig.put("osgi.condition.id",
"org.apache.sling.event.jobs.processing.enabled");
+ ServiceRegistration<org.osgi.service.condition.Condition> conditionReg
=
+
this.bundleContext.registerService(org.osgi.service.condition.Condition.class,
+ new org.osgi.service.condition.Condition() {}, propsConfig);
+ this.registrations.add(conditionReg);
}
protected AtomicReference<TopologyEvent> lastTopologyEvent = new
AtomicReference<>();
diff --git
a/src/test/java/org/apache/sling/event/it/JobHandlingConditionIT.java
b/src/test/java/org/apache/sling/event/it/JobHandlingConditionIT.java
new file mode 100644
index 0000000..25110d3
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/it/JobHandlingConditionIT.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerMethod;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.condition.Condition;
+
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.ops4j.pax.exam.CoreOptions.options;
+import static
org.ops4j.pax.exam.cm.ConfigurationAdminOptions.factoryConfiguration;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerMethod.class)
+public class JobHandlingConditionIT extends AbstractJobHandlingIT {
+
+ public static final String TOPIC = "sling/test/condition";
+
+ private ServiceRegistration<Condition> jobProcessingConditionReg;
+
+ @Configuration
+ public Option[] configuration() {
+ return options(
+ baseConfiguration(),
+ // create test queue
+
factoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration")
+ .put(ConfigurationConstants.PROP_NAME, "test")
+ .put(ConfigurationConstants.PROP_TYPE,
QueueConfiguration.Type.UNORDERED.name())
+ .put(ConfigurationConstants.PROP_TOPICS, new
String[]{TOPIC, TOPIC + "2"})
+ .put(ConfigurationConstants.PROP_RETRIES, 2)
+ .put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L)
+ .asOption()
+ );
+ }
+
+ @Before
+ public void additionalStartupDelay() throws InterruptedException {
+ Thread.sleep(2000);
+ }
+
+ /**
+ * Simulates toggling the job processing condition while jobs are being
processed.
+ */
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testJobProcessingConditionToggle() throws Exception {
+ final AtomicInteger processed = new AtomicInteger(0);
+ final int jobCount = 10;
+ final String uniqueTopic = TOPIC + "/race/" +
System.currentTimeMillis();
+
+ this.registerJobConsumer(uniqueTopic, job -> {
+ processed.incrementAndGet();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return JobConsumer.JobResult.OK;
+ });
+
+ // Disable job processing before adding jobs
+ setJobProcessingEnabled(false);
+
+ // Add jobs while processing is disabled
+ for (int i = 0; i < jobCount; i++) {
+ jobManager.addJob(uniqueTopic, Collections.singletonMap("id", i));
+ }
+
+ // Wait to ensure jobs are not processed
+ Thread.sleep(1000);
+ assertEquals("No jobs should be processed while processing is
disabled", 0, processed.get());
+
+ // Re-enable job processing
+ setJobProcessingEnabled(true);
+
+ // Wait for all jobs to finish
+ while (processed.get() < jobCount) {
+ Thread.sleep(100);
+ }
+ assertEquals("All jobs should eventually be processed", jobCount,
processed.get());
+ }
+
+ /**
+ * Simulates topology changes while jobs are being processed.
+ */
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testTopologyChangeDuringJobProcessing() throws Exception {
+ final AtomicInteger processed = new AtomicInteger(0);
+ final int jobCount = 5;
+
+ this.registerJobConsumer(TOPIC, job -> {
+ processed.incrementAndGet();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return JobConsumer.JobResult.OK;
+ });
+
+ // Disable job processing before adding jobs
+ setJobProcessingEnabled(false);
+
+ // Add jobs while processing is disabled
+ for (int i = 0; i < jobCount; i++) {
+ jobManager.addJob(TOPIC, Collections.singletonMap("id", i));
+ }
+
+ // Wait to ensure jobs are not processed
+ Thread.sleep(1000);
+ assertEquals("No jobs should be processed while processing is
disabled", 0, processed.get());
+
+ // Re-enable job processing
+ setJobProcessingEnabled(true);
+
+ // Simulate topology change by registering/unregistering a dummy
service
+ BundleContext ctx = bundleContext;
+ ServiceRegistration<?> reg = ctx.registerService(Object.class, new
Object(), null);
+ Thread.sleep(500);
+ reg.unregister();
+
+ // Wait for all jobs to finish
+ while (processed.get() < jobCount) {
+ Thread.sleep(50);
+ }
+ assertEquals("All jobs should be processed despite topology changes",
jobCount, processed.get());
+ }
+
+ /**
+ * Simulates rapid toggling of both condition and topology during job
processing.
+ */
+ @Test(timeout = DEFAULT_TEST_TIMEOUT)
+ public void testRaceConditionWithConditionAndTopology() throws Exception {
+ final AtomicInteger processed = new AtomicInteger(0);
+ final int jobCount = 8;
+ final String uniqueTopic = TOPIC + "/race/" +
System.currentTimeMillis();
+
+ this.registerJobConsumer(uniqueTopic, job -> {
+ processed.incrementAndGet();
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return JobConsumer.JobResult.OK;
+ });
+
+ // Disable job processing before adding jobs
+ setJobProcessingEnabled(false);
+
+ // Add jobs while processing is disabled
+ for (int i = 0; i < jobCount; i++) {
+ jobManager.addJob(uniqueTopic, Collections.singletonMap("id", i));
+ }
+
+ // Wait to ensure jobs are not processed
+ Thread.sleep(1000);
+ assertEquals("No jobs should be processed while processing is
disabled", 0, processed.get());
+
+ // Rapidly toggle condition and topology
+ BundleContext ctx = bundleContext;
+ for (int i = 0; i < 3; i++) {
+ setJobProcessingEnabled(true);
+ ServiceRegistration<?> reg = ctx.registerService(Object.class, new
Object(), null);
+ Thread.sleep(300);
+ setJobProcessingEnabled(false);
+ reg.unregister();
+ Thread.sleep(300);
+ }
+
+ // Re-enable job processing to allow jobs to finish
+ setJobProcessingEnabled(true);
+
+ // Wait for all jobs to finish
+ long start = System.currentTimeMillis();
+ long maxWait = 10000; // 10 seconds
+ while (processed.get() < jobCount && (System.currentTimeMillis() -
start) < maxWait) {
+ Thread.sleep(100);
+ }
+ assertEquals("All jobs should be processed after race conditions",
jobCount, processed.get());
+ }
+
+ // Helper to toggle the job processing condition
+ private void setJobProcessingEnabled(boolean enabled) throws Exception {
+ String conditionId = "org.apache.sling.event.jobs.processing.enabled";
+ if (enabled) {
+ if (jobProcessingConditionReg == null) {
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put("osgi.condition.id", conditionId);
+ jobProcessingConditionReg = bundleContext.registerService(
+ Condition.class, UNCONDITIONAL_CONDITION, props
+ );
+ }
+ } else {
+ if (jobProcessingConditionReg != null) {
+ jobProcessingConditionReg.unregister();
+ jobProcessingConditionReg = null;
+ }
+ }
+ Thread.sleep(300); // Wait for the system to react
+ }
+
+ private static final Condition UNCONDITIONAL_CONDITION = new Condition()
{};
+}