Repository: falcon
Updated Branches:
  refs/heads/0.7 fbb4d3142 -> 0375c9055


FALCON-1312 Falcon post processing action should use Oozie prepared 
configuration. Contributed by Venkat Ranganathan.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0375c905
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0375c905
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0375c905

Branch: refs/heads/0.7
Commit: 0375c9055093867d8d86fc6f11893640882efb4a
Parents: 6d1405b
Author: Ajay Yadava <[email protected]>
Authored: Sat Aug 29 01:15:32 2015 +0530
Committer: Ajay Yadav <[email protected]>
Committed: Tue Sep 1 10:14:27 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                             |  2 ++
 .../falcon/workflow/WorkflowExecutionContext.java       | 12 +++++++++++-
 .../java/org/apache/falcon/logging/JobLogMover.java     | 11 +++++++++--
 .../apache/falcon/workflow/FalconPostProcessing.java    |  6 ++++--
 4 files changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/0375c905/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c3757fa..7bc10a8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -95,6 +95,8 @@ Trunk (Unreleased)
     (Suhas Vasu)
 
   BUG FIXES
+    FALCON-1312 Falcon post processing action should use Oozie prepared 
configuration(Venkat Ranganathan via Ajay Yadava)
+
     FALCON-1038 Log mover fails for map-reduce action(Peeyush Bishnoi via Ajay 
Yadava)
     
     FALCON-1412 Process waits indefinitely and finally timedout even though 
missing dependencies are met(Pallavi Rao via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/0375c905/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 53ef5de..4454239 100644
--- 
a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ 
b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -28,6 +28,7 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.json.simple.JSONValue;
@@ -95,6 +96,7 @@ public class WorkflowExecutionContext {
 
     private final Map<WorkflowExecutionArgs, String> context;
     private final long creationTime;
+    private Configuration actionJobConf;
 
     public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> 
context) {
         this.context = context;
@@ -301,7 +303,9 @@ public class WorkflowExecutionContext {
         OutputStream out = null;
         Path file = new Path(contextFile);
         try {
-            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(file.toUri());
+            FileSystem fs =
+                    actionJobConf == null ? 
HadoopClientFactory.get().createProxiedFileSystem(file.toUri())
+                                 : 
HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), actionJobConf);
             out = fs.create(file);
             out.write(JSONValue.toJSONString(context).getBytes());
         } catch (IOException e) {
@@ -346,7 +350,12 @@ public class WorkflowExecutionContext {
         return new Path(logDir + parentSuffix, entityName + 
"-wf-post-exec-context.json").toString();
     }
 
+
     public static WorkflowExecutionContext create(String[] args, Type type) 
throws FalconException {
+        return create(args, type, null);
+    }
+
+    public static WorkflowExecutionContext create(String[] args, Type type, 
Configuration conf) throws FalconException {
         Map<WorkflowExecutionArgs, String> wfProperties = new 
HashMap<WorkflowExecutionArgs, String>();
 
         try {
@@ -362,6 +371,7 @@ public class WorkflowExecutionContext {
         }
 
         WorkflowExecutionContext executionContext = new 
WorkflowExecutionContext(wfProperties);
+        executionContext.actionJobConf = conf;
         executionContext.context.put(WorkflowExecutionArgs.CONTEXT_TYPE, 
type.name());
         executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE,
                 getFilePath(executionContext.getLogDir(), 
executionContext.getEntityName(),

http://git-wip-us.apache.org/repos/asf/falcon/blob/0375c905/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java 
b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
index 478d68c..830641e 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
@@ -22,6 +22,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.EngineType;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -58,7 +59,13 @@ public class JobLogMover {
         new HashSet<String>(Arrays.asList(new String[]{"eviction", 
"replication", }));
 
     private Configuration getConf() {
-        return new Configuration();
+        Configuration conf = null;
+        try {
+            conf = OozieActionConfigurationHelper.createActionConf();
+        } catch (IOException ioe) {
+            LOG.warn("Cannot get Oozie configuration.  Returning default");
+        }
+        return conf == null ? new Configuration(): conf;
     }
 
     public int run(WorkflowExecutionContext context) {
@@ -76,7 +83,7 @@ public class JobLogMover {
             //the corresponding job logs are stored within the respective dir
             Path path = new Path(context.getLogDir() + "/"
                     + String.format("%03d", context.getWorkflowRunId()));
-            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(path.toUri());
+            FileSystem fs = 
HadoopClientFactory.get().createProxiedFileSystem(path.toUri(), getConf());
 
             if 
(EntityType.FEED.name().equalsIgnoreCase(context.getEntityType())
                     || 
notUserWorkflowEngineIsOozie(context.getUserWorkflowEngine())) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/0375c905/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java 
b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 7557153..cff1187 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -20,6 +20,7 @@ package org.apache.falcon.workflow;
 import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.logging.JobLogMover;
 import org.apache.falcon.messaging.JMSMessageProducer;
+import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -35,13 +36,14 @@ public class FalconPostProcessing extends Configured 
implements Tool {
     private static final Logger LOG = 
LoggerFactory.getLogger(FalconPostProcessing.class);
 
     public static void main(String[] args) throws Exception {
-        ToolRunner.run(new Configuration(), new FalconPostProcessing(), args);
+        Configuration conf = OozieActionConfigurationHelper.createActionConf();
+        ToolRunner.run(conf, new FalconPostProcessing(), args);
     }
 
     @Override
     public int run(String[] args) throws Exception {
         WorkflowExecutionContext context = 
WorkflowExecutionContext.create(args,
-                WorkflowExecutionContext.Type.POST_PROCESSING);
+                WorkflowExecutionContext.Type.POST_PROCESSING, getConf());
         LOG.info("Post workflow execution context created {}", context);
         // serialize the context to HDFS under logs dir before sending the 
message
         context.serialize();

Reply via email to