Repository: falcon
Updated Branches:
  refs/heads/0.6.1 64431a98b -> 7ec516ed4


FALCON-1129 In a secure cluster, feed replication fails because of 
Authentication issues. Contributed by Venkat Ranganathan


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7ec516ed
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7ec516ed
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7ec516ed

Branch: refs/heads/0.6.1
Commit: 7ec516ed4c9c2eb01706368bb47e708ea9dcb149
Parents: 64431a9
Author: Suhas Vasu <[email protected]>
Authored: Mon Apr 13 23:16:51 2015 +0530
Committer: Suhas Vasu <[email protected]>
Committed: Mon Apr 13 23:16:51 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                          |  3 +++
 .../oozie/feed/FSReplicationWorkflowBuilder.java     |  4 ++++
 .../oozie/feed/FeedReplicationWorkflowBuilder.java   | 15 ++++++++++++++-
 .../oozie/feed/HCatReplicationWorkflowBuilder.java   | 14 ++++----------
 4 files changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7ec516ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ecd6190..77a0fef 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -137,6 +137,9 @@ Branch: 0.6.1 (Proposed Release Version: 0.6.1)
    (Suhas vasu)
 
   BUG FIXES
+   FALCON-1129 In a secure cluster, feed replication fails because of
+   Authentication issues (Venkat Ranganathan via Suhas Vasu)
+
    FALCON-1141 Reverse Lookup for feed in prism fails with BadRequest
    (Ajay Yadava via Suhas Vasu)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ec516ed/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
index 6feb32e..1d97204 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -45,6 +45,7 @@ public class FSReplicationWorkflowBuilder extends 
FeedReplicationWorkflowBuilder
         //Add pre-processing
         if (shouldPreProcess()) {
             ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
+            addHDFSServersConfig(action, src, target);
             addTransition(action, REPLICATION_ACTION_NAME, 
FAIL_POSTPROCESS_ACTION_NAME);
             workflow.getDecisionOrForkOrJoin().add(action);
             start = PREPROCESS_ACTION_NAME;
@@ -52,15 +53,18 @@ public class FSReplicationWorkflowBuilder extends 
FeedReplicationWorkflowBuilder
 
         //Add replication
         ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
+        addHDFSServersConfig(replication, src, target);
         addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, 
FAIL_POSTPROCESS_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(replication);
 
         //Add post-processing actions
         ACTION success = getSuccessPostProcessAction();
+        addHDFSServersConfig(success, src, target);
         addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(success);
 
         ACTION fail = getFailPostProcessAction();
+        addHDFSServersConfig(fail, src, target);
         addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(fail);
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ec516ed/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index 288e9de..aa936ad 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -21,12 +21,15 @@ package org.apache.falcon.oozie.feed;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.LifeCycle;
 import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 import org.apache.hadoop.fs.Path;
 
@@ -55,6 +58,16 @@ public abstract class FeedReplicationWorkflowBuilder extends 
OozieOrchestrationW
         marshal(cluster, workflow, buildPath);
         return getProperties(buildPath, wfName);
     }
-
+    protected ACTION addHDFSServersConfig(ACTION action, Cluster 
sourceCluster, Cluster targetCluster) {
+        if (isSecurityEnabled) {
+            // this is to ensure that the delegation tokens are checked out 
for both clusters
+            CONFIGURATION.Property property = new CONFIGURATION.Property();
+            property.setName("mapreduce.job.hdfs-servers");
+            
property.setValue(ClusterHelper.getReadOnlyStorageUrl(sourceCluster)
+                    + "," + ClusterHelper.getStorageUrl(targetCluster));
+            action.getJava().getConfiguration().getProperty().add(property);
+        }
+        return action;
+    }
     protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) 
throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7ec516ed/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 30ca0a8..72bbca4 100644
--- 
a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ 
b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -20,12 +20,10 @@ package org.apache.falcon.oozie.feed;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Tag;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.CONFIGURATION;
 import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
 
 import java.util.Arrays;
@@ -57,6 +55,7 @@ public class HCatReplicationWorkflowBuilder extends 
FeedReplicationWorkflowBuild
         //Add pre-processing
         if (shouldPreProcess()) {
             ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
+            addHDFSServersConfig(action, src, target);
             addTransition(action, EXPORT_ACTION_NAME, 
FAIL_POSTPROCESS_ACTION_NAME);
             workflow.getDecisionOrForkOrJoin().add(action);
             start = PREPROCESS_ACTION_NAME;
@@ -84,10 +83,12 @@ public class HCatReplicationWorkflowBuilder extends 
FeedReplicationWorkflowBuild
 
         //Add post-processing actions
         ACTION success = getSuccessPostProcessAction();
+        addHDFSServersConfig(success, src, target);
         addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(success);
 
         ACTION fail = getFailPostProcessAction();
+        addHDFSServersConfig(fail, src, target);
         addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
         workflow.getDecisionOrForkOrJoin().add(fail);
 
@@ -130,14 +131,7 @@ public class HCatReplicationWorkflowBuilder extends 
FeedReplicationWorkflowBuild
                     action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
                 }
             } else if (REPLICATION_ACTION_NAME.equals(actionName)) {
-                if (isSecurityEnabled) {
-                    // this is to ensure that the delegation tokens are 
checked out for both clusters
-                    CONFIGURATION.Property property = new 
CONFIGURATION.Property();
-                    property.setName("mapreduce.job.hdfs-servers");
-                    
property.setValue(ClusterHelper.getReadOnlyStorageUrl(sourceCluster)
-                            + "," + 
ClusterHelper.getStorageUrl(targetCluster));
-                    
action.getJava().getConfiguration().getProperty().add(property);
-                }
+                addHDFSServersConfig(action, sourceCluster, targetCluster);
             }
         }
     }

Reply via email to