Repository: incubator-gobblin Updated Branches: refs/heads/master 1507b6055 -> 520bcf5df
[GOBBLIN-241] Allow lineage event to aggregate on task states based on lineage.dataset.urn Closes #2092 from yukuai518/kafkalineage Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/520bcf5d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/520bcf5d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/520bcf5d Branch: refs/heads/master Commit: 520bcf5df5a7dcc7961f38f7b54b41cc8b2ddd45 Parents: 1507b60 Author: Kuai Yu <[email protected]> Authored: Wed Sep 13 08:41:56 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Wed Sep 13 08:41:56 2017 -0700 ---------------------------------------------------------------------- .../org/apache/gobblin/lineage/LineageInfo.java | 14 +++++++++++++- .../apache/gobblin/runtime/SafeDatasetCommit.java | 16 +++++++++++----- 2 files changed, 24 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/520bcf5d/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java index 8d582f2..90473d4 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java @@ -17,6 +17,7 @@ package org.apache.gobblin.lineage; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -48,7 +49,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class LineageInfo { - + public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn"; public static final String LINEAGE_NAME_SPACE = "gobblin.lineage"; public static final String BRANCH_ID_METADATA_KEY = "branchId"; private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + "."; @@ -228,6 +229,17 @@ public class LineageInfo { state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value); } + public static Map<String, Collection<State>> aggregateByDatasetUrn (Collection<? extends State> states) { + Map<String, Collection<State>> datasetStates = new HashMap<>(); + for (State state: states) { + String urn = state.getProp(LINEAGE_DATASET_URN, state.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN)); + datasetStates.putIfAbsent(urn, new ArrayList<>()); + Collection<State> datasetState = datasetStates.get(urn); + datasetState.add(state); + } + return datasetStates; + } + public final String getId() { return Joiner.on(":::").join(this.datasetUrn, this.jobId); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/520bcf5d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index c7bd90f..0a8984b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -32,6 +32,7 @@ import org.apache.gobblin.commit.CommitSequence; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.commit.DeliverySemantics; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.lineage.LineageException; @@ -188,11 +189,16 @@ final class SafeDatasetCommit implements Callable<Void> { } try { - Collection<LineageInfo> branchLineages = LineageInfo.load(states, LineageInfo.Level.All); - EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class), - LineageInfo.LINEAGE_NAME_SPACE).build(); - for (LineageInfo info: branchLineages) { - submitter.submit(info.getId(), info.getLineageMetaData()); + // Aggregate states by lineage.dataset.urn, in case datasetUrn may be set to empty so that all task states falls into one empty dataset. + // FixMe: once all dataset.urn attribues are set properly, we don't need this aggregation. + Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values(); + for (Collection<State> dataState: datasetStates) { + Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All); + EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class), + LineageInfo.LINEAGE_NAME_SPACE).build(); + for (LineageInfo info: branchLineages) { + submitter.submit(info.getId(), info.getLineageMetaData()); + } } } catch (LineageException e) { log.error ("Lineage event submission failed due to :" + e.toString());
