Repository: incubator-gobblin Updated Branches: refs/heads/master ce60d2c7c -> 42ea018e5
[GOBBLIN-189] add dataset path to 'dataset published' events Closes #2047 from arjun4084346/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/42ea018e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/42ea018e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/42ea018e Branch: refs/heads/master Commit: 42ea018e5690a55c4293b42842093e68c6fae304 Parents: ce60d2c Author: Arjun <[email protected]> Authored: Thu Aug 10 08:55:40 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Thu Aug 10 08:55:40 2017 -0700 ---------------------------------------------------------------------- .../apache/gobblin/data/management/copy/CopyableFile.java | 10 ++++++++-- .../data/management/copy/RecursiveCopyableDataset.java | 2 +- .../data/management/copy/hive/HivePartitionFileSet.java | 3 ++- .../management/copy/hive/UnpartitionedTableFileSet.java | 2 +- .../data/management/copy/publisher/CopyDataPublisher.java | 8 ++++++++ .../copy/ConcurrentBoundedWorkUnitListTest.java | 2 +- .../management/copy/CopySourcePrioritizationTest.java | 2 +- .../gobblin/data/management/copy/CopyableFileTest.java | 4 ++-- .../gobblin/data/management/copy/CopyableFileUtils.java | 4 ++-- .../apache/gobblin/metrics/event/sla/SlaEventKeys.java | 1 + 10 files changed, 27 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java index 9c729e3..cec06f2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java @@ -58,6 +58,9 @@ public class CopyableFile extends CopyEntity implements File { /** Complete destination {@link Path} of the file. */ private Path destination; + /** Common path for dataset to which this CopyableFile belongs. */ + public String datasetOutputPath; + /** Desired {@link OwnerAndPermission} of the destination path. */ private OwnerAndPermission destinationOwnerAndPermission; @@ -89,7 +92,8 @@ public class CopyableFile extends CopyEntity implements File { @lombok.Builder(builderClassName = "Builder", builderMethodName = "_hiddenBuilder") public CopyableFile(FileStatus origin, Path destination, OwnerAndPermission destinationOwnerAndPermission, List<OwnerAndPermission> ancestorsOwnerAndPermission, byte[] checksum, PreserveAttributes preserve, - String fileSet, long originTimestamp, long upstreamTimestamp, Map<String, String> additionalMetadata) { + String fileSet, long originTimestamp, long upstreamTimestamp, Map<String, String> additionalMetadata, + String datasetOutputPath) { super(fileSet, additionalMetadata); this.origin = origin; this.destination = destination; @@ -99,6 +103,7 @@ public class CopyableFile extends CopyEntity implements File { this.preserve = preserve; this.originTimestamp = originTimestamp; this.upstreamTimestamp = upstreamTimestamp; + this.datasetOutputPath = datasetOutputPath; } /** @@ -145,6 +150,7 @@ public class CopyableFile extends CopyEntity implements File { private CopyConfiguration configuration; private FileSystem originFs; private Map<String, String> additionalMetadata; + private String datasetOutputPath; private Builder originFS(FileSystem originFs) { this.originFs = originFs; @@ -219,7 +225,7 @@ public class CopyableFile extends CopyEntity implements File { return new CopyableFile(this.origin, this.destination, this.destinationOwnerAndPermission, this.ancestorsOwnerAndPermission, this.checksum, this.preserve, this.fileSet, this.originTimestamp, - this.upstreamTimestamp, this.additionalMetadata); + this.upstreamTimestamp, this.additionalMetadata, this.datasetOutputPath); } private List<OwnerAndPermission> replicateAncestorsOwnerAndPermission(FileSystem originFs, Path originPath, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java index 34428f1..0f18f68 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java @@ -136,7 +136,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData Path thisTargetPath = new Path(configuration.getPublishDir(), filePathRelativeToSearchPath); copyableFiles.add(CopyableFile.fromOriginAndDestination(this.fs, file, thisTargetPath, configuration) - .fileSet(datasetURN()) + .fileSet(datasetURN()).datasetOutputPath(thisTargetPath.toString()) .ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath, configuration)) .build()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java index c57f55b..a9982bf 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java @@ -149,7 +149,8 @@ public class HivePartitionFileSet extends HiveFileSet { multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_COPY_UNITS); for (CopyableFile.Builder builder : hiveCopyEntityHelper.getCopyableFilesFromPaths(diffPathSet.filesToCopy, hiveCopyEntityHelper.getConfiguration(), Optional.of(this.partition))) { - copyEntities.add(builder.fileSet(fileSet).checksum(new byte[0]).build()); + copyEntities.add(builder.fileSet(fileSet).checksum(new byte[0]) + .datasetOutputPath(desiredTargetLocation.location.toString()).build()); } log.info("Created {} copy entities for partition {}", copyEntities.size(), this.partition.getCompleteName()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java index e1421e7..a796a2b 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java @@ -120,7 +120,7 @@ public class UnpartitionedTableFileSet extends HiveFileSet { for (CopyableFile.Builder builder : this.helper.getCopyableFilesFromPaths(diffPathSet.filesToCopy, this.helper.getConfiguration(), Optional.<Partition> absent())) { - copyEntities.add(builder.fileSet(fileSet).build()); + copyEntities.add(builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build()); } multiTimer.close(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index dc57039..8092de6 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -195,6 +195,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl long datasetOriginTimestamp = Long.MAX_VALUE; long datasetUpstreamTimestamp = Long.MAX_VALUE; + Optional<String> fileSetRoot = Optional.<String>absent(); for (WorkUnitState wus : datasetWorkUnitStates) { if (wus.getWorkingState() == WorkingState.SUCCESSFUL) { @@ -205,6 +206,12 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl CopyableFile copyableFile = (CopyableFile) copyEntity; if (wus.getWorkingState() == WorkingState.COMMITTED) { CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, copyableFile, wus); + // Dataset Output path is injected in each copyableFile. + // This can be optimized by having a dataset level equivalent class for copyable entities + // and storing dataset related information, e.g. dataset output path, there. + if (!fileSetRoot.isPresent()) { + fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath()); + } } if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) { datasetOriginTimestamp = copyableFile.getOriginTimestamp(); @@ -226,6 +233,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl additionalMetadata.put(SlaEventKeys.SOURCE_URI, this.state.getProp(SlaEventKeys.SOURCE_URI)); additionalMetadata.put(SlaEventKeys.DESTINATION_URI, this.state.getProp(SlaEventKeys.DESTINATION_URI)); + additionalMetadata.put(SlaEventKeys.DATASET_OUTPUT_PATH, fileSetRoot.or("Unknown")); CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter, datasetAndPartition, Long.toString(datasetOriginTimestamp), Long.toString(datasetUpstreamTimestamp), additionalMetadata); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java index 0dc9bc3..faa39bb 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java @@ -145,7 +145,7 @@ public class ConcurrentBoundedWorkUnitListTest { return new CopyableFile(origin, targetPath, new OwnerAndPermission(null, null, null), Lists.<OwnerAndPermission>newArrayList(), null, PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps - .<String, String>newHashMap()); + .<String, String>newHashMap(), ""); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java index 2b55bd4..5fdc532 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java @@ -220,7 +220,7 @@ public class CopySourcePrioritizationTest { private static CopyableFile createCopyableFile(String path, String fileSet) { return new CopyableFile(new FileStatus(0, false, 0, 0, 0, new Path(path)), new Path(path), new OwnerAndPermission("owner", "group", FsPermission.getDefault()), null, null, - PreserveAttributes.fromMnemonicString(""), fileSet, 0, 0, Maps.<String, String>newHashMap()); + PreserveAttributes.fromMnemonicString(""), fileSet, 0, 0, Maps.<String, String>newHashMap(), ""); } public static class MyPrioritizer implements FileSetComparator { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java index c523c35..30ba0af 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java @@ -48,7 +48,7 @@ public class CopyableFileTest { new OwnerAndPermission("owner", "group", FsPermission.getDefault()), Lists.newArrayList(new OwnerAndPermission("owner2", "group2", FsPermission.getDefault())), "checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps - .<String, String>newHashMap()); + .<String, String>newHashMap(), ""); String s = CopyEntity.serialize(copyableFile); CopyEntity de = CopyEntity.deserialize(s); @@ -63,7 +63,7 @@ public class CopyableFileTest { new CopyableFile(null, null, new OwnerAndPermission("owner", "group", FsPermission.getDefault()), Lists.newArrayList(new OwnerAndPermission(null, "group2", FsPermission .getDefault())), "checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), "", 0, 0, - Maps.<String, String>newHashMap()); + Maps.<String, String>newHashMap(), ""); String serialized = CopyEntity.serialize(copyableFile); CopyEntity deserialized = CopyEntity.deserialize(serialized); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java index f0442d9..d8cb938 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java @@ -39,7 +39,7 @@ public class CopyableFileUtils { FileStatus status = new FileStatus(0l, false, 0, 0l, 0l, new Path(resourcePath)); return new CopyableFile(status, new Path(getRandomPath()), null, null, null, - PreserveAttributes.fromMnemonicString(""), "", 0 ,0, Maps.<String, String>newHashMap()); + PreserveAttributes.fromMnemonicString(""), "", 0 ,0, Maps.<String, String>newHashMap(), ""); } public static CopyableFile getTestCopyableFile() { @@ -83,7 +83,7 @@ public class CopyableFileUtils { Path destinationRelativePath = new Path(relativePath); return new CopyableFile(status, new Path(destinationPath), ownerAndPermission, null, null, - PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap()); + PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), ""); } private static String getRandomPath() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java index bf068b3..b7c10d4 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java @@ -35,4 +35,5 @@ public class SlaEventKeys { public static final String SOURCE_URI = "sourceCluster"; public static final String DESTINATION_URI = "destinationCluster"; + public static final String DATASET_OUTPUT_PATH = "datasetOutputPath"; }
