Repository: falcon Updated Branches: refs/heads/master c57c8f448 -> cb473b4fb
FALCON-2028 HDFS extension: Validate and append/remove the scheme://authority for the paths Author: Sowmya Ramesh <[email protected]> Reviewers: "Venkat Ranganathan<[email protected]>, Balu Vellanki <[email protected]>" Closes #248 from sowmyaramesh/FALCON-2028_1 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/cb473b4f Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/cb473b4f Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/cb473b4f Branch: refs/heads/master Commit: cb473b4fb037f13c1e458b6a4215f53a05f42b15 Parents: c57c8f4 Author: Sowmya Ramesh <[email protected]> Authored: Fri Aug 12 15:00:51 2016 -0700 Committer: Sowmya Ramesh <[email protected]> Committed: Fri Aug 12 15:00:51 2016 -0700 ---------------------------------------------------------------------- .../mirroring/hdfs/HdfsMirroringExtension.java | 32 ++++++++++++++++++-- .../falcon/resource/ExtensionManagerIT.java | 4 +-- 2 files changed, 32 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/cb473b4f/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java index ef26d81..74d217c 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfs/HdfsMirroringExtension.java @@ -24,6 +24,8 @@ import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.extensions.AbstractExtension; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Properties; /** @@ -82,17 +84,43 @@ public class HdfsMirroringExtension extends AbstractExtension { if (StringUtils.isNotBlank(srcPaths)) { String[] paths = srcPaths.split(COMMA_SEPARATOR); + URI pathUri; for (String path : paths) { - StringBuilder srcpath = new StringBuilder(srcClusterEndPoint); + try { + pathUri = new URI(path.trim()); + } catch (URISyntaxException e) { + throw new FalconException(e); + } + String authority = pathUri.getAuthority(); + StringBuilder srcpath = new StringBuilder(); + if (authority == null) { + srcpath.append(srcClusterEndPoint); + } + srcpath.append(path.trim()); srcpath.append(COMMA_SEPARATOR); absoluteSrcPaths.append(srcpath); } } - additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(), StringUtils.removeEnd(absoluteSrcPaths.toString(), COMMA_SEPARATOR)); + // Target dir shouldn't have the namenode + String targetDir = extensionProperties.getProperty(HdfsMirroringExtensionProperties + .TARGET_DIR.getName()); + + URI targetPathUri; + try { + targetPathUri = new URI(targetDir.trim()); + } catch (URISyntaxException e) { + throw new FalconException(e); + } + + if (targetPathUri.getScheme() != null) { + additionalProperties.put(HdfsMirroringExtensionProperties.TARGET_DIR.getName(), + targetPathUri.getPath()); + } + // add sourceClusterFS and targetClusterFS additionalProperties.put(HdfsMirroringExtensionProperties.SOURCE_CLUSTER_FS_WRITE_ENDPOINT.getName(), ClusterHelper.getStorageUrl(srcCluster)); http://git-wip-us.apache.org/repos/asf/falcon/blob/cb473b4f/webapp/src/test/java/org/apache/falcon/resource/ExtensionManagerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/resource/ExtensionManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ExtensionManagerIT.java index 6efe884..db23d6d 100644 --- a/webapp/src/test/java/org/apache/falcon/resource/ExtensionManagerIT.java +++ b/webapp/src/test/java/org/apache/falcon/resource/ExtensionManagerIT.java @@ -42,8 +42,8 @@ public class ExtensionManagerIT extends AbstractTestExtensionStore { private static final String START_TIME_1 = "2016-05-03T00:00Z"; private static final String START_TIME_2 = "2016-05-01T00:00Z"; private static final String FREQUENCY = "days(1)"; - private static final String SOURCE_DIR = "/apps/falcon/demo/input-{year}-{month}-{day}"; - private static final String TARGET_DIR = "/apps/falcon/demo/output-{year}-{month}-{day}"; + private static final String SOURCE_DIR = "/apps/falcon/demo/input-2016-07-10"; + private static final String TARGET_DIR = "/apps/falcon/demo/output"; private final TestContext context = new TestContext();
