This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 083ad2f  [GOBBLIN-1266] Refactor dataset lineage code to allow Lineage 
event emission in streaming mode[]
083ad2f is described below

commit 083ad2f3f4db731bf6bb8dd0cb91799149c60282
Author: sv2000 <[email protected]>
AuthorDate: Wed Sep 23 14:46:57 2020 -0700

    [GOBBLIN-1266] Refactor dataset lineage code to allow Lineage event 
emission in streaming mode[]
    
    Closes #3107 from sv2000/baseDataPublisher
---
 .../gobblin/publisher/BaseDataPublisher.java       |  2 +-
 .../gobblin/writer/PartitionedDataWriter.java      |  6 ++-
 .../gobblin/metrics/event/lineage/LineageInfo.java | 51 +++++++++++++++++++++-
 .../apache/gobblin/runtime/SafeDatasetCommit.java  | 34 +++------------
 4 files changed, 63 insertions(+), 30 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 7fb0c28..f105031 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -371,7 +371,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
    * This method publishes task output data for the given {@link 
WorkUnitState}, but if there are output data of
    * other tasks in the same folder, it may also publish those data.
    */
-  private void publishMultiTaskData(WorkUnitState state, int branchId, 
Set<Path> writerOutputPathsMoved)
+  protected void publishMultiTaskData(WorkUnitState state, int branchId, 
Set<Path> writerOutputPathsMoved)
       throws IOException {
     publishData(state, branchId, false, writerOutputPathsMoved);
     addLineageInfo(state, branchId);
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 3ebba0e..c96cd83 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -56,6 +56,7 @@ import 
org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
 import 
org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
 import org.apache.gobblin.records.ControlMessageHandler;
 import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.FlushControlMessage;
 import org.apache.gobblin.stream.MetadataUpdateControlMessage;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.stream.StreamEntity;
@@ -395,6 +396,9 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
             .getGlobalMetadata().getSchema());
         state.setProp(WRITER_LATEST_SCHEMA, ((MetadataUpdateControlMessage) 
message)
             .getGlobalMetadata().getSchema());
