Repository: oozie Updated Branches: refs/heads/master 87a6d0536 -> 3deeaf77f
OOZIE-2055 PauseTransitService does not proceed forward if any job has issue Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3deeaf77 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3deeaf77 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3deeaf77 Branch: refs/heads/master Commit: 3deeaf77fd37f2cd441b3db0d27768a702e5dd63 Parents: 87a6d05 Author: Purshotam Shah <[email protected]> Authored: Tue Jan 27 17:01:03 2015 -0800 Committer: Purshotam Shah <[email protected]> Committed: Tue Jan 27 17:01:03 2015 -0800 ---------------------------------------------------------------------- .../oozie/service/PauseTransitService.java | 88 +++++++++++++------- core/src/main/resources/oozie-default.xml | 10 +++ release-log.txt | 3 +- 3 files changed, 72 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/3deeaf77/core/src/main/java/org/apache/oozie/service/PauseTransitService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/PauseTransitService.java b/core/src/main/java/org/apache/oozie/service/PauseTransitService.java index 54865e8..823cc5f 100644 --- a/core/src/main/java/org/apache/oozie/service/PauseTransitService.java +++ b/core/src/main/java/org/apache/oozie/service/PauseTransitService.java @@ -18,10 +18,10 @@ package org.apache.oozie.service; +import java.util.ArrayList; import java.util.Date; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.command.bundle.BundlePauseXCommand; @@ -34,32 +34,37 @@ import org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor; import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.SchedulerService; import org.apache.oozie.service.Service; import org.apache.oozie.service.Services; import org.apache.oozie.lock.LockToken; import org.apache.oozie.util.ConfigUtils; +import org.apache.oozie.util.XCallable; import org.apache.oozie.util.XLog; import com.google.common.annotations.VisibleForTesting; /** - * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles - * to see if they should be paused, un-paused or started. + * PauseTransitService is the runnable which is scheduled to run at the configured interval, it checks all bundles to + * see if they should be paused, un-paused or started. */ public class PauseTransitService implements Service { public static final String CONF_PREFIX = Service.CONF_PREFIX + "PauseTransitService."; public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX + "PauseTransit.interval"; private final static XLog LOG = XLog.getLog(PauseTransitService.class); + public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size"; + /** - * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all - * bundles to see if they should be paused, un-paused or started. + * PauseTransitRunnable is the runnable which is scheduled to run at the configured interval, it checks all bundles + * to see if they should be paused, un-paused or started. */ @VisibleForTesting public static class PauseTransitRunnable implements Runnable { private JPAService jpaService = null; private LockToken lock; + private List<XCallable<Void>> callables; public PauseTransitRunnable() { jpaService = Services.get().get(JPAService.class); @@ -71,8 +76,8 @@ public class PauseTransitService implements Service { public void run() { try { // first check if there is some other running instance from the same service; - lock = Services.get().get(MemoryLocksService.class).getWriteLock( - PauseTransitService.class.getName(), lockTimeout); + lock = Services.get().get(MemoryLocksService.class) + .getWriteLock(PauseTransitService.class.getName(), lockTimeout); if (lock == null) { LOG.info("This PauseTransitService instance will" + "not run since there is already an instance running"); @@ -82,6 +87,17 @@ public class PauseTransitService implements Service { updateBundle(); updateCoord(); + if (null != callables) { + boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); + if (ret == false) { + XLog.getLog(getClass()).warn( + "Unable to queue the callables commands for PauseTransitService. " + + "Most possibly command queue is full. Queue size is :" + + Services.get().get(CallableQueueService.class).queueSize()); + } + callables = null; + } + } } catch (Exception ex) { @@ -105,14 +121,14 @@ public class PauseTransitService implements Service { if (jobList != null) { for (BundleJobBean bundleJob : jobList) { if ((bundleJob.getPauseTime() != null) && !bundleJob.getPauseTime().after(d)) { - new BundlePauseXCommand(bundleJob).call(); - LOG.debug("Calling BundlePauseXCommand for bundle job = " + bundleJob.getId()); + queueCallable(new BundlePauseXCommand(bundleJob)); + LOG.debug("Queuing BundlePauseXCommand for bundle job = " + bundleJob.getId()); } } } } - catch (Exception ex) { - LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); + catch (JPAExecutorException ex) { + LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex); } // unpause bundles as needed; try { @@ -120,28 +136,27 @@ public class PauseTransitService implements Service { if (jobList != null) { for (BundleJobBean bundleJob : jobList) { if ((bundleJob.getPauseTime() == null || bundleJob.getPauseTime().after(d))) { - new BundleUnpauseXCommand(bundleJob).call(); - LOG.debug("Calling BundleUnpauseXCommand for bundle job = " + bundleJob.getId()); + queueCallable(new BundleUnpauseXCommand(bundleJob)); + LOG.debug("Queuing BundleUnpauseXCommand for bundle job = " + bundleJob.getId()); } } } } - catch (Exception ex) { - LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); + catch (JPAExecutorException ex) { + LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex); } // start bundles as needed; try { jobList = jpaService.execute(new BundleJobsGetNeedStartJPAExecutor(d)); if (jobList != null) { for (BundleJobBean bundleJob : jobList) { - bundleJob.setKickoffTime(d); - new BundleStartXCommand(bundleJob.getId()).call(); - LOG.debug("Calling BundleStartXCommand for bundle job = " + bundleJob.getId()); + queueCallable(new BundleStartXCommand(bundleJob.getId())); + LOG.debug("Queuing BundleStartXCommand for bundle job = " + bundleJob.getId()); } } } - catch (Exception ex) { - LOG.warn("Exception happened when pausing/unpausing/starting Bundle jobs", ex); + catch (JPAExecutorException ex) { + LOG.warn("JPAExecutorException happened when pausing/unpausing/starting Bundle jobs", ex); } } @@ -161,14 +176,14 @@ public class PauseTransitService implements Service { continue; } if ((coordJob.getPauseTime() != null) && !coordJob.getPauseTime().after(d)) { - new CoordPauseXCommand(coordJob).call(); - LOG.debug("Calling CoordPauseXCommand for coordinator job = " + coordJob.getId()); + queueCallable(new CoordPauseXCommand(coordJob)); + LOG.debug("Queuing CoordPauseXCommand for coordinator job = " + coordJob.getId()); } } } } - catch (Exception ex) { - LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex); + catch (JPAExecutorException ex) { + LOG.warn("JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex); } // unpause coordinators as needed; try { @@ -181,14 +196,31 @@ public class PauseTransitService implements Service { continue; } if ((coordJob.getPauseTime() == null || coordJob.getPauseTime().after(d))) { - new CoordUnpauseXCommand(coordJob).call(); - LOG.debug("Calling CoordUnpauseXCommand for coordinator job = " + coordJob.getId()); + queueCallable(new CoordUnpauseXCommand(coordJob)); + LOG.debug("Queuing CoordUnpauseXCommand for coordinator job = " + coordJob.getId()); } } } } - catch (Exception ex) { - LOG.warn("Exception happened when pausing/unpausing Coordinator jobs", ex); + catch (JPAExecutorException ex) { + LOG.warn("JPAExecutorException happened when pausing/unpausing Coordinator jobs", ex); + } + } + + private void queueCallable(XCallable<Void> callable) { + if (callables == null) { + callables = new ArrayList<XCallable<Void>>(); + } + callables.add(callable); + if (callables.size() == ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) { + boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); + if (ret == false) { + XLog.getLog(getClass()).warn( + "Unable to queue the callables commands for PauseTransitService. " + + "Most possibly command queue is full. Queue size is :" + + Services.get().get(CallableQueueService.class).queueSize()); + } + callables = new ArrayList<XCallable<Void>>(); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/3deeaf77/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 31c80ac..fcc73b8 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2438,4 +2438,14 @@ </description> </property> + <property> + <name>oozie.service.PauseTransitService.callable.batch.size + </name> + <value>10</value> + <description> + This value determines the number of callable which will be batched together + to be executed by a single thread. + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/3deeaf77/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 8cfe13a..8b48956 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,6 +1,7 @@ -- Oozie 4.2.0 release (trunk - unreleased) -OZIE-2068 Configuration as part of sharelib (puru) +OOZIE-2055 PauseTransitService does not proceed forward if any job has issue (puru) +OOZIE-2068 Configuration as part of sharelib (puru) OOZIE-2121 CoordinatorFunctionalSpec 4.4.1.1 swap Value and First Occurrence line 4 in example table (apivovarov via ryota) OOZIE-1894 Better error reporting to user (puru) OOZIE-2120 Many JPAExecutor names are wrong (rkanter)
