Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a34a81a42 -> 3e229db98


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
new file mode 100644
index 0000000..4e711b9
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.event.lineage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.GobblinEventBuilder;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+
+public class LineageEventTest {
+  @Test
+  public void testEvent() {
+    final String topic = "testTopic";
+    State state0 = new State();
+    DatasetDescriptor source = new DatasetDescriptor("kafka", topic);
+    LineageInfo.setSource(source, state0);
+    DatasetDescriptor destination0 = new DatasetDescriptor("hdfs", 
"/data/dbchanges");
+    LineageInfo.putDestination(destination0, 0, state0);
+    DatasetDescriptor destination1 = new DatasetDescriptor("mysql", 
"kafka.testTopic");
+    LineageInfo.putDestination(destination1, 1, state0);
+
+    Map<String, LineageEventBuilder> events = LineageInfo.load(state0);
+    verify(events.get("0"), topic, source, destination0, 0);
+    verify(events.get("1"), topic, source, destination1, 1);
+
+    State state1 = new State();
+    LineageInfo.setSource(source, state1);
+    List<State> states = Lists.newArrayList();
+    states.add(state0);
+    states.add(state1);
+
+    // Test only full fledged lineage events are loaded
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+      Assert.assertTrue(eventsList.size() == 2);
+      Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0"));
+      Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1"));
+    } catch (LineageException e) {
+      Assert.fail("Unexpected exception");
+    }
+
+    // There are 3 full fledged lineage events
+    DatasetDescriptor destination2 = new DatasetDescriptor("mysql", 
"kafka.testTopic2");
+    LineageInfo.putDestination(destination2, 2, state1);
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+      Assert.assertTrue(eventsList.size() == 3);
+      Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0"));
+      Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1"));
+      verify(getLineageEvent(eventsList, 2), topic, source, destination2, 2);
+    } catch (LineageException e) {
+      Assert.fail("Unexpected exception");
+    }
+
+    // Throw conflict exception when there is a conflict on a branch between 2 
states
+    LineageInfo.putDestination(destination2, 0, state1);
+    boolean hasLineageException = false;
+    try {
+      Collection<LineageEventBuilder> eventsList = LineageInfo.load(states);
+    } catch (LineageException e) {
+      Assert.assertTrue(e instanceof LineageException.ConflictException);
+      hasLineageException = true;
+    }
+    Assert.assertTrue(hasLineageException);
+  }
+
+  private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> 
events, int branchId) {
+    for (LineageEventBuilder event : events) {
+      if 
(event.getDestination().getMetadata().get(LineageInfo.BRANCH).equals(String.valueOf(branchId)))
 {
+        return event;
+      }
+    }
+    return null;
+  }
+
+  private void verify(LineageEventBuilder event, String name, 
DatasetDescriptor source, DatasetDescriptor destination, int branchId) {
+    Assert.assertEquals(event.getName(), name);
+    Assert.assertEquals(event.getNamespace(), 
LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);
+    
Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE), 
LineageEventBuilder.LINEAGE_EVENT_TYPE);
+    Assert.assertTrue(event.getSource().equals(source));
+
+    DatasetDescriptor updatedDestination = new DatasetDescriptor(destination);
+    updatedDestination.addMetadata(LineageInfo.BRANCH, 
String.valueOf(branchId));
+    Assert.assertTrue(event.getDestination().equals(updatedDestination));
+
+    // It only has eventType info
+    Assert.assertTrue(event.getMetadata().size() == 1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 45223fb..8cd25c2 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -30,7 +30,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +55,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import 
org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.extract.EventBasedSource;
 import 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker;
 import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys;
@@ -549,8 +552,9 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
     workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, 
offsets.getLatestOffset());
 
     // Add lineage info
-    workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, 
partition.getTopicName());
-    LineageInfo.setDatasetLineageAttribute(workUnit, 
ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers);
+    DatasetDescriptor source = new 
DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
+    source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers);
+    LineageInfo.setSource(source, workUnit);
 
     LOG.info(String.format("Created workunit for partition %s: 
lowWatermark=%d, highWatermark=%d, range=%d", partition,
         offsets.getStartOffset(), offsets.getLatestOffset(), 
offsets.getLatestOffset() - offsets.getStartOffset()));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
 
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
index 57fdedd..e2292c7 100644
--- 
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
+++ 
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
@@ -19,12 +19,13 @@ package org.apache.gobblin.source.extractor.extract.jdbc;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
 import java.io.IOException;
 
-import org.apache.gobblin.source.workunit.WorkUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +34,7 @@ import com.google.gson.JsonElement;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
 import org.apache.gobblin.source.jdbc.MysqlExtractor;
+import org.apache.gobblin.source.workunit.WorkUnit;
 
 
 /**
@@ -55,12 +57,14 @@ public class MysqlSource extends 
QueryBasedSource<JsonArray, JsonElement> {
     return extractor;
   }
 
-  protected void addLineageSourceInfo (SourceState sourceState, SourceEntity 
entity, WorkUnit workUnit) {
-    super.addLineageSourceInfo(sourceState, entity, workUnit);
+  protected void addLineageSourceInfo(SourceState sourceState, SourceEntity 
entity, WorkUnit workUnit) {
     String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
     String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT);
     String database = 
sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA);
     String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" + 
database.trim();
-    LineageInfo.setDatasetLineageAttribute(workUnit, "connectionUrl", 
connectionUrl);
+    DatasetDescriptor source =
+        new DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, database + "." 
+ entity.getSourceEntityName());
+    source.addMetadata(DatasetConstants.CONNECTION_URL, connectionUrl);
+    LineageInfo.setSource(source, workUnit);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index d6a1b58..7ff9bb1 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -22,24 +22,27 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
 
 import org.apache.gobblin.commit.CommitSequence;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.lineage.LineageException;
-import org.apache.gobblin.lineage.LineageInfo;
+import org.apache.gobblin.metrics.event.lineage.LineageException;
+import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.FailureEventBuilder;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.publisher.CommitSequencePublisher;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.publisher.UnpublishedHandling;
@@ -170,8 +173,7 @@ final class SafeDatasetCommit implements Callable<Void> {
       try {
         finalizeDatasetState(datasetState, datasetUrn);
         maySubmitFailureEvent(datasetState);
-        submitLineageEvent(datasetState.getTaskStates());
-
+        maySubmitLineageEvent(datasetState);
         if (commitSequenceBuilder.isPresent()) {
           buildAndExecuteCommitSequence(commitSequenceBuilder.get(), 
datasetState, datasetUrn);
           datasetState.setState(JobState.RunningState.COMMITTED);
@@ -197,38 +199,45 @@ final class SafeDatasetCommit implements Callable<Void> {
     }
   }
 
-  private void submitLineageEvent(Collection<TaskState> states) {
-    if (states.size() == 0) {
-      return;
+  private void maySubmitLineageEvent(JobState.DatasetState datasetState) {
+    Collection<TaskState> allStates = datasetState.getTaskStates();
+    Collection<TaskState> states = Lists.newArrayList();
+    // Filter out failed states or states that don't have lineage info
+    for (TaskState state : allStates) {
+      if (state.getWorkingState() == WorkUnitState.WorkingState.COMMITTED &&
+          LineageInfo.hasLineageInfo(state)) {
+        states.add(state);
+      }
     }
-
-    TaskState oneWorkUnitState = states.iterator().next();
-    if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) {
-      // Do nothing if the dataset is not configured with lineage info
+    if (states.size() == 0) {
       return;
     }
 
     try {
-      // Aggregate states by lineage.dataset.urn, in case datasetUrn may be 
set to empty so that all task states falls into one empty dataset.
-      // FixMe: once all dataset.urn attribues are set properly, we don't need 
this aggregation.
-      Collection<Collection<State>> datasetStates = 
LineageInfo.aggregateByDatasetUrn(states).values();
-      for (Collection<State> dataState: datasetStates) {
-        Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, 
LineageInfo.Level.All);
-        EventSubmitter submitter = new EventSubmitter.Builder(metricContext, 
LineageInfo.LINEAGE_NAME_SPACE).build();
-        for (LineageInfo info: branchLineages) {
-          submitter.submit(info.getId(), info.getLineageMetaData());
+      if (StringUtils.isEmpty(datasetUrn)) {
+        // This dataset may contain different kinds of LineageEvent
+        for (Collection<TaskState> collection : 
aggregateByLineageEvent(states)) {
+          submitLineageEvent(collection);
         }
+      } else {
+        submitLineageEvent(states);
       }
     } catch (LineageException e) {
-      log.error ("Lineage event submission failed due to :" + e.toString());
+      log.error("Lineage event submission failed due to :" + e.toString());
     } finally {
-      for (TaskState taskState: states) {
-        // Remove lineage info from the state to avoid sending duplicate 
lineage events in the next run
-        taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE);
+      // Purge lineage info from all states
+      for (TaskState taskState : allStates) {
+        LineageInfo.purgeLineageInfo(taskState);
       }
     }
   }
 
+  private void submitLineageEvent(Collection<TaskState> states) throws 
LineageException {
+    Collection<LineageEventBuilder> events = LineageInfo.load(states);
+    // Send events
+    events.forEach(event -> event.submit(metricContext));
+  }
+
   /**
    * Synchronized version of {@link #commitDataset(Collection, DataPublisher)} 
used when publisher is not
    * thread safe.
@@ -415,4 +424,15 @@ final class SafeDatasetCommit implements Callable<Void> {
     return Optional.of(new 
DatasetStateCommitStep.Builder<>().withProps(datasetState).withDatasetUrn(datasetUrn)
         .withDatasetState(datasetState).build());
   }
+
+  private static Collection<Collection<TaskState>> 
aggregateByLineageEvent(Collection<TaskState> states) {
+    Map<String, Collection<TaskState>> statesByEvents = Maps.newHashMap();
+    for (TaskState state : states) {
+      String eventName = LineageInfo.getFullEventName(state);
+      Collection<TaskState> statesForEvent = 
statesByEvents.computeIfAbsent(eventName, k -> Lists.newArrayList());
+      statesForEvent.add(state);
+    }
+
+    return statesByEvents.values();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template 
b/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
index 4cdf991..7380257 100644
--- 
a/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
+++ 
b/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template
@@ -2,3 +2,5 @@ 
source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource
 
writer.builder.class="org.apache.gobblin.writer.test.GobblinTestEventBusWriter$Builder"
 
 extract.table.type=APPEND_ONLY
+
+data.publisher.final.dir=/tmp

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
----------------------------------------------------------------------
diff --git 
a/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
 
b/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
index 83a97de..b5f8fef 100644
--- 
a/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
+++ 
b/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties
@@ -24,6 +24,6 @@ job.lock.enabled=false
 
state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store
 
writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging
 
writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output
-
+data.publisher.final.dir=/tmp
 
 source.class=org.apache.gobblin.TestSkipWorkUnitsSource

Reply via email to