This is an automated email from the ASF dual-hosted git repository. joerghoh pushed a commit to branch SLING-12394 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-event.git
commit dae2016ba8d53498f5201c4f7ca936d9056762ca Author: Joerg Hoh <[email protected]> AuthorDate: Sat Jul 27 09:12:55 2024 +0200 SLING-12394 improve ResourceResolver handling --- .../sling/event/impl/jobs/JobManagerImpl.java | 21 +-- .../impl/jobs/config/JobManagerConfiguration.java | 16 +- .../sling/event/impl/jobs/queues/JobQueueImpl.java | 7 +- .../event/impl/jobs/queues/QueueJobCache.java | 5 +- .../sling/event/impl/jobs/queues/QueueManager.java | 5 +- .../impl/jobs/scheduling/JobSchedulerImpl.java | 5 +- .../impl/jobs/scheduling/ScheduledJobHandler.java | 99 +++++------- .../event/impl/jobs/tasks/CheckTopologyTask.java | 169 ++++++++++----------- .../sling/event/impl/jobs/tasks/CleanUpTask.java | 26 +--- .../impl/jobs/tasks/FindUnfinishedJobsTask.java | 8 +- .../event/impl/jobs/tasks/HistoryCleanUpTask.java | 5 +- .../sling/event/impl/jobs/tasks/UpgradeTask.java | 39 ++--- .../sling/event/impl/jobs/JobHandlerTest.java | 4 +- .../jobs/config/JobManagerConfigurationTest.java | 9 ++ 14 files changed, 165 insertions(+), 253 deletions(-) diff --git a/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java index 4f4ad08..062dee4 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java @@ -266,8 +266,7 @@ public class JobManagerImpl final boolean isHistoryJob = this.configuration.isStoragePath(job.getResourcePath()); // if history job, simply remove - otherwise move to history! if ( isHistoryJob ) { - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { final Resource jobResource = resolver.getResource(job.getResourcePath()); if ( jobResource != null ) { resolver.delete(jobResource); @@ -280,8 +279,6 @@ public class JobManagerImpl } catch ( final PersistenceException pe) { logger.warn("Unable to remove job at " + job.getResourcePath(), pe); result = false; - } finally { - resolver.close(); } } else { final JobHandler jh = new JobHandler(job, null, this.configuration); @@ -309,9 +306,8 @@ public class JobManagerImpl @Override public Job getJobById(final String id) { logger.debug("Getting job by id: {}", id); - final ResourceResolver resolver = this.configuration.createResourceResolver(); final StringBuilder buf = new StringBuilder(64); - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { buf.append("/jcr:root"); buf.append(this.configuration.getJobsBasePathWithSlash()); @@ -342,8 +338,6 @@ public class JobManagerImpl } } catch (final QuerySyntaxException qse) { logger.warn("Query syntax wrong " + buf.toString(), qse); - } finally { - resolver.close(); } logger.debug("Job not found with id: {}", id); return null; @@ -400,9 +394,8 @@ public class JobManagerImpl || type == QueryType.GIVEN_UP || type == QueryType.STOPPED; final List<Job> result = new ArrayList<>(); - final ResourceResolver resolver = this.configuration.createResourceResolver(); final StringBuilder buf = new StringBuilder(64); - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { buf.append(buildBaseQuery(this.configuration.getJobsBasePathWithSlash(), topic, type, isHistoryQuery)); if ( templates != null && templates.length > 0 ) { @@ -502,8 +495,6 @@ public class JobManagerImpl } } catch (final QuerySyntaxException qse) { logger.warn("Query syntax wrong " + buf.toString(), qse); - } finally { - resolver.close(); } return result; } @@ -587,8 +578,8 @@ public class JobManagerImpl logger.debug("Persisting job {} into queue {}, target={}", Utility.toString(jobTopic, jobProperties), info.queueName, info.targetId); } - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { + + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { final JobImpl job = this.writeJob(resolver, jobTopic, jobProperties, @@ -604,8 +595,6 @@ public class JobManagerImpl } catch (final PersistenceException re ) { // something went wrong, so let's log it this.logger.error("Exception during persisting new job '" + Utility.toString(jobTopic, jobProperties) + "'", re); - } finally { - resolver.close(); } if ( errors != null ) { errors.add("Unable to persist new job."); diff --git a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java index a66d626..14720c0 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java @@ -35,6 +35,7 @@ import org.apache.sling.event.impl.support.Environment; import org.apache.sling.event.impl.support.ResourceHelper; import org.apache.sling.event.jobs.Job; import org.apache.sling.serviceusermapping.ServiceUserMapped; +import org.jetbrains.annotations.NotNull; import org.osgi.framework.Constants; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -231,15 +232,12 @@ public class JobManagerConfiguration { this.historyCleanUpRemovedJobs = config.cleanup_period(); // create initial resources - final ResourceResolver resolver = this.createResourceResolver(); - try { + try (final ResourceResolver resolver = this.createResourceResolver();) { ResourceHelper.getOrCreateBasePath(resolver, this.getLocalJobsPath()); ResourceHelper.getOrCreateBasePath(resolver, this.getUnassignedJobsPath()); } catch ( final PersistenceException pe ) { logger.error("Unable to create default paths: " + pe.getMessage(), pe); throw new RuntimeException(pe); - } finally { - resolver.close(); } this.active.set(true); @@ -308,21 +306,21 @@ public class JobManagerConfiguration { * This ResourceResolver provides read and write access to all resources relevant for the event * and job handling. * - * @return A resource resolver or {@code null} if the component is already deactivated. + * @return A resource resolver * @throws RuntimeException if the resolver can't be created. */ - public ResourceResolver createResourceResolver() { - ResourceResolver resolver = null; + public @NotNull ResourceResolver createResourceResolver() { final ResourceResolverFactory factory = this.resourceResolverFactory; if ( factory != null ) { try { - resolver = this.resourceResolverFactory.getServiceResourceResolver(null); + return this.resourceResolverFactory.getServiceResourceResolver(null); } catch ( final LoginException le) { logger.error("Unable to create new resource resolver: " + le.getMessage(), le); throw new RuntimeException(le); } + } else { + throw new RuntimeException ("ResourceResolverFactory is null, cannot create ResourceResolver"); } - return resolver; } /** diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java index 3a272ea..60478f4 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java @@ -641,8 +641,7 @@ public class JobQueueImpl if ( !topics.isEmpty() ) { - final ResourceResolver resolver = this.services.configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = this.services.configuration.createResourceResolver();) { final Resource baseResource = resolver.getResource(this.services.configuration.getLocalJobsPath()); // sanity check - should never be null @@ -674,12 +673,10 @@ public class JobQueueImpl } try { resolver.commit(); - } catch ( final PersistenceException ignore) { + } catch (final PersistenceException ignore) { logger.error("Unable to remove jobs", ignore); } } - } finally { - resolver.close(); } } } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java index f4190e6..72b1753 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java @@ -221,8 +221,7 @@ public class QueueJobCache { final Map<String, List<JobImpl>> topicCache = new HashMap<String, List<JobImpl>>(); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath()); // sanity check - should never be null if ( baseResource != null ) { @@ -234,8 +233,6 @@ public class QueueJobCache { } } } - } finally { - resolver.close(); } orderTopics(topicCache); diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java index 5d919e0..a5b5464 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java @@ -426,8 +426,7 @@ public class QueueManager private Set<String> scanTopics() { final Set<String> topics = new HashSet<>(); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { final Resource baseResource = resolver.getResource(this.configuration.getLocalJobsPath()); // sanity check - should never be null @@ -440,8 +439,6 @@ public class QueueManager topics.add(topic); } } - } finally { - resolver.close(); } return topics; } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java index 7c4ce45..1469e41 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/JobSchedulerImpl.java @@ -472,8 +472,7 @@ public class JobSchedulerImpl * @param flag The corresponding flag */ public void setSuspended(final ScheduledJobInfoImpl info, final boolean flag) { - final ResourceResolver resolver = configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = configuration.createResourceResolver();) { final StringBuilder sb = new StringBuilder(this.configuration.getScheduledJobsPath(true)); sb.append(ResourceHelper.filterName(info.getName())); final String path = sb.toString(); @@ -496,8 +495,6 @@ public class JobSchedulerImpl } catch (final PersistenceException pe) { // we ignore the exception if removing fails ignoreException(pe); - } finally { - resolver.close(); } } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java index f925e4a..127e282 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/scheduling/ScheduledJobHandler.java @@ -141,36 +141,31 @@ public class ScheduledJobHandler implements Runnable { } private void scan() { - final ResourceResolver resolver = configuration.createResourceResolver(); - if ( resolver != null ) { - try { - logger.debug("Scanning for scheduled jobs..."); - final String path = this.configuration.getScheduledJobsPath(false); - final Resource startResource = resolver.getResource(path); - if ( startResource != null ) { - final Map<String, Holder> newScheduledJobs = new HashMap<String, Holder>(); - synchronized ( this.scheduledJobs ) { - for(final Resource rsrc : startResource.getChildren()) { - if ( !isRunning.get() ) { - break; - } - handleAddOrUpdate(newScheduledJobs, rsrc); + try (final ResourceResolver resolver = configuration.createResourceResolver();) { + logger.debug("Scanning for scheduled jobs..."); + final String path = this.configuration.getScheduledJobsPath(false); + final Resource startResource = resolver.getResource(path); + if ( startResource != null ) { + final Map<String, Holder> newScheduledJobs = new HashMap<String, Holder>(); + synchronized ( this.scheduledJobs ) { + for(final Resource rsrc : startResource.getChildren()) { + if ( !isRunning.get() ) { + break; } - if ( isRunning.get() ) { - for(final Holder h : this.scheduledJobs.values()) { - if ( h.info != null ) { - this.jobScheduler.unscheduleJob(h.info); - } + handleAddOrUpdate(newScheduledJobs, rsrc); + } + if ( isRunning.get() ) { + for(final Holder h : this.scheduledJobs.values()) { + if ( h.info != null ) { + this.jobScheduler.unscheduleJob(h.info); } - this.scheduledJobs.clear(); - this.scheduledJobs.putAll(newScheduledJobs); } + this.scheduledJobs.clear(); + this.scheduledJobs.putAll(newScheduledJobs); } } - logger.debug("Finished scanning for scheduled jobs..."); - } finally { - resolver.close(); } + logger.debug("Finished scanning for scheduled jobs..."); } } @@ -235,8 +230,7 @@ public class ScheduledJobHandler implements Runnable { final boolean suspend, final List<ScheduleInfoImpl> scheduleInfos) throws PersistenceException { - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { // create properties final Map<String, Object> properties = new HashMap<String, Object>(); @@ -286,8 +280,6 @@ public class ScheduledJobHandler implements Runnable { properties.put(ResourceHelper.PROPERTY_SCHEDULE_INFO, scheduleInfos); return properties; - } finally { - resolver.close(); } } @@ -331,23 +323,18 @@ public class ScheduledJobHandler implements Runnable { } } if ( !updateJobs.isEmpty() && isRunning.get() ) { - ResourceResolver resolver = configuration.createResourceResolver(); - if ( resolver != null ) { - try { - for(final Map.Entry<String, Holder> entry : updateJobs.entrySet()) { - final String path = configuration.getScheduledJobsPath(true) + entry.getKey(); - final Resource rsrc = resolver.getResource(path); - if ( !isRunning.get() ) { - break; - } - if ( rsrc != null ) { - synchronized ( scheduledJobs ) { - handleAddOrUpdate(scheduledJobs, rsrc); - } + try (ResourceResolver resolver = configuration.createResourceResolver();) { + for(final Map.Entry<String, Holder> entry : updateJobs.entrySet()) { + final String path = configuration.getScheduledJobsPath(true) + entry.getKey(); + final Resource rsrc = resolver.getResource(path); + if ( !isRunning.get() ) { + break; + } + if ( rsrc != null ) { + synchronized ( scheduledJobs ) { + handleAddOrUpdate(scheduledJobs, rsrc); } } - } finally { - resolver.close(); } } } @@ -387,17 +374,12 @@ public class ScheduledJobHandler implements Runnable { @Override public void run() { if ( isRunning.get() ) { - final ResourceResolver resolver = configuration.createResourceResolver(); - if ( resolver != null ) { - try { - final Resource rsrc = resolver.getResource(path); - if ( rsrc != null ) { - synchronized ( scheduledJobs ) { - handleAddOrUpdate(scheduledJobs, rsrc); - } + try (final ResourceResolver resolver = configuration.createResourceResolver();) { + final Resource rsrc = resolver.getResource(path); + if ( rsrc != null ) { + synchronized ( scheduledJobs ) { + handleAddOrUpdate(scheduledJobs, rsrc); } - } finally { - resolver.close(); } } } @@ -462,8 +444,8 @@ public class ScheduledJobHandler implements Runnable { public void remove(final ScheduledJobInfoImpl info) { final String scheduleKey = ResourceHelper.filterName(info.getName()); - final ResourceResolver resolver = configuration.createResourceResolver(); - try { + + try (final ResourceResolver resolver = configuration.createResourceResolver();) { final StringBuilder sb = new StringBuilder(configuration.getScheduledJobsPath(true)); sb.append(scheduleKey); final String path = sb.toString(); @@ -476,8 +458,6 @@ public class ScheduledJobHandler implements Runnable { } catch (final PersistenceException pe) { // we ignore the exception if removing fails ignoreException(pe); - } finally { - resolver.close(); } synchronized ( this.scheduledJobs ) { @@ -490,8 +470,7 @@ public class ScheduledJobHandler implements Runnable { public void updateSchedule(final String scheduleName, final Collection<ScheduleInfo> scheduleInfo) { - final ResourceResolver resolver = configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = configuration.createResourceResolver();) { final String scheduleKey = ResourceHelper.filterName(scheduleName); final StringBuilder sb = new StringBuilder(configuration.getScheduledJobsPath(true)); @@ -527,8 +506,6 @@ public class ScheduledJobHandler implements Runnable { logger.warn("Unable to update scheduled job " + scheduleName, pe); } } - } finally { - resolver.close(); } } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java index c5750be..f1166a1 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CheckTopologyTask.java @@ -72,29 +72,24 @@ public class CheckTopologyTask { private void reassignJobsFromStoppedInstances() { if ( caps.isLeader() && caps.isActive() ) { this.logger.debug("Checking for stopped instances..."); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - if ( resolver != null ) { - try { - final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath()); - this.logger.debug("Got jobs root {}", jobsRoot); + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { + final Resource jobsRoot = resolver.getResource(this.configuration.getAssginedJobsPath()); + this.logger.debug("Got jobs root {}", jobsRoot); - // this resource should exist, but we check anyway - if ( jobsRoot != null ) { - final Iterator<Resource> instanceIter = jobsRoot.listChildren(); - while ( caps.isActive() && instanceIter.hasNext() ) { - final Resource instanceResource = instanceIter.next(); + // this resource should exist, but we check anyway + if ( jobsRoot != null ) { + final Iterator<Resource> instanceIter = jobsRoot.listChildren(); + while ( caps.isActive() && instanceIter.hasNext() ) { + final Resource instanceResource = instanceIter.next(); - final String instanceId = instanceResource.getName(); - if ( !caps.isActive(instanceId) ) { - logger.debug("Found stopped instance {}", instanceId); - assignJobs(instanceResource, true); - } + final String instanceId = instanceResource.getName(); + if ( !caps.isActive(instanceId) ) { + logger.debug("Found stopped instance {}", instanceId); + assignJobs(instanceResource, true); } } - } finally { - resolver.close(); } - } + } } } @@ -104,85 +99,80 @@ public class CheckTopologyTask { private void reassignStaleJobs() { if ( caps.isActive() ) { this.logger.debug("Checking for stale jobs..."); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - if ( resolver != null ) { - try { - final Resource jobsRoot = resolver.getResource(this.configuration.getLocalJobsPath()); + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { + final Resource jobsRoot = resolver.getResource(this.configuration.getLocalJobsPath()); - // this resource should exist, but we check anyway - if ( jobsRoot != null ) { - final Iterator<Resource> topicIter = jobsRoot.listChildren(); - while ( caps.isActive() && topicIter.hasNext() ) { - final Resource topicResource = topicIter.next(); + // this resource should exist, but we check anyway + if ( jobsRoot != null ) { + final Iterator<Resource> topicIter = jobsRoot.listChildren(); + while ( caps.isActive() && topicIter.hasNext() ) { + final Resource topicResource = topicIter.next(); - final String topicName = topicResource.getName().replace('.', '/'); - this.logger.debug("Checking topic {}..." , topicName); - final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName); - boolean reassign = true; - for(final InstanceDescription desc : potentialTargets) { - if ( desc.isLocal() ) { - reassign = false; - break; - } + final String topicName = topicResource.getName().replace('.', '/'); + this.logger.debug("Checking topic {}..." , topicName); + final List<InstanceDescription> potentialTargets = caps.getPotentialTargets(topicName); + boolean reassign = true; + for(final InstanceDescription desc : potentialTargets) { + if ( desc.isLocal() ) { + reassign = false; + break; } - if ( reassign ) { - final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager(); - if ( qcm == null ) { - break; - } - final QueueInfo info = qcm.getQueueInfo(topicName); - logger.info ("Start reassigning stale jobs"); - JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() { + } + if ( reassign ) { + final QueueConfigurationManager qcm = this.configuration.getQueueConfigurationManager(); + if ( qcm == null ) { + break; + } + final QueueInfo info = qcm.getQueueInfo(topicName); + logger.info ("Start reassigning stale jobs"); + JobTopicTraverser.traverse(this.logger, topicResource, new JobTopicTraverser.ResourceCallback() { - @Override - public boolean handle(final Resource rsrc) { - try { - final ValueMap vm = ResourceHelper.getValueMap(rsrc); - final String targetId = caps.detectTarget(topicName, vm, info); + @Override + public boolean handle(final Resource rsrc) { + try { + final ValueMap vm = ResourceHelper.getValueMap(rsrc); + final String targetId = caps.detectTarget(topicName, vm, info); - final Map<String, Object> props = new HashMap<>(vm); - props.remove(Job.PROPERTY_JOB_STARTED_TIME); + final Map<String, Object> props = new HashMap<>(vm); + props.remove(Job.PROPERTY_JOB_STARTED_TIME); - final String newPath; + final String newPath; + if ( targetId != null ) { + newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); + props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName); + props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId); + } else { + newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); + props.remove(Job.PROPERTY_JOB_QUEUE_NAME); + props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE); + } + try { + ResourceHelper.getOrCreateResource(resolver, newPath, props); + resolver.delete(rsrc); + resolver.commit(); + final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class); if ( targetId != null ) { - newPath = configuration.getAssginedJobsPath() + '/' + targetId + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); - props.put(Job.PROPERTY_JOB_QUEUE_NAME, info.queueName); - props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId); + configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId); } else { - newPath = configuration.getUnassignedJobsPath() + '/' + topicResource.getName() + rsrc.getPath().substring(topicResource.getPath().length()); - props.remove(Job.PROPERTY_JOB_QUEUE_NAME); - props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE); - } - try { - ResourceHelper.getOrCreateResource(resolver, newPath, props); - resolver.delete(rsrc); - resolver.commit(); - final String jobId = vm.get(ResourceHelper.PROPERTY_JOB_ID, String.class); - if ( targetId != null ) { - configuration.getAuditLogger().debug("REASSIGN OK {} : {}", targetId, jobId); - } else { - configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId); - } - } catch ( final PersistenceException pe ) { - logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe); - resolver.refresh(); - resolver.revert(); + configuration.getAuditLogger().debug("REUNASSIGN OK : {}", jobId); } - } catch (final InstantiationException ie) { - // something happened with the resource in the meantime - logger.warn("Unable to move stale job from " + rsrc.getPath(), ie); + } catch ( final PersistenceException pe ) { + logger.warn("Unable to move stale job from " + rsrc.getPath() + " to " + newPath, pe); resolver.refresh(); resolver.revert(); } - return caps.isActive(); + } catch (final InstantiationException ie) { + // something happened with the resource in the meantime + logger.warn("Unable to move stale job from " + rsrc.getPath(), ie); + resolver.refresh(); + resolver.revert(); } - }); + return caps.isActive(); + } + }); - } } } - } finally { - resolver.close(); } } } @@ -197,18 +187,13 @@ public class CheckTopologyTask { public void assignUnassignedJobs() { if ( caps != null && caps.isLeader() && caps.isActive() ) { logger.debug("Checking unassigned jobs..."); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - if ( resolver != null ) { - try { - final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath()); - logger.debug("Got unassigned root {}", unassignedRoot); + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { + final Resource unassignedRoot = resolver.getResource(this.configuration.getUnassignedJobsPath()); + logger.debug("Got unassigned root {}", unassignedRoot); - // this resource should exist, but we check anyway - if ( unassignedRoot != null ) { - assignJobs(unassignedRoot, false); - } - } finally { - resolver.close(); + // this resource should exist, but we check anyway + if ( unassignedRoot != null ) { + assignJobs(unassignedRoot, false); } } } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java index f5f2c21..1ea196c 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/CleanUpTask.java @@ -145,8 +145,7 @@ public class CleanUpTask { } private void historyCleanUpRemovedJobs(Calendar since) { - ResourceResolver resolver = this.configuration.createResourceResolver(); - try { + try (ResourceResolver resolver = this.configuration.createResourceResolver();) { HistoryCleanUpTask.cleanup( since, resolver, @@ -203,8 +202,6 @@ public class CleanUpTask { )); } catch (PersistenceException e) { this.logger.warn("Exception during job resource tree cleanup.", e); - } finally { - resolver.close(); } } @@ -215,8 +212,7 @@ public class CleanUpTask { */ private void simpleEmptyFolderCleanup(final TopologyCapabilities caps, final String basePath) { this.logger.debug("Cleaning up job resource tree: looking for empty folders"); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { final Calendar cleanUpDate = getCalendarInstance(); // go back five minutes cleanUpDate.add(Calendar.MINUTE, -5); @@ -267,8 +263,6 @@ public class CleanUpTask { } catch (final PersistenceException pe) { // in the case of an error, we just log this as a warning this.logger.warn("Exception during job resource tree cleanup.", pe); - } finally { - resolver.close(); } } @@ -277,11 +271,7 @@ public class CleanUpTask { */ private void fullEmptyFolderCleanup(final TopologyCapabilities caps, final String basePath) { this.logger.debug("Cleaning up job resource tree: removing ALL empty folders"); - final ResourceResolver resolver = this.configuration.createResourceResolver(); - if ( resolver == null ) { - return; - } - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { final Resource baseResource = resolver.getResource(basePath); // sanity check - should never be null if ( baseResource != null ) { @@ -382,8 +372,6 @@ public class CleanUpTask { } catch (final PersistenceException pe) { // in the case of an error, we just log this as a warning this.logger.warn("Exception during job resource tree cleanup.", pe); - } finally { - resolver.close(); } } @@ -393,11 +381,7 @@ public class CleanUpTask { * @param assginedJobsPath The root path for the assigned jobs */ private void cleanUpInstanceIdFolders(final TopologyCapabilities caps, final String assginedJobsPath) { - final ResourceResolver resolver = this.configuration.createResourceResolver(); - if ( resolver == null ) { - return; - } - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { final Resource baseResource = resolver.getResource(assginedJobsPath); // sanity check - should never be null if ( baseResource != null ) { @@ -452,8 +436,6 @@ public class CleanUpTask { } catch (final PersistenceException e) { // in the case of an error, we just log this as a warning this.logger.warn("Exception during job resource tree cleanup.", e); - } finally { - resolver.close(); } } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java index 7bd889b..1058116 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/FindUnfinishedJobsTask.java @@ -61,11 +61,7 @@ public class FindUnfinishedJobsTask { */ private void initialScan() { logger.debug("Scanning repository for unfinished jobs..."); - final ResourceResolver resolver = configuration.createResourceResolver(); - if ( resolver == null ) { - return; - } - try { + try (final ResourceResolver resolver = configuration.createResourceResolver();) { final Resource baseResource = resolver.getResource(configuration.getLocalJobsPath()); // sanity check - should never be null @@ -79,8 +75,6 @@ public class FindUnfinishedJobsTask { initTopic(topicResource); } } - } finally { - resolver.close(); } } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java index 6cda613..0dfa664 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/HistoryCleanUpTask.java @@ -96,8 +96,7 @@ public class HistoryCleanUpTask implements JobExecutor { } else { stateList = null; } - final ResourceResolver resolver = this.configuration.createResourceResolver(); - try { + try (final ResourceResolver resolver = this.configuration.createResourceResolver();) { if ( stateList == null || stateList.contains(Job.JobState.SUCCEEDED.name()) ) { this.cleanup(removeDate, resolver, context, configuration.getStoredSuccessfulJobsPath(), topics, null); } @@ -111,8 +110,6 @@ public class HistoryCleanUpTask implements JobExecutor { } catch (final PersistenceException pe) { // in the case of an error, we just log this as a warning this.logger.warn("Exception during job resource tree cleanup.", pe); - } finally { - resolver.close(); } return context.result().succeeded(); } diff --git a/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java b/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java index 842addf..275982e 100644 --- a/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java +++ b/src/main/java/org/apache/sling/event/impl/jobs/tasks/UpgradeTask.java @@ -82,21 +82,17 @@ public class UpgradeTask { */ private void upgradeBridgedJobs() { final String path = configuration.getLocalJobsPath() + "/slingevent:eventadmin"; - final ResourceResolver resolver = configuration.createResourceResolver(); - if ( resolver != null ) { - try { - final Resource rootResource = resolver.getResource(path); - if ( rootResource != null ) { - upgradeBridgedJobs(rootResource); - } - if ( caps.isLeader() ) { - final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + "/slingevent:eventadmin"); - if ( unassignedRoot != null ) { - upgradeBridgedJobs(unassignedRoot); - } + + try ( final ResourceResolver resolver = configuration.createResourceResolver();) { + final Resource rootResource = resolver.getResource(path); + if ( rootResource != null ) { + upgradeBridgedJobs(rootResource); + } + if ( caps.isLeader() ) { + final Resource unassignedRoot = resolver.getResource(configuration.getUnassignedJobsPath() + "/slingevent:eventadmin"); + if ( unassignedRoot != null ) { + upgradeBridgedJobs(unassignedRoot); } - } finally { - resolver.close(); } } } @@ -155,16 +151,11 @@ public class UpgradeTask { * Handle jobs from previous versions (<= 3.1.4) by moving them to the unassigned area */ private void processJobsFromPreviousVersions() { - final ResourceResolver resolver = configuration.createResourceResolver(); - if ( resolver != null ) { - try { - this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath())); - this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath())); - } catch ( final PersistenceException pe ) { - this.logger.warn("Problems moving jobs from previous version.", pe); - } finally { - resolver.close(); - } + try (final ResourceResolver resolver = configuration.createResourceResolver();) { + this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionAnonPath())); + this.processJobsFromPreviousVersions(resolver.getResource(configuration.getPreviousVersionIdentifiedPath())); + } catch ( final PersistenceException pe ) { + this.logger.warn("Problems moving jobs from previous version.", pe); } } diff --git a/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java b/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java index bebc711..0c11810 100644 --- a/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java +++ b/src/test/java/org/apache/sling/event/impl/jobs/JobHandlerTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -181,7 +182,8 @@ public class JobHandlerTest { // we don't care what type of resolver we use for these tests resolver = spy (rrf.getAdministrativeResourceResolver(null)); - when (configuration.createResourceResolver()).thenReturn(resolver); +// when (configuration.createResourceResolver()).thenReturn(resolver); + doReturn(resolver).when(configuration).createResourceResolver(); // these are mocked because it's easier than invoking the activate method when (configuration.getJobsBasePathWithSlash()).thenReturn("/var/events/"); diff --git a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java index a2e68cd..1d63a6d 100644 --- a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java +++ b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.discovery.ClusterView; import org.apache.sling.discovery.InstanceDescription; import org.apache.sling.discovery.TopologyEvent; @@ -36,10 +37,15 @@ import org.apache.sling.discovery.TopologyEventListener; import org.apache.sling.discovery.TopologyView; import org.apache.sling.event.impl.TestUtil; import org.apache.sling.event.impl.discovery.InitDelayingTopologyEventListener; +import org.apache.sling.testing.mock.sling.junit.SlingContext; +import org.junit.Rule; import org.junit.Test; import org.mockito.Mockito; public class JobManagerConfigurationTest { + + @Rule + public SlingContext context = new SlingContext(); private TopologyView createView() { final TopologyView view = Mockito.mock(TopologyView.class); @@ -85,10 +91,13 @@ public class JobManagerConfigurationTest { @Test public void testTopologyChange() throws Exception { // mock scheduler final ChangeListener ccl = new ChangeListener(); + + ResourceResolverFactory rrf = context.getService(ResourceResolverFactory.class); // add change listener and verify ccl.init(1); final JobManagerConfiguration config = new JobManagerConfiguration(); + TestUtil.setFieldValue(config, "resourceResolverFactory",rrf); ((AtomicBoolean)TestUtil.getFieldValue(config, "active")).set(true); InitDelayingTopologyEventListener startupDelayListener = new InitDelayingTopologyEventListener(1, new TopologyEventListener() {
