Author: lektran
Date: Sun Apr 7 07:39:47 2019
New Revision: 1857071
URL: http://svn.apache.org/viewvc?rev=1857071&view=rev
Log:
Implemented: Allow Jobs to specify a priority and be queued accordingly by the
JobPoller so that important jobs can be prioritized over normal jobs, and low
priority jobs can be left until last. [OFBIZ-10865]
Changes are as follows:
* Add a "priority" field to JobSandbox entity (numeric/Long)
* Add JobPriority constants class containing fields LOW (0), NORMAL (50) and
HIGH (100)
* Add getPriority method to the Job interface and implement methods for
AbstractJob (returns NORMAL), PersistedServiceJob (returns JobSandbox.priority)
and PurgeJob (returns LOW)
* Change the JobPoller executor's queue to use PriorityBlockingQueue
(unbounded) instead of LinkedBlockingQueue (bounded)
* Implement custom Comparator for the priority queue to sort by priority
descending and then runTime ascending
* Change the poll size per poll to be (queueSize() - queue.size) instead of
queue.remainingCapacity() due to the new queue being unbounded
* I've also opted to limit the database poll query to the poll size using
maxRows() because it seemed dangerous to me to use an unconstrained query on
this table
* Ensured recurring jobs receive the default (NORMAL) priority when being
rescheduled so that they're sorted correctly on the next time they show up in
the database poll
* Ensured jobs generated at runtime are given a default priority of NORMAL
Added:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java
Modified:
ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml
ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
Modified:
ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
---
ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml
(original)
+++
ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml
Sun Apr 7 07:39:47 2019
@@ -889,6 +889,7 @@ under the License.
<create-value value-field="runtimeData"/>
<!-- Create Job For ProductGroupOrder -->
+ <!-- FIXME: Jobs should not be manually created -->
<make-value entity-name="JobSandbox" value-field="jobSandbox"/>
<sequenced-id sequence-name="JobSandbox" field="jobSandbox.jobId"/>
<set field="jobId" from-field="jobSandbox.jobId"/>
@@ -900,6 +901,7 @@ under the License.
<set field="jobSandbox.runAsUser" value="system"/>
<set field="jobSandbox.runtimeDataId" from-field="runtimeDataId"/>
<set field="jobSandbox.maxRecurrenceCount" value="1" type="Long"/>
+ <set field="jobSandbox.priority" value="50" type="Long"/>
<create-value value-field="jobSandbox"/>
<set field="productGroupOrder.jobId" from-field="jobId"/>
Modified:
ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml
(original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml Sun
Apr 7 07:39:47 2019
@@ -44,6 +44,7 @@ under the License.
<field name="jobId" type="id"></field>
<field name="jobName" type="name"></field>
<field name="runTime" type="date-time"></field>
+ <field name="priority" type="numeric"></field>
<field name="poolId" type="name"></field>
<field name="statusId" type="id"></field>
<field name="parentJobId" type="id"></field>
Modified:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
---
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java
(original)
+++
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java
Sun Apr 7 07:39:47 2019
@@ -41,6 +41,7 @@ import org.apache.ofbiz.service.job.Gene
import org.apache.ofbiz.service.job.Job;
import org.apache.ofbiz.service.job.JobManager;
import org.apache.ofbiz.service.job.JobManagerException;
+import org.apache.ofbiz.service.job.JobPriority;
/**
* Generic Asynchronous Engine
@@ -112,6 +113,7 @@ public abstract class GenericAsyncEngine
jFields.put("loaderName", localName);
jFields.put("maxRetry", (long) modelService.maxRetry);
jFields.put("runtimeDataId", dataId);
+ jFields.put("priority", JobPriority.NORMAL);
if (UtilValidate.isNotEmpty(authUserLoginId)) {
jFields.put("authUserLoginId", authUserLoginId);
}
Modified:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
---
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java
(original)
+++
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java
Sun Apr 7 07:39:47 2019
@@ -115,4 +115,12 @@ public abstract class AbstractJob implem
public Date getStartTime() {
return (Date) startTime.clone();
}
+
+ /*
+ * Returns JobPriority.NORMAL, the default setting
+ */
+ @Override
+ public long getPriority() {
+ return JobPriority.NORMAL;
+ }
}
Modified:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
---
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java
(original)
+++
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java
Sun Apr 7 07:39:47 2019
@@ -72,5 +72,10 @@ public interface Job extends Runnable {
* Returns the time this job is scheduled to start.
*/
Date getStartTime();
+
+ /**
+ * Returns the priority of this job, higher the number the higher the
priority
+ */
+ long getPriority();
}
Modified:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
---
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
(original)
+++
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
Sun Apr 7 07:39:47 2019
@@ -213,7 +213,10 @@ public final class JobManager {
Debug.logWarning("Unable to poll JobSandbox for jobs; unable
to begin transaction.", module);
return poll;
}
- try (EntityListIterator jobsIterator =
EntityQuery.use(delegator).from("JobSandbox").where(mainCondition).orderBy("runTime").queryIterator())
{
+ try (EntityListIterator jobsIterator = EntityQuery.use(delegator)
+ .from("JobSandbox").where(mainCondition)
+ .orderBy("priority DESC NULLS LAST", "runTime")
+ .maxRows(limit).queryIterator()) {
GenericValue jobValue = jobsIterator.next();
while (jobValue != null) {
// Claim ownership of this value. Using storeByCondition
to avoid a race condition.
@@ -546,7 +549,8 @@ public final class JobManager {
jobName = Long.toString((new Date().getTime()));
}
Map<String, Object> jFields = UtilMisc.<String, Object>
toMap("jobName", jobName, "runTime", new java.sql.Timestamp(startTime),
- "serviceName", serviceName, "statusId", "SERVICE_PENDING",
"recurrenceInfoId", infoId, "runtimeDataId", dataId);
+ "serviceName", serviceName, "statusId", "SERVICE_PENDING",
"recurrenceInfoId", infoId, "runtimeDataId", dataId,
+ "priority", JobPriority.NORMAL);
// set the pool ID
if (UtilValidate.isNotEmpty(poolName)) {
jFields.put("poolId", poolName);
Modified:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
---
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
(original)
+++
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
Sun Apr 7 07:39:47 2019
@@ -20,13 +20,14 @@ package org.apache.ofbiz.service.job;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -63,15 +64,48 @@ public final class JobPoller implements
private static ThreadPoolExecutor createThreadPoolExecutor() {
try {
ThreadPool threadPool =
ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
- return new ThreadPoolExecutor(threadPool.getMinThreads(),
threadPool.getMaxThreads(), threadPool.getTtl(),
- TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>(threadPool.getJobs()), new
JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
+ return new ThreadPoolExecutor(
+ threadPool.getMinThreads(),
+ threadPool.getMaxThreads(),
+ threadPool.getTtl(),
+ TimeUnit.MILLISECONDS,
+ new PriorityBlockingQueue<>(threadPool.getJobs(),
createPriorityComparator()),
+ new JobInvokerThreadFactory(),
+ new ThreadPoolExecutor.AbortPolicy());
} catch (GenericConfigException e) {
Debug.logError(e, "Exception thrown while getting <thread-pool>
model, using default <thread-pool> values: ", module);
- return new ThreadPoolExecutor(ThreadPool.MIN_THREADS,
ThreadPool.MAX_THREADS, ThreadPool.THREAD_TTL,
- TimeUnit.MILLISECONDS, new
LinkedBlockingQueue<Runnable>(ThreadPool.QUEUE_SIZE), new
JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
+ return new ThreadPoolExecutor(
+ ThreadPool.MIN_THREADS,
+ ThreadPool.MAX_THREADS,
+ ThreadPool.THREAD_TTL,
+ TimeUnit.MILLISECONDS,
+ new PriorityBlockingQueue<>(ThreadPool.QUEUE_SIZE,
createPriorityComparator()),
+ new JobInvokerThreadFactory(),
+ new ThreadPoolExecutor.AbortPolicy());
}
}
+ private static Comparator<Runnable> createPriorityComparator() {
+ return new Comparator<Runnable>() {
+
+ /**
+ * Sorts jobs by priority then by start time
+ */
+ @Override
+ public int compare(Runnable o1, Runnable o2) {
+ Job j1 = (Job) o1;
+ Job j2 = (Job) o2;
+ // Descending priority (higher number returns -1)
+ int priorityCompare = Long.compare(j2.getPriority(),
j1.getPriority());
+ if (priorityCompare != 0) {
+ return priorityCompare;
+ }
+ // Ascending start time (earlier time returns -1)
+ return Long.compare(j1.getStartTime().getTime(),
j2.getStartTime().getTime());
+ }
+ };
+ }
+
private static int pollWaitTime() {
try {
ThreadPool threadPool =
ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
@@ -82,6 +116,16 @@ public final class JobPoller implements
}
}
+ static int queueSize() {
+ try {
+ ThreadPool threadPool =
ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
+ return threadPool.getJobs();
+ } catch (GenericConfigException e) {
+ Debug.logError(e, "Exception thrown while getting <thread-pool>
model, using default <thread-pool> values: ", module);
+ return ThreadPool.QUEUE_SIZE;
+ }
+ }
+
/**
* Register a {@link JobManager} with the job poller.
*
@@ -170,6 +214,7 @@ public final class JobPoller implements
try {
executor.execute(job);
} catch (Exception e) {
+ Debug.logError(e, module);
job.deQueue();
}
}
@@ -197,6 +242,7 @@ public final class JobPoller implements
private static class JobInvokerThreadFactory implements ThreadFactory {
+ @Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "OFBiz-JobQueue-" +
created.getAndIncrement());
}
@@ -214,7 +260,7 @@ public final class JobPoller implements
Thread.sleep(1000);
}
while (!executor.isShutdown()) {
- int remainingCapacity =
executor.getQueue().remainingCapacity();
+ int remainingCapacity = queueSize() -
executor.getQueue().size();
if (remainingCapacity > 0) {
// Build "list of lists"
Collection<JobManager> jmCollection =
jobManagers.values();
Added:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java?rev=1857071&view=auto
==============================================================================
---
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java
(added)
+++
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java
Sun Apr 7 07:39:47 2019
@@ -0,0 +1,7 @@
+package org.apache.ofbiz.service.job;
+
+public final class JobPriority {
+ public static final long LOW = 0;
+ public static final long NORMAL = 50;
+ public static final long HIGH = 100;
+}
Modified:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
---
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
(original)
+++
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
Sun Apr 7 07:39:47 2019
@@ -211,6 +211,10 @@ public class PersistedServiceJob extends
newJob.set("currentRetryCount", 0L);
}
nextRecurrence = next;
+ // Set priority if missing
+ if (newJob.getLong("priority") == null) {
+ newJob.set("priority", JobPriority.NORMAL);
+ }
delegator.createSetNextSeqId(newJob);
if (Debug.verboseOn()) {
Debug.logVerbose("Created next job entry: " + newJob, module);
@@ -379,4 +383,17 @@ public class PersistedServiceJob extends
public Date getStartTime() {
return new Date(startTime);
}
+
+ /*
+ * Returns the priority stored in the JobSandbox.priority field, if no
value is present
+ * then it defaults to AbstractJob.getPriority()
+ */
+ @Override
+ public long getPriority() {
+ Long priority = jobValue.getLong("priority");
+ if (priority == null) {
+ return super.getPriority();
+ }
+ return priority;
+ }
}
Modified:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
URL:
http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
---
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
(original)
+++
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
Sun Apr 7 07:39:47 2019
@@ -82,4 +82,12 @@ public class PurgeJob extends AbstractJo
throw new InvalidJobException("Illegal state change");
}
}
+
+ /*
+ * Returns JobPriority.LOW
+ */
+ @Override
+ public long getPriority() {
+ return JobPriority.LOW;
+ }
}