Repository: falcon Updated Branches: refs/heads/master 6bbfe2366 -> 5a474ffab
FALCON-1365 HCatReplication job fails with AccessControlException. Contributed by Sowmya Ramesh. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5a474ffa Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5a474ffa Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5a474ffa Branch: refs/heads/master Commit: 5a474ffab766a964e44096633016f6463537fd0a Parents: 6bbfe23 Author: Ajay Yadava <[email protected]> Authored: Wed Sep 9 14:50:53 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Wed Sep 9 14:50:53 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../feed/HCatReplicationWorkflowBuilder.java | 31 ++++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/5a474ffa/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 196490d..e0249a0 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-1365 HCatReplication job fails with AccessControlException(Sowmya Ramesh via Ajay Yadava) + FALCON-298 Feed update with replication delay creates holes(Sandeep Samudrala via Ajay Yadava) FALCON-1410 Entity submit fails when multiple threads try submitting same definition(Sandeep Samudrala via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/5a474ffa/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java index ed86b32..6e2a631 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java @@ -20,12 +20,15 @@ package org.apache.falcon.oozie.feed; import org.apache.falcon.FalconException; import org.apache.falcon.Tag; +import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.oozie.workflow.ACTION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; +import org.apache.falcon.util.OozieUtils; +import javax.xml.bind.JAXBElement; import java.util.Arrays; import java.util.Properties; @@ -64,17 +67,28 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild //Add export action ACTION export = unmarshalAction(EXPORT_ACTION_TEMPLATE); + JAXBElement<org.apache.falcon.oozie.hive.ACTION> exportActionJaxbElement = + OozieUtils.unMarshalHiveAction(export); + org.apache.falcon.oozie.hive.ACTION hiveExportAction = exportActionJaxbElement.getValue(); + addHDFSServersConfig(hiveExportAction, src, target); + OozieUtils.marshalHiveAction(export, exportActionJaxbElement); addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(export); //Add replication ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE); + addHDFSServersConfig(replication, src, target); addAdditionalReplicationProperties(replication); addTransition(replication, IMPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(replication); //Add import action ACTION importAction = unmarshalAction(IMPORT_ACTION_TEMPLATE); + JAXBElement<org.apache.falcon.oozie.hive.ACTION> importActionJaxbElement = + OozieUtils.unMarshalHiveAction(importAction); + org.apache.falcon.oozie.hive.ACTION hiveImportAction = importActionJaxbElement.getValue(); + addHDFSServersConfig(hiveImportAction, src, target); + OozieUtils.marshalHiveAction(importAction, importActionJaxbElement); addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME); workflow.getDecisionOrForkOrJoin().add(importAction); @@ -132,8 +146,6 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild if (isSecurityEnabled) { // add a reference to credential in the action action.setCred(TARGET_HIVE_CREDENTIAL_NAME); } - } else if (REPLICATION_ACTION_NAME.equals(actionName)) { - addHDFSServersConfig(action, sourceCluster, targetCluster); } } } @@ -144,4 +156,19 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild return props; } + + private org.apache.falcon.oozie.hive.ACTION addHDFSServersConfig(org.apache.falcon.oozie.hive.ACTION action, + Cluster sourceCluster, Cluster targetCluster) { + if (isSecurityEnabled) { + // this is to ensure that the delegation tokens are checked out for both clusters + org.apache.falcon.oozie.hive.CONFIGURATION.Property hiveProperty = new org.apache.falcon.oozie.hive + .CONFIGURATION.Property(); + hiveProperty.setName("oozie.launcher.mapreduce.job.hdfs-servers"); + hiveProperty.setValue(ClusterHelper.getReadOnlyStorageUrl(sourceCluster) + + "," + ClusterHelper.getStorageUrl(targetCluster)); + action.getConfiguration().getProperty().add(hiveProperty); + } + return action; + } + }
