Repository: incubator-batchee Updated Branches: refs/heads/master ab0fb6f57 -> aef149a0a
BATCHEE-131 keep track of running jobs and actively stop them on shutdown. Project: http://git-wip-us.apache.org/repos/asf/incubator-batchee/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-batchee/commit/aef149a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-batchee/tree/aef149a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-batchee/diff/aef149a0 Branch: refs/heads/master Commit: aef149a0afdedd6b3d83079c7494a80c0f8adf5c Parents: ab0fb6f Author: Mark Struberg <[email protected]> Authored: Tue Mar 20 16:12:38 2018 +0100 Committer: Mark Struberg <[email protected]> Committed: Tue Mar 20 16:12:38 2018 +0100 ---------------------------------------------------------------------- .../batchee/container/impl/JobOperatorImpl.java | 6 ++++- .../thread/AsyncEjbBatchThreadPoolService.java | 28 ++++++++++++++++---- .../services/thread/ThreadExecutorEjb.java | 18 +++++++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java ---------------------------------------------------------------------- diff --git a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java index 27082f7..af0f54d 100755 --- a/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java +++ b/jbatch/src/main/java/org/apache/batchee/container/impl/JobOperatorImpl.java @@ -61,7 +61,7 @@ import java.util.logging.Logger; import static org.apache.batchee.container.util.ClassLoaderAwareHandler.makeLoaderAware; -public class JobOperatorImpl implements JobOperator { +public class JobOperatorImpl implements JobOperator, AutoCloseable { private static final Logger LOGGER = Logger.getLogger(JobOperatorImpl.class.getName()); static { @@ -115,6 +115,10 @@ public class JobOperatorImpl implements JobOperator { this(ServicesManager.find()); } + public void close() throws Exception { + + } + @Override public long start(final String jobXMLName, final Properties jobParameters) throws JobStartException, JobSecurityException { /* http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java ---------------------------------------------------------------------- diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java index f4db719..df8beea 100644 --- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java +++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/AsyncEjbBatchThreadPoolService.java @@ -18,12 +18,17 @@ package org.apache.batchee.tools.services.thread; import java.util.Properties; import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.batch.operations.JobOperator; +import javax.batch.runtime.BatchRuntime; import javax.enterprise.context.spi.CreationalContext; import javax.enterprise.inject.spi.Bean; import javax.enterprise.inject.spi.BeanManager; import org.apache.batchee.container.cdi.BatchCDIInjectionExtension; +import org.apache.batchee.container.util.BatchWorkUnit; import org.apache.batchee.spi.BatchThreadPoolService; /** @@ -43,10 +48,12 @@ import org.apache.batchee.spi.BatchThreadPoolService; * */ public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService { - + + private static final Logger logger = Logger.getLogger(AsyncEjbBatchThreadPoolService.class.getName()); + private BeanManager beanManager; private ThreadExecutorEjb threadExecutorEjb; - + @Override public void init(Properties batchConfig) { beanManager = BatchCDIInjectionExtension.getInstance().getBeanManager(); @@ -65,8 +72,19 @@ public class AsyncEjbBatchThreadPoolService implements BatchThreadPoolService { @Override public void shutdown() { - // We cannot force an async EJB to shutdown. - // This usually works out of the box if the container EJB - // undeploys or stops the application. + Set<BatchWorkUnit> runningBatchWorkUnits = threadExecutorEjb.getRunningBatchWorkUnits(); + if (!runningBatchWorkUnits.isEmpty()) { + JobOperator jobOperator = BatchRuntime.getJobOperator(); + for (BatchWorkUnit batchWorkUnit : runningBatchWorkUnits) { + try { + long executionId = batchWorkUnit.getJobExecutionImpl().getExecutionId(); + if (executionId >= 0) { + jobOperator.stop(executionId); + } + } catch(Exception e) { + logger.log(Level.SEVERE, "Failure while shutting down execution", e); + } + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-batchee/blob/aef149a0/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java ---------------------------------------------------------------------- diff --git a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java index d4923d9..edb732e 100644 --- a/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java +++ b/tools/ee6/src/main/java/org/apache/batchee/tools/services/thread/ThreadExecutorEjb.java @@ -16,6 +16,8 @@ */ package org.apache.batchee.tools.services.thread; +import org.apache.batchee.container.util.BatchWorkUnit; + import javax.annotation.Resource; import javax.ejb.Asynchronous; import javax.ejb.Lock; @@ -24,6 +26,9 @@ import javax.ejb.Singleton; import javax.ejb.TransactionManagement; import javax.ejb.TransactionManagementType; import javax.transaction.UserTransaction; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; /** * Small helper class to allow new threads being created via the @@ -42,18 +47,31 @@ public class ThreadExecutorEjb { @Resource private UserTransaction ut; + private Set<BatchWorkUnit> runningBatchWorkUnits = Collections.synchronizedSet(new HashSet<BatchWorkUnit>()); + + private static ThreadLocal<UserTransaction> userTransactions = new ThreadLocal<UserTransaction>(); @Asynchronous public void executeTask(Runnable work, Object config) { try { userTransactions.set(ut); + if (work instanceof BatchWorkUnit) { + runningBatchWorkUnits.add((BatchWorkUnit) work); + } + work.run(); } finally { + if (work instanceof BatchWorkUnit) { + runningBatchWorkUnits.remove(work); + } userTransactions.remove(); } } + public Set<BatchWorkUnit> getRunningBatchWorkUnits() { + return runningBatchWorkUnits; + } public static UserTransaction getUserTransaction() { return userTransactions.get();
