Repository: falcon Updated Branches: refs/heads/0.9 ae002a01e -> 1fb294fbe
FALCON-1757 EntityNotRegisteredException when entity is deleted from falcon (By Pavan Kolamuri) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c5db759c Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c5db759c Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c5db759c Branch: refs/heads/0.9 Commit: c5db759c18ce13f2b7ed6db3a82cca7eb507ea0f Parents: ae002a0 Author: Pallavi Rao <[email protected]> Authored: Fri Jan 22 16:25:19 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Jan 22 16:25:19 2016 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../falcon/rerun/handler/AbstractRerunConsumer.java | 9 ++++++++- .../apache/falcon/rerun/handler/LateRerunConsumer.java | 9 ++++++++- .../org/apache/falcon/rerun/handler/LateRerunHandler.java | 7 +++++++ .../org/apache/falcon/rerun/handler/RetryConsumer.java | 10 ++++++++-- 5 files changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 765569e..038c6f2 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -106,6 +106,8 @@ Release Version: 0.9 OPTIMIZATIONS BUG FIXES + FALCON-1757 EntityNotRegisteredException when entity is deleted from falcon (Pavan Kolamuri via Pallavi Rao) + FALCON-1748 Client throws FalconWebException irrespective of type of error(Praveen Adlakha via Ajay Yadava) FALCON-1727 Suspend fails with InvalidStateTransitionException if entity has 'KILLED' instances (Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java index f60b927..61ca8c0 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/AbstractRerunConsumer.java @@ -20,6 +20,7 @@ package org.apache.falcon.rerun.handler; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.rerun.event.RerunEvent; import org.apache.falcon.rerun.policy.AbstractRerunPolicy; @@ -53,8 +54,8 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst AbstractRerunPolicy policy = new ExpBackoffPolicy(); Frequency frequency = new Frequency("minutes(1)"); while (!Thread.currentThread().isInterrupted()) { + T message = null; try { - T message; try { message = handler.takeFromQueue(); attempt = 1; @@ -83,6 +84,12 @@ public abstract class AbstractRerunConsumer<T extends RerunEvent, M extends Abst message.getEntityType(), message.getEntityName()); } catch (Throwable e) { + if (e instanceof EntityNotRegisteredException) { + LOG.warn("Entity {} of type {} doesn't exist in config store. Rerun " + + "cannot be done for workflow ", message.getEntityName(), + message.getEntityType(), message.getWfId()); + return; + } LOG.error("Error in rerun consumer", e); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java index 4297788..fa0d6ae 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java @@ -18,6 +18,7 @@ package org.apache.falcon.rerun.handler; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.SchemaHelper; @@ -58,7 +59,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP") || jobStatus.equals("SUSPENDED")) { LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same delay as " - + "job status is running: {}", message.getWfId()); + + "job status is running: {}", message.getWfId()); message.setMsgInsertTime(System.currentTimeMillis()); handler.offerToQueue(message); return; @@ -81,6 +82,12 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}", message.getWfId(), message.getClusterName()); } catch (Exception e) { + if (e instanceof EntityNotRegisteredException) { + LOG.warn("Entity {} of type {} doesn't exist in config store. Late rerun " + + "cannot be done for workflow ", message.getEntityName(), + message.getEntityType(), message.getWfId()); + return; + } LOG.warn("Late Re-run failed for instance {}:{} after {}", message.getEntityName(), message.getInstance(), message.getDelayInMilliSec(), e); GenericAlert.alertLateRerunFailed(message.getEntityType(), message.getEntityName(), http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java index 1d2ed37..0be6252 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java @@ -20,6 +20,7 @@ package org.apache.falcon.rerun.handler; import org.apache.falcon.FalconException; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; @@ -91,6 +92,12 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends wait, entityType, entityName, nominalTime, intRunId, workflowUser); offerToQueue(event); } catch (Exception e) { + if (e instanceof EntityNotRegisteredException) { + LOG.warn("Entity {} of type {} doesn't exist in config store. So late rerun " + + "cannot be done for workflow ", entityName, + entityType, wfId); + return; + } LOG.error("Unable to schedule late rerun for entity instance: {} ({}): {} And WorkflowId: {}", entityType, entityName, nominalTime, wfId, e); GenericAlert.alertLateRerunFailed(entityType, entityName, http://git-wip-us.apache.org/repos/asf/falcon/blob/c5db759c/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java index 96300d9..9b46713 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/RetryConsumer.java @@ -18,6 +18,7 @@ package org.apache.falcon.rerun.handler; import org.apache.falcon.aspect.GenericAlert; +import org.apache.falcon.entity.EntityNotRegisteredException; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.rerun.event.RetryEvent; import org.apache.falcon.rerun.queue.DelayedQueue; @@ -43,17 +44,22 @@ public class RetryConsumer<T extends RetryHandler<DelayedQueue<RetryEvent>>> try { if (!jobStatus.equals("KILLED")) { LOG.debug("Re-enqueing message in RetryHandler for workflow with same delay as job status is running:" - + " {}", message.getWfId()); + + " {}", message.getWfId()); message.setMsgInsertTime(System.currentTimeMillis()); handler.offerToQueue(message); return; } LOG.info("Retrying attempt: {} out of configured: {} attempt for instance: {}:{} And WorkflowId: {}" - + " At time: {}", + + " At time: {}", (message.getRunId() + 1), message.getAttempts(), message.getEntityName(), message.getInstance(), message.getWfId(), SchemaHelper.formatDateUTC(new Date(System.currentTimeMillis()))); handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), message.getWfId(), null, false); } catch (Exception e) { + if (e instanceof EntityNotRegisteredException) { + LOG.warn("Entity {} of type {} doesn't exist in config store. So retry " + + "cannot be done for workflow ", entityName, entityType, message.getWfId()); + return; + } int maxFailRetryCount = Integer.parseInt(StartupProperties.get() .getProperty("max.retry.failure.count", "1")); if (message.getFailRetryCount() < maxFailRetryCount) {
