Repository: oozie
Updated Branches:
  refs/heads/master 87a6d0536 -> 3deeaf77f


OOZIE-2055 PauseTransitService does not proceed forward if any job has issue


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3deeaf77
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3deeaf77
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3deeaf77

Branch: refs/heads/master
Commit: 3deeaf77fd37f2cd441b3db0d27768a702e5dd63
Parents: 87a6d05
Author: Purshotam Shah <[email protected]>
Authored: Tue Jan 27 17:01:03 2015 -0800
Committer: Purshotam Shah <[email protected]>
Committed: Tue Jan 27 17:01:03 2015 -0800

----------------------------------------------------------------------
 .../oozie/service/PauseTransitService.java      | 88 +++++++++++++-------
 core/src/main/resources/oozie-default.xml       | 10 +++
 release-log.txt                                 |  3 +-
 3 files changed, 72 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/3deeaf77/core/src/main/java/org/apache/oozie/service/PauseTransitService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/PauseTransitService.java 
b/core/src/main/java/org/apache/oozie/service/PauseTransitService.java
index 54865e8..823cc5f 100644
--- a/core/src/main/java/org/apache/oozie/service/PauseTransitService.java
+++ b/core/src/main/java/org/apache/oozie/service/PauseTransitService.java
@@ -18,10 +18,10 @@
 
 package org.apache.oozie.service;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.command.bundle.BundlePauseXCommand;
@@ -34,32 +34,37 @@ import 
org.apache.oozie.executor.jpa.BundleJobsGetPausedJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobsGetUnpausedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsGetPausedJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobsGetUnpausedJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.SchedulerService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.ConfigUtils;
+import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XLog;
 
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * PauseTransitService is the runnable which is scheduled to run at the 
configured interval, it checks all bundles
- * to see if they should be paused, un-paused or started.
+ * PauseTransitService is the runnable which is scheduled to run at the 
configured interval, it checks all bundles to
+ * see if they should be paused, un-paused or started.
  */
 public class PauseTransitService implements Service {
     public static final String CONF_PREFIX = Service.CONF_PREFIX + 
"PauseTransitService.";
     public static final String CONF_BUNDLE_PAUSE_START_INTERVAL = CONF_PREFIX 
+ "PauseTransit.interval";
     private final static XLog LOG = XLog.getLog(PauseTransitService.class);
 
+    public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + 
"callable.batch.size";
+
     /**
-     * PauseTransitRunnable is the runnable which is scheduled to run at the 
configured interval, it checks all
-     * bundles to see if they should be paused, un-paused or started.
+     * PauseTransitRunnable is the runnable which is scheduled to run at the 
configured interval, it checks all bundles
+     * to see if they should be paused, un-paused or started.
      */
     @VisibleForTesting
     public static class PauseTransitRunnable implements Runnable {
         private JPAService jpaService = null;
         private LockToken lock;
+        private List<XCallable<Void>> callables;
 
         public PauseTransitRunnable() {
             jpaService = Services.get().get(JPAService.class);
@@ -71,8 +76,8 @@ public class PauseTransitService implements Service {
         public void run() {
             try {
                 // first check if there is some other running instance from 
the same service;
-                lock = 
Services.get().get(MemoryLocksService.class).getWriteLock(
-                        PauseTransitService.class.getName(), lockTimeout);
+                lock = Services.get().get(MemoryLocksService.class)
+                        .getWriteLock(PauseTransitService.class.getName(), 
lockTimeout);
                 if (lock == null) {
                     LOG.info("This PauseTransitService instance will"
                             + "not run since there is already an instance 
running");
@@ -82,6 +87,17 @@ public class PauseTransitService implements Service {
 
                     updateBundle();
                     updateCoord();
+                    if (null != callables) {
+                        boolean ret = 
Services.get().get(CallableQueueService.class).queueSerial(callables);
+                        if (ret == false) {
+                            XLog.getLog(getClass()).warn(
+                                    "Unable to queue the callables commands 
for PauseTransitService. "
+                                            + "Most possibly command queue is 
full. Queue size is :"
+                                            + 
Services.get().get(CallableQueueService.class).queueSize());
+                        }
+                        callables = null;
+                    }
+
                 }
             }
             catch (Exception ex) {
@@ -105,14 +121,14 @@ public class PauseTransitService implements Service {
                 if (jobList != null) {
                     for (BundleJobBean bundleJob : jobList) {
                         if ((bundleJob.getPauseTime() != null) && 
!bundleJob.getPauseTime().after(d)) {
-                            new BundlePauseXCommand(bundleJob).call();
-                            LOG.debug("Calling BundlePauseXCommand for bundle 
job = " + bundleJob.getId());
+                            queueCallable(new BundlePauseXCommand(bundleJob));
+                            LOG.debug("Queuing BundlePauseXCommand for bundle 
job = " + bundleJob.getId());
                         }
                     }
                 }
             }
-            catch (Exception ex) {
-                LOG.warn("Exception happened when pausing/unpausing/starting 
Bundle jobs", ex);
+            catch (JPAExecutorException ex) {
+                LOG.warn("JPAExecutorException happened when 
pausing/unpausing/starting Bundle jobs", ex);
             }
             // unpause bundles as needed;
             try {
@@ -120,28 +136,27 @@ public class PauseTransitService implements Service {
                 if (jobList != null) {
                     for (BundleJobBean bundleJob : jobList) {
                         if ((bundleJob.getPauseTime() == null || 
bundleJob.getPauseTime().after(d))) {
-                            new BundleUnpauseXCommand(bundleJob).call();
-                            LOG.debug("Calling BundleUnpauseXCommand for 
bundle job = " + bundleJob.getId());
+                            queueCallable(new 
BundleUnpauseXCommand(bundleJob));
+                            LOG.debug("Queuing BundleUnpauseXCommand for 
bundle job = " + bundleJob.getId());
                         }
                     }
                 }
             }
-            catch (Exception ex) {
-                LOG.warn("Exception happened when pausing/unpausing/starting 
Bundle jobs", ex);
+            catch (JPAExecutorException ex) {
+                LOG.warn("JPAExecutorException happened when 
pausing/unpausing/starting Bundle jobs", ex);
             }
             // start bundles as needed;
             try {
                 jobList = jpaService.execute(new 
BundleJobsGetNeedStartJPAExecutor(d));
                 if (jobList != null) {
                     for (BundleJobBean bundleJob : jobList) {
-                        bundleJob.setKickoffTime(d);
-                        new BundleStartXCommand(bundleJob.getId()).call();
-                        LOG.debug("Calling BundleStartXCommand for bundle job 
= " + bundleJob.getId());
+                        queueCallable(new 
BundleStartXCommand(bundleJob.getId()));
+                        LOG.debug("Queuing BundleStartXCommand for bundle job 
= " + bundleJob.getId());
                     }
                 }
             }
-            catch (Exception ex) {
-                LOG.warn("Exception happened when pausing/unpausing/starting 
Bundle jobs", ex);
+            catch (JPAExecutorException ex) {
+                LOG.warn("JPAExecutorException happened when 
pausing/unpausing/starting Bundle jobs", ex);
             }
         }
 
@@ -161,14 +176,14 @@ public class PauseTransitService implements Service {
                             continue;
                         }
                         if ((coordJob.getPauseTime() != null) && 
!coordJob.getPauseTime().after(d)) {
-                            new CoordPauseXCommand(coordJob).call();
-                            LOG.debug("Calling CoordPauseXCommand for 
coordinator job = " + coordJob.getId());
+                            queueCallable(new CoordPauseXCommand(coordJob));
+                            LOG.debug("Queuing CoordPauseXCommand for 
coordinator job = " + coordJob.getId());
                         }
                     }
                 }
             }
-            catch (Exception ex) {
-                LOG.warn("Exception happened when pausing/unpausing 
Coordinator jobs", ex);
+            catch (JPAExecutorException ex) {
+                LOG.warn("JPAExecutorException happened when pausing/unpausing 
Coordinator jobs", ex);
             }
             // unpause coordinators as needed;
             try {
@@ -181,14 +196,31 @@ public class PauseTransitService implements Service {
                             continue;
                         }
                         if ((coordJob.getPauseTime() == null || 
coordJob.getPauseTime().after(d))) {
-                            new CoordUnpauseXCommand(coordJob).call();
-                            LOG.debug("Calling CoordUnpauseXCommand for 
coordinator job = " + coordJob.getId());
+                            queueCallable(new CoordUnpauseXCommand(coordJob));
+                            LOG.debug("Queuing CoordUnpauseXCommand for 
coordinator job = " + coordJob.getId());
                         }
                     }
                 }
             }
-            catch (Exception ex) {
-                LOG.warn("Exception happened when pausing/unpausing 
Coordinator jobs", ex);
+            catch (JPAExecutorException ex) {
+                LOG.warn("JPAExecutorException happened when pausing/unpausing 
Coordinator jobs", ex);
+            }
+        }
+
+        private void queueCallable(XCallable<Void> callable) {
+            if (callables == null) {
+                callables = new ArrayList<XCallable<Void>>();
+            }
+            callables.add(callable);
+            if (callables.size() == 
ConfigurationService.getInt(CONF_CALLABLE_BATCH_SIZE)) {
+                boolean ret = 
Services.get().get(CallableQueueService.class).queueSerial(callables);
+                if (ret == false) {
+                    XLog.getLog(getClass()).warn(
+                            "Unable to queue the callables commands for 
PauseTransitService. "
+                                    + "Most possibly command queue is full. 
Queue size is :"
+                                    + 
Services.get().get(CallableQueueService.class).queueSize());
+                }
+                callables = new ArrayList<XCallable<Void>>();
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/3deeaf77/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 31c80ac..fcc73b8 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2438,4 +2438,14 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.PauseTransitService.callable.batch.size
+        </name>
+        <value>10</value>
+        <description>
+            This value determines the number of callable which will be batched 
together
+            to be executed by a single thread.
+        </description>
+    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/3deeaf77/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 8cfe13a..8b48956 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,6 +1,7 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
-OZIE-2068 Configuration as part of sharelib (puru)
+OOZIE-2055 PauseTransitService does not proceed forward if any job has issue 
(puru)
+OOZIE-2068 Configuration as part of sharelib (puru)
 OOZIE-2121 CoordinatorFunctionalSpec 4.4.1.1 swap Value and First Occurrence 
line 4 in example table (apivovarov via ryota)
 OOZIE-1894 Better error reporting to user (puru)
 OOZIE-2120 Many JPAExecutor names are wrong (rkanter)

Reply via email to