Repository: falcon Updated Branches: refs/heads/master bd0028458 -> 717b472eb
FALCON-298 Feed update with replication delay creates holes. Contributed by Sandeep Samudrala. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/717b472e Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/717b472e Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/717b472e Branch: refs/heads/master Commit: 717b472eb15d45abfa94590940e5b131456fc6b1 Parents: bd00284 Author: Ajay Yadava <[email protected]> Authored: Mon Sep 7 14:10:34 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Mon Sep 7 14:10:34 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../workflow/engine/OozieWorkflowEngine.java | 29 ++++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/717b472e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 88d0f64..4b47d5b 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-298 Feed update with replication delay creates holes(Sandeep Samudrala via Ajay Yadava) + FALCON-1410 Entity submit fails when multiple threads try submitting same definition(Sandeep Samudrala via Ajay Yadava) FALCON-1429 Fix Falcon monitoring, alert, audit and monitoring plugins by fixing aspectj handling(Venkat Ranganathan via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/717b472e/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 7e6cd6c..f8b7764 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -31,6 +31,7 @@ import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.Frequency.TimeUnit; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.oozie.OozieBundleBuilder; import org.apache.falcon.oozie.OozieEntityBuilder; @@ -1078,7 +1079,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { // only concurrency and endtime are changed. So, change coords LOG.info("Change operation is adequate! : {}, bundle: {}", cluster, bundle.getId()); updateCoords(cluster, bundle, EntityUtil.getParallel(newEntity), - EntityUtil.getEndTime(newEntity, cluster)); + EntityUtil.getEndTime(newEntity, cluster), newEntity); return getUpdateString(newEntity, new Date(), bundle, bundle); } @@ -1195,7 +1196,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private void updateCoords(String cluster, BundleJob bundle, - int concurrency, Date endTime) throws FalconException { + int concurrency, Date endTime, Entity entity) throws FalconException { if (endTime.compareTo(now()) <= 0) { throw new FalconException("End time " + SchemaHelper.formatDateUTC(endTime) + " can't be in the past"); } @@ -1206,8 +1207,19 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { // change coords for (CoordinatorJob coord : bundle.getCoordinators()) { + + Frequency delay = null; + //get Delay to calculate coordinator end time in case of feed replication with delay. + if (entity.getEntityType().equals(EntityType.FEED)) { + delay = getDelay((Feed) entity, coord); + } + + //calculate next start time based on delay. + endTime = (delay == null) ? endTime + : EntityUtil.getNextStartTime(coord.getStartTime(), delay, EntityUtil.getTimeZone(entity), endTime); LOG.debug("Updating endtime of coord {} to {} on cluster {}", coord.getId(), SchemaHelper.formatDateUTC(endTime), cluster); + Date lastActionTime = getCoordLastActionTime(coord); if (lastActionTime == null) { // nothing is materialized LOG.info("Nothing is materialized for this coord: {}", coord.getId()); @@ -1233,6 +1245,17 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } + private Frequency getDelay(Feed entity, CoordinatorJob coord) { + Feed feed = entity; + for (org.apache.falcon.entity.v0.feed.Cluster entityCluster : feed.getClusters().getClusters()){ + if (coord.getAppName().contains(entityCluster.getName()) && coord.getAppName().contains("REPLICATION") + && entityCluster.getDelay() != null){ + return entityCluster.getDelay(); + } + } + return null; + } + private String updateInternal(Entity oldEntity, Entity newEntity, Cluster cluster, BundleJob oldBundle, String user, Boolean skipDryRun) throws FalconException { String clusterName = cluster.getName(); @@ -1246,7 +1269,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { boolean suspended = BUNDLE_SUSPENDED_STATUS.contains(oldBundle.getStatus()); //Set end times for old coords - updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime); + updateCoords(clusterName, oldBundle, EntityUtil.getParallel(oldEntity), effectiveTime, newEntity); //schedule new entity String newJobId = scheduleForUpdate(newEntity, cluster, effectiveTime, user);
