Author: cziegeler
Date: Mon Sep 16 16:35:49 2013
New Revision: 1523723
URL: http://svn.apache.org/r1523723
Log:
SLING-3028 : Support for progress tracking of jobs
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobConsumerManager.java
Mon Sep 16 16:35:49 2013
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -35,12 +36,18 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.References;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
import org.apache.sling.discovery.PropertyProvider;
import org.apache.sling.event.impl.support.TopicMatcher;
import org.apache.sling.event.impl.support.TopicMatcherHelper;
+import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.apache.sling.event.jobs.consumer.JobStatus;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
@@ -54,9 +61,14 @@ import org.slf4j.LoggerFactory;
description="%job.consumermanager.description",
metatype=true)
@Service(value=JobConsumerManager.class)
-@Reference(referenceInterface=JobConsumer.class,
- cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
- policy=ReferencePolicy.DYNAMIC)
+@References({
+ @Reference(referenceInterface=JobConsumer.class,
+ cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
+ policy=ReferencePolicy.DYNAMIC),
+ @Reference(referenceInterface=JobExecutor.class,
+ cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
+ policy=ReferencePolicy.DYNAMIC)
+})
@Property(name="org.apache.sling.installer.configuration.persist",
boolValue=false, propertyPrivate=true)
public class JobConsumerManager {
@@ -154,22 +166,22 @@ public class JobConsumerManager {
}
/**
- * Get the consumer for the topic.
+ * Get the executor for the topic.
* @param topic The job topic
* @return A consumer or <code>null</code>
*/
- public JobConsumer getConsumer(final String topic) {
+ public JobExecutor getExecutor(final String topic) {
synchronized ( this.topicToConsumerMap ) {
final List<ConsumerInfo> consumers =
this.topicToConsumerMap.get(topic);
if ( consumers != null ) {
- return consumers.get(0).getConsumer(this.bundleContext);
+ return consumers.get(0).getExecutor(this.bundleContext);
}
final int pos = topic.lastIndexOf('/');
if ( pos > 0 ) {
final String category = topic.substring(0, pos +
1).concat("*");
final List<ConsumerInfo> categoryConsumers =
this.topicToConsumerMap.get(category);
if ( categoryConsumers != null ) {
- return
categoryConsumers.get(0).getConsumer(this.bundleContext);
+ return
categoryConsumers.get(0).getExecutor(this.bundleContext);
}
}
}
@@ -195,9 +207,42 @@ public class JobConsumerManager {
* @param serviceReference The service reference to the consumer.
*/
protected void bindJobConsumer(final ServiceReference serviceReference) {
+ this.bindService(serviceReference, true);
+ }
+
+ /**
+ * Unbind a consumer
+ * @param serviceReference The service reference to the consumer.
+ */
+ protected void unbindJobConsumer(final ServiceReference serviceReference) {
+ this.unbindService(serviceReference, true);
+ }
+
+ /**
+ * Bind a new executor
+ * @param serviceReference The service reference to the executor.
+ */
+ protected void bindJobExecutor(final ServiceReference serviceReference) {
+ this.bindService(serviceReference, false);
+ }
+
+ /**
+ * Unbind a executor
+ * @param serviceReference The service reference to the executor.
+ */
+ protected void unbindJobExecutor(final ServiceReference serviceReference) {
+ this.unbindService(serviceReference, false);
+ }
+
+ /**
+ * Bind a consumer or executor
+ * @param serviceReference The service reference to the consumer or
executor.
+ * @param isConsumer Indicating whether this is a JobConsumer or
JobExecutor
+ */
+ private void bindService(final ServiceReference serviceReference, final
boolean isConsumer) {
final String[] topics =
PropertiesUtil.toStringArray(serviceReference.getProperty(JobConsumer.PROPERTY_TOPICS));
if ( topics != null && topics.length > 0 ) {
- final ConsumerInfo info = new ConsumerInfo(serviceReference);
+ final ConsumerInfo info = new ConsumerInfo(serviceReference,
isConsumer);
boolean changed = false;
synchronized ( this.topicToConsumerMap ) {
for(final String t : topics) {
@@ -228,13 +273,14 @@ public class JobConsumerManager {
}
/**
- * Unbind a consumer
- * @param serviceReference The service reference to the consumer.
+ * Unbind a consumer or executor
+ * @param serviceReference The service reference to the consumer or
executor.
+ * @param isConsumer Indicating whether this is a JobConsumer or
JobExecutor
*/
- protected void unbindJobConsumer(final ServiceReference serviceReference) {
+ private void unbindService(final ServiceReference serviceReference, final
boolean isConsumer) {
final String[] topics =
PropertiesUtil.toStringArray(serviceReference.getProperty(JobConsumer.PROPERTY_TOPICS));
if ( topics != null && topics.length > 0 ) {
- final ConsumerInfo info = new ConsumerInfo(serviceReference);
+ final ConsumerInfo info = new ConsumerInfo(serviceReference,
isConsumer);
boolean changed = false;
synchronized ( this.topicToConsumerMap ) {
for(final String t : topics) {
@@ -311,12 +357,14 @@ public class JobConsumerManager {
private final static class ConsumerInfo implements
Comparable<ConsumerInfo> {
public final ServiceReference serviceReference;
- private JobConsumer consumer;
+ private final boolean isConsumer;
+ private JobExecutor executor;
public final int ranking;
public final long serviceId;
- public ConsumerInfo(final ServiceReference serviceReference) {
+ public ConsumerInfo(final ServiceReference serviceReference, final
boolean isConsumer) {
this.serviceReference = serviceReference;
+ this.isConsumer = isConsumer;
final Object sr =
serviceReference.getProperty(Constants.SERVICE_RANKING);
if ( sr == null || !(sr instanceof Integer)) {
this.ranking = 0;
@@ -350,11 +398,70 @@ public class JobConsumerManager {
return serviceReference.hashCode();
}
- public JobConsumer getConsumer(final BundleContext bundleContext) {
- if ( consumer == null ) {
- consumer = (JobConsumer)
bundleContext.getService(this.serviceReference);
+ public JobExecutor getExecutor(final BundleContext bundleContext) {
+ if ( executor == null ) {
+ if ( this.isConsumer ) {
+ executor = new JobConsumerWrapper((JobConsumer)
bundleContext.getService(this.serviceReference));
+ } else {
+ executor = (JobExecutor)
bundleContext.getService(this.serviceReference);
+ }
+ }
+ return executor;
+ }
+ }
+
+ private final static class JobConsumerWrapper implements JobExecutor {
+
+ private final JobConsumer consumer;
+
+ public JobConsumerWrapper(final JobConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public JobStatus process(final Job job, final JobExecutionContext
context) {
+ final JobConsumer.AsyncHandler asyncHandler =
+ new JobConsumer.AsyncHandler() {
+
+ final Object asyncLock = new Object();
+ final AtomicBoolean asyncDone = new
AtomicBoolean(false);
+
+ private void check(final JobStatus result) {
+ synchronized ( asyncLock ) {
+ if ( !asyncDone.get() ) {
+ asyncDone.set(true);
+ context.asyncProcessingFinished(result);
+ } else {
+ throw new IllegalStateException("Job is
already marked as processed");
+ }
+ }
+ }
+
+ @Override
+ public void ok() {
+ this.check(JobStatus.OK);
+ }
+
+ @Override
+ public void failed() {
+ this.check(JobStatus.FAILED);
+ }
+
+ @Override
+ public void cancel() {
+ this.check(JobStatus.CANCEL);
+ }
+ };
+ ((JobImpl)job).setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER,
asyncHandler);
+ final JobConsumer.JobResult result = this.consumer.process(job);
+ if ( result == JobResult.ASYNC ) {
+ return JobStatus.ASYNC;
+ } else if ( result == JobResult.FAILED) {
+ return JobStatus.FAILED;
+ } else if ( result == JobResult.OK) {
+ return JobStatus.OK;
}
- return consumer;
+ return JobStatus.CANCEL;
}
}
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
Mon Sep 16 16:35:49 2013
@@ -77,7 +77,7 @@ import org.apache.sling.event.jobs.Queue
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
import org.apache.sling.event.jobs.jmx.QueuesMBean;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -279,7 +279,7 @@ public class JobManagerImpl
*/
void process(final JobImpl job) {
// check if we still are able to process this job
- final JobConsumer consumer =
this.jobConsumerManager.getConsumer(job.getTopic());
+ final JobExecutor consumer =
this.jobConsumerManager.getExecutor(job.getTopic());
boolean reassign = false;
String reassignTargetId = null;
if ( consumer == null && (!job.isBridgedEvent() ||
!this.jobConsumerManager.supportsBridgedEvents())) {
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Mon Sep 16 16:35:49 2013
@@ -43,8 +43,9 @@ import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.Queue;
import org.apache.sling.event.jobs.Statistics;
-import org.apache.sling.event.jobs.consumer.JobConsumer;
-import org.apache.sling.event.jobs.consumer.JobConsumer.JobResult;
+import org.apache.sling.event.jobs.consumer.JobExecutionContext;
+import org.apache.sling.event.jobs.consumer.JobExecutor;
+import org.apache.sling.event.jobs.consumer.JobStatus;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
@@ -296,9 +297,9 @@ public abstract class AbstractJobQueue
return ack != null;
}
- private boolean handleReschedule(final JobHandler jobEvent, final
JobConsumer.JobResult result) {
+ private boolean handleReschedule(final JobHandler jobEvent, final
JobStatus result) {
boolean reschedule = false;
- switch ( result ) {
+ switch ( result.getState() ) {
case OK : // job is finished
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Finished job {}",
Utility.toString(jobEvent.getJob()));
@@ -351,11 +352,11 @@ public abstract class AbstractJobQueue
@Override
public boolean finishedJob(final Event job, final boolean
shouldReschedule) {
final String location = (String)job.getProperty(JobUtil.JOB_ID);
- return this.finishedJob(location, shouldReschedule ? JobResult.FAILED
: JobResult.OK, false);
+ return this.finishedJob(location, shouldReschedule ? JobStatus.FAILED
: JobStatus.OK, false);
}
private boolean finishedJob(final String jobId,
- final JobConsumer.JobResult result,
+ final JobStatus result,
final boolean isAsync) {
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Received finish for job {}, result={}", jobId,
result);
@@ -487,7 +488,7 @@ public abstract class AbstractJobQueue
*/
protected boolean executeJob(final JobHandler handler) {
final JobImpl job = handler.getJob();
- final JobConsumer consumer =
this.jobConsumerManager.getConsumer(job.getTopic());
+ final JobExecutor consumer =
this.jobConsumerManager.getExecutor(job.getTopic());
if ( (consumer != null || (job.isBridgedEvent() &&
this.jobConsumerManager.supportsBridgedEvents())) ) {
if ( handler.start() ) {
@@ -528,58 +529,72 @@ public abstract class AbstractJobQueue
break;
}
}
- JobConsumer.JobResult result =
JobConsumer.JobResult.CANCEL;
- final JobConsumer.AsyncHandler asyncHandler =
- new JobConsumer.AsyncHandler() {
-
- final Object asyncLock = new
Object();
- final AtomicBoolean asyncDone =
new AtomicBoolean(false);
-
- private void check(final
JobConsumer.JobResult result) {
- synchronized ( asyncLock ) {
- if ( !asyncDone.get() ) {
- asyncDone.set(true);
-
finishedJob(job.getId(), result, true);
-
asyncCounter.decrementAndGet();
- } else {
- throw new
IllegalStateException("Job is already marked as processed");
- }
- }
+ JobStatus result = JobStatus.CANCEL;
+ final AtomicBoolean isAsync = new
AtomicBoolean(false);
+
+ try {
+ synchronized ( job ) {
+ result = consumer.process(job, new
JobExecutionContext() {
+
+ @Override
+ public void update(long eta) {
+ // TODO Auto-generated method
stub
+
}
@Override
- public void ok() {
-
this.check(JobConsumer.JobResult.OK);
+ public void startProgress(long
eta) {
+ // TODO Auto-generated method
stub
+
}
@Override
- public void failed() {
-
this.check(JobConsumer.JobResult.FAILED);
+ public void startProgress(int
steps) {
+ // TODO Auto-generated method
stub
+
}
@Override
- public void cancel() {
-
this.check(JobConsumer.JobResult.CANCEL);
+ public void setProgress(int step) {
+ // TODO Auto-generated method
stub
+
}
- };
-
job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, asyncHandler);
- try {
- result = consumer.process(job);
+
+ @Override
+ public void log(String message,
Object... args) {
+ // TODO Auto-generated method
stub
+
+ }
+
+ @Override
+ public void
asyncProcessingFinished(final JobStatus status) {
+ synchronized ( job ) {
+ if (
isAsync.compareAndSet(true, false) ) {
+
finishedJob(job.getId(), status, true);
+
asyncCounter.decrementAndGet();
+ } else {
+ throw new
IllegalStateException("Job is not processed async " + job.getId());
+ }
+ }
+ }
+ });
+ if ( result.getState() ==
JobStatus.JobState.ASYNC ) {
+ asyncCounter.incrementAndGet();
+ notifyFinished(null);
+ isAsync.set(true);
+ }
+ }
} catch (final Throwable t) { //NOSONAR
logger.error("Unhandled error occured in
job processor " + t.getMessage() + " while processing job " +
Utility.toString(job), t);
// we don't reschedule if an exception
occurs
- result = JobConsumer.JobResult.CANCEL;
+ result = JobStatus.CANCEL;
} finally {
currentThread.setPriority(oldPriority);
currentThread.setName(oldName);
- if ( result != JobConsumer.JobResult.ASYNC
) {
+ if ( result.getState() !=
JobStatus.JobState.ASYNC ) {
finishedJob(job.getId(), result,
false);
}
}
- if ( result == JobConsumer.JobResult.ASYNC ) {
- asyncCounter.incrementAndGet();
- notifyFinished(null);
- }
}
};
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobExecutionContext.java
Mon Sep 16 16:35:49 2013
@@ -30,6 +30,7 @@ public interface JobExecutionContext {
/**
* Report an async result.
* @throws IllegalStateException If the job is not processed asynchronously
+ * or if this method has already been called.
*/
void asyncProcessingFinished(final JobStatus status);
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java?rev=1523723&r1=1523722&r2=1523723&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/consumer/JobStatus.java
Mon Sep 16 16:35:49 2013
@@ -29,7 +29,15 @@ import aQute.bnd.annotation.ProviderType
@ProviderType
public final class JobStatus {
- enum JobState {
+ public static final JobStatus OK = new JobStatus(JobState.OK, null);
+
+ public static final JobStatus FAILED = new JobStatus(JobState.FAILED,
null);
+
+ public static final JobStatus CANCEL = new JobStatus(JobState.CANCEL,
null);
+
+ public static final JobStatus ASYNC = new JobStatus(JobState.ASYNC, null);
+
+ public enum JobState {
OK, // processing finished
FAILED, // processing failed, can be retried
CANCEL, // processing failed permanently
@@ -40,15 +48,16 @@ public final class JobStatus {
private final String message;
- private Long retryDelay;
+ private final Long retryDelay;
- public JobStatus(final JobState result) {
- this(result, null);
+ public JobStatus(final JobState result, final String message) {
+ this(result, message, null);
}
- public JobStatus(final JobState result, final String message) {
+ public JobStatus(final JobState result, final String message, final Long
retryDelayInMs) {
this.state = result;
this.message = message;
+ this.retryDelay = retryDelayInMs;
}
public JobState getState() {
@@ -66,7 +75,9 @@ public final class JobStatus {
return this.retryDelay;
}
- public void setRetryDelayInMs(final Long value) {
- this.retryDelay = value;
+ @Override
+ public String toString() {
+ return "JobStatus [state=" + state + ", message=" + message
+ + ", retryDelay=" + retryDelay + "]";
}
}