Repository: falcon Updated Branches: refs/heads/master ccc343fa8 -> 9fdfe713f
FALCON-2228 Set output names into workflow builder and also maintain the order of the input and ouputs Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #329 from sandeepSamudrala/FALCON-2228 and squashes the following commits: b8a3e3e [sandeep] FALCON-2228 Incorporated review comments. Removed OUTPUT_STORAGE_TYPES 4d00d7f [sandeep] FALCON-2228 Set output names into workflow builder and also maintain the order of the input and ouputs 4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon 0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon 194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon 48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/9fdfe713 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/9fdfe713 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/9fdfe713 Branch: refs/heads/master Commit: 9fdfe713f8d72e63fb45aaed93071cbde39b9eec Parents: ccc343f Author: sandeep <[email protected]> Authored: Mon Jan 2 14:36:43 2017 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Jan 2 14:36:43 2017 +0530 ---------------------------------------------------------------------- .gitignore | 1 + .../falcon/workflow/WorkflowExecutionArgs.java | 3 +- .../retention/AgeBasedWorkflowBuilder.java | 5 ++- .../falcon/oozie/ExportWorkflowBuilder.java | 1 + .../falcon/oozie/ImportWorkflowBuilder.java | 1 + .../feed/FeedReplicationCoordinatorBuilder.java | 1 + .../feed/FeedRetentionWorkflowBuilder.java | 5 ++- .../NativeOozieProcessWorkflowBuilder.java | 21 ++++++---- .../ProcessExecutionCoordinatorBuilder.java | 18 +++++---- .../feed/OozieFeedWorkflowBuilderTest.java | 41 ++++++++++++-------- .../OozieProcessWorkflowBuilderTest.java | 18 ++++++--- .../workflow/FalconPostProcessingTest.java | 4 +- 12 files changed, 75 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index b6733d7..8589d94 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ activemq-data #log files logs *.log +*.patch #Falcon UI NPM files falcon-ui/dist http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java index 682b14e..dcf7cb5 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java @@ -52,7 +52,7 @@ public enum WorkflowExecutionArgs { // workflow execution details WORKFLOW_ID("workflowId", "current workflow-id of the instance"), RUN_ID("runId", "current run-id of the instance"), - STATUS("status", "status of the user workflow isnstance"), + STATUS("status", "status of the user workflow instance"), WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie", false), USER_SUBFLOW_ID("subflowId", "external id of user workflow", false), PARENT_ID("parentId", "The parent of the current workflow, typically coord action", false), @@ -70,6 +70,7 @@ public enum WorkflowExecutionArgs { // what outputs OUTPUT_FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"), OUTPUT_FEED_PATHS("feedInstancePaths", "comma separated feed instance paths"), + OUTPUT_NAMES("feedInstanceNames", "comma separated list of names of outputs", false), // broker related parameters TOPIC_NAME("topicName", "name of the topic to be used to send JMS message", false), http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java index dd0c6d2..f37f897 100644 --- a/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java +++ b/lifecycle/src/main/java/org/apache/falcon/lifecycle/engine/oozie/retention/AgeBasedWorkflowBuilder.java @@ -121,14 +121,15 @@ public final class AgeBasedWorkflowBuilder { props.put("frequency", feed.getFrequency().getTimeUnit().name()); props.put("falconFeedStorageType", storage.getType().name()); props.put("limit", new AgeBasedDelete().getRetentionLimit(feed, cluster.getName()).toString()); - props.put("falconInputFeeds", feed.getName()); - props.put("falconInPaths", OozieBuilderUtils.IGNORE); + props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), feed.getName()); + props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), OozieBuilderUtils.IGNORE); String feedDataPath = storage.getUriTemplate(); props.put("feedDataPath", feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX)); props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), feed.getName()); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), feed.getName()); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OozieBuilderUtils.IGNORE); return props; http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java index af7431a..7f4ba69 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/ExportWorkflowBuilder.java @@ -62,6 +62,7 @@ public abstract class ExportWorkflowBuilder extends OozieOrchestrationWorkflowBu props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); } props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), NONE); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE); props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), entity.getName()); http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java index 2d93189..fd97fa6 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/ImportWorkflowBuilder.java @@ -63,6 +63,7 @@ public abstract class ImportWorkflowBuilder extends OozieOrchestrationWorkflowBu props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle())); } props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName()); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), entity.getName()); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), String.format("${coord:dataOut('%s')}", FeedImportCoordinatorBuilder.IMPORT_DATAOUT_NAME)); props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), NONE); http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index 07d293c..77f2c75 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -262,6 +262,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F // falcon post processing props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName()); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), entity.getName()); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "${coord:dataOut('output')}"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java index fd51ed0..553bf05 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java @@ -95,10 +95,11 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil props.put("limit", feedCluster.getRetention().getLimit().toString()); props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName()); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), entity.getName()); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), IGNORE); - props.put("falconInputFeeds", entity.getName()); - props.put("falconInPaths", IGNORE); + props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), entity.getName()); + props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), IGNORE); props.put(WorkflowExecutionArgs.DATASOURCE_NAME.getName(), "NA"); return props; } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java index 78e049d..0a31b98 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java @@ -41,6 +41,7 @@ import org.joda.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Date; +import java.util.LinkedList; import java.util.List; import java.util.Properties; @@ -110,13 +111,16 @@ public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuild if (entity.getOutputs() == null) { props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), NONE); return props; } - List<String> feedNames = new ArrayList<>(); - List<String> feedInstancePaths= new ArrayList<>(); + List<String> falconOutputFeeds = new LinkedList<>(); + List<String> feedInstancePaths= new LinkedList<>(); + List<String> falconOutputNames = new LinkedList<>(); for (Output output : entity.getOutputs().getOutputs()) { Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed()); - feedNames.add(feed.getName()); + falconOutputFeeds.add(feed.getName()); + falconOutputNames.add(output.getName()); String outputExp = output.getInstance(); Date outTime = EXPRESSION_HELPER.evaluate(outputExp, Date.class); for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) { @@ -139,7 +143,8 @@ public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuild } } } - props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(feedNames, ",")); + props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(falconOutputFeeds, ",")); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), StringUtils.join(falconOutputNames, ",")); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(feedInstancePaths, ",")); return props; } @@ -154,10 +159,10 @@ public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuild props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), NONE); return props; } - List<String> falconInputFeeds = new ArrayList<>(); - List<String> falconInputNames = new ArrayList<>(); - List<String> falconInputPaths = new ArrayList<>(); - List<String> falconInputFeedStorageTypes = new ArrayList<>(); + List<String> falconInputFeeds = new LinkedList<>(); + List<String> falconInputNames = new LinkedList<>(); + List<String> falconInputPaths = new LinkedList<>(); + List<String> falconInputFeedStorageTypes = new LinkedList<>(); for (Input input : entity.getInputs().getInputs()) { Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed()); Storage storage = FeedHelper.createStorage(clusterObj, feed); http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java index 91f4757..a45c9a9 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java @@ -52,8 +52,8 @@ import org.apache.falcon.oozie.coordinator.WORKFLOW; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.hadoop.fs.Path; -import java.util.ArrayList; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import java.util.Properties; @@ -150,10 +150,10 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< return; } - List<String> inputFeeds = new ArrayList<String>(); - List<String> inputNames = new ArrayList<String>(); - List<String> inputPaths = new ArrayList<String>(); - List<String> inputFeedStorageTypes = new ArrayList<String>(); + List<String> inputFeeds = new LinkedList<>(); + List<String> inputNames = new LinkedList<>(); + List<String> inputPaths = new LinkedList<>(); + List<String> inputFeedStorageTypes = new LinkedList<>(); for (Input input : entity.getInputs().getInputs()) { Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed()); Storage storage = FeedHelper.createStorage(cluster, feed); @@ -254,6 +254,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< if (entity.getOutputs() == null) { props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), NONE); return; } @@ -265,8 +266,9 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< coord.setOutputEvents(new OUTPUTEVENTS()); } - List<String> outputFeeds = new ArrayList<String>(); - List<String> outputPaths = new ArrayList<String>(); + List<String> outputFeeds = new LinkedList<>(); + List<String> outputPaths = new LinkedList<>(); + List<String> falconOutputNames = new LinkedList<>(); for (Output output : entity.getOutputs().getOutputs()) { Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed()); Storage storage = FeedHelper.createStorage(cluster, feed); @@ -282,6 +284,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< String outputExpr = "${coord:dataOut('" + output.getName() + "')}"; outputFeeds.add(feed.getName()); + falconOutputNames.add(output.getName()); outputPaths.add(outputExpr); if (storage.getType() == Storage.TYPE.FILESYSTEM) { @@ -295,6 +298,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder< // Output feed name and path for parent workflow props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(outputFeeds, ',')); + props.put(WorkflowExecutionArgs.OUTPUT_NAMES.getName(), StringUtils.join(falconOutputNames, ',')); props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(outputPaths, ',')); } http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index d753baf..5418562 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -195,7 +195,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(coord.getTimezone(), "UTC"); HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord); - Assert.assertEquals(wfProps.get("feedNames"), lifecycleRetentionFeed.getName()); + Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), + lifecycleRetentionFeed.getName()); Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name())); Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon"); Assert.assertEquals(wfProps.get("queueName"), "ageBasedDeleteQueue"); @@ -219,7 +220,8 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(coord.getTimezone(), "UTC"); HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord); - Assert.assertEquals(wfProps.get("feedNames"), lifecycleLocalRetentionFeed.getName()); + Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), + lifecycleLocalRetentionFeed.getName()); Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name())); Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon"); Assert.assertEquals(wfProps.get("queueName"), "local"); @@ -370,15 +372,17 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name()); // verify the late data params - Assert.assertEquals(props.get("falconInputFeeds"), feed.getName()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), feed.getName()); Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), feed.getName()); - Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}"); - Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions); - Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()), "${coord:dataIn('input')}"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()), pathsWithPartitions); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()), + Storage.TYPE.FILESYSTEM.name()); // verify the post processing params - Assert.assertEquals(props.get("feedNames"), feed.getName()); - Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), feed.getName()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()), "${coord:dataOut('output')}"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), feed.getName()); // verify workflow params Assert.assertEquals(wfProps.get("userWorkflowName"), "replication-policy"); @@ -643,14 +647,15 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget"); // verify the late data params - Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName()); - Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), tableFeed.getName()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()), "${coord:dataIn('input')}"); Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), tableFeed.getName()); - Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()), Storage.TYPE.TABLE.name()); // verify the post processing params - Assert.assertEquals(props.get("feedNames"), tableFeed.getName()); - Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), tableFeed.getName()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), tableFeed.getName()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()), "${coord:dataOut('output')}"); Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster)); assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), @@ -788,8 +793,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { } // verify the post processing params - Assert.assertEquals(wfProps.get("feedNames"), feed.getName()); - Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE"); + Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), feed.getName()); + Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), feed.getName()); + Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()), "IGNORE"); assertWorkflowRetries(getWorkflowapp(srcMiniDFS.getFileSystem(), coord)); @@ -850,8 +856,9 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { } // verify the post processing params - Assert.assertEquals(wfProps.get("feedNames"), tableFeed.getName()); - Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE"); + Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), tableFeed.getName()); + Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), tableFeed.getName()); + Assert.assertEquals(wfProps.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()), "IGNORE"); assertWorkflowRetries(coord); verifyBrokerProperties(srcCluster, wfProps); http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java index 05b513e..840b332 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java @@ -195,8 +195,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { HashMap<String, String> wfProps = getWorkflowProperties(fs, coord); assertEquals(wfProps.get("mapred.job.priority"), "LOW"); List<Input> inputs = process.getInputs().getInputs(); + List<Output> outputs = process.getOutputs().getOutputs(); assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), inputs.get(0).getName() + "#" + inputs .get(1).getName()); + assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), outputs.get(0).getName()); verifyEntityProperties(process, cluster, WorkflowExecutionContext.EntityOperations.GENERATE, wfProps); @@ -685,15 +687,19 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { verifyBrokerProperties(cluster, wfProps); // verify the late data params - Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed()); - Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}"); - Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), + process.getInputs().getInputs().get(0).getFeed()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName()), "${coord:dataIn('input')}"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName()), Storage.TYPE.TABLE.name()); Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), process.getInputs().getInputs().get(0).getName()); // verify the post processing params - Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed()); - Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), + process.getOutputs().getOutputs().get(0).getFeed()); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()), "${coord:dataOut('output')}"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), + process.getOutputs().getOutputs().get(0).getName()); String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""); WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml")); @@ -853,6 +859,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { String[] expected = { WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), + WorkflowExecutionArgs.OUTPUT_NAMES.getName(), WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), @@ -893,6 +900,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks"); Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "NONE"); + Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_NAMES.getName()), "NONE"); } @Test http://git-wip-us.apache.org/repos/asf/falcon/blob/9fdfe713/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java index 4132c3a..8e7804e 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java @@ -213,13 +213,13 @@ public class FalconPostProcessingTest { // Verify user message if (checkUserMessage) { - verifyMesssage(consumer); + verifyMessage(consumer); } connection.close(); } - private void verifyMesssage(MessageConsumer consumer) throws JMSException { + private void verifyMessage(MessageConsumer consumer) throws JMSException { String[] actualFeedNames = new String[outputFeedPaths.length]; String[] actualFeedPaths = new String[outputFeedPaths.length];
