FALCON-892 HCatReplication fails in secure setup. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e8b1d11e Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e8b1d11e Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e8b1d11e Branch: refs/heads/master Commit: e8b1d11eb074e947959cb8d8aaa92a98acecd827 Parents: 60d33b6 Author: Venkatesh Seetharam <venkat...@apache.org> Authored: Thu Nov 13 18:28:54 2014 -0800 Committer: Venkatesh Seetharam <venkat...@apache.org> Committed: Thu Nov 13 18:55:19 2014 -0800 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../org/apache/falcon/entity/FeedHelper.java | 17 +++++++---- .../apache/falcon/oozie/OozieBundleBuilder.java | 18 +++++++++--- .../feed/FeedReplicationCoordinatorBuilder.java | 11 +++++-- .../feed/HCatReplicationWorkflowBuilder.java | 11 +++++++ .../main/resources/action/feed/table-export.xml | 7 +++-- .../feed/OozieFeedWorkflowBuilderTest.java | 30 ++++++++++++++++++-- 7 files changed, 78 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f20ef0d..fda0338 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -144,6 +144,8 @@ Trunk (Unreleased) OPTIMIZATIONS BUG FIXES + FALCON-892 HCatReplication fails in secure setup (Venkatesh Seetharam) + FALCON-889 Windows azure replication fails with "wasb" as the scheme to an HDFS file system (Chris Nauroth via Venkatesh Seetharam) http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/common/src/main/java/org/apache/falcon/entity/FeedHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java index b5dd5c3..ca31f95 100644 --- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java +++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java @@ -291,9 +291,10 @@ public final class FeedHelper { return expHelp.evaluateFullExpression(exp, String.class); } - public static String getStagingPath(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, - Feed feed, CatalogStorage storage, Tag tag, String suffix) { - String stagingDirPath = getStagingDir(clusterEntity, feed, storage, tag); + public static String getStagingPath(boolean isSource, + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, + Feed feed, CatalogStorage storage, Tag tag, String suffix) { + String stagingDirPath = getStagingDir(isSource, clusterEntity, feed, storage, tag); String datedPartitionKey = storage.getDatedPartitionKeys().get(0); String datedPartitionKeySuffix = datedPartitionKey + "=${coord:dataOutPartitionValue('output'," @@ -304,13 +305,17 @@ public final class FeedHelper { + "data"; } - public static String getStagingDir(org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, + public static String getStagingDir(boolean isSource, + org.apache.falcon.entity.v0.cluster.Cluster clusterEntity, Feed feed, CatalogStorage storage, Tag tag) { String workflowName = EntityUtil.getWorkflowName( tag, Arrays.asList(clusterEntity.getName()), feed).toString(); - // log path is created at scheduling wf and has 777 perms - return ClusterHelper.getStorageUrl(clusterEntity) + // log path is created at scheduling wf + final String storageUri = isSource + ? ClusterHelper.getReadOnlyStorageUrl(clusterEntity) // read interface + : ClusterHelper.getStorageUrl(clusterEntity); // write interface + return storageUri + EntityUtil.getLogPath(clusterEntity, feed) + "/" + workflowName + "/" + storage.getDatabase() + "/" http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java index 957300a..c73401a 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java @@ -19,6 +19,7 @@ package org.apache.falcon.oozie; import org.apache.falcon.FalconException; +import org.apache.falcon.Tag; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.Entity; @@ -77,9 +78,10 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu // add the coordinator to the bundle COORDINATOR coord = new COORDINATOR(); String coordPath = coordProps.getProperty(OozieEntityBuilder.ENTITY_PATH); - coord.setName(coordProps.getProperty(OozieEntityBuilder.ENTITY_NAME)); + final String coordName = coordProps.getProperty(OozieEntityBuilder.ENTITY_NAME); + coord.setName(coordName); coord.setAppPath(getStoragePath(coordPath)); - Properties appProps = createAppProperties(cluster, buildPath); + Properties appProps = createAppProperties(cluster, buildPath, coordName); appProps.putAll(coordProps); coord.setConfiguration(getConfig(appProps)); bundle.getCoordinator().add(coord); @@ -104,7 +106,8 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu return conf; } - protected Properties createAppProperties(Cluster cluster, Path buildPath) throws FalconException { + protected Properties createAppProperties(Cluster cluster, Path buildPath, + String coordName) throws FalconException { Properties properties = getEntityProperties(cluster); properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster)); properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster)); @@ -115,7 +118,14 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib"); if (EntityUtil.isTableStorageType(cluster, entity)) { - properties.putAll(getHiveCredentials(cluster)); + Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity); + if (tag == Tag.REPLICATION) { + // todo: kludge send source hcat creds for coord dependency check to pass + String srcClusterName = EntityUtil.getWorkflowNameSuffix(coordName, entity); + properties.putAll(getHiveCredentials(ClusterHelper.getCluster(srcClusterName))); + } else { + properties.putAll(getHiveCredentials(cluster)); + } } //Add libpath http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java index 8f7f01a..2963ac9 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java @@ -246,15 +246,20 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage, Cluster trgCluster, CatalogStorage targetStorage, Properties props) { - // create staging dirs for export at source & set it as distcpSourcePaths + // create staging dirs for copy from source & set it as distcpSourcePaths - Read interface String sourceStagingPath = - FeedHelper.getStagingPath(srcCluster, entity, sourceStorage, Tag.REPLICATION, + FeedHelper.getStagingPath(true, srcCluster, entity, sourceStorage, Tag.REPLICATION, NOMINAL_TIME_EL + "/" + trgCluster.getName()); props.put("distcpSourcePaths", sourceStagingPath); + // create staging dirs for export at source which needs to be writable - hence write interface + String falconSourceStagingPath = + FeedHelper.getStagingPath(false, srcCluster, entity, sourceStorage, Tag.REPLICATION, + NOMINAL_TIME_EL + "/" + trgCluster.getName()); + props.put("falconSourceStagingDir", falconSourceStagingPath); // create staging dirs for import at target & set it as distcpTargetPaths String targetStagingPath = - FeedHelper.getStagingPath(trgCluster, entity, targetStorage, Tag.REPLICATION, + FeedHelper.getStagingPath(false, trgCluster, entity, targetStorage, Tag.REPLICATION, NOMINAL_TIME_EL + "/" + trgCluster.getName()); props.put("distcpTargetPaths", targetStagingPath); http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java index 61739a5..30ca0a8 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java @@ -20,10 +20,12 @@ package org.apache.falcon.oozie.feed; import org.apache.falcon.FalconException; import org.apache.falcon.Tag; +import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; import java.util.Arrays; @@ -127,6 +129,15 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild if (isSecurityEnabled) { // add a reference to credential in the action action.setCred(TARGET_HIVE_CREDENTIAL_NAME); } + } else if (REPLICATION_ACTION_NAME.equals(actionName)) { + if (isSecurityEnabled) { + // this is to ensure that the delegation tokens are checked out for both clusters + CONFIGURATION.Property property = new CONFIGURATION.Property(); + property.setName("mapreduce.job.hdfs-servers"); + property.setValue(ClusterHelper.getReadOnlyStorageUrl(sourceCluster) + + "," + ClusterHelper.getStorageUrl(targetCluster)); + action.getJava().getConfiguration().getProperty().add(property); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/main/resources/action/feed/table-export.xml ---------------------------------------------------------------------- diff --git a/oozie/src/main/resources/action/feed/table-export.xml b/oozie/src/main/resources/action/feed/table-export.xml index f5f7e66..fcf1a1a 100644 --- a/oozie/src/main/resources/action/feed/table-export.xml +++ b/oozie/src/main/resources/action/feed/table-export.xml @@ -20,6 +20,9 @@ <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${falconSourceJobTracker}</job-tracker> <name-node>${falconSourceNameNode}</name-node> + <!-- + falconSourceStagingDir and distcpSourcePaths are same but falconSourceStagingDir is readonly + --> <prepare> <delete path="${distcpSourcePaths}"/> </prepare> @@ -38,8 +41,8 @@ <param>falconSourceDatabase=${falconSourceDatabase}</param> <param>falconSourceTable=${falconSourceTable}</param> <param>falconSourcePartition=${falconSourcePartition}</param> - <param>falconSourceStagingDir=${distcpSourcePaths}</param> + <param>falconSourceStagingDir=${falconSourceStagingDir}</param> </hive> <ok to="replication"/> <error to="failed-post-processing"/> -</action> \ No newline at end of file +</action> http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e8b1d11e/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java ---------------------------------------------------------------------- diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java index 42c231f..e5588b4 100644 --- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java +++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java @@ -43,6 +43,7 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP; import org.apache.falcon.oozie.coordinator.SYNCDATASET; import org.apache.falcon.oozie.process.AbstractTestBase; import org.apache.falcon.oozie.workflow.ACTION; +import org.apache.falcon.oozie.workflow.CONFIGURATION; import org.apache.falcon.oozie.workflow.JAVA; import org.apache.falcon.oozie.workflow.WORKFLOWAPP; import org.apache.falcon.security.CurrentUser; @@ -437,13 +438,26 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE"); Assert.assertTrue(props.containsKey("distcpSourcePaths")); - Assert.assertEquals(props.get("distcpSourcePaths"), - FeedHelper.getStagingPath(srcCluster, tableFeed, srcStorage, Tag.REPLICATION, + final String distcpSourcePaths = props.get("distcpSourcePaths"); + Assert.assertEquals(distcpSourcePaths, + FeedHelper.getStagingPath(true, srcCluster, tableFeed, srcStorage, Tag.REPLICATION, "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" + "/" + trgCluster.getName())); + Assert.assertTrue(props.containsKey("falconSourceStagingDir")); + + final String falconSourceStagingDir = props.get("falconSourceStagingDir"); + Assert.assertEquals(falconSourceStagingDir, + FeedHelper.getStagingPath(false, srcCluster, tableFeed, srcStorage, Tag.REPLICATION, + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" + "/" + trgCluster.getName())); + + String exportPath = falconSourceStagingDir.substring( + ClusterHelper.getStorageUrl(srcCluster).length(), falconSourceStagingDir.length()); + String distCPPath = distcpSourcePaths.substring( + ClusterHelper.getReadOnlyStorageUrl(srcCluster).length(), distcpSourcePaths.length()); + Assert.assertEquals(exportPath, distCPPath); Assert.assertTrue(props.containsKey("distcpTargetPaths")); Assert.assertEquals(props.get("distcpTargetPaths"), - FeedHelper.getStagingPath(trgCluster, tableFeed, trgStorage, Tag.REPLICATION, + FeedHelper.getStagingPath(false, trgCluster, tableFeed, trgStorage, Tag.REPLICATION, "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" + "/" + trgCluster.getName())); Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name()); @@ -510,6 +524,16 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase { } else if ("table-import".equals(actionName) && isSecurityEnabled) { Assert.assertNotNull(action.getCred()); Assert.assertEquals(action.getCred(), "falconTargetHiveAuth"); + } else if ("replication".equals(actionName)) { + List<CONFIGURATION.Property> properties = + action.getJava().getConfiguration().getProperty(); + for (CONFIGURATION.Property property : properties) { + if (property.getName().equals("mapreduce.job.hdfs-servers")) { + Assert.assertEquals(property.getValue(), + ClusterHelper.getReadOnlyStorageUrl(srcCluster) + + "," + ClusterHelper.getStorageUrl(trgCluster)); + } + } } } }