Repository: oozie Updated Branches: refs/heads/master 373a52ff2 -> 853a4af9f
OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/853a4af9 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/853a4af9 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/853a4af9 Branch: refs/heads/master Commit: 853a4af9f06348e2bd95793e1d1b6c1e6a9949e1 Parents: 373a52f Author: Rohini Palaniswamy <[email protected]> Authored: Wed May 28 10:38:55 2014 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Wed May 28 10:38:55 2014 -0700 ---------------------------------------------------------------------- .../service/CoordMaterializeTriggerService.java | 81 +++++++++++++------- release-log.txt | 1 + 2 files changed, 55 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/853a4af9/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java index ef3c3f4..d2b5a6c 100644 --- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java +++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java @@ -27,6 +27,7 @@ import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.lock.LockToken; import org.apache.oozie.util.XCallable; import org.apache.oozie.util.XLog; import org.apache.oozie.util.DateUtils; @@ -73,6 +74,8 @@ public class CoordMaterializeTriggerService implements Service { private long delay = 0; private List<XCallable<Void>> callables; private List<XCallable<Void>> delayedCallables; + private XLog LOG = XLog.getLog(getClass()); + public CoordMaterializeTriggerRunnable(int materializationWindow, int lookupInterval) { this.materializationWindow = materializationWindow; @@ -81,29 +84,54 @@ public class CoordMaterializeTriggerService implements Service { @Override public void run() { - runCoordJobMatLookup(); + LockToken lock = null; - if (null != callables) { - boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); - if (ret == false) { - XLog.getLog(getClass()).warn( - "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " - + "Most possibly command queue is full. Queue size is :" - + Services.get().get(CallableQueueService.class).queueSize()); + // first check if there is some other running instance from the same service; + try { + lock = Services.get().get(MemoryLocksService.class) + .getWriteLock(CoordMaterializeTriggerService.class.getName(), lockTimeout); + + if (lock != null) { + runCoordJobMatLookup(); + if (null != callables) { + boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables); + if (ret == false) { + XLog.getLog(getClass()).warn( + "Unable to queue the callables commands for CoordMaterializeTriggerRunnable. " + + "Most possibly command queue is full. Queue size is :" + + Services.get().get(CallableQueueService.class).queueSize()); + } + callables = null; + } + if (null != delayedCallables) { + boolean ret = Services.get().get(CallableQueueService.class) + .queueSerial(delayedCallables, this.delay); + if (ret == false) { + XLog.getLog(getClass()).warn( + "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. " + + "Most possibly Callable queue is full. Queue size is :" + + Services.get().get(CallableQueueService.class).queueSize()); + } + delayedCallables = null; + this.delay = 0; + } + } + + else { + LOG.debug("Can't obtain lock, skipping"); } - callables = null; } - if (null != delayedCallables) { - boolean ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, this.delay); - if (ret == false) { - XLog.getLog(getClass()).warn( - "Unable to queue the delayedCallables commands for CoordMaterializeTriggerRunnable. " - + "Most possibly Callable queue is full. Queue size is :" - + Services.get().get(CallableQueueService.class).queueSize()); + catch (Exception e) { + LOG.error("Exception", e); + } + finally { + if (lock != null) { + lock.release(); + LOG.info("Released lock for [{0}]", CoordMaterializeTriggerService.class.getName()); } - delayedCallables = null; - this.delay = 0; + } + } /** @@ -133,15 +161,14 @@ public class CoordMaterializeTriggerService implements Service { LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate) + ", Num jobs to materialize = " + materializeJobs.size()); for (CoordinatorJobBean coordJob : materializeJobs) { - if (Services.get().get(JobsConcurrencyService.class).isJobIdForThisServer(coordJob.getId())) { - Services.get().get(InstrumentationService.class).get() - .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1); - queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow)); - coordJob.setLastModifiedTime(new Date()); - // TODO In place of calling single query, we should call bulk update. - CoordJobQueryExecutor.getInstance().executeUpdate( - CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob); - } + Services.get().get(InstrumentationService.class).get() + .incr(INSTRUMENTATION_GROUP, INSTR_MAT_JOBS_COUNTER, 1); + queueCallable(new CoordMaterializeTransitionXCommand(coordJob.getId(), materializationWindow)); + coordJob.setLastModifiedTime(new Date()); + // TODO In place of calling single query, we should call bulk update. + CoordJobQueryExecutor.getInstance().executeUpdate( + CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_LAST_MODIFIED_TIME, coordJob); + } } catch (JPAExecutorException jex) { http://git-wip-us.apache.org/repos/asf/oozie/blob/853a4af9/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 7d1d339..44e42d5 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini) OOZIE-1834 sla should-start is supposed to be optional but it is not (rkanter) OOZIE-1838 jdbc.connections.active sampler does not show up (rkanter) OOZIE-1801 ZKLocksService instrumentation should say how many locks this server has (rkanter)
