Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ce60d2c7c -> 42ea018e5


[GOBBLIN-189] add dataset path to 'dataset published' events

Closes #2047 from arjun4084346/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/42ea018e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/42ea018e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/42ea018e

Branch: refs/heads/master
Commit: 42ea018e5690a55c4293b42842093e68c6fae304
Parents: ce60d2c
Author: Arjun <ab...@linkedin.com>
Authored: Thu Aug 10 08:55:40 2017 -0700
Committer: Issac Buenrostro <ibuen...@apache.org>
Committed: Thu Aug 10 08:55:40 2017 -0700

----------------------------------------------------------------------
 .../apache/gobblin/data/management/copy/CopyableFile.java | 10 ++++++++--
 .../data/management/copy/RecursiveCopyableDataset.java    |  2 +-
 .../data/management/copy/hive/HivePartitionFileSet.java   |  3 ++-
 .../management/copy/hive/UnpartitionedTableFileSet.java   |  2 +-
 .../data/management/copy/publisher/CopyDataPublisher.java |  8 ++++++++
 .../copy/ConcurrentBoundedWorkUnitListTest.java           |  2 +-
 .../management/copy/CopySourcePrioritizationTest.java     |  2 +-
 .../gobblin/data/management/copy/CopyableFileTest.java    |  4 ++--
 .../gobblin/data/management/copy/CopyableFileUtils.java   |  4 ++--
 .../apache/gobblin/metrics/event/sla/SlaEventKeys.java    |  1 +
 10 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 9c729e3..cec06f2 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -58,6 +58,9 @@ public class CopyableFile extends CopyEntity implements File {
   /** Complete destination {@link Path} of the file. */
   private Path destination;
 
+  /** Common path for dataset to which this CopyableFile belongs. */
+  public String datasetOutputPath;
+
   /** Desired {@link OwnerAndPermission} of the destination path. */
   private OwnerAndPermission destinationOwnerAndPermission;
 
@@ -89,7 +92,8 @@ public class CopyableFile extends CopyEntity implements File {
   @lombok.Builder(builderClassName = "Builder", builderMethodName = 
"_hiddenBuilder")
   public CopyableFile(FileStatus origin, Path destination, OwnerAndPermission 
destinationOwnerAndPermission,
       List<OwnerAndPermission> ancestorsOwnerAndPermission, byte[] checksum, 
PreserveAttributes preserve,
-      String fileSet, long originTimestamp, long upstreamTimestamp, 
Map<String, String> additionalMetadata) {
+      String fileSet, long originTimestamp, long upstreamTimestamp, 
Map<String, String> additionalMetadata,
+      String datasetOutputPath) {
     super(fileSet, additionalMetadata);
     this.origin = origin;
     this.destination = destination;
@@ -99,6 +103,7 @@ public class CopyableFile extends CopyEntity implements File 
{
     this.preserve = preserve;
     this.originTimestamp = originTimestamp;
     this.upstreamTimestamp = upstreamTimestamp;
+    this.datasetOutputPath = datasetOutputPath;
   }
 
   /**
@@ -145,6 +150,7 @@ public class CopyableFile extends CopyEntity implements 
File {
     private CopyConfiguration configuration;
     private FileSystem originFs;
     private Map<String, String> additionalMetadata;
+    private String datasetOutputPath;
 
     private Builder originFS(FileSystem originFs) {
       this.originFs = originFs;
@@ -219,7 +225,7 @@ public class CopyableFile extends CopyEntity implements 
File {
 
       return new CopyableFile(this.origin, this.destination, 
this.destinationOwnerAndPermission,
           this.ancestorsOwnerAndPermission, this.checksum, this.preserve, 
this.fileSet, this.originTimestamp,
-          this.upstreamTimestamp, this.additionalMetadata);
+          this.upstreamTimestamp, this.additionalMetadata, 
this.datasetOutputPath);
     }
 
     private List<OwnerAndPermission> 
replicateAncestorsOwnerAndPermission(FileSystem originFs, Path originPath,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 34428f1..0f18f68 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -136,7 +136,7 @@ public class RecursiveCopyableDataset implements 
CopyableDataset, FileSystemData
       Path thisTargetPath = new Path(configuration.getPublishDir(), 
filePathRelativeToSearchPath);
 
       copyableFiles.add(CopyableFile.fromOriginAndDestination(this.fs, file, 
thisTargetPath, configuration)
-          .fileSet(datasetURN())
+          .fileSet(datasetURN()).datasetOutputPath(thisTargetPath.toString())
           
.ancestorsOwnerAndPermission(CopyableFile.resolveReplicatedOwnerAndPermissionsRecursively(this.fs,
               file.getPath().getParent(), nonGlobSearchPath, configuration))
           .build());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index c57f55b..a9982bf 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -149,7 +149,8 @@ public class HivePartitionFileSet extends HiveFileSet {
       multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_COPY_UNITS);
       for (CopyableFile.Builder builder : 
hiveCopyEntityHelper.getCopyableFilesFromPaths(diffPathSet.filesToCopy,
           hiveCopyEntityHelper.getConfiguration(), 
Optional.of(this.partition))) {
-        copyEntities.add(builder.fileSet(fileSet).checksum(new 
byte[0]).build());
+        copyEntities.add(builder.fileSet(fileSet).checksum(new byte[0])
+            
.datasetOutputPath(desiredTargetLocation.location.toString()).build());
       }
 
       log.info("Created {} copy entities for partition {}", 
copyEntities.size(), this.partition.getCompleteName());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index e1421e7..a796a2b 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -120,7 +120,7 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
 
     for (CopyableFile.Builder builder : 
this.helper.getCopyableFilesFromPaths(diffPathSet.filesToCopy, 
this.helper.getConfiguration(),
         Optional.<Partition> absent())) {
-      copyEntities.add(builder.fileSet(fileSet).build());
+      
copyEntities.add(builder.fileSet(fileSet).datasetOutputPath(desiredTargetLocation.location.toString()).build());
     }
 
     multiTimer.close();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index dc57039..8092de6 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -195,6 +195,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
 
     long datasetOriginTimestamp = Long.MAX_VALUE;
     long datasetUpstreamTimestamp = Long.MAX_VALUE;
+    Optional<String> fileSetRoot = Optional.<String>absent();
 
     for (WorkUnitState wus : datasetWorkUnitStates) {
       if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
@@ -205,6 +206,12 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
         CopyableFile copyableFile = (CopyableFile) copyEntity;
         if (wus.getWorkingState() == WorkingState.COMMITTED) {
           
CopyEventSubmitterHelper.submitSuccessfulFilePublish(this.eventSubmitter, 
copyableFile, wus);
+          // Dataset Output path is injected in each copyableFile.
+          // This can be optimized by having a dataset level equivalent class 
for copyable entities
+          // and storing dataset related information, e.g. dataset output 
path, there.
+          if (!fileSetRoot.isPresent()) {
+            fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
+          }
         }
         if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
           datasetOriginTimestamp = copyableFile.getOriginTimestamp();
@@ -226,6 +233,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
 
     additionalMetadata.put(SlaEventKeys.SOURCE_URI, 
this.state.getProp(SlaEventKeys.SOURCE_URI));
     additionalMetadata.put(SlaEventKeys.DESTINATION_URI, 
this.state.getProp(SlaEventKeys.DESTINATION_URI));
+    additionalMetadata.put(SlaEventKeys.DATASET_OUTPUT_PATH, 
fileSetRoot.or("Unknown"));
     
CopyEventSubmitterHelper.submitSuccessfulDatasetPublish(this.eventSubmitter, 
datasetAndPartition,
         Long.toString(datasetOriginTimestamp), 
Long.toString(datasetUpstreamTimestamp), additionalMetadata);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java
index 0dc9bc3..faa39bb 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/ConcurrentBoundedWorkUnitListTest.java
@@ -145,7 +145,7 @@ public class ConcurrentBoundedWorkUnitListTest {
 
     return new CopyableFile(origin, targetPath, new OwnerAndPermission(null, 
null, null),
         Lists.<OwnerAndPermission>newArrayList(), null, 
PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps
-        .<String, String>newHashMap());
+        .<String, String>newHashMap(), "");
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java
index 2b55bd4..5fdc532 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourcePrioritizationTest.java
@@ -220,7 +220,7 @@ public class CopySourcePrioritizationTest {
   private static CopyableFile createCopyableFile(String path, String fileSet) {
     return new CopyableFile(new FileStatus(0, false, 0, 0, 0, new Path(path)), 
new Path(path),
         new OwnerAndPermission("owner", "group", FsPermission.getDefault()), 
null, null,
-        PreserveAttributes.fromMnemonicString(""), fileSet, 0, 0, 
Maps.<String, String>newHashMap());
+        PreserveAttributes.fromMnemonicString(""), fileSet, 0, 0, 
Maps.<String, String>newHashMap(), "");
   }
 
   public static class MyPrioritizer implements FileSetComparator {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
index c523c35..30ba0af 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
@@ -48,7 +48,7 @@ public class CopyableFileTest {
             new OwnerAndPermission("owner", "group", 
FsPermission.getDefault()),
             Lists.newArrayList(new OwnerAndPermission("owner2", "group2", 
FsPermission.getDefault())),
             "checksum".getBytes(), PreserveAttributes.fromMnemonicString(""), 
"", 0, 0, Maps
-            .<String, String>newHashMap());
+            .<String, String>newHashMap(), "");
 
     String s = CopyEntity.serialize(copyableFile);
     CopyEntity de = CopyEntity.deserialize(s);
@@ -63,7 +63,7 @@ public class CopyableFileTest {
         new CopyableFile(null, null, new OwnerAndPermission("owner", "group",
             FsPermission.getDefault()), Lists.newArrayList(new 
OwnerAndPermission(null, "group2", FsPermission
             .getDefault())), "checksum".getBytes(), 
PreserveAttributes.fromMnemonicString(""), "", 0, 0,
-            Maps.<String, String>newHashMap());
+            Maps.<String, String>newHashMap(), "");
 
     String serialized = CopyEntity.serialize(copyableFile);
     CopyEntity deserialized = CopyEntity.deserialize(serialized);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
index f0442d9..d8cb938 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
@@ -39,7 +39,7 @@ public class CopyableFileUtils {
     FileStatus status = new FileStatus(0l, false, 0, 0l, 0l, new 
Path(resourcePath));
 
     return new CopyableFile(status, new Path(getRandomPath()), null, null, 
null,
-        PreserveAttributes.fromMnemonicString(""), "", 0 ,0, Maps.<String, 
String>newHashMap());
+        PreserveAttributes.fromMnemonicString(""), "", 0 ,0, Maps.<String, 
String>newHashMap(), "");
   }
 
   public static CopyableFile getTestCopyableFile() {
@@ -83,7 +83,7 @@ public class CopyableFileUtils {
     Path destinationRelativePath = new Path(relativePath);
 
     return new CopyableFile(status, new Path(destinationPath), 
ownerAndPermission, null, null,
-        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, 
String>newHashMap());
+        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, 
String>newHashMap(), "");
   }
 
   private static String getRandomPath() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42ea018e/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java
index bf068b3..b7c10d4 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/sla/SlaEventKeys.java
@@ -35,4 +35,5 @@ public class SlaEventKeys {
 
   public static final String SOURCE_URI = "sourceCluster";
   public static final String DESTINATION_URI = "destinationCluster";
+  public static final String DATASET_OUTPUT_PATH = "datasetOutputPath";
 }

Reply via email to