Repository: falcon Updated Branches: refs/heads/master c85a3c092 -> d9b824dc2
FALCON-1312 Falcon post processing action should use Oozie prepared configuration. Contributed by Venkat Ranganathan. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/d9b824dc Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/d9b824dc Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/d9b824dc Branch: refs/heads/master Commit: d9b824dc2f57c855f48185a3ea117c7da1bbc3a3 Parents: c85a3c0 Author: Ajay Yadava <[email protected]> Authored: Sat Aug 29 01:15:32 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Sat Aug 29 01:15:32 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../falcon/workflow/WorkflowExecutionContext.java | 12 +++++++++++- .../java/org/apache/falcon/logging/JobLogMover.java | 11 +++++++++-- .../apache/falcon/workflow/FalconPostProcessing.java | 6 ++++-- 4 files changed, 26 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/d9b824dc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c3757fa..7bc10a8 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -95,6 +95,8 @@ Trunk (Unreleased) (Suhas Vasu) BUG FIXES + FALCON-1312 Falcon post processing action should use Oozie prepared configuration(Venkat Ranganathan via Ajay Yadava) + FALCON-1038 Log mover fails for map-reduce action(Peeyush Bishnoi via Ajay Yadava) FALCON-1412 Process waits indefinitely and finally timedout even though missing dependencies are met(Pallavi Rao via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/d9b824dc/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java index 53ef5de..4454239 100644 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java @@ -28,6 +28,7 @@ import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.json.simple.JSONValue; @@ -95,6 +96,7 @@ public class WorkflowExecutionContext { private final Map<WorkflowExecutionArgs, String> context; private final long creationTime; + private Configuration actionJobConf; public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) { this.context = context; @@ -301,7 +303,9 @@ public class WorkflowExecutionContext { OutputStream out = null; Path file = new Path(contextFile); try { - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(file.toUri()); + FileSystem fs = + actionJobConf == null ? HadoopClientFactory.get().createProxiedFileSystem(file.toUri()) + : HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), actionJobConf); out = fs.create(file); out.write(JSONValue.toJSONString(context).getBytes()); } catch (IOException e) { @@ -346,7 +350,12 @@ public class WorkflowExecutionContext { return new Path(logDir + parentSuffix, entityName + "-wf-post-exec-context.json").toString(); } + public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException { + return create(args, type, null); + } + + public static WorkflowExecutionContext create(String[] args, Type type, Configuration conf) throws FalconException { Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>(); try { @@ -362,6 +371,7 @@ public class WorkflowExecutionContext { } WorkflowExecutionContext executionContext = new WorkflowExecutionContext(wfProperties); + executionContext.actionJobConf = conf; executionContext.context.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name()); executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE, getFilePath(executionContext.getLogDir(), executionContext.getEntityName(), http://git-wip-us.apache.org/repos/asf/falcon/blob/d9b824dc/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java index 478d68c..830641e 100644 --- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java +++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java @@ -22,6 +22,7 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.process.EngineType; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.workflow.WorkflowExecutionContext; +import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -58,7 +59,13 @@ public class JobLogMover { new HashSet<String>(Arrays.asList(new String[]{"eviction", "replication", })); private Configuration getConf() { - return new Configuration(); + Configuration conf = null; + try { + conf = OozieActionConfigurationHelper.createActionConf(); + } catch (IOException ioe) { + LOG.warn("Cannot get Oozie configuration. Returning default"); + } + return conf == null ? new Configuration(): conf; } public int run(WorkflowExecutionContext context) { @@ -76,7 +83,7 @@ public class JobLogMover { //the corresponding job logs are stored within the respective dir Path path = new Path(context.getLogDir() + "/" + String.format("%03d", context.getWorkflowRunId())); - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri()); + FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(path.toUri(), getConf()); if (EntityType.FEED.name().equalsIgnoreCase(context.getEntityType()) || notUserWorkflowEngineIsOozie(context.getUserWorkflowEngine())) { http://git-wip-us.apache.org/repos/asf/falcon/blob/d9b824dc/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java index 7557153..cff1187 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java @@ -20,6 +20,7 @@ package org.apache.falcon.workflow; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.logging.JobLogMover; import org.apache.falcon.messaging.JMSMessageProducer; +import org.apache.falcon.workflow.util.OozieActionConfigurationHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.security.UserGroupInformation; @@ -35,13 +36,14 @@ public class FalconPostProcessing extends Configured implements Tool { private static final Logger LOG = LoggerFactory.getLogger(FalconPostProcessing.class); public static void main(String[] args) throws Exception { - ToolRunner.run(new Configuration(), new FalconPostProcessing(), args); + Configuration conf = OozieActionConfigurationHelper.createActionConf(); + ToolRunner.run(conf, new FalconPostProcessing(), args); } @Override public int run(String[] args) throws Exception { WorkflowExecutionContext context = WorkflowExecutionContext.create(args, - WorkflowExecutionContext.Type.POST_PROCESSING); + WorkflowExecutionContext.Type.POST_PROCESSING, getConf()); LOG.info("Post workflow execution context created {}", context); // serialize the context to HDFS under logs dir before sending the message context.serialize();