+      } else if (message instanceof FlushControlMessage){
+        //Add Partition info to state to report partition level lineage events 
on Flush
+        serializePartitionInfoToState();
       }
 
       synchronized (PartitionedDataWriter.this) {
@@ -411,7 +415,7 @@ public class PartitionedDataWriter<S, D> extends 
WriterWrapper<D> implements Fin
   /**
    * Get the serialized key to partitions info in {@link #state}
    */
-  private static String getPartitionsKey(int branchId) {
+  public static String getPartitionsKey(int branchId) {
     return String.format("writer.%d.partitions", branchId);
   }
 
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index cd1f373..b167bac 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -44,13 +44,16 @@ import 
org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.NotConfiguredException;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.dataset.DatasetResolver;
 import org.apache.gobblin.dataset.DatasetResolverFactory;
 import org.apache.gobblin.dataset.Descriptor;
 import org.apache.gobblin.dataset.DescriptorResolver;
 import org.apache.gobblin.dataset.NoopDatasetResolver;
+import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.broker.LineageInfoFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -160,7 +163,7 @@ public final class LineageInfo {
         resolvedDescriptors.add(resolvedDescriptor);
       }
 
-      String destinationKey = getKey(BRANCH, branchId, 
LineageEventBuilder.DESTINATION);
+      String destinationKey = getDestinationKey(branchId);
       String currentDestinations = state.getProp(destinationKey);
       List<Descriptor> allDescriptors = Lists.newArrayList();
       if (StringUtils.isNotEmpty(currentDestinations)) {
@@ -303,4 +306,50 @@ public final class LineageInfo {
     System.arraycopy(objects, 0, args, 1, objects.length);
     return LineageEventBuilder.getKey(args);
   }
+
+  private static String getDestinationKey(int branchId) {
+    return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+  }
+  /**
+   * Remove the destination property from the state object. Used in streaming 
mode, where we want to selectively purge
+   * lineage information from the state.
+   * @param state
+   * @param branchId
+   */
+  public static void removeDestinationProp(State state, int branchId) {
+    String destinationKey = getDestinationKey(branchId);
+    if (state.contains(destinationKey)) {
+      state.removeProp(destinationKey);
+    }
+  }
+
+  /**
+   * Group states by lineage event name (i.e the dataset name). Used for 
de-duping LineageEvents for a given dataset.
+   * @param states
+   * @return a map of {@link WorkUnitState}s keyed by dataset name.
+   */
+  public static Map<String, Collection<WorkUnitState>> 
aggregateByLineageEvent(Collection<? extends WorkUnitState> states) {
+    Map<String, Collection<WorkUnitState>> statesByEvents = Maps.newHashMap();
+    for (WorkUnitState state : states) {
+      String eventName = LineageInfo.getFullEventName(state);
+      Collection<WorkUnitState> statesForEvent = 
statesByEvents.computeIfAbsent(eventName, k -> Lists.newArrayList());
+      statesForEvent.add(state);
+    }
+
+    return statesByEvents;
+  }
+
+  /**
+   * Emit lineage events for a given dataset. This method de-dupes the 
LineageEvents before submitting to the Lineage
+   * Event reporter
+   * @param dataset dataset name
+   * @param states a set of {@link WorkUnitState}s associated with the dataset
+   * @param metricContext {@link MetricContext} to submit the events to, which 
then notifies the Lineage EventReporter.
+   */
+  public static void submitLineageEvent(String dataset, Collection<? extends 
WorkUnitState> states, MetricContext metricContext) {
+    Collection<LineageEventBuilder> events = LineageInfo.load(states);
+    // Send events
+    events.forEach(event -> EventSubmitter.submit(metricContext, event));
+    log.info(String.format("Submitted %d lineage events for dataset %s", 
events.size(), dataset));
+  }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 5782196..f649e14 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -29,17 +29,18 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.io.Closer;
 
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.commit.CommitSequence;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.FailureEventBuilder;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
@@ -52,10 +53,6 @@ import org.apache.gobblin.runtime.task.TaskFactory;
 import org.apache.gobblin.runtime.task.TaskUtils;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
 
-import lombok.Data;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * {@link Callable} that commits a single dataset. The logic in this class is 
thread-safe, however, it calls
@@ -228,11 +225,11 @@ final class SafeDatasetCommit implements Callable<Void> {
     try {
       if (StringUtils.isEmpty(datasetUrn)) {
         // This dataset may contain different kinds of LineageEvent
-        for (Map.Entry<String, Collection<TaskState>> entry : 
aggregateByLineageEvent(states).entrySet()) {
-          submitLineageEvent(entry.getKey(), entry.getValue());
+        for (Map.Entry<String, Collection<WorkUnitState>> entry : 
LineageInfo.aggregateByLineageEvent(states).entrySet()) {
+          LineageInfo.submitLineageEvent(entry.getKey(), entry.getValue(), 
metricContext);
         }
       } else {
-        submitLineageEvent(datasetUrn, states);
+        LineageInfo.submitLineageEvent(datasetUrn, states, metricContext);
       }
     } finally {
       // Purge lineage info from all states
@@ -242,13 +239,6 @@ final class SafeDatasetCommit implements Callable<Void> {
     }
   }
 
-  private void submitLineageEvent(String dataset, Collection<TaskState> 
states) {
-    Collection<LineageEventBuilder> events = LineageInfo.load(states);
-    // Send events
-    events.forEach(event -> EventSubmitter.submit(this.metricContext, event));
-    log.info(String.format("Submitted %d lineage events for dataset %s", 
events.size(), dataset));
-  }
-
   /**
    * Synchronized version of {@link #commitDataset(Collection, DataPublisher)} 
used when publisher is not
    * thread safe.
@@ -442,14 +432,4 @@ final class SafeDatasetCommit implements Callable<Void> {
         .withDatasetState(datasetState).build());
   }
 
-  private static Map<String, Collection<TaskState>> 
aggregateByLineageEvent(Collection<TaskState> states) {
-    Map<String, Collection<TaskState>> statesByEvents = Maps.newHashMap();
-    for (TaskState state : states) {
-      String eventName = LineageInfo.getFullEventName(state);
-      Collection<TaskState> statesForEvent = 
statesByEvents.computeIfAbsent(eventName, k -> Lists.newArrayList());
-      statesForEvent.add(state);
-    }
-
-    return statesByEvents;
-  }
 }

Reply via email to