This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 083ad2f [GOBBLIN-1266] Refactor dataset lineage code to allow Lineage
event emission in streaming mode[]
083ad2f is described below
commit 083ad2f3f4db731bf6bb8dd0cb91799149c60282
Author: sv2000 <[email protected]>
AuthorDate: Wed Sep 23 14:46:57 2020 -0700
[GOBBLIN-1266] Refactor dataset lineage code to allow Lineage event
emission in streaming mode[]
Closes #3107 from sv2000/baseDataPublisher
---
.../gobblin/publisher/BaseDataPublisher.java | 2 +-
.../gobblin/writer/PartitionedDataWriter.java | 6 ++-
.../gobblin/metrics/event/lineage/LineageInfo.java | 51 +++++++++++++++++++++-
.../apache/gobblin/runtime/SafeDatasetCommit.java | 34 +++------------
4 files changed, 63 insertions(+), 30 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 7fb0c28..f105031 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -371,7 +371,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
* This method publishes task output data for the given {@link
WorkUnitState}, but if there are output data of
* other tasks in the same folder, it may also publish those data.
*/
- private void publishMultiTaskData(WorkUnitState state, int branchId,
Set<Path> writerOutputPathsMoved)
+ protected void publishMultiTaskData(WorkUnitState state, int branchId,
Set<Path> writerOutputPathsMoved)
throws IOException {
publishData(state, branchId, false, writerOutputPathsMoved);
addLineageInfo(state, branchId);
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index 3ebba0e..c96cd83 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -56,6 +56,7 @@ import
org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
import
org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.FlushControlMessage;
import org.apache.gobblin.stream.MetadataUpdateControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
@@ -395,6 +396,9 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
.getGlobalMetadata().getSchema());
state.setProp(WRITER_LATEST_SCHEMA, ((MetadataUpdateControlMessage)
message)
.getGlobalMetadata().getSchema());
+ } else if (message instanceof FlushControlMessage){
+ //Add Partition info to state to report partition level lineage events
on Flush
+ serializePartitionInfoToState();
}
synchronized (PartitionedDataWriter.this) {
@@ -411,7 +415,7 @@ public class PartitionedDataWriter<S, D> extends
WriterWrapper<D> implements Fin
/**
* Get the serialized key to partitions info in {@link #state}
*/
- private static String getPartitionsKey(int branchId) {
+ public static String getPartitionsKey(int branchId) {
return String.format("writer.%d.partitions", branchId);
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index cd1f373..b167bac 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -44,13 +44,16 @@ import
org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.NotConfiguredException;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.dataset.DatasetResolver;
import org.apache.gobblin.dataset.DatasetResolverFactory;
import org.apache.gobblin.dataset.Descriptor;
import org.apache.gobblin.dataset.DescriptorResolver;
import org.apache.gobblin.dataset.NoopDatasetResolver;
+import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.broker.LineageInfoFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.util.ConfigUtils;
@@ -160,7 +163,7 @@ public final class LineageInfo {
resolvedDescriptors.add(resolvedDescriptor);
}
- String destinationKey = getKey(BRANCH, branchId,
LineageEventBuilder.DESTINATION);
+ String destinationKey = getDestinationKey(branchId);
String currentDestinations = state.getProp(destinationKey);
List<Descriptor> allDescriptors = Lists.newArrayList();
if (StringUtils.isNotEmpty(currentDestinations)) {
@@ -303,4 +306,50 @@ public final class LineageInfo {
System.arraycopy(objects, 0, args, 1, objects.length);
return LineageEventBuilder.getKey(args);
}
+
+ private static String getDestinationKey(int branchId) {
+ return getKey(BRANCH, branchId, LineageEventBuilder.DESTINATION);
+ }
+ /**
+ * Remove the destination property from the state object. Used in streaming
mode, where we want to selectively purge
+ * lineage information from the state.
+ * @param state
+ * @param branchId
+ */
+ public static void removeDestinationProp(State state, int branchId) {
+ String destinationKey = getDestinationKey(branchId);
+ if (state.contains(destinationKey)) {
+ state.removeProp(destinationKey);
+ }
+ }
+
+ /**
+ * Group states by lineage event name (i.e the dataset name). Used for
de-duping LineageEvents for a given dataset.
+ * @param states
+ * @return a map of {@link WorkUnitState}s keyed by dataset name.
+ */
+ public static Map<String, Collection<WorkUnitState>>
aggregateByLineageEvent(Collection<? extends WorkUnitState> states) {
+ Map<String, Collection<WorkUnitState>> statesByEvents = Maps.newHashMap();
+ for (WorkUnitState state : states) {
+ String eventName = LineageInfo.getFullEventName(state);
+ Collection<WorkUnitState> statesForEvent =
statesByEvents.computeIfAbsent(eventName, k -> Lists.newArrayList());
+ statesForEvent.add(state);
+ }
+
+ return statesByEvents;
+ }
+
+ /**
+ * Emit lineage events for a given dataset. This method de-dupes the
LineageEvents before submitting to the Lineage
+ * Event reporter
+ * @param dataset dataset name
+ * @param states a set of {@link WorkUnitState}s associated with the dataset
+ * @param metricContext {@link MetricContext} to submit the events to, which
then notifies the Lineage EventReporter.
+ */
+ public static void submitLineageEvent(String dataset, Collection<? extends
WorkUnitState> states, MetricContext metricContext) {
+ Collection<LineageEventBuilder> events = LineageInfo.load(states);
+ // Send events
+ events.forEach(event -> EventSubmitter.submit(metricContext, event));
+ log.info(String.format("Submitted %d lineage events for dataset %s",
events.size(), dataset));
+ }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
index 5782196..f649e14 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java
@@ -29,17 +29,18 @@ import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.io.Closer;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.commit.CommitSequence;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.FailureEventBuilder;
import org.apache.gobblin.metrics.event.lineage.LineageInfo;
@@ -52,10 +53,6 @@ import org.apache.gobblin.runtime.task.TaskFactory;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.source.extractor.JobCommitPolicy;
-import lombok.Data;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-
/**
* {@link Callable} that commits a single dataset. The logic in this class is
thread-safe, however, it calls
@@ -228,11 +225,11 @@ final class SafeDatasetCommit implements Callable<Void> {
try {
if (StringUtils.isEmpty(datasetUrn)) {
// This dataset may contain different kinds of LineageEvent
- for (Map.Entry<String, Collection<TaskState>> entry :
aggregateByLineageEvent(states).entrySet()) {
- submitLineageEvent(entry.getKey(), entry.getValue());
+ for (Map.Entry<String, Collection<WorkUnitState>> entry :
LineageInfo.aggregateByLineageEvent(states).entrySet()) {
+ LineageInfo.submitLineageEvent(entry.getKey(), entry.getValue(),
metricContext);
}
} else {
- submitLineageEvent(datasetUrn, states);
+ LineageInfo.submitLineageEvent(datasetUrn, states, metricContext);
}
} finally {
// Purge lineage info from all states
@@ -242,13 +239,6 @@ final class SafeDatasetCommit implements Callable<Void> {
}
}
- private void submitLineageEvent(String dataset, Collection<TaskState>
states) {
- Collection<LineageEventBuilder> events = LineageInfo.load(states);
- // Send events
- events.forEach(event -> EventSubmitter.submit(this.metricContext, event));
- log.info(String.format("Submitted %d lineage events for dataset %s",
events.size(), dataset));
- }
-
/**
* Synchronized version of {@link #commitDataset(Collection, DataPublisher)}
used when publisher is not
* thread safe.
@@ -442,14 +432,4 @@ final class SafeDatasetCommit implements Callable<Void> {
.withDatasetState(datasetState).build());
}
- private static Map<String, Collection<TaskState>>
aggregateByLineageEvent(Collection<TaskState> states) {
- Map<String, Collection<TaskState>> statesByEvents = Maps.newHashMap();
- for (TaskState state : states) {
- String eventName = LineageInfo.getFullEventName(state);
- Collection<TaskState> statesForEvent =
statesByEvents.computeIfAbsent(eventName, k -> Lists.newArrayList());
- statesForEvent.add(state);
- }
-
- return statesByEvents;
- }
}