Repository: oozie
Updated Branches:
  refs/heads/master 373a52ff2 -> 853a4af9f


OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via 
rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/853a4af9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/853a4af9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/853a4af9

Branch: refs/heads/master
Commit: 853a4af9f06348e2bd95793e1d1b6c1e6a9949e1
Parents: 373a52f
Author: Rohini Palaniswamy <[email protected]>
Authored: Wed May 28 10:38:55 2014 -0700
Committer: Rohini Palaniswamy <[email protected]>
Committed: Wed May 28 10:38:55 2014 -0700

----------------------------------------------------------------------
 .../service/CoordMaterializeTriggerService.java | 81 +++++++++++++-------
 release-log.txt                                 |  1 +
 2 files changed, 55 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/853a4af9/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
 
b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index ef3c3f4..d2b5a6c 100644
--- 
a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ 
b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -27,6 +27,7 @@ import 
org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.util.XCallable;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.DateUtils;
@@ -73,6 +74,8 @@ public class CoordMaterializeTriggerService implements 
Service {
         private long delay = 0;
         private List<XCallable<Void>> callables;
         private List<XCallable<Void>> delayedCallables;
+        private XLog LOG = XLog.getLog(getClass());
+
 
         public CoordMaterializeTriggerRunnable(int materializationWindow, int 
lookupInterval) {
             this.materializationWindow = materializationWindow;
@@ -81,29 +84,54 @@ public class CoordMaterializeTriggerService implements 
Service {
 
         @Override
         public void run() {
-            runCoordJobMatLookup();
+            LockToken lock = null;
 
-            if (null != callables) {
-                boolean ret = 
Services.get().get(CallableQueueService.class).queueSerial(callables);
-                if (ret == false) {
-                    XLog.getLog(getClass()).warn(
-                            "Unable to queue the callables commands for 
CoordMaterializeTriggerRunnable. "
-                                    + "Most possibly command queue is full. 
Queue size is :"
-                                    + 
Services.get().get(CallableQueueService.class).queueSize());
+            // first check if there is some other running instance from the 
same service;
+            try {
+                lock = Services.get().get(MemoryLocksService.class)
+                        
.getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout);
+
+                if (lock != null) {
+                    runCoordJobMatLookup();
+                    if (null != callables) {
+                        boolean ret = 
Services.get().get(CallableQueueService.class).queueSerial(callables);
+                        if (ret == false) {
+                            XLog.getLog(getClass()).warn(
+                                    "Unable to queue the callables commands 
for CoordMaterializeTriggerRunnable. "
+                                            + "Most possibly command queue is 
full. Queue size is :"
+                                            + 
Services.get().get(CallableQueueService.class).queueSize());
+                        }
+                        callables = null;
+                    }
+                    if (null != delayedCallables) {
+                        boolean ret = 
Services.get().get(CallableQueueService.class)
+                                .queueSerial(delayedCallables, this.delay);
+                        if (ret == false) {
+                            XLog.getLog(getClass()).warn(
+                                    "Unable to queue the delayedCallables 
commands for CoordMaterializeTriggerRunnable. "
+                                            + "Most possibly Callable queue is 
full. Queue size is :"
+                                            + 
Services.get().get(CallableQueueService.class).queueSize());
+                        }
+                        delayedCallables = null;
+                        this.delay = 0;
+                    }
+                }
+
+                else {
+                    LOG.debug("Can't obtain lock, skipping");
                 }
-                callables = null;
             }
-            if (null != delayedCallables) {
-                boolean ret = 
Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, 
this.delay);
-                if (ret == false) {
-                    XLog.getLog(getClass()).warn(
-                            "Unable to queue the delayedCallables commands for 
CoordMaterializeTriggerRunnable. "
-                                    + "Most possibly Callable queue is full. 
Queue size is :"
-                                    + 
Services.get().get(CallableQueueService.class).queueSize());
+            catch (Exception e) {
+                LOG.error("Exception", e);
+            }
+            finally {
+                if (lock != null) {
+                    lock.release();
+                    LOG.info("Released lock for [{0}]", 
CoordMaterializeTriggerService.class.getName());
                 }
-                delayedCallables = null;
-                this.delay = 0;
+
             }
+
         }
 
         /**
@@ -133,15 +161,14 @@ public class CoordMaterializeTriggerService implements 
Service {
                 LOG.info("CoordMaterializeTriggerService - Curr Date= " + 
DateUtils.formatDateOozieTZ(currDate)
                         + ", Num jobs to materialize = " + 
materializeJobs.size());
                 for (CoordinatorJobBean coordJob : materializeJobs) {
-                    if 
(Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId()))
 {
-                        Services.get().get(InstrumentationService.class).get()
-                                .incr(INSTRUMENTATION_GROUP, 
INSTR_MAT_JOBS_COUNTER, 1);
-                        queueCallable(new 
CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
-                        coordJob.setLastModifiedTime(new Date());
-                        // TODO In place of calling single query, we should 
call bulk update.
-                        CoordJobQueryExecutor.getInstance().executeUpdate(
-                                
CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, 
coordJob);
-                    }
+                    Services.get().get(InstrumentationService.class).get()
+                            .incr(INSTRUMENTATION_GROUP, 
INSTR_MAT_JOBS_COUNTER, 1);
+                    queueCallable(new 
CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow));
+                    coordJob.setLastModifiedTime(new Date());
+                    // TODO In place of calling single query, we should call 
bulk update.
+                    CoordJobQueryExecutor.getInstance().executeUpdate(
+                            
CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, 
coordJob);
+
                 }
             }
             catch (JPAExecutorException jex) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/853a4af9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7d1d339..44e42d5 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via 
rohini)
 OOZIE-1834 sla should-start is supposed to be optional but it is not (rkanter)
 OOZIE-1838 jdbc.connections.active sampler does not show up (rkanter)
 OOZIE-1801 ZKLocksService instrumentation should say how many locks this 
server has (rkanter)

Reply via email to