This is an automated email from the ASF dual-hosted git repository.

joerghoh pushed a commit to branch SLING-12394
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git

commit dae2016ba8d53498f5201c4f7ca936d9056762ca
Author: Joerg Hoh <[email protected]>
AuthorDate: Sat Jul 27 09:12:55 2024 +0200

    SLING-12394 improve ResourceResolver handling
---
 .../sling/event/impl/jobs/JobManagerImpl.java      |  21 +--
 .../impl/jobs/config/JobManagerConfiguration.java  |  16 +-
 .../sling/event/impl/jobs/queues/JobQueueImpl.java |   7 +-
 .../event/impl/jobs/queues/QueueJobCache.java      |   5 +-
 .../sling/event/impl/jobs/queues/QueueManager.java |   5 +-
 .../impl/jobs/scheduling/JobSchedulerImpl.java     |   5 +-
 .../impl/jobs/scheduling/ScheduledJobHandler.java  |  99 +++++-------
 .../event/impl/jobs/tasks/CheckTopologyTask.java   | 169 ++++++++++-----------
 .../sling/event/impl/jobs/tasks/CleanUpTask.java   |  26 +---
 .../impl/jobs/tasks/FindUnfinishedJobsTask.java    |   8 +-
 .../event/impl/jobs/tasks/HistoryCleanUpTask.java  |   5 +-
 .../sling/event/impl/jobs/tasks/UpgradeTask.java   |  39 ++---
 .../sling/event/impl/jobs/JobHandlerTest.java      |   4 +-
 .../jobs/config/JobManagerConfigurationTest.java   |   9 ++
 14 files changed, 165 insertions(+), 253 deletions(-)

diff --git a/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java 
b/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
index 4f4ad08..062dee4 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
@@ -266,8 +266,7 @@ public class JobManagerImpl
                 final boolean isHistoryJob = 
this.configuration.isStoragePath(job.getResourcePath());
                 // if history job, simply remove - otherwise move to history!
                 if ( isHistoryJob ) {
-                    final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-                    try {
+                    try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
                         final Resource jobResource = 
resolver.getResource(job.getResourcePath());
                         if ( jobResource != null ) {
                             resolver.delete(jobResource);
@@ -280,8 +279,6 @@ public class JobManagerImpl
                     } catch ( final PersistenceException pe) {
                         logger.warn("Unable to remove job at " + 
job.getResourcePath(), pe);
                         result = false;
-                    } finally {
-                        resolver.close();
                     }
                 } else {
                     final JobHandler jh = new JobHandler(job, null, 
this.configuration);
@@ -309,9 +306,8 @@ public class JobManagerImpl
     @Override
     public Job getJobById(final String id) {
         logger.debug("Getting job by id: {}", id);
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
         final StringBuilder buf = new StringBuilder(64);
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
 
             buf.append("/jcr:root");
             buf.append(this.configuration.getJobsBasePathWithSlash());
@@ -342,8 +338,6 @@ public class JobManagerImpl
             }
         } catch (final QuerySyntaxException qse) {
             logger.warn("Query syntax wrong " + buf.toString(), qse);
-        } finally {
-            resolver.close();
         }
         logger.debug("Job not found with id: {}", id);
         return null;
@@ -400,9 +394,8 @@ public class JobManagerImpl
                                        || type == QueryType.GIVEN_UP
                                        || type == QueryType.STOPPED;
         final List<Job> result = new ArrayList<>();
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
         final StringBuilder buf = new StringBuilder(64);
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             
buf.append(buildBaseQuery(this.configuration.getJobsBasePathWithSlash(), topic, 
type, isHistoryQuery));
 
             if ( templates != null && templates.length > 0 ) {
@@ -502,8 +495,6 @@ public class JobManagerImpl
              }
         } catch (final QuerySyntaxException qse) {
             logger.warn("Query syntax wrong " + buf.toString(), qse);
-        } finally {
-            resolver.close();
         }
         return result;
     }
@@ -587,8 +578,8 @@ public class JobManagerImpl
             logger.debug("Persisting job {} into queue {}, target={}",
                     Utility.toString(jobTopic, jobProperties), info.queueName, 
info.targetId);
         }
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        try {
+        
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             final JobImpl job = this.writeJob(resolver,
                     jobTopic,
                     jobProperties,
@@ -604,8 +595,6 @@ public class JobManagerImpl
         } catch (final PersistenceException re ) {
             // something went wrong, so let's log it
             this.logger.error("Exception during persisting new job '" + 
Utility.toString(jobTopic, jobProperties) + "'", re);
-        } finally {
-            resolver.close();
         }
         if ( errors != null ) {
             errors.add("Unable to persist new job.");
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
 
b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index a66d626..14720c0 100644
--- 
a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -35,6 +35,7 @@ import org.apache.sling.event.impl.support.Environment;
 import org.apache.sling.event.impl.support.ResourceHelper;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.serviceusermapping.ServiceUserMapped;
+import org.jetbrains.annotations.NotNull;
 import org.osgi.framework.Constants;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -231,15 +232,12 @@ public class JobManagerConfiguration {
         this.historyCleanUpRemovedJobs = config.cleanup_period();
 
         // create initial resources
-        final ResourceResolver resolver = this.createResourceResolver();
-        try {
+        try (final ResourceResolver resolver = this.createResourceResolver();) 
{
             ResourceHelper.getOrCreateBasePath(resolver, 
this.getLocalJobsPath());
             ResourceHelper.getOrCreateBasePath(resolver, 
this.getUnassignedJobsPath());
         } catch ( final PersistenceException pe ) {
             logger.error("Unable to create default paths: " + pe.getMessage(), 
pe);
             throw new RuntimeException(pe);
-        } finally {
-            resolver.close();
         }
         this.active.set(true);
 
@@ -308,21 +306,21 @@ public class JobManagerConfiguration {
      * This ResourceResolver provides read and write access to all resources 
relevant for the event
      * and job handling.
      * 
-     * @return A resource resolver or {@code null} if the component is already 
deactivated.
+     * @return A resource resolver 
      * @throws RuntimeException if the resolver can't be created.
      */
-    public ResourceResolver createResourceResolver() {
-        ResourceResolver resolver = null;
+    public @NotNull ResourceResolver createResourceResolver() {
         final ResourceResolverFactory factory = this.resourceResolverFactory;
         if ( factory != null ) {
             try {
-                resolver = 
this.resourceResolverFactory.getServiceResourceResolver(null);
+                return 
this.resourceResolverFactory.getServiceResourceResolver(null);
             } catch ( final LoginException le) {
                 logger.error("Unable to create new resource resolver: " + 
le.getMessage(), le);
                 throw new RuntimeException(le);
             }
+        } else {
+            throw new RuntimeException ("ResourceResolverFactory is null, 
cannot create ResourceResolver");
         }
-        return resolver;
     }
 
     /**
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index 3a272ea..60478f4 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -641,8 +641,7 @@ public class JobQueueImpl
 
         if ( !topics.isEmpty() ) {
 
-            final ResourceResolver resolver = 
this.services.configuration.createResourceResolver();
-            try {
+            try (final ResourceResolver resolver = 
this.services.configuration.createResourceResolver();) {
                 final Resource baseResource = 
resolver.getResource(this.services.configuration.getLocalJobsPath());
 
                 // sanity check - should never be null
@@ -674,12 +673,10 @@ public class JobQueueImpl
                     }
                     try {
                         resolver.commit();
-                    } catch ( final PersistenceException ignore) {
+                    } catch (final PersistenceException ignore) {
                         logger.error("Unable to remove jobs", ignore);
                     }
                 }
-            } finally {
-                resolver.close();
             }
         }
     }
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
index f4190e6..72b1753 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
@@ -221,8 +221,7 @@ public class QueueJobCache {
 
         final Map<String, List<JobImpl>> topicCache = new HashMap<String, 
List<JobImpl>>();
 
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             final Resource baseResource = 
resolver.getResource(this.configuration.getLocalJobsPath());
             // sanity check - should never be null
             if ( baseResource != null ) {
@@ -234,8 +233,6 @@ public class QueueJobCache {
                     }
                 }
             }
-        } finally {
-            resolver.close();
         }
         orderTopics(topicCache);
 
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java 
b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 5d919e0..a5b5464 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -426,8 +426,7 @@ public class QueueManager
     private Set<String> scanTopics() {
         final Set<String> topics = new HashSet<>();
 
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             final Resource baseResource = 
resolver.getResource(this.configuration.getLocalJobsPath());
 
             // sanity check - should never be null
@@ -440,8 +439,6 @@ public class QueueManager
                     topics.add(topic);
                 }
             }
-        } finally {
-            resolver.close();
         }
         return topics;
     }
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
 
b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
index 7c4ce45..1469e41 100644
--- 
a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java
@@ -472,8 +472,7 @@ public class JobSchedulerImpl
      * @param flag The corresponding flag
      */
     public void setSuspended(final ScheduledJobInfoImpl info, final boolean 
flag) {
-        final ResourceResolver resolver = 
configuration.createResourceResolver();
-        try {
+        try (final ResourceResolver resolver = 
configuration.createResourceResolver();) {
             final StringBuilder sb = new 
StringBuilder(this.configuration.getScheduledJobsPath(true));
             sb.append(ResourceHelper.filterName(info.getName()));
             final String path = sb.toString();
@@ -496,8 +495,6 @@ public class JobSchedulerImpl
         } catch (final PersistenceException pe) {
             // we ignore the exception if removing fails
             ignoreException(pe);
-        } finally {
-            resolver.close();
         }
     }
 
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
 
b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
index f925e4a..127e282 100644
--- 
a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java
@@ -141,36 +141,31 @@ public class ScheduledJobHandler implements Runnable {
     }
 
     private void scan() {
-        final ResourceResolver resolver = 
configuration.createResourceResolver();
-        if ( resolver != null ) {
-            try {
-                logger.debug("Scanning for scheduled jobs...");
-                final String path = 
this.configuration.getScheduledJobsPath(false);
-                final Resource startResource = resolver.getResource(path);
-                if ( startResource != null ) {
-                    final Map<String, Holder> newScheduledJobs = new 
HashMap<String, Holder>();
-                    synchronized ( this.scheduledJobs ) {
-                        for(final Resource rsrc : startResource.getChildren()) 
{
-                            if ( !isRunning.get() ) {
-                                break;
-                            }
-                            handleAddOrUpdate(newScheduledJobs, rsrc);
+        try (final ResourceResolver resolver = 
configuration.createResourceResolver();) {
+            logger.debug("Scanning for scheduled jobs...");
+            final String path = this.configuration.getScheduledJobsPath(false);
+            final Resource startResource = resolver.getResource(path);
+            if ( startResource != null ) {
+                final Map<String, Holder> newScheduledJobs = new 
HashMap<String, Holder>();
+                synchronized ( this.scheduledJobs ) {
+                    for(final Resource rsrc : startResource.getChildren()) {
+                        if ( !isRunning.get() ) {
+                            break;
                         }
-                        if ( isRunning.get() ) {
-                            for(final Holder h : this.scheduledJobs.values()) {
-                                if ( h.info != null ) {
-                                    this.jobScheduler.unscheduleJob(h.info);
-                                }
+                        handleAddOrUpdate(newScheduledJobs, rsrc);
+                    }
+                    if ( isRunning.get() ) {
+                        for(final Holder h : this.scheduledJobs.values()) {
+                            if ( h.info != null ) {
+                                this.jobScheduler.unscheduleJob(h.info);
                             }
-                            this.scheduledJobs.clear();
-                            this.scheduledJobs.putAll(newScheduledJobs);
                         }
+                        this.scheduledJobs.clear();
+                        this.scheduledJobs.putAll(newScheduledJobs);
                     }
                 }
-                logger.debug("Finished scanning for scheduled jobs...");
-            } finally {
-                resolver.close();
             }
+            logger.debug("Finished scanning for scheduled jobs...");
         }
     }
 
@@ -235,8 +230,7 @@ public class ScheduledJobHandler implements Runnable {
             final boolean suspend,
             final List<ScheduleInfoImpl> scheduleInfos)
     throws PersistenceException {
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             // create properties
             final Map<String, Object> properties = new HashMap<String, 
Object>();
 
@@ -286,8 +280,6 @@ public class ScheduledJobHandler implements Runnable {
             properties.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, 
scheduleInfos);
 
             return properties;
-        } finally {
-            resolver.close();
         }
     }
 
@@ -331,23 +323,18 @@ public class ScheduledJobHandler implements Runnable {
                     }
                 }
                 if ( !updateJobs.isEmpty() && isRunning.get() ) {
-                    ResourceResolver resolver = 
configuration.createResourceResolver();
-                    if ( resolver != null ) {
-                        try {
-                            for(final Map.Entry<String, Holder> entry : 
updateJobs.entrySet()) {
-                                final String path = 
configuration.getScheduledJobsPath(true) + entry.getKey();
-                                final Resource rsrc = 
resolver.getResource(path);
-                                if ( !isRunning.get() ) {
-                                    break;
-                                }
-                                if ( rsrc != null ) {
-                                    synchronized ( scheduledJobs ) {
-                                        handleAddOrUpdate(scheduledJobs, rsrc);
-                                    }
+                    try (ResourceResolver resolver = 
configuration.createResourceResolver();) {
+                        for(final Map.Entry<String, Holder> entry : 
updateJobs.entrySet()) {
+                            final String path = 
configuration.getScheduledJobsPath(true) + entry.getKey();
+                            final Resource rsrc = resolver.getResource(path);
+                            if ( !isRunning.get() ) {
+                                break;
+                            }
+                            if ( rsrc != null ) {
+                                synchronized ( scheduledJobs ) {
+                                    handleAddOrUpdate(scheduledJobs, rsrc);
                                 }
                             }
-                        } finally {
-                            resolver.close();
                         }
                     }
                 }
@@ -387,17 +374,12 @@ public class ScheduledJobHandler implements Runnable {
             @Override
             public void run() {
                 if ( isRunning.get() ) {
-                    final ResourceResolver resolver = 
configuration.createResourceResolver();
-                    if ( resolver != null ) {
-                        try {
-                            final Resource rsrc = resolver.getResource(path);
-                            if ( rsrc != null ) {
-                                synchronized ( scheduledJobs ) {
-                                    handleAddOrUpdate(scheduledJobs, rsrc);
-                                }
+                    try (final ResourceResolver resolver = 
configuration.createResourceResolver();) {
+                        final Resource rsrc = resolver.getResource(path);
+                        if ( rsrc != null ) {
+                            synchronized ( scheduledJobs ) {
+                                handleAddOrUpdate(scheduledJobs, rsrc);
                             }
-                        } finally {
-                            resolver.close();
                         }
                     }
                 }
@@ -462,8 +444,8 @@ public class ScheduledJobHandler implements Runnable {
     public void remove(final ScheduledJobInfoImpl info) {
         final String scheduleKey = ResourceHelper.filterName(info.getName());
 
-        final ResourceResolver resolver = 
configuration.createResourceResolver();
-        try {
+        
+        try (final ResourceResolver resolver = 
configuration.createResourceResolver();) {
             final StringBuilder sb = new 
StringBuilder(configuration.getScheduledJobsPath(true));
             sb.append(scheduleKey);
             final String path = sb.toString();
@@ -476,8 +458,6 @@ public class ScheduledJobHandler implements Runnable {
         } catch (final PersistenceException pe) {
             // we ignore the exception if removing fails
             ignoreException(pe);
-        } finally {
-            resolver.close();
         }
 
         synchronized ( this.scheduledJobs ) {
@@ -490,8 +470,7 @@ public class ScheduledJobHandler implements Runnable {
 
     public void updateSchedule(final String scheduleName, final 
Collection<ScheduleInfo> scheduleInfo) {
 
-        final ResourceResolver resolver = 
configuration.createResourceResolver();
-        try {
+        try (final ResourceResolver resolver = 
configuration.createResourceResolver();) {
             final String scheduleKey = ResourceHelper.filterName(scheduleName);
 
             final StringBuilder sb = new 
StringBuilder(configuration.getScheduledJobsPath(true));
@@ -527,8 +506,6 @@ public class ScheduledJobHandler implements Runnable {
                     logger.warn("Unable to update scheduled job " + 
scheduleName, pe);
                 }
             }
-        } finally {
-            resolver.close();
         }
     }
 
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java 
b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
index c5750be..f1166a1 100644
--- 
a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java
@@ -72,29 +72,24 @@ public class CheckTopologyTask {
     private void reassignJobsFromStoppedInstances() {
         if ( caps.isLeader() && caps.isActive() ) {
             this.logger.debug("Checking for stopped instances...");
-            final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-            if ( resolver != null ) {
-                try {
-                    final Resource jobsRoot = 
resolver.getResource(this.configuration.getAssginedJobsPath());
-                    this.logger.debug("Got jobs root {}", jobsRoot);
+            try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
+                final Resource jobsRoot = 
resolver.getResource(this.configuration.getAssginedJobsPath());
+                this.logger.debug("Got jobs root {}", jobsRoot);
 
-                    // this resource should exist, but we check anyway
-                    if ( jobsRoot != null ) {
-                        final Iterator<Resource> instanceIter = 
jobsRoot.listChildren();
-                        while ( caps.isActive() && instanceIter.hasNext() ) {
-                            final Resource instanceResource = 
instanceIter.next();
+                // this resource should exist, but we check anyway
+                if ( jobsRoot != null ) {
+                    final Iterator<Resource> instanceIter = 
jobsRoot.listChildren();
+                    while ( caps.isActive() && instanceIter.hasNext() ) {
+                        final Resource instanceResource = instanceIter.next();
 
-                            final String instanceId = 
instanceResource.getName();
-                            if ( !caps.isActive(instanceId) ) {
-                                logger.debug("Found stopped instance {}", 
instanceId);
-                                assignJobs(instanceResource, true);
-                            }
+                        final String instanceId = instanceResource.getName();
+                        if ( !caps.isActive(instanceId) ) {
+                            logger.debug("Found stopped instance {}", 
instanceId);
+                            assignJobs(instanceResource, true);
                         }
                     }
-                } finally {
-                    resolver.close();
                 }
-            }
+            } 
         }
     }
 
@@ -104,85 +99,80 @@ public class CheckTopologyTask {
     private void reassignStaleJobs() {
         if ( caps.isActive() ) {
             this.logger.debug("Checking for stale jobs...");
-            final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-            if ( resolver != null ) {
-                try {
-                    final Resource jobsRoot = 
resolver.getResource(this.configuration.getLocalJobsPath());
+            try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
+                final Resource jobsRoot = 
resolver.getResource(this.configuration.getLocalJobsPath());
 
-                    // this resource should exist, but we check anyway
-                    if ( jobsRoot != null ) {
-                        final Iterator<Resource> topicIter = 
jobsRoot.listChildren();
-                        while ( caps.isActive() && topicIter.hasNext() ) {
-                            final Resource topicResource = topicIter.next();
+                // this resource should exist, but we check anyway
+                if ( jobsRoot != null ) {
+                    final Iterator<Resource> topicIter = 
jobsRoot.listChildren();
+                    while ( caps.isActive() && topicIter.hasNext() ) {
+                        final Resource topicResource = topicIter.next();
 
-                            final String topicName = 
topicResource.getName().replace('.', '/');
-                            this.logger.debug("Checking topic {}..." , 
topicName);
-                            final List<InstanceDescription> potentialTargets = 
caps.getPotentialTargets(topicName);
-                            boolean reassign = true;
-                            for(final InstanceDescription desc : 
potentialTargets) {
-                                if ( desc.isLocal() ) {
-                                    reassign = false;
-                                    break;
-                                }
+                        final String topicName = 
topicResource.getName().replace('.', '/');
+                        this.logger.debug("Checking topic {}..." , topicName);
+                        final List<InstanceDescription> potentialTargets = 
caps.getPotentialTargets(topicName);
+                        boolean reassign = true;
+                        for(final InstanceDescription desc : potentialTargets) 
{
+                            if ( desc.isLocal() ) {
+                                reassign = false;
+                                break;
                             }
-                            if ( reassign ) {
-                                final QueueConfigurationManager qcm = 
this.configuration.getQueueConfigurationManager();
-                                if ( qcm == null ) {
-                                    break;
-                                }
-                                final QueueInfo info = 
qcm.getQueueInfo(topicName);
-                               logger.info ("Start reassigning stale jobs");
-                                JobTopicTraverser.traverse(this.logger, 
topicResource, new JobTopicTraverser.ResourceCallback() {
+                        }
+                        if ( reassign ) {
+                            final QueueConfigurationManager qcm = 
this.configuration.getQueueConfigurationManager();
+                            if ( qcm == null ) {
+                                break;
+                            }
+                            final QueueInfo info = qcm.getQueueInfo(topicName);
+                            logger.info ("Start reassigning stale jobs");
+                            JobTopicTraverser.traverse(this.logger, 
topicResource, new JobTopicTraverser.ResourceCallback() {
 
-                                    @Override
-                                    public boolean handle(final Resource rsrc) 
{
-                                        try {
-                                            final ValueMap vm = 
ResourceHelper.getValueMap(rsrc);
-                                            final String targetId = 
caps.detectTarget(topicName, vm, info);
+                                @Override
+                                public boolean handle(final Resource rsrc) {
+                                    try {
+                                        final ValueMap vm = 
ResourceHelper.getValueMap(rsrc);
+                                        final String targetId = 
caps.detectTarget(topicName, vm, info);
 
-                                            final Map<String, Object> props = 
new HashMap<>(vm);
-                                            
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+                                        final Map<String, Object> props = new 
HashMap<>(vm);
+                                        
props.remove(Job.PROPERTY_JOB_STARTED_TIME);
 
-                                            final String newPath;
+                                        final String newPath;
+                                        if ( targetId != null ) {
+                                            newPath = 
configuration.getAssginedJobsPath() + '/' + targetId + '/' + 
topicResource.getName() + 
rsrc.getPath().substring(topicResource.getPath().length());
+                                            
props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
+                                            
props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                                        } else {
+                                            newPath = 
configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + 
rsrc.getPath().substring(topicResource.getPath().length());
+                                            
props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                                            
props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                                        }
+                                        try {
+                                            
ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                                            resolver.delete(rsrc);
+                                            resolver.commit();
+                                            final String jobId = 
vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
                                             if ( targetId != null ) {
-                                                newPath = 
configuration.getAssginedJobsPath() + '/' + targetId + '/' + 
topicResource.getName() + 
rsrc.getPath().substring(topicResource.getPath().length());
-                                                
props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName);
-                                                
props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                                                
configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
                                             } else {
-                                                newPath = 
configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + 
rsrc.getPath().substring(topicResource.getPath().length());
-                                                
props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-                                                
props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-                                            }
-                                            try {
-                                                
ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                                                resolver.delete(rsrc);
-                                                resolver.commit();
-                                                final String jobId = 
vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class);
-                                                if ( targetId != null ) {
-                                                    
configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId);
-                                                } else {
-                                                    
configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
-                                                }
-                                            } catch ( final 
PersistenceException pe ) {
-                                                logger.warn("Unable to move 
stale job from " + rsrc.getPath() + " to " + newPath, pe);
-                                                resolver.refresh();
-                                                resolver.revert();
+                                                
configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId);
                                             }
-                                        } catch (final InstantiationException 
ie) {
-                                            // something happened with the 
resource in the meantime
-                                            logger.warn("Unable to move stale 
job from " + rsrc.getPath(), ie);
+                                        } catch ( final PersistenceException 
pe ) {
+                                            logger.warn("Unable to move stale 
job from " + rsrc.getPath() + " to " + newPath, pe);
                                             resolver.refresh();
                                             resolver.revert();
                                         }
-                                        return caps.isActive();
+                                    } catch (final InstantiationException ie) {
+                                        // something happened with the 
resource in the meantime
+                                        logger.warn("Unable to move stale job 
from " + rsrc.getPath(), ie);
+                                        resolver.refresh();
+                                        resolver.revert();
                                     }
-                                });
+                                    return caps.isActive();
+                                }
+                            });
 
-                            }
                         }
                     }
-                } finally {
-                    resolver.close();
                 }
             }
         }
@@ -197,18 +187,13 @@ public class CheckTopologyTask {
     public void assignUnassignedJobs() {
         if ( caps != null && caps.isLeader() && caps.isActive() ) {
             logger.debug("Checking unassigned jobs...");
-            final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-            if ( resolver != null ) {
-                try {
-                    final Resource unassignedRoot = 
resolver.getResource(this.configuration.getUnassignedJobsPath());
-                    logger.debug("Got unassigned root {}", unassignedRoot);
+            try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
+                final Resource unassignedRoot = 
resolver.getResource(this.configuration.getUnassignedJobsPath());
+                logger.debug("Got unassigned root {}", unassignedRoot);
 
-                    // this resource should exist, but we check anyway
-                    if ( unassignedRoot != null ) {
-                        assignJobs(unassignedRoot, false);
-                    }
-                } finally {
-                    resolver.close();
+                // this resource should exist, but we check anyway
+                if ( unassignedRoot != null ) {
+                    assignJobs(unassignedRoot, false);
                 }
             }
         }
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java 
b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
index f5f2c21..1ea196c 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java
@@ -145,8 +145,7 @@ public class CleanUpTask {
     }
 
     private void historyCleanUpRemovedJobs(Calendar since) {
-        ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        try {
+        try (ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             HistoryCleanUpTask.cleanup(
                     since,
                     resolver,
@@ -203,8 +202,6 @@ public class CleanUpTask {
                     ));
         } catch (PersistenceException e) {
             this.logger.warn("Exception during job resource tree cleanup.", e);
-        } finally {
-            resolver.close();
         }
     }
 
@@ -215,8 +212,7 @@ public class CleanUpTask {
      */
     private void simpleEmptyFolderCleanup(final TopologyCapabilities caps, 
final String basePath) {
         this.logger.debug("Cleaning up job resource tree: looking for empty 
folders");
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             final Calendar cleanUpDate = getCalendarInstance();
             // go back five minutes
             cleanUpDate.add(Calendar.MINUTE, -5);
@@ -267,8 +263,6 @@ public class CleanUpTask {
         } catch (final PersistenceException pe) {
             // in the case of an error, we just log this as a warning
             this.logger.warn("Exception during job resource tree cleanup.", 
pe);
-        } finally {
-            resolver.close();
         }
     }
 
@@ -277,11 +271,7 @@ public class CleanUpTask {
      */
     private void fullEmptyFolderCleanup(final TopologyCapabilities caps, final 
String basePath) {
         this.logger.debug("Cleaning up job resource tree: removing ALL empty 
folders");
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        if ( resolver == null ) {
-            return;
-        }
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             final Resource baseResource = resolver.getResource(basePath);
             // sanity check - should never be null
             if ( baseResource != null ) {
@@ -382,8 +372,6 @@ public class CleanUpTask {
         } catch (final PersistenceException pe) {
             // in the case of an error, we just log this as a warning
             this.logger.warn("Exception during job resource tree cleanup.", 
pe);
-        } finally {
-            resolver.close();
         }
     }
 
@@ -393,11 +381,7 @@ public class CleanUpTask {
      * @param assginedJobsPath The root path for the assigned jobs
      */
     private void cleanUpInstanceIdFolders(final TopologyCapabilities caps, 
final String assginedJobsPath) {
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        if ( resolver == null ) {
-            return;
-        }
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             final Resource baseResource = 
resolver.getResource(assginedJobsPath);
             // sanity check - should never be null
             if ( baseResource != null ) {
@@ -452,8 +436,6 @@ public class CleanUpTask {
         } catch (final PersistenceException e) {
             // in the case of an error, we just log this as a warning
             this.logger.warn("Exception during job resource tree cleanup.", e);
-        } finally {
-            resolver.close();
         }
     }
 
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
 
b/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
index 7bd889b..1058116 100644
--- 
a/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java
@@ -61,11 +61,7 @@ public class FindUnfinishedJobsTask {
      */
     private void initialScan() {
         logger.debug("Scanning repository for unfinished jobs...");
-        final ResourceResolver resolver = 
configuration.createResourceResolver();
-        if ( resolver == null ) {
-            return;
-        }
-        try {
+        try (final ResourceResolver resolver = 
configuration.createResourceResolver();) {
             final Resource baseResource = 
resolver.getResource(configuration.getLocalJobsPath());
 
             // sanity check - should never be null
@@ -79,8 +75,6 @@ public class FindUnfinishedJobsTask {
                     initTopic(topicResource);
                 }
             }
-        } finally {
-            resolver.close();
         }
     }
 
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java 
b/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
index 6cda613..0dfa664 100644
--- 
a/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
+++ 
b/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java
@@ -96,8 +96,7 @@ public class HistoryCleanUpTask implements JobExecutor {
         } else {
             stateList = null;
         }
-        final ResourceResolver resolver = 
this.configuration.createResourceResolver();
-        try {
+        try (final ResourceResolver resolver = 
this.configuration.createResourceResolver();) {
             if ( stateList == null || 
stateList.contains(Job.JobState.SUCCEEDED.name()) ) {
                 this.cleanup(removeDate, resolver, context, 
configuration.getStoredSuccessfulJobsPath(), topics, null);
             }
@@ -111,8 +110,6 @@ public class HistoryCleanUpTask implements JobExecutor {
         } catch (final PersistenceException pe) {
             // in the case of an error, we just log this as a warning
             this.logger.warn("Exception during job resource tree cleanup.", 
pe);
-        } finally {
-            resolver.close();
         }
         return context.result().succeeded();
     }
diff --git 
a/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java 
b/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
index 842addf..275982e 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java
@@ -82,21 +82,17 @@ public class UpgradeTask {
      */
     private void upgradeBridgedJobs() {
         final String path = configuration.getLocalJobsPath() + 
"/slingevent:eventadmin";
-        final ResourceResolver resolver = 
configuration.createResourceResolver();
-        if ( resolver != null ) {
-            try {
-                final Resource rootResource = resolver.getResource(path);
-                if ( rootResource != null ) {
-                    upgradeBridgedJobs(rootResource);
-                }
-                if ( caps.isLeader() ) {
-                    final Resource unassignedRoot = 
resolver.getResource(configuration.getUnassignedJobsPath() + 
"/slingevent:eventadmin");
-                    if ( unassignedRoot != null ) {
-                        upgradeBridgedJobs(unassignedRoot);
-                    }
+
+        try ( final ResourceResolver resolver = 
configuration.createResourceResolver();) {
+            final Resource rootResource = resolver.getResource(path);
+            if ( rootResource != null ) {
+                upgradeBridgedJobs(rootResource);
+            }
+            if ( caps.isLeader() ) {
+                final Resource unassignedRoot = 
resolver.getResource(configuration.getUnassignedJobsPath() + 
"/slingevent:eventadmin");
+                if ( unassignedRoot != null ) {
+                    upgradeBridgedJobs(unassignedRoot);
                 }
-            } finally {
-                resolver.close();
             }
         }
     }
@@ -155,16 +151,11 @@ public class UpgradeTask {
      * Handle jobs from previous versions (<= 3.1.4) by moving them to the 
unassigned area
      */
     private void processJobsFromPreviousVersions() {
-        final ResourceResolver resolver = 
configuration.createResourceResolver();
-        if ( resolver != null ) {
-            try {
-                
this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath()));
-                
this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
-            } catch ( final PersistenceException pe ) {
-                this.logger.warn("Problems moving jobs from previous 
version.", pe);
-            } finally {
-                resolver.close();
-            }
+        try (final ResourceResolver resolver = 
configuration.createResourceResolver();) {
+            
this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath()));
+            
this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath()));
+        } catch ( final PersistenceException pe ) {
+            this.logger.warn("Problems moving jobs from previous version.", 
pe);
         }
     }
 
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java 
b/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java
index bebc711..0c11810 100644
--- a/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java
+++ b/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -181,7 +182,8 @@ public class JobHandlerTest {
         // we don't care what type of resolver we use for these tests
         resolver = spy (rrf.getAdministrativeResourceResolver(null));
 
-        when (configuration.createResourceResolver()).thenReturn(resolver);
+//        when (configuration.createResourceResolver()).thenReturn(resolver);
+        doReturn(resolver).when(configuration).createResourceResolver();
 
         // these are mocked because it's easier than invoking the activate 
method
         when 
(configuration.getJobsBasePathWithSlash()).thenReturn("/var/events/");
diff --git 
a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
 
b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
index a2e68cd..1d63a6d 100644
--- 
a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
+++ 
b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.discovery.ClusterView;
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.TopologyEvent;
@@ -36,10 +37,15 @@ import org.apache.sling.discovery.TopologyEventListener;
 import org.apache.sling.discovery.TopologyView;
 import org.apache.sling.event.impl.TestUtil;
 import org.apache.sling.event.impl.discovery.InitDelayingTopologyEventListener;
+import org.apache.sling.testing.mock.sling.junit.SlingContext;
+import org.junit.Rule;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 public class JobManagerConfigurationTest {
+    
+    @Rule
+    public SlingContext context = new SlingContext();
 
     private TopologyView createView() {
         final TopologyView view = Mockito.mock(TopologyView.class);
@@ -85,10 +91,13 @@ public class JobManagerConfigurationTest {
     @Test public void testTopologyChange() throws Exception {
         // mock scheduler
         final ChangeListener ccl = new ChangeListener();
+        
+        ResourceResolverFactory rrf = 
context.getService(ResourceResolverFactory.class);
 
         // add change listener and verify
         ccl.init(1);
         final JobManagerConfiguration config = new JobManagerConfiguration();
+        TestUtil.setFieldValue(config, "resourceResolverFactory",rrf);
         ((AtomicBoolean)TestUtil.getFieldValue(config, "active")).set(true);
         InitDelayingTopologyEventListener startupDelayListener = new 
InitDelayingTopologyEventListener(1, new TopologyEventListener() {
 

Reply via email to