FALCON-1719 Retry does not update the state of the instance in the database
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7cde36c4 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7cde36c4 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7cde36c4 Branch: refs/heads/0.9 Commit: 7cde36c4104bcb43b913cbd1c22288daa773f488 Parents: 5e80dcd Author: Pallavi Rao <[email protected]> Authored: Wed Jan 6 15:13:59 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Wed Jan 6 15:13:59 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../falcon/rerun/handler/AbstractRerunConsumer.java | 11 ++++++++--- .../falcon/rerun/handler/AbstractRerunHandler.java | 15 +++++++++++++-- .../falcon/rerun/handler/LateRerunConsumer.java | 9 +++++---- .../falcon/rerun/handler/LateRerunHandler.java | 5 ++--- .../apache/falcon/rerun/handler/RetryConsumer.java | 4 ++-- 6 files changed, 32 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 599efca..2ed1ab4 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -90,6 +90,8 @@ Proposed Release Version: 0.9 OPTIMIZATIONS BUG FIXES + FALCON-1719 Retry does not update the state of the instance in the database (Pavan Kolamuri via Pallavi Rao) + FALCON-1710 dependency API sets totalResults as 0 by default(Praveen Adlakha via Ajay Yadava) FALCON-1714 EntityNotRegisteredException when process with no input/output feed is scheduled(Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java index 582cb15..f60b927 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java @@ -26,6 +26,7 @@ import org.apache.falcon.rerun.policy.AbstractRerunPolicy; import org.apache.falcon.rerun.policy.ExpBackoffPolicy; import org.apache.falcon.rerun.queue.DelayedQueue; import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,9 +75,12 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst // Login the user to access WfEngine as this user CurrentUser.authenticate(message.getWorkflowUser()); - String jobStatus = handler.getWfEngine().getWorkflowStatus( + AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(), + message.getEntityName()); + String jobStatus = wfEngine.getWorkflowStatus( message.getClusterName(), message.getWfId()); - handleRerun(message.getClusterName(), jobStatus, message); + handleRerun(message.getClusterName(), jobStatus, message, + message.getEntityType(), message.getEntityName()); } catch (Throwable e) { LOG.error("Error in rerun consumer", e); @@ -84,5 +88,6 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst } } - protected abstract void handleRerun(String clusterName, String jobStatus, T message); + protected abstract void handleRerun(String clusterName, String jobStatus, T message, + String entityType, String entityName); } http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java index 64c566e..bc1f7f2 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunHandler.java @@ -17,9 +17,11 @@ */ package org.apache.falcon.rerun.handler; +import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.Retry; import org.apache.falcon.rerun.event.RerunEvent; import org.apache.falcon.rerun.queue.DelayedQueue; @@ -58,8 +60,17 @@ public abstract class AbstractRerunHandler<T extends RerunEvent, M extends Delay String wfId, String workflowUser, long msgReceivedTime); //RESUME CHECKSTYLE CHECK ParameterNumberCheck - public AbstractWorkflowEngine getWfEngine() { - return wfEngine; + public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) { + if (StringUtils.isBlank(entityType) || StringUtils.isBlank(entityName)) { + return wfEngine; + } + try { + Entity entity = EntityUtil.getEntity(EntityType.valueOf(entityType), entityName); + return WorkflowEngineFactory.getWorkflowEngine(entity); + } catch (FalconException e) { + // Just to make sure of backward compatibility in case of any exceptions. + return wfEngine; + } } public boolean offerToQueue(T event) throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java index ee31952..4297788 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java @@ -53,7 +53,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv @Override protected void handleRerun(String clusterName, String jobStatus, - LaterunEvent message) { + LaterunEvent message, String entityType, String entityName) { try { if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP") || jobStatus.equals("SUSPENDED")) { @@ -77,7 +77,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv LOG.info("Late changes detected in the following feeds: {}", detectLate); - handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, true); + handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, true); LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}", message.getWfId(), message.getClusterName()); } catch (Exception e) { @@ -91,8 +91,9 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv public String detectLate(LaterunEvent message) throws Exception { LateDataHandler late = new LateDataHandler(); - Properties properties = handler.getWfEngine().getWorkflowProperties( - message.getClusterName(), message.getWfId()); + AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(), + message.getEntityName()); + Properties properties = wfEngine.getWorkflowProperties(message.getClusterName(), message.getWfId()); String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName()); String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()); String falconInputFeedStorageTypes = http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java index 64177a4..1d2ed37 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java @@ -65,9 +65,8 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends Long wait = getEventDelay(entity, nominalTime); if (wait == -1) { LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName); - - java.util.Properties properties = - this.getWfEngine().getWorkflowProperties(cluster, wfId); + AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName); + java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId); String logDir = properties.getProperty("logDir"); String srcClusterName = properties.getProperty("srcClusterName"); Path lateLogPath = this.getLateLogPath(logDir, http://git-wip-us.apache.org/repos/asf/falcon/blob/7cde36c4/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java index 61aa3e1..96300d9 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java @@ -39,7 +39,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>> @Override protected void handleRerun(String clusterName, String jobStatus, - RetryEvent message) { + RetryEvent message, String entityType, String entityName) { try { if (!jobStatus.equals("KILLED")) { LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:" @@ -52,7 +52,7 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>> + " At time: {}", (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(), message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis()))); - handler.getWfEngine().reRun(message.getClusterName(), message.getWfId(), null, false); + handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, false); } catch (Exception e) { int maxFailRetryCount = Integer.parseInt(StartupProperties.get() .getProperty("max.retry.failure.count", "1"));
