Repository: falcon Updated Branches: refs/heads/master 27e872240 -> 336408f4c
FALCON-237 Falcon feed replication should honour availability flag. Contributed by Peeyush Bishnoi Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/336408f4 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/336408f4 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/336408f4 Branch: refs/heads/master Commit: 336408f4cc53919d4cf764513f6c570d13f4c050 Parents: 27e8722 Author: Srikanth Sundarrajan <[email protected]> Authored: Sun Jan 11 07:06:43 2015 +0530 Committer: Srikanth Sundarrajan <[email protected]> Committed: Sun Jan 11 07:06:43 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 3 +++ .../feed/FeedReplicationCoordinatorBuilder.java | 7 ++++++ .../feed/FeedRetentionWorkflowBuilder.java | 1 + .../ProcessExecutionWorkflowBuilder.java | 1 + .../action/feed/replication-action.xml | 2 ++ .../feed/OozieFeedWorkflowBuilderTest.java | 2 +- .../falcon/replication/FeedReplicator.java | 25 +++++++++++++++----- .../falcon/replication/FilteredCopyListing.java | 12 ++++++---- 8 files changed, 41 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 24967a2..8f6c571 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,9 @@ Trunk (Unreleased) NEW FEATURES IMPROVEMENTS + FALCON-237 falcon feed replication should honour availability flag (Peeyush + Bishnoi via Srikanth Sundarrajan) + FALCON-417 Upgrade Hive and HCatalog to latest stable version. (Peeyush Bishnoi via Srikanth Sundarrajan) http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index 2963ac9..2451bbe 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -181,6 +181,12 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F instancePaths = pathsWithPartitions; propagateFileSystemCopyProperties(pathsWithPartitions, props); + + if (entity.getAvailabilityFlag() == null) { + props.put("availabilityFlag", "NA"); + } else { + props.put("availabilityFlag", entity.getAvailabilityFlag()); + } } else if (sourceStorage.getType() == Storage.TYPE.TABLE) { instancePaths = "${coord:dataIn('input')}"; final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage; @@ -189,6 +195,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget"); propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props); setupHiveConfiguration(srcCluster, trgCluster, buildPath); + props.put("availabilityFlag", "NA"); } propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props); http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java index cbe055a..51e081f 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java @@ -77,6 +77,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil private Properties getWorkflowProperties() { Properties props = new Properties(); props.setProperty("srcClusterName", "NA"); + props.setProperty("availabilityFlag", "NA"); return props; } http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java index 75faceb..61cc3c2 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java @@ -111,6 +111,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration private Properties getWorkflowProperties() { Properties props = new Properties(); props.setProperty("srcClusterName", "NA"); + props.setProperty("availabilityFlag", "NA"); return props; } http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/main/resources/action/feed/replication-action.xml ---------------------------------------------------------------------- diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml index da40b74..beedd57 100644 --- a/oozie/src/main/resources/action/feed/replication-action.xml +++ b/oozie/src/main/resources/action/feed/replication-action.xml @@ -52,6 +52,8 @@ <arg>${distcpTargetPaths}</arg> <arg>-falconFeedStorageType</arg> <arg>${falconFeedStorageType}</arg> + <arg>-availabilityFlag</arg> + <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg> </java> <ok to="succeeded-post-processing"/> <error to="failed-post-processing"/> http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index e5588b4..723f909 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -327,7 +327,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { ACTION replicationActionNode = getAction(workflow, "replication"); JAVA replication = replicationActionNode.getJava(); List<String> args = replication.getArg(); - Assert.assertEquals(args.size(), 13); + Assert.assertEquals(args.size(), 15); HashMap<String, String> props = getCoordProperties(coord); http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java ---------------------------------------------------------------------- diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java index 90ac753..9e55ffb 100644 --- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java +++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java @@ -21,6 +21,7 @@ import org.apache.commons.cli.*; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.Storage; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -68,6 +69,13 @@ public class FeedReplicator extends Configured implements Tool { final boolean includePathSet = (includePathConf != null) && !IGNORE.equalsIgnoreCase(includePathConf); + String availabilityFlag = EntityUtil.SUCCEEDED_FILE_NAME; + if (cmd.getOptionValue("falconFeedStorageType").equals(Storage.TYPE.FILESYSTEM.name())) { + availabilityFlag = cmd.getOptionValue("availabilityFlag").equals("NA") + ? availabilityFlag : cmd.getOptionValue("availabilityFlag"); + } + + conf.set("falcon.feed.availability.flag", availabilityFlag); DistCp distCp = (includePathSet) ? new CustomReplicator(conf, options) : new DistCp(conf, options); @@ -75,7 +83,7 @@ public class FeedReplicator extends Configured implements Tool { distCp.execute(); if (includePathSet) { - executePostProcessing(options); // this only applies for FileSystem Storage. + executePostProcessing(conf, options); // this only applies for FileSystem Storage. } LOG.info("Completed DistCp"); @@ -107,6 +115,10 @@ public class FeedReplicator extends Configured implements Tool { opt.setRequired(true); options.addOption(opt); + opt = new Option("availabilityFlag", true, "availability flag"); + opt.setRequired(false); + options.addOption(opt); + return new GnuParser().parse(options, args); } @@ -131,7 +143,7 @@ public class FeedReplicator extends Configured implements Tool { return listPaths; } - private void executePostProcessing(DistCpOptions options) throws IOException, FalconException { + private void executePostProcessing(Configuration conf, DistCpOptions options) throws IOException, FalconException { Path targetPath = options.getTargetPath(); FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( targetPath.toUri(), getConf()); @@ -154,15 +166,16 @@ public class FeedReplicator extends Configured implements Tool { finalOutputPath = targetPath; } + final String availabilityFlag = conf.get("falcon.feed.availability.flag"); FileStatus[] files = fs.globStatus(finalOutputPath); if (files != null) { for (FileStatus file : files) { - fs.create(new Path(file.getPath(), EntityUtil.SUCCEEDED_FILE_NAME)).close(); - LOG.info("Created {}", new Path(file.getPath(), EntityUtil.SUCCEEDED_FILE_NAME)); + fs.create(new Path(file.getPath(), availabilityFlag)).close(); + LOG.info("Created {}", new Path(file.getPath(), availabilityFlag)); } } else { - // As distcp is not copying empty directories we are creating _SUCCESS file here - fs.create(new Path(finalOutputPath, EntityUtil.SUCCEEDED_FILE_NAME)).close(); + // As distcp is not copying empty directories we are creating availabilityFlag file here + fs.create(new Path(finalOutputPath, availabilityFlag)).close(); LOG.info("No files present in path: {}", finalOutputPath); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/336408f4/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java ---------------------------------------------------------------------- diff --git a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java index 58c09b4..295de92 100644 --- a/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java +++ b/replication/src/main/java/org/apache/falcon/replication/FilteredCopyListing.java @@ -18,7 +18,6 @@ package org.apache.falcon.replication; -import org.apache.falcon.entity.EntityUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.Credentials; @@ -31,9 +30,9 @@ import java.io.IOException; import java.util.regex.Pattern; /** - * An implementation of CopyListing that overrides the default behavior by suppressing file, - * EntityUtil.SUCCEEDED_FILE_NAME and copies that in the last so downstream apps - * depending on data availability will work correctly. + * An implementation of CopyListing that overrides the default behavior by suppressing file + * availabilityFlag and copies that in the last so downstream apps depending on data + * availability will work correctly. */ public class FilteredCopyListing extends SimpleCopyListing { private static final Logger LOG = LoggerFactory.getLogger(FilteredCopyListing.class); @@ -51,10 +50,13 @@ public class FilteredCopyListing extends SimpleCopyListing { */ private static final char PAT_SET_CLOSE = ']'; + private String availabilityFlag; + private Pattern regex; protected FilteredCopyListing(Configuration configuration, Credentials credentials) { super(configuration, credentials); + availabilityFlag = configuration.get("falcon.feed.availability.flag"); try { regex = getRegEx(configuration.get("falcon.include.path", "").trim()); LOG.info("Inclusion pattern = {}", configuration.get("falcon.include.path")); @@ -67,7 +69,7 @@ public class FilteredCopyListing extends SimpleCopyListing { @Override protected boolean shouldCopy(Path path, DistCpOptions options) { - if (path.getName().equals(EntityUtil.SUCCEEDED_FILE_NAME)) { + if (path.getName().equals(availabilityFlag)) { return false; } return regex == null || regex.matcher(path.toString()).find();
