Repository: incubator-gobblin
Updated Branches:
  refs/heads/master e65f1316d -> d6c7fe79b


[GOBBLIN-261] Add Kafka lineage information

Closes #2113 from yukuai518/logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d6c7fe79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d6c7fe79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d6c7fe79

Branch: refs/heads/master
Commit: d6c7fe79b2f193521eb00a56abf49ffd511d7308
Parents: e65f131
Author: Kuai Yu <[email protected]>
Authored: Fri Sep 22 16:01:12 2017 -0700
Committer: Issac Buenrostro <[email protected]>
Committed: Fri Sep 22 16:01:12 2017 -0700

----------------------------------------------------------------------
 .../gobblin/lineage/LineageException.java       |  39 +++
 .../org/apache/gobblin/lineage/LineageInfo.java | 246 +++++++++++++++++++
 .../gobblin/lineage/LineageException.java       |  39 ---
 .../org/apache/gobblin/lineage/LineageInfo.java | 246 -------------------
 .../publisher/TimePartitionedDataPublisher.java |  14 ++
 .../packer/KafkaBiLevelWorkUnitPacker.java      |  22 +-
 6 files changed, 314 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java 
b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
new file mode 100644
index 0000000..8dcf592
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.gobblin.lineage;
+
+/**
+ * A set of exceptions used by {@link LineageInfo} when lineage information is 
serialized or deserialized.
+ */
+public class LineageException extends Exception {
+  public LineageException(String message) {
+    super(message);
+  }
+  public static class LineageConflictAttributeException extends 
LineageException {
+    public LineageConflictAttributeException (String key, String oldValue, 
String newValue) {
+      super ("Lineage has conflict value: key=" + key + " value=[1]" + 
oldValue + " [2]" + newValue);
+    }
+  }
+
+  public static class LineageUnsupportedLevelException extends 
LineageException {
+    public LineageUnsupportedLevelException (LineageInfo.Level level) {
+      super (level.toString() + " is not supported");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java 
b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
new file mode 100644
index 0000000..90473d4
--- /dev/null
+++ b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.lineage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A class to restore all lineage information from a {@link State}
+ * All lineage attributes are under LINEAGE_NAME_SPACE namespace.
+ *
+ * For example, a typical lineage attributes looks like:
+ *    gobblin.lineage.K1          ---> V1
+ *    gobblin.lineage.branch.3.K2 ---> V2
+ *
+ * K1 is dataset level attribute, K2 is branch level attribute, and branch id 
is 3.
+ */
+
+@Slf4j
+public class LineageInfo {
+  public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn";
+  public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
+  public static final String BRANCH_ID_METADATA_KEY = "branchId";
+  private static final String DATASET_PREFIX =  LINEAGE_NAME_SPACE + ".";
+  private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";
+
+  @Getter
+  private String datasetUrn;
+  @Getter
+  private String jobId;
+
+  private Map<String, String> lineageMetaData;
+
+  public enum Level {
+    DATASET,
+    BRANCH,
+    All
+  }
+
+  private LineageInfo() {
+  }
+
+  private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, 
String> lineageMetaData) {
+    Preconditions.checkArgument(datasetUrn != null);
+    Preconditions.checkArgument(jobId != null);
+    this.datasetUrn = datasetUrn;
+    this.jobId = jobId;
+    this.lineageMetaData = lineageMetaData;
+  }
+
+  /**
+   * Retrieve lineage information from a {@link State} by {@link Level}
+   * @param state A single state
+   * @param level {@link Level#DATASET}  only load dataset level lineage 
attributes
+   *              {@link Level#BRANCH}   only load branch level lineage 
attributes
+   *              {@link Level#All}      load all lineage attributes
+   * @return A collection of {@link LineageInfo}s per branch. When level is 
{@link Level#DATASET}, this list has only single element.
+   */
+  public static Collection<LineageInfo> load (State state, Level level) throws 
LineageException {
+    return load(Collections.singleton(state), level);
+  }
+
+  /**
+   * Get all lineage meta data.
+   */
+  public ImmutableMap<String, String> getLineageMetaData() {
+    return ImmutableMap.copyOf(lineageMetaData);
+  }
+
+  /**
+   * Retrieve all lineage information from different {@link State}s.
+   * This requires the job id and dataset urn to be present in the state, 
under job.id and dataset.urn.
+   * A global union operation is applied to combine all <K, V> pairs from the 
input {@link State}s. If multiple {@link State}s
+   * share the same K, but have conflicting V, a {@link LineageException} is 
thrown.
+   *
+   * {@link Level} can control if a dataset level or branch level information 
should be used. When {@link Level#All} is
+   * specified, all levels of information will be returned; otherwise only 
specified level of information will be returned.
+   *
+   * For instance, assume we have below input states:
+   *    State[0]: gobblin.lineage.K1          ---> V1
+   *              gobblin.lineage.K2          ---> V2
+   *              gobblin.lineage.branch.1.K4 ---> V4
+   *    State[1]: gobblin.lineage.K2          ---> V2
+   *              gobblin.lineage.K3          ---> V3
+   *              gobblin.lineage.branch.1.K4 ---> V4
+   *              gobblin.lineage.branch.1.K5 ---> V5
+   *              gobblin.lineage.branch.2.K6 ---> V6
+   *
+   *  (1) With {@link Level#DATASET} level, the output would be:
+   *      LinieageInfo[0]:  K1 ---> V1
+   *                        K2 ---> V2
+   *                        K3 ---> V3
+   *  (2) With {@link Level#All} level, the output would be: (because there 
are two branches, so there are two LineageInfo)
+   *      LineageInfo[0]:   K1 ---> V1
+   *                        K2 ---> V2
+   *                        K3 ---> V3
+   *                        K4 ---> V4
+   *                        K5 ---> V5
+   *
+   *      LineageInfo[1]:   K1 ---> V1
+   *                        K2 ---> V2
+   *                        K3 ---> V3
+   *                        K6 ---> V6
+   *
+   *   (3) With {@link Level#BRANCH} level, the output would be: (only branch 
level information was returned)
+   *      LineageInfo[0]:   K4 ---> V4
+   *                        K5 ---> V5
+   *      LineageInfo[1]:   K6 ---> V6
+   *
+   * @param states All states which belong to the same dataset and share the 
same jobId.
+   * @param level {@link Level#DATASET}  only load dataset level lineage 
attributes
+   *              {@link Level#BRANCH}   only load branch level lineage 
attributes
+   *              {@link Level#All}      load all lineage attributes
+   * @return A collection of {@link LineageInfo}s per branch. When level is 
{@link Level#DATASET}, this list has only single element.
+   *
+   * @throws LineageException.LineageConflictAttributeException if two states 
have same key but not the same value.
+   */
+  public static Collection<LineageInfo> load (Collection<? extends State> 
states, Level level) throws LineageException {
+    Preconditions.checkArgument(states != null && !states.isEmpty());
+    Map<String, String> datasetMetaData = new HashMap<>();
+    Map<String, Map<String, String>> branchAggregate = new HashMap<>();
+
+    State anyOne = states.iterator().next();
+    String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, "");
+    String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN);
+
+    for (State state: states) {
+      for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) 
{
+        if (entry.getKey() instanceof String && ((String) 
entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) {
+
+          String lineageKey = ((String) entry.getKey());
+          String lineageValue = (String) entry.getValue();
+
+          if (lineageKey.startsWith(BRANCH_PREFIX)) {
+            String branchPrefixStrip = 
lineageKey.substring(BRANCH_PREFIX.length());
+            String branchId = branchPrefixStrip.substring(0, 
branchPrefixStrip.indexOf("."));
+            String key = 
branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1);
+
+            if (level == Level.BRANCH || level == Level.All) {
+              if (!branchAggregate.containsKey(branchId)) {
+                branchAggregate.put(branchId, new HashMap<>());
+              }
+              Map<String, String> branchMetaData = 
branchAggregate.get(branchId);
+              String prev = branchMetaData.put(key, lineageValue);
+              if (prev != null && !prev.equals(lineageValue)) {
+                throw new 
LineageException.LineageConflictAttributeException(lineageKey, prev, 
lineageValue);
+              }
+            }
+          } else if (lineageKey.startsWith(DATASET_PREFIX)) {
+            if (level == Level.DATASET || level == Level.All) {
+              String prev = 
datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), 
lineageValue);
+              if (prev != null && !prev.equals(lineageValue)) {
+                throw new 
LineageException.LineageConflictAttributeException(lineageKey, prev, 
lineageValue);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    Collection<LineageInfo> collection = Sets.newHashSet();
+
+    if (level == Level.DATASET) {
+      ImmutableMap<String, String> metaData = ImmutableMap.<String, 
String>builder()
+          .putAll(datasetMetaData)
+          .build();
+      collection.add(new LineageInfo(urn, jobId, metaData));
+      return collection;
+    } else if (level == Level.BRANCH || level == Level.All){
+      if (branchAggregate.isEmpty()) {
+        if (level == Level.All) {
+          collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, 
String>builder().putAll(datasetMetaData).build()));
+        }
+        return collection;
+      }
+      for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: 
branchAggregate.entrySet()) {
+        String branchId = branchMetaDataEntry.getKey();
+        Map<String, String> branchMetaData = branchMetaDataEntry.getValue();
+        ImmutableMap.Builder<String, String> metaDataBuilder = 
ImmutableMap.builder();
+        if (level == Level.All) {
+          metaDataBuilder.putAll(datasetMetaData);
+        }
+        metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, 
branchId);
+        collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build()));
+      }
+
+      return collection;
+    } else {
+      throw new LineageException.LineageUnsupportedLevelException(level);
+    }
+  }
+
+  public static void setDatasetLineageAttribute (State state, String key, 
String value) {
+    state.setProp(DATASET_PREFIX + key, value);
+  }
+
+  public static void setBranchLineageAttribute (State state, int branchId, 
String key, String value) {
+    state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value);
+  }
+
+  public static Map<String, Collection<State>> aggregateByDatasetUrn 
(Collection<? extends State> states) {
+    Map<String, Collection<State>> datasetStates = new HashMap<>();
+    for (State state: states) {
+      String urn = state.getProp(LINEAGE_DATASET_URN, 
state.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN));
+      datasetStates.putIfAbsent(urn, new ArrayList<>());
+      Collection<State> datasetState = datasetStates.get(urn);
+      datasetState.add(state);
+    }
+    return datasetStates;
+  }
+
+  public final String getId() {
+    return Joiner.on(":::").join(this.datasetUrn, this.jobId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java 
b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
deleted file mode 100644
index 8dcf592..0000000
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.gobblin.lineage;
-
-/**
- * A set of exceptions used by {@link LineageInfo} when lineage information is 
serialized or deserialized.
- */
-public class LineageException extends Exception {
-  public LineageException(String message) {
-    super(message);
-  }
-  public static class LineageConflictAttributeException extends 
LineageException {
-    public LineageConflictAttributeException (String key, String oldValue, 
String newValue) {
-      super ("Lineage has conflict value: key=" + key + " value=[1]" + 
oldValue + " [2]" + newValue);
-    }
-  }
-
-  public static class LineageUnsupportedLevelException extends 
LineageException {
-    public LineageUnsupportedLevelException (LineageInfo.Level level) {
-      super (level.toString() + " is not supported");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java 
b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
deleted file mode 100644
index 90473d4..0000000
--- a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.lineage;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-
-/**
- * A class to restore all lineage information from a {@link State}
- * All lineage attributes are under LINEAGE_NAME_SPACE namespace.
- *
- * For example, a typical lineage attributes looks like:
- *    gobblin.lineage.K1          ---> V1
- *    gobblin.lineage.branch.3.K2 ---> V2
- *
- * K1 is dataset level attribute, K2 is branch level attribute, and branch id 
is 3.
- */
-
-@Slf4j
-public class LineageInfo {
-  public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn";
-  public static final String LINEAGE_NAME_SPACE = "gobblin.lineage";
-  public static final String BRANCH_ID_METADATA_KEY = "branchId";
-  private static final String DATASET_PREFIX =  LINEAGE_NAME_SPACE + ".";
-  private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch.";
-
-  @Getter
-  private String datasetUrn;
-  @Getter
-  private String jobId;
-
-  private Map<String, String> lineageMetaData;
-
-  public enum Level {
-    DATASET,
-    BRANCH,
-    All
-  }
-
-  private LineageInfo() {
-  }
-
-  private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, 
String> lineageMetaData) {
-    Preconditions.checkArgument(datasetUrn != null);
-    Preconditions.checkArgument(jobId != null);
-    this.datasetUrn = datasetUrn;
-    this.jobId = jobId;
-    this.lineageMetaData = lineageMetaData;
-  }
-
-  /**
-   * Retrieve lineage information from a {@link State} by {@link Level}
-   * @param state A single state
-   * @param level {@link Level#DATASET}  only load dataset level lineage 
attributes
-   *              {@link Level#BRANCH}   only load branch level lineage 
attributes
-   *              {@link Level#All}      load all lineage attributes
-   * @return A collection of {@link LineageInfo}s per branch. When level is 
{@link Level#DATASET}, this list has only single element.
-   */
-  public static Collection<LineageInfo> load (State state, Level level) throws 
LineageException {
-    return load(Collections.singleton(state), level);
-  }
-
-  /**
-   * Get all lineage meta data.
-   */
-  public ImmutableMap<String, String> getLineageMetaData() {
-    return ImmutableMap.copyOf(lineageMetaData);
-  }
-
-  /**
-   * Retrieve all lineage information from different {@link State}s.
-   * This requires the job id and dataset urn to be present in the state, 
under job.id and dataset.urn.
-   * A global union operation is applied to combine all <K, V> pairs from the 
input {@link State}s. If multiple {@link State}s
-   * share the same K, but have conflicting V, a {@link LineageException} is 
thrown.
-   *
-   * {@link Level} can control if a dataset level or branch level information 
should be used. When {@link Level#All} is
-   * specified, all levels of information will be returned; otherwise only 
specified level of information will be returned.
-   *
-   * For instance, assume we have below input states:
-   *    State[0]: gobblin.lineage.K1          ---> V1
-   *              gobblin.lineage.K2          ---> V2
-   *              gobblin.lineage.branch.1.K4 ---> V4
-   *    State[1]: gobblin.lineage.K2          ---> V2
-   *              gobblin.lineage.K3          ---> V3
-   *              gobblin.lineage.branch.1.K4 ---> V4
-   *              gobblin.lineage.branch.1.K5 ---> V5
-   *              gobblin.lineage.branch.2.K6 ---> V6
-   *
-   *  (1) With {@link Level#DATASET} level, the output would be:
-   *      LinieageInfo[0]:  K1 ---> V1
-   *                        K2 ---> V2
-   *                        K3 ---> V3
-   *  (2) With {@link Level#All} level, the output would be: (because there 
are two branches, so there are two LineageInfo)
-   *      LineageInfo[0]:   K1 ---> V1
-   *                        K2 ---> V2
-   *                        K3 ---> V3
-   *                        K4 ---> V4
-   *                        K5 ---> V5
-   *
-   *      LineageInfo[1]:   K1 ---> V1
-   *                        K2 ---> V2
-   *                        K3 ---> V3
-   *                        K6 ---> V6
-   *
-   *   (3) With {@link Level#BRANCH} level, the output would be: (only branch 
level information was returned)
-   *      LineageInfo[0]:   K4 ---> V4
-   *                        K5 ---> V5
-   *      LineageInfo[1]:   K6 ---> V6
-   *
-   * @param states All states which belong to the same dataset and share the 
same jobId.
-   * @param level {@link Level#DATASET}  only load dataset level lineage 
attributes
-   *              {@link Level#BRANCH}   only load branch level lineage 
attributes
-   *              {@link Level#All}      load all lineage attributes
-   * @return A collection of {@link LineageInfo}s per branch. When level is 
{@link Level#DATASET}, this list has only single element.
-   *
-   * @throws LineageException.LineageConflictAttributeException if two states 
have same key but not the same value.
-   */
-  public static Collection<LineageInfo> load (Collection<? extends State> 
states, Level level) throws LineageException {
-    Preconditions.checkArgument(states != null && !states.isEmpty());
-    Map<String, String> datasetMetaData = new HashMap<>();
-    Map<String, Map<String, String>> branchAggregate = new HashMap<>();
-
-    State anyOne = states.iterator().next();
-    String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, "");
-    String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN);
-
-    for (State state: states) {
-      for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) 
{
-        if (entry.getKey() instanceof String && ((String) 
entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) {
-
-          String lineageKey = ((String) entry.getKey());
-          String lineageValue = (String) entry.getValue();
-
-          if (lineageKey.startsWith(BRANCH_PREFIX)) {
-            String branchPrefixStrip = 
lineageKey.substring(BRANCH_PREFIX.length());
-            String branchId = branchPrefixStrip.substring(0, 
branchPrefixStrip.indexOf("."));
-            String key = 
branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1);
-
-            if (level == Level.BRANCH || level == Level.All) {
-              if (!branchAggregate.containsKey(branchId)) {
-                branchAggregate.put(branchId, new HashMap<>());
-              }
-              Map<String, String> branchMetaData = 
branchAggregate.get(branchId);
-              String prev = branchMetaData.put(key, lineageValue);
-              if (prev != null && !prev.equals(lineageValue)) {
-                throw new 
LineageException.LineageConflictAttributeException(lineageKey, prev, 
lineageValue);
-              }
-            }
-          } else if (lineageKey.startsWith(DATASET_PREFIX)) {
-            if (level == Level.DATASET || level == Level.All) {
-              String prev = 
datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), 
lineageValue);
-              if (prev != null && !prev.equals(lineageValue)) {
-                throw new 
LineageException.LineageConflictAttributeException(lineageKey, prev, 
lineageValue);
-              }
-            }
-          }
-        }
-      }
-    }
-
-    Collection<LineageInfo> collection = Sets.newHashSet();
-
-    if (level == Level.DATASET) {
-      ImmutableMap<String, String> metaData = ImmutableMap.<String, 
String>builder()
-          .putAll(datasetMetaData)
-          .build();
-      collection.add(new LineageInfo(urn, jobId, metaData));
-      return collection;
-    } else if (level == Level.BRANCH || level == Level.All){
-      if (branchAggregate.isEmpty()) {
-        if (level == Level.All) {
-          collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, 
String>builder().putAll(datasetMetaData).build()));
-        }
-        return collection;
-      }
-      for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: 
branchAggregate.entrySet()) {
-        String branchId = branchMetaDataEntry.getKey();
-        Map<String, String> branchMetaData = branchMetaDataEntry.getValue();
-        ImmutableMap.Builder<String, String> metaDataBuilder = 
ImmutableMap.builder();
-        if (level == Level.All) {
-          metaDataBuilder.putAll(datasetMetaData);
-        }
-        metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, 
branchId);
-        collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build()));
-      }
-
-      return collection;
-    } else {
-      throw new LineageException.LineageUnsupportedLevelException(level);
-    }
-  }
-
-  public static void setDatasetLineageAttribute (State state, String key, 
String value) {
-    state.setProp(DATASET_PREFIX + key, value);
-  }
-
-  public static void setBranchLineageAttribute (State state, int branchId, 
String key, String value) {
-    state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value);
-  }
-
-  public static Map<String, Collection<State>> aggregateByDatasetUrn 
(Collection<? extends State> states) {
-    Map<String, Collection<State>> datasetStates = new HashMap<>();
-    for (State state: states) {
-      String urn = state.getProp(LINEAGE_DATASET_URN, 
state.getProp(ConfigurationKeys.DATASET_URN_KEY, 
ConfigurationKeys.DEFAULT_DATASET_URN));
-      datasetStates.putIfAbsent(urn, new ArrayList<>());
-      Collection<State> datasetState = datasetStates.get(urn);
-      datasetState.add(state);
-    }
-    return datasetStates;
-  }
-
-  public final String getId() {
-    return Joiner.on(":::").join(this.datasetUrn, this.jobId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
index 30b77ea..90e241a 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
@@ -18,7 +18,10 @@
 package org.apache.gobblin.publisher;
 
 import java.io.IOException;
+import java.util.Set;
 
+import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.writer.partitioner.TimeBasedWriterPartitioner;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
@@ -65,4 +68,15 @@ public class TimePartitionedDataPublisher extends 
BaseDataPublisher {
       movePath(parallelRunner, workUnitState, status.getPath(), outputPath, 
branchId);
     }
   }
+
+  @Override
+  protected void publishData(WorkUnitState state, int branchId, boolean 
publishSingleTaskData, Set<Path> writerOutputPathsMoved) throws IOException {
+    super.publishData(state, branchId, publishSingleTaskData, 
writerOutputPathsMoved);
+    if (publishSingleTaskData) {
+      // Add lineage event for destination. Make sure all workunits belongs to 
the same dataset has exactly the same value
+      Path publisherOutputDir = getPublisherOutputDir(state, branchId);
+      String timePrefix = 
state.getProp(TimeBasedWriterPartitioner.WRITER_PARTITION_PREFIX, "");
+      LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, 
new Path(publisherOutputDir, timePrefix).toString());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d6c7fe79/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index ae06c67..7971fa5 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -24,8 +24,10 @@ import java.util.PriorityQueue;
 
 import com.google.common.collect.Lists;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.AbstractSource;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
@@ -41,7 +43,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
  * algorithm (used by the second level) may not achieve a good balance if the 
number of items
  * is less than 3 times the number of bins.
  *
- * In the second level, these grouped {@link WorkUnit}s are assembled into 
{@link MultiWorkunit}s
+ * In the second level, these grouped {@link WorkUnit}s are assembled into 
{@link MultiWorkUnit}s
  * using worst-fit-decreasing.
  *
  * Bi-level bin packing has two advantages: (1) reduce the number of small 
output files since it tends to pack
@@ -68,22 +70,27 @@ public class KafkaBiLevelWorkUnitPacker extends 
KafkaWorkUnitPacker {
     double avgGroupSize = totalEstDataSize / numContainers / 
getPreGroupingSizeFactor(this.state);
 
     List<MultiWorkUnit> mwuGroups = Lists.newArrayList();
-    for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) {
-      double estimatedDataSizeForTopic = 
calcTotalEstSizeForTopic(workUnitsForTopic);
+    for (Map.Entry<String, List<WorkUnit>> entry : 
workUnitsByTopic.entrySet()) {
+      double estimatedDataSizeForTopic = 
calcTotalEstSizeForTopic(entry.getValue());
       if (estimatedDataSizeForTopic < avgGroupSize) {
 
         // If the total estimated size of a topic is smaller than group size, 
put all partitions of this
         // topic in a single group.
         MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty();
-        addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup);
+        mwuGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, entry.getKey());
+        addWorkUnitsToMultiWorkUnit(entry.getValue(), mwuGroup);
         mwuGroups.add(mwuGroup);
       } else {
-
         // Use best-fit-decreasing to group workunits for a topic into 
multiple groups.
-        mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, 
avgGroupSize));
+        mwuGroups.addAll(bestFitDecreasingBinPacking(entry.getKey(), 
entry.getValue(), avgGroupSize));
       }
     }
 
+    // Add common lineage information
+    for (MultiWorkUnit multiWorkUnit: mwuGroups) {
+      LineageInfo.setDatasetLineageAttribute(multiWorkUnit, 
ConfigurationKeys.KAFKA_BROKERS, 
this.state.getProp(ConfigurationKeys.KAFKA_BROKERS, ""));
+    }
+
     List<WorkUnit> groups = squeezeMultiWorkUnits(mwuGroups);
     return worstFitDecreasingBinPacking(groups, numContainers);
   }
@@ -104,7 +111,7 @@ public class KafkaBiLevelWorkUnitPacker extends 
KafkaWorkUnitPacker {
    * Group {@link WorkUnit}s into groups. Each group is a {@link 
MultiWorkUnit}. Each group has a capacity of
    * avgGroupSize. If there's a single {@link WorkUnit} whose size is larger 
than avgGroupSize, it forms a group itself.
    */
-  private static List<MultiWorkUnit> 
bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) {
+  private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, 
List<WorkUnit> workUnits, double avgGroupSize) {
 
     // Sort workunits by data size desc
     Collections.sort(workUnits, LOAD_DESC_COMPARATOR);
@@ -116,6 +123,7 @@ public class KafkaBiLevelWorkUnitPacker extends 
KafkaWorkUnitPacker {
         addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
       } else {
         bestGroup = MultiWorkUnit.createEmpty();
+        bestGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, topic);
         addWorkUnitToMultiWorkUnit(workUnit, bestGroup);
       }
       pQueue.add(bestGroup);

Reply via email to