Author: cziegeler
Date: Mon Sep 30 13:40:46 2013
New Revision: 1527561
URL: http://svn.apache.org/r1527561
Log:
SLING-3028 : Support for progress tracking of jobs
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1527561&r1=1527560&r2=1527561&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
Mon Sep 30 13:40:46 2013
@@ -72,6 +72,12 @@ public class JobHandler {
this.jobManager.reassign(this.job);
}
+ public void persistJobProperties(final String... propNames) {
+ if ( propNames != null ) {
+ this.jobManager.persistJobProperties(this.job, propNames);
+ }
+ }
+
@Override
public int hashCode() {
return this.job.getId().hashCode();
@@ -89,9 +95,4 @@ public class JobHandler {
public String toString() {
return "JobHandler(" + this.job.getId() + ")";
}
-
- public void updateProperty(final String propName) {
- this.jobManager.updateProperty(this.job, propName);
-
- }
}
\ No newline at end of file
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java?rev=1527561&r1=1527560&r2=1527561&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
Mon Sep 30 13:40:46 2013
@@ -46,27 +46,6 @@ public class JobImpl implements Job {
/** Internal job property containing optional delay override. */
public static final String PROPERTY_DELAY_OVERRIDE =
":slingevent:delayOverride";
- /** Property for log statements. */
- public static final String PROPERTY_LOG = "slingevent:log";
-
- /** Property for ETA. */
- public static final String PROPERTY_ETA = "slingevent:eta";
-
- /** Property for Steps. */
- public static final String PROPERTY_STEPS = "slingevent:steps";
-
- /** Property for Step. */
- public static final String PROPERTY_STEP = "slingevent:step";
-
- /** Property for final message. */
- public static final String PROPERTY_MESSAGE = "slingevent:message";
-
- /** Property for finished jobs. */
- public static final String PROPERTY_FINISHED_STATE =
"slingevent:finishedState";
-
- /** Property for finished jobs. */
- public static final String PROPERTY_FINISHED_DATE =
"slingevent:finishedDate";
-
private final ValueMap properties;
private final String topic;
@@ -260,45 +239,56 @@ public class JobImpl implements Job {
*/
public void prepare() {
this.properties.remove(JobImpl.PROPERTY_DELAY_OVERRIDE);
- this.properties.remove(JobImpl.PROPERTY_LOG);
- this.properties.remove(JobImpl.PROPERTY_ETA);
- this.properties.remove(JobImpl.PROPERTY_STEPS);
- this.properties.remove(JobImpl.PROPERTY_STEP);
- this.properties.remove(JobImpl.PROPERTY_MESSAGE);
+ this.properties.remove(Job.PROPERTY_JOB_LOG);
+ this.properties.remove(Job.PROPERTY_JOB_PROGRESS_ETA);
+ this.properties.remove(Job.PROPERTY_JOB_PROGRESS_STEPS);
+ this.properties.remove(Job.PROPERTY_JOB_PROGRESS_STEP);
+ this.properties.remove(Job.PROPERTY_RESULT_MESSAGE);
}
- public String update(final long eta) {
- this.setProperty(JobImpl.PROPERTY_ETA, eta);
- return JobImpl.PROPERTY_ETA;
+ public String[] startProgress(final int steps, final long eta) {
+ if ( steps > 0 ) {
+ this.setProperty(Job.PROPERTY_JOB_PROGRESS_STEPS, steps);
+ }
+ if ( eta > 0 ) {
+ this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, eta);
+ }
+ return new String[] {Job.PROPERTY_JOB_PROGRESS_ETA,
PROPERTY_JOB_PROGRESS_STEPS};
}
- public String startProgress(final long eta) {
- this.setProperty(JobImpl.PROPERTY_ETA, eta);
- return JobImpl.PROPERTY_ETA;
- }
+ public String setProgress(final int step) {
+ final int steps = this.getProperty(Job.PROPERTY_JOB_PROGRESS_STEPS,
-1);
+ if ( steps > 0 && step > 0 ) {
+ int current = this.getProperty(Job.PROPERTY_JOB_PROGRESS_STEP, 0);
+ current += step;
+ if ( current > steps ) {
+ current = steps;
+ }
+ this.setProperty(Job.PROPERTY_JOB_PROGRESS_STEP, current);
- public String startProgress(final int steps) {
- this.setProperty(JobImpl.PROPERTY_STEPS, steps);
- return JobImpl.PROPERTY_STEPS;
+ // TODO - recalculate ETA
+ return Job.PROPERTY_JOB_PROGRESS_STEP;
+ }
+ return null;
}
- public String setProgress(final int step) {
- this.setProperty(JobImpl.PROPERTY_STEP, step);
- return JobImpl.PROPERTY_STEP;
+ public String update(final long eta) {
+ this.setProperty(Job.PROPERTY_JOB_PROGRESS_ETA, eta);
+ return Job.PROPERTY_JOB_PROGRESS_ETA;
}
- public String log(final String message, Object... args) {
+ public String log(final String message, final Object... args) {
final String logEntry = MessageFormat.format(message, args);
- final String[] entries = this.getProperty(JobImpl.PROPERTY_LOG,
String[].class);
+ final String[] entries = this.getProperty(Job.PROPERTY_JOB_LOG,
String[].class);
if ( entries == null ) {
- this.setProperty(JobImpl.PROPERTY_LOG, new String[] {logEntry});
+ this.setProperty(Job.PROPERTY_JOB_LOG, new String[] {logEntry});
} else {
final String[] newEntries = new String[entries.length + 1];
System.arraycopy(entries, 0, newEntries, 0, entries.length);
newEntries[entries.length] = logEntry;
- this.setProperty(JobImpl.PROPERTY_LOG, newEntries);
+ this.setProperty(Job.PROPERTY_JOB_LOG, newEntries);
}
- return JobImpl.PROPERTY_LOG;
+ return Job.PROPERTY_JOB_LOG;
}
@Override
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1527561&r1=1527560&r2=1527561&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
Mon Sep 30 13:40:46 2013
@@ -506,7 +506,12 @@ public class JobManagerImpl
jobProperties.put(Job.PROPERTY_JOB_RETRIES,
vm.get(Job.PROPERTY_JOB_RETRIES, Integer.class));
jobProperties.put(Job.PROPERTY_JOB_RETRY_COUNT,
vm.get(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class));
jobProperties.put(Job.PROPERTY_JOB_PRIORITY,
JobPriority.valueOf(vm.get(Job.PROPERTY_JOB_PRIORITY,
JobPriority.NORM.name())));
-
+ if ( vm.get(Job.PROPERTY_JOB_PROGRESS_STEPS) != null ) {
+ jobProperties.put(Job.PROPERTY_JOB_PROGRESS_STEPS,
vm.get(Job.PROPERTY_JOB_PROGRESS_STEPS, Integer.class));
+ }
+ if ( vm.get(Job.PROPERTY_JOB_PROGRESS_STEP) != null ) {
+ jobProperties.put(Job.PROPERTY_JOB_PROGRESS_STEP,
vm.get(Job.PROPERTY_JOB_PROGRESS_STEP, Integer.class));
+ }
@SuppressWarnings("unchecked")
final List<Exception> readErrorList = (List<Exception>)
jobProperties.get(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
if ( readErrorList != null ) {
@@ -1068,7 +1073,9 @@ public class JobManagerImpl
final Map<String, Object> props = new HashMap<String,
Object>(vm);
props.put(JobImpl.PROPERTY_FINISHED_STATE, isSuccess ?
JobState.SUCCEEDED.name() : JobState.CANCELLED.name());
props.put(JobImpl.PROPERTY_FINISHED_DATE,
Calendar.getInstance());
-
+ if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) !=
null ) {
+ props.put(Job.PROPERTY_RESULT_MESSAGE,
job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
+ }
ResourceHelper.getOrCreateResource(resolver, newPath,
props);
}
resolver.delete(jobResource);
@@ -1112,6 +1119,9 @@ public class JobManagerImpl
if ( jobResource != null ) {
final ModifiableValueMap mvm =
jobResource.adaptTo(ModifiableValueMap.class);
mvm.put(Job.PROPERTY_JOB_RETRY_COUNT,
job.getProperty(Job.PROPERTY_JOB_RETRY_COUNT));
+ if ( job.getProperty(Job.PROPERTY_RESULT_MESSAGE) != null ) {
+ mvm.put(Job.PROPERTY_RESULT_MESSAGE,
job.getProperty(Job.PROPERTY_RESULT_MESSAGE));
+ }
mvm.remove(Job.PROPERTY_JOB_STARTED_TIME);
try {
resolver.commit();
@@ -1145,11 +1155,11 @@ public class JobManagerImpl
mvm.put(Job.PROPERTY_JOB_QUEUE_NAME,
info.getJob().getQueueName());
mvm.put(Job.PROPERTY_JOB_RETRIES,
info.getJob().getNumberOfRetries());
mvm.put(Job.PROPERTY_JOB_PRIORITY,
info.getJob().getJobPriority().name());
- mvm.remove(JobImpl.PROPERTY_ETA);
- mvm.remove(JobImpl.PROPERTY_STEPS);
- mvm.remove(JobImpl.PROPERTY_STEP);
- mvm.remove(JobImpl.PROPERTY_LOG);
- mvm.remove(JobImpl.PROPERTY_MESSAGE);
+ mvm.remove(Job.PROPERTY_JOB_PROGRESS_ETA);
+ mvm.remove(Job.PROPERTY_JOB_PROGRESS_STEPS);
+ mvm.remove(Job.PROPERTY_JOB_PROGRESS_STEP);
+ mvm.remove(Job.PROPERTY_JOB_LOG);
+ mvm.remove(Job.PROPERTY_RESULT_MESSAGE);
resolver.commit();
return true;
@@ -1387,14 +1397,19 @@ public class JobManagerImpl
/**
* Update the property of a job in the resource tree
*/
- public void updateProperty(final JobImpl job, final String propName) {
+ public void persistJobProperties(final JobImpl job, final String...
propNames) {
ResourceResolver resolver = null;
try {
resolver =
this.resourceResolverFactory.getAdministrativeResourceResolver(null);
final Resource jobResource =
resolver.getResource(job.getResourcePath());
if ( jobResource != null ) {
final ModifiableValueMap mvm =
jobResource.adaptTo(ModifiableValueMap.class);
- mvm.put(propName, job.getProperty(propName));
+ for(final String propName : propNames) {
+ final Object val = job.getProperty(propName);
+ if ( val != null ) {
+ mvm.put(propName, job.getProperty(propName));
+ }
+ }
resolver.commit();
}
} catch ( final PersistenceException ignore ) {
@@ -1413,7 +1428,7 @@ public class JobManagerImpl
*/
@Override
public void stopJobById(final String jobId) {
- // not implemented yet
-
+ // TODO not implemented yet
+ throw new IllegalStateException("Not implemented yet...");
}
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1527561&r1=1527560&r2=1527561&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Mon Sep 30 13:40:46 2013
@@ -543,26 +543,34 @@ public abstract class AbstractJobQueue
synchronized ( lock ) {
result = consumer.process(job, new
JobExecutionContext() {
+ private boolean hasInit = false;
+
@Override
public void initProgress(final int
steps,
final long eta) {
-
handler.updateProperty(job.startProgress(steps));
-
handler.updateProperty(job.startProgress(eta));
+ if ( !hasInit ) {
+
handler.persistJobProperties(job.startProgress(steps, eta));
+ hasInit = true;
+ }
}
@Override
public void
incrementProgressCount(final int steps) {
-
handler.updateProperty(job.setProgress(steps));
+ if ( hasInit ) {
+
handler.persistJobProperties(job.setProgress(steps));
+ }
}
@Override
public void updateProgress(final
long eta) {
-
handler.updateProperty(job.update(eta));
+ if ( hasInit ) {
+
handler.persistJobProperties(job.update(eta));
+ }
}
@Override
public void log(final String
message, Object... args) {
-
handler.updateProperty(job.log(message, args));
+
handler.persistJobProperties(job.log(message, args));
}
@Override
@@ -599,6 +607,9 @@ public abstract class AbstractJobQueue
if ( result.getRetryDelayInMs() !=
null ) {
job.setProperty(JobImpl.PROPERTY_DELAY_OVERRIDE, result.getRetryDelayInMs());
}
+ if ( result.getMessage() != null ) {
+
job.setProperty(Job.PROPERTY_RESULT_MESSAGE, result.getMessage());
+ }
finishedJob(job.getId(), result,
false);
}
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java?rev=1527561&r1=1527560&r2=1527561&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/support/ResourceHelper.java
Mon Sep 30 13:40:46 2013
@@ -36,6 +36,7 @@ import org.apache.sling.api.resource.Val
import org.apache.sling.event.EventUtil;
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
+import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobUtil;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.osgi.service.event.EventConstants;
@@ -65,7 +66,14 @@ public abstract class ResourceHelper {
JobUtil.PROPERTY_NOTIFICATION_JOB,
JobStatusNotifier.CONTEXT_PROPERTY_NAME,
JobImpl.PROPERTY_DELAY_OVERRIDE,
- JobConsumer.PROPERTY_JOB_ASYNC_HANDLER
+ JobConsumer.PROPERTY_JOB_ASYNC_HANDLER,
+ Job.PROPERTY_JOB_LOG,
+ Job.PROPERTY_JOB_PROGRESS_ETA,
+ Job.PROPERTY_JOB_PROGRESS_STEP,
+ Job.PROPERTY_JOB_PROGRESS_STEPS,
+ Job.PROPERTY_FINISHED_DATE,
+ Job.PROPERTY_FINISHED_STATE,
+ Job.PROPERTY_RESULT_MESSAGE
};
/**
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java?rev=1527561&r1=1527560&r2=1527561&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/Job.java
Mon Sep 30 13:40:46 2013
@@ -1,5 +1,5 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
+ 1 * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
@@ -116,6 +116,58 @@ public interface Job {
String PROPERTY_JOB_RETRY_DELAY = "event.job.retrydelay";
/**
+ * This property contains the optional output log of a job consumer.
+ * The value of this property is a string array.
+ * This property is read-only and can't be specified when the job is
created.
+ */
+ String PROPERTY_JOB_LOG = "slingevent:log";
+
+ /**
+ * This property contains the optional ETA for a job.
+ * The value of this property is a {@link Calendar} object.
+ * This property is read-only and can't be specified when the job is
created.
+ */
+ String PROPERTY_JOB_PROGRESS_ETA = "slingevent:progressETA";
+
+ /**
+ * This property contains optional progress information about a job,
+ * the number of steps the job consumer will perform. Each step is
+ * assumed to consume roughly the same amount if time.
+ * The value of this property is an integer.
+ * This property is read-only and can't be specified when the job is
created.
+ */
+ String PROPERTY_JOB_PROGRESS_STEPS = "slingevent:progressSteps";
+
+ /**
+ * This property contains optional progress information about a job,
+ * the number of completed steps.
+ * The value of this property is an integer.
+ * This property is read-only and can't be specified when the job is
created.
+ */
+ String PROPERTY_JOB_PROGRESS_STEP = "slingevent:progressStep";
+
+ /**
+ * This property contains the optional result message of a job consumer.
+ * The value of this property is a string.
+ * This property is read-only and can't be specified when the job is
created.
+ */
+ String PROPERTY_RESULT_MESSAGE = "slingevent:resultMessage";
+
+ /**
+ * This property contains the finished state of a job once it's marked as
finished.
+ * TODO - DOCUMENT
+ * This property is read-only and can't be specified when the job is
created.
+ */
+ String PROPERTY_FINISHED_STATE = "slingevent:finishedState";
+
+ /**
+ * This property contains the finished date once a job is marked as
finished.
+ * The value of this property is a {@link Calendar} object.
+ * This property is read-only and can't be specified when the job is
created.
+ */
+ String PROPERTY_FINISHED_DATE = "slingevent:finishedDate";
+
+ /**
* The job topic.
* @return The job topic
*/
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java?rev=1527561&r1=1527560&r2=1527561&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/JobManager.java
Mon Sep 30 13:40:46 2013
@@ -85,6 +85,9 @@ public interface JobManager {
* A job topic is a hierarchical name separated by dashes, each part has
to start with a letter,
* allowed characters are letters, numbers and the underscore.
*
+ * The returned job object is a snapshot of the job state taken at the
time of creation. Updates
+ * to the job state are not reflected and the client needs to get a new
job object using the job id.
+ *
* @param topic The required job topic.
* @param properties Optional job properties. The properties must be
serializable.
* @return The new job - or <code>null</code> if the job could not be
created.
@@ -107,6 +110,9 @@ public interface JobManager {
* dealing with this situation and as jobs with name come with a heavy
processing overhead
* these should be avoided.
*
+ * The returned job object is a snapshot of the job state taken at the
time of creation. Updates
+ * to the job state are not reflected and the client needs to get a new
job object using the job id.
+ *
* @param topic The required job topic.
* @param name Optional unique job name
* @param properties Optional job properties. The properties must be
serializable.
@@ -116,12 +122,22 @@ public interface JobManager {
Job addJob(String topic, String name, Map<String, Object> properties);
/**
+ * Return a job based on the unique job name.
+ *
+ * The returned job object is a snapshot of the job state taken at the
time of the call. Updates
+ * to the job state are not reflected and the client needs to get a new
job object using the job id.
+ *
* @return A job or <code>null</code>
* @since 1.2
*/
Job getJobByName(String name);
/**
+ * Return a job based on the unique id.
+ *
+ * The returned job object is a snapshot of the job state taken at the
time of the call. Updates
+ * to the job state are not reflected and the client needs to get a new
job object using the job id.
+ *
* @param jobId The unique identifier from {@link Job#getId()}
* @return A job or <code>null</code>
* @since 1.2
@@ -130,9 +146,11 @@ public interface JobManager {
/**
* Removes the job even if it is currently in processing.
+ *
* If the job exists and is not in processing, it gets removed from the
processing queue.
* If the job exists and is in processing, it is removed from the
persistence layer,
* however processing is not stopped.
+ *
* @param jobId The unique identifier from {@link Job#getId()}
* @return <code>true</code> if the job could be removed or does not exist
anymore.
* <code>false</code> otherwise.
@@ -142,9 +160,13 @@ public interface JobManager {
/**
* Find a job - either scheduled or active.
+ *
* This method searches for an event with the given topic and filter
properties. If more than one
* job matches, the first one found is returned which could be any of the
matching jobs.
*
+ * The returned job object is a snapshot of the job state taken at the
time of the call. Updates
+ * to the job state are not reflected and the client needs to get a new
job object using the job id.
+ *
* @param topic Topic is required.
* @param template The map acts like a template. The searched job
* must match the template (AND query).
@@ -163,6 +185,9 @@ public interface JobManager {
* If the history is returned, the result set is sorted in descending
order, listening the newest entry
* first. For unfinished jobs, the result set is sorted in ascending order.
*
+ * The returned job objects are a snapshot of the jobs state taken at the
time of the call. Updates
+ * to the job states are not reflected and the client needs to get new job
objects.
+ *
* @param type Required parameter for the type. See above.
* @param topic Topic can be used as a filter, if it is non-null, only
jobs with this topic will be returned.
* @param limit A positive number indicating the maximum number of jobs
returned by the iterator. A value