Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 1507b6055 -> 520bcf5df


[GOBBLIN-241] Allow lineage event to aggregate on task states based on 
lineage.dataset.urn

Closes #2092 from yukuai518/kafkalineage


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/520bcf5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/520bcf5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/520bcf5d

Branch: refs/heads/master
Commit: 520bcf5df5a7dcc7961f38f7b54b41cc8b2ddd45
Parents: 1507b60
Author: Kuai Yu <[email protected]>
Authored: Wed Sep 13 08:41:56 2017 -0700
Committer: Issac Buenrostro <[email protected]>
Committed: Wed Sep 13 08:41:56 2017 -0700

----------------------------------------------------------------------
 .../org/apache/gobblin/lineage/LineageInfo.java     | 14 +++++++++++++-
 .../apache/gobblin/runtime/SafeDatasetCommit.java   | 16 +++++++++++-----
 2 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/520bcf5d/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java 
b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
index 8d582f2..90473d4 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.lineage;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -48,7 +49,7 @@ import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class LineageInfo {
-
+  public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn";
   public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
   public static final String BRANCH_ID_METADATA_KEY = "branchId";
   private static final String DATASET_PREFIX =  LINEAGE_NAME_SPACE + ".";
@@ -228,6 +229,17 @@ public class LineageInfo {
     state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value);
   }
 
+  public static Map<String, Collection<State>> aggregateByDatasetUrn 
(Collection<? extends State> states) {
+    Map<String, Collection<State>> datasetStates = new HashMap<>();
+    for (State state: states) {
+      String urn = state.getProp(LINEAGE_DATASET_URN, 
state.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN));
+      datasetStates.putIfAbsent(urn, new ArrayList<>());
+      Collection<State> datasetState = datasetStates.get(urn);
+      datasetState.add(state);
+    }
+    return datasetStates;
+  }
+
   public final String getId() {
     return Joiner.on(":::").join(this.datasetUrn, this.jobId);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/520bcf5d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index c7bd90f..0a8984b 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -32,6 +32,7 @@ import org.apache.gobblin.commit.CommitSequence;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.lineage.LineageException;
@@ -188,11 +189,16 @@ final class SafeDatasetCommit implements Callable<Void> {
     }
 
     try {
-      Collection<LineageInfo> branchLineages = LineageInfo.load(states, 
LineageInfo.Level.All);
-      EventSubmitter submitter = new 
EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, 
SafeDatasetCommit.class),
-          LineageInfo.LINEAGE_NAME_SPACE).build();
-      for (LineageInfo info: branchLineages) {
-        submitter.submit(info.getId(), info.getLineageMetaData());
+      // Aggregate states by lineage.dataset.urn, in case datasetUrn may be 
set to empty so that all task states falls into one empty dataset.
+      // FixMe: once all dataset.urn attribues are set properly, we don't need 
this aggregation.
+      Collection<Collection<State>> datasetStates = 
LineageInfo.aggregateByDatasetUrn(states).values();
+      for (Collection<State> dataState: datasetStates) {
+        Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, 
LineageInfo.Level.All);
+        EventSubmitter submitter = new 
EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, 
SafeDatasetCommit.class),
+            LineageInfo.LINEAGE_NAME_SPACE).build();
+        for (LineageInfo info: branchLineages) {
+          submitter.submit(info.getId(), info.getLineageMetaData());
+        }
       }
     } catch (LineageException e) {
       log.error ("Lineage event submission failed due to :" + e.toString());

Reply via email to