Author: cziegeler
Date: Mon May 6 11:53:22 2013
New Revision: 1479531
URL: http://svn.apache.org/r1479531
Log:
SLING-2856 : NPE in JobManagerImpl to read priority for Job
SLING-2829 : Distributed jobs only across local cluster if possible
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java?rev=1479531&r1=1479530&r2=1479531&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerConfiguration.java
Mon May 6 11:53:22 2013
@@ -43,6 +43,12 @@ public class JobManagerConfiguration {
/** The background loader waits this time of seconds after startup before
loading events from the repository. (in secs) */
public static final String CONFIG_PROPERTY_BACKGROUND_LOAD_DELAY =
"load.delay";
+ /** Default for disabling the distribution. */
+ public static final boolean DEFAULT_DISABLE_DISTRIBUTION = false;
+
+ /** Configuration switch for distributing the jobs. */
+ public static final String PROPERTY_DISABLE_DISTRIBUTION =
"job.consumermanager.disableDistribution";
+
/** The jobs base path with a slash. */
private String jobsBasePathWithSlash;
@@ -70,7 +76,10 @@ public class JobManagerConfiguration {
private long backgroundLoadDelay;
+ private boolean disabledDistribution;
+
public JobManagerConfiguration(final Map<String, Object> props) {
+ this.disabledDistribution =
PropertiesUtil.toBoolean(props.get(PROPERTY_DISABLE_DISTRIBUTION),
DEFAULT_DISABLE_DISTRIBUTION);
this.jobsBasePathWithSlash =
PropertiesUtil.toString(props.get(CONFIG_PROPERTY_REPOSITORY_PATH),
DEFAULT_REPOSITORY_PATH) + '/';
@@ -199,4 +208,8 @@ public class JobManagerConfiguration {
public String getPreviousVersionIdentifiedPath() {
return this.previousVersionIdentifiedPath;
}
+
+ public boolean disableDistribution() {
+ return this.disabledDistribution;
+ }
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1479531&r1=1479530&r2=1479531&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
Mon May 6 11:53:22 2013
@@ -495,14 +495,13 @@ public class JobManagerImpl
// convert to integers (JCR supports only long...)
jobProperties.put(Job.PROPERTY_JOB_RETRIES,
vm.get(Job.PROPERTY_JOB_RETRIES, Integer.class));
jobProperties.put(Job.PROPERTY_JOB_RETRY_COUNT,
vm.get(Job.PROPERTY_JOB_RETRY_COUNT, Integer.class));
- jobProperties.put(Job.PROPERTY_JOB_PRIORITY,
JobPriority.valueOf(vm.get(Job.PROPERTY_JOB_PRIORITY, String.class)));
+ jobProperties.put(Job.PROPERTY_JOB_PRIORITY,
JobPriority.valueOf(vm.get(Job.PROPERTY_JOB_PRIORITY,
JobPriority.NORM.name())));
job = new JobImpl(topic,
(String)jobProperties.get(JobUtil.PROPERTY_JOB_NAME),
(String)jobProperties.get(JobUtil.JOB_ID),
jobProperties);
} else {
- logger.warn(errorMessage + " : {}", vm);
// remove the job as the topic is invalid anyway
try {
resource.getResourceResolver().delete(resource);
@@ -544,7 +543,7 @@ public class JobManagerImpl
private void startProcessing(final long changeCount, final TopologyView
view) {
// create new capabilities and update view
- this.topologyCapabilities = new TopologyCapabilities(view,
changeCount);
+ this.topologyCapabilities = new TopologyCapabilities(view,
this.configuration.disableDistribution(), changeCount);
this.backgroundLoader.start();
}
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java?rev=1479531&r1=1479530&r2=1479531&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
(original)
+++
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TopologyCapabilities.java
Mon May 6 11:53:22 2013
@@ -29,6 +29,7 @@ import java.util.TreeMap;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyView;
import
org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.support.Environment;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.slf4j.Logger;
@@ -48,6 +49,9 @@ public class TopologyCapabilities {
/** Round robin map. */
private final Map<String, Integer> roundRobinMap = new HashMap<String,
Integer>();
+ /** Instance map. */
+ private final Map<String, InstanceDescription> instanceMap = new
HashMap<String, InstanceDescription>();
+
/** Is this the leader of the cluster? */
private final boolean isLeader;
@@ -63,10 +67,14 @@ public class TopologyCapabilities {
/** Instance comparator. */
private final InstanceDescriptionComparator instanceComparator;
+ /** Disable distribution flag. */
+ private final boolean disableDistribution;
+
public static final class InstanceDescriptionComparator implements
Comparator<InstanceDescription> {
private final String localClusterId;
+
public InstanceDescriptionComparator(final String clusterId) {
this.localClusterId = clusterId;
}
@@ -109,7 +117,8 @@ public class TopologyCapabilities {
return allInstances;
}
- public TopologyCapabilities(final TopologyView view, final long
changeCount) {
+ public TopologyCapabilities(final TopologyView view, final boolean
disableDistribution, final long changeCount) {
+ this.disableDistribution = disableDistribution;
this.instanceComparator = new
InstanceDescriptionComparator(view.getLocalInstance().getClusterView().getId());
this.changeCount = changeCount;
this.isLeader = view.getLocalInstance().isLeader();
@@ -129,6 +138,7 @@ public class TopologyCapabilities {
Collections.sort(list, this.instanceComparator);
}
}
+ this.instanceMap.put(desc.getSlingId(), desc);
}
this.instanceCapabilities = newCaps;
}
@@ -196,12 +206,40 @@ public class TopologyCapabilities {
public String detectTarget(final String jobTopic, final Map<String,
Object> jobProperties,
final QueueInfo queueInfo) {
final List<InstanceDescription> potentialTargets =
this.getPotentialTargets(jobTopic, jobProperties);
+ logger.debug("Potential targets for {} : {}", jobTopic,
potentialTargets);
+ String createdOn = null;
+ if ( jobProperties != null ) {
+ createdOn = (String)
jobProperties.get(org.apache.sling.event.jobs.Job.PROPERTY_JOB_CREATED_INSTANCE);
+ }
+ if ( createdOn == null ) {
+ createdOn = Environment.APPLICATION_ID;
+ }
+ final InstanceDescription createdOnInstance =
this.instanceMap.get(createdOn);
if ( potentialTargets != null && potentialTargets.size() > 0 ) {
+ if ( createdOnInstance != null ) {
+ // create a list with local targets first.
+ final List<InstanceDescription> localTargets = new
ArrayList<InstanceDescription>();
+ for(final InstanceDescription desc : potentialTargets) {
+ if (
desc.getClusterView().getId().equals(createdOnInstance.getClusterView().getId())
) {
+ if ( !this.disableDistribution || desc.isLeader() ) {
+ localTargets.add(desc);
+ }
+ }
+ }
+ if ( localTargets != null ) {
+ potentialTargets.clear();
+ potentialTargets.addAll(localTargets);
+ logger.debug("Potential targets filtered for {} : {}",
jobTopic, potentialTargets);
+ }
+ }
if ( queueInfo.queueConfiguration.getType() ==
QueueConfiguration.Type.ORDERED ) {
// for ordered queues we always pick the first as we have to
pick the same target on each cluster view
// on all instances (TODO - we could try to do some round
robin of the whole queue)
- return potentialTargets.get(0).getSlingId();
+ final String result = potentialTargets.get(0).getSlingId();
+ logger.debug("Target for {} : {}", jobTopic, result);
+
+ return result;
}
// TODO - this is a simple round robin which is not based on the
actual load
// of the instances
@@ -213,7 +251,9 @@ public class TopologyCapabilities {
index = 0;
}
this.roundRobinMap.put(jobTopic, index + 1);
- return potentialTargets.get(index).getSlingId();
+ final String result = potentialTargets.get(index).getSlingId();
+ logger.debug("Target for {} : {}", jobTopic, result);
+ return result;
}
return null;
Modified:
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL:
http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1479531&r1=1479530&r2=1479531&view=diff
==============================================================================
---
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
(original)
+++
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Mon May 6 11:53:22 2013
@@ -71,6 +71,16 @@ queue.waitforasync.description = If a jo
#
# Job Event Handler
+job.events.name = Apache Sling Job Managers
+job.events.description = Manages job scheduling on a single system as well \
+ as on a cluster. A Job runs only on a single cluster node. \
+ The respective scheduling is persisted in the resource tree and distributed \
+ amongst the cluster instances. The jobs are started \
+ locally on a single cluster node. Most of the configuration is for \
+ configuring the default job queue.
+
+#
+# Job Event Handler
job.events.name = Apache Sling Job Default Queue
job.events.description = The configuration of the default job queue.
@@ -108,4 +118,3 @@ job.consumermanager.blacklist.desscripti
processed by this instance. Leaving it empty, all job consumers are enabled.
Putting a '*' as \
one entry, disables all job consumers. Adding separate topics disables job
consumers for exactly \
this topic.
-