Repository: incubator-gobblin Updated Branches: refs/heads/master bd17f1384 -> d2e43542a
[GOBBLIN-278] Fix sending lineage event for KafkaSource Closes #2131 from zxcware/lineage Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d2e43542 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d2e43542 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d2e43542 Branch: refs/heads/master Commit: d2e43542ab7ee3e5814755a8b37e86f07f9115ea Parents: bd17f13 Author: zhchen <[email protected]> Authored: Tue Oct 10 09:19:15 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Oct 10 09:19:15 2017 -0700 ---------------------------------------------------------------------- .../org/apache/gobblin/configuration/State.java | 25 +++++++++++++++ .../gobblin/configuration/WorkUnitState.java | 14 +++++++++ .../org/apache/gobblin/lineage/LineageInfo.java | 2 +- .../apache/gobblin/configuration/StateTest.java | 12 +++++++ .../extractor/extract/kafka/KafkaSource.java | 13 +++++--- .../packer/KafkaBiLevelWorkUnitPacker.java | 19 +++-------- .../workunit/packer/KafkaWorkUnitPacker.java | 33 +++++++------------- .../gobblin/runtime/SafeDatasetCommit.java | 25 +++++++++------ 8 files changed, 92 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java index 20b6e17..63ced0a 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/State.java @@ -20,6 +20,7 @@ package org.apache.gobblin.configuration; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -500,6 +501,30 @@ public class State implements WritableShim { } /** + * Remove all properties with a certain keyPrefix + * + * @param prefix key prefix + */ + public void removePropsWithPrefix(String prefix) { + this.specProperties.entrySet().removeIf(entry -> ((String) entry.getKey()).startsWith(prefix)); + + Properties newCommonProperties = null; + for (Object key: this.commonProperties.keySet()) { + if (((String)key).startsWith(prefix)) { + if (newCommonProperties == null) { + newCommonProperties = new Properties(); + newCommonProperties.putAll(this.commonProperties); + } + newCommonProperties.remove(key); + } + } + + if (newCommonProperties != null) { + this.commonProperties = newCommonProperties; + } + } + + /** * @deprecated Use {@link #getProp(String)} */ @Deprecated http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java index af02d01..0b40399 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/WorkUnitState.java @@ -367,6 +367,20 @@ public class WorkUnitState extends State { return super.contains(key) || this.workUnit.contains(key) || this.jobState.contains(key); } + @Override + public void removeProp(String key) { + super.removeProp(key); + this.workUnit.removeProp(key); + this.jobState.removeProp(key); + } + + @Override + public void removePropsWithPrefix(String prefix) { + super.removePropsWithPrefix(prefix); + this.workUnit.removePropsWithPrefix(prefix); + this.jobState.removePropsWithPrefix(prefix); + } + /** * Get the {@link org.apache.gobblin.source.workunit.Extract} associated with the {@link WorkUnit}. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java index 90473d4..7af71df 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/lineage/LineageInfo.java @@ -49,10 +49,10 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class LineageInfo { - public static final String LINEAGE_DATASET_URN = "lineage.dataset.urn"; public static final String LINEAGE_NAME_SPACE = "gobblin.lineage"; public static final String BRANCH_ID_METADATA_KEY = "branchId"; private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + "."; + public static final String LINEAGE_DATASET_URN = DATASET_PREFIX + "dataset.urn"; private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch."; @Getter http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java b/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java index bcfa9a7..35cd2eb 100644 --- a/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java +++ b/gobblin-api/src/test/java/org/apache/gobblin/configuration/StateTest.java @@ -81,4 +81,16 @@ public class StateTest { Assert.fail("Concurrency test failed with first exception: " + ExceptionUtils.getFullStackTrace(this.exceptions.poll())); } } + + @Test + public void testRemovePropsWithPrefix() { + final State state = new State(); + final String prefix = "prefix"; + for (int i = 0; i < 10; i++) { + state.setProp("prefix." + i, i); + } + Assert.assertTrue(state.getPropertyNames().size() == 10); + state.removePropsWithPrefix(prefix); + Assert.assertTrue(state.getPropertyNames().size() == 0); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 56f81e1..e1494f7 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; +import org.apache.gobblin.lineage.LineageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,6 +133,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { private Extract.TableType tableType; private String extractNamespace; private boolean isFullExtract; + private String kafkaBrokers; private boolean shouldEnableDatasetStateStore; private AtomicBoolean isDatasetStateEnabled = new AtomicBoolean(false); private Set<String> topicsToProcess; @@ -172,7 +174,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { extractNamespace = KafkaSource.DEFAULT_NAMESPACE_NAME; } isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY); - + kafkaBrokers = state.getProp(ConfigurationKeys.KAFKA_BROKERS, ""); this.shouldEnableDatasetStateStore = state.getPropAsBoolean(GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE, DEFAULT_GOBBLIN_KAFKA_SHOULD_ENABLE_DATASET_STATESTORE); @@ -538,10 +540,6 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { } WorkUnit workUnit = WorkUnit.create(extract); - if (topicSpecificState.isPresent()) { - workUnit.addAll(topicSpecificState.get()); - } - workUnit.setProp(TOPIC_NAME, partition.getTopicName()); addDatasetUrnOptionally(workUnit); workUnit.setProp(PARTITION_ID, partition.getId()); @@ -549,6 +547,11 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString()); workUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, offsets.getStartOffset()); workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset()); + + // Add lineage info + workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, partition.getTopicName()); + LineageInfo.setDatasetLineageAttribute(workUnit, ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers); + LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition, offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset())); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java index 7971fa5..9c134b8 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java @@ -24,10 +24,8 @@ import java.util.PriorityQueue; import com.google.common.collect.Lists; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; -import org.apache.gobblin.lineage.LineageInfo; import org.apache.gobblin.source.extractor.extract.AbstractSource; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; @@ -70,27 +68,21 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker { double avgGroupSize = totalEstDataSize / numContainers / getPreGroupingSizeFactor(this.state); List<MultiWorkUnit> mwuGroups = Lists.newArrayList(); - for (Map.Entry<String, List<WorkUnit>> entry : workUnitsByTopic.entrySet()) { - double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(entry.getValue()); + for (List<WorkUnit> workUnitsForTopic : workUnitsByTopic.values()) { + double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic); if (estimatedDataSizeForTopic < avgGroupSize) { // If the total estimated size of a topic is smaller than group size, put all partitions of this // topic in a single group. MultiWorkUnit mwuGroup = MultiWorkUnit.createEmpty(); - mwuGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, entry.getKey()); - addWorkUnitsToMultiWorkUnit(entry.getValue(), mwuGroup); + addWorkUnitsToMultiWorkUnit(workUnitsForTopic, mwuGroup); mwuGroups.add(mwuGroup); } else { // Use best-fit-decreasing to group workunits for a topic into multiple groups. - mwuGroups.addAll(bestFitDecreasingBinPacking(entry.getKey(), entry.getValue(), avgGroupSize)); + mwuGroups.addAll(bestFitDecreasingBinPacking(workUnitsForTopic, avgGroupSize)); } } - // Add common lineage information - for (MultiWorkUnit multiWorkUnit: mwuGroups) { - LineageInfo.setDatasetLineageAttribute(multiWorkUnit, ConfigurationKeys.KAFKA_BROKERS, this.state.getProp(ConfigurationKeys.KAFKA_BROKERS, "")); - } - List<WorkUnit> groups = squeezeMultiWorkUnits(mwuGroups); return worstFitDecreasingBinPacking(groups, numContainers); } @@ -111,7 +103,7 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker { * Group {@link WorkUnit}s into groups. Each group is a {@link MultiWorkUnit}. Each group has a capacity of * avgGroupSize. If there's a single {@link WorkUnit} whose size is larger than avgGroupSize, it forms a group itself. */ - private static List<MultiWorkUnit> bestFitDecreasingBinPacking(String topic, List<WorkUnit> workUnits, double avgGroupSize) { + private static List<MultiWorkUnit> bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) { // Sort workunits by data size desc Collections.sort(workUnits, LOAD_DESC_COMPARATOR); @@ -123,7 +115,6 @@ public class KafkaBiLevelWorkUnitPacker extends KafkaWorkUnitPacker { addWorkUnitToMultiWorkUnit(workUnit, bestGroup); } else { bestGroup = MultiWorkUnit.createEmpty(); - bestGroup.setProp(LineageInfo.LINEAGE_DATASET_URN, topic); addWorkUnitToMultiWorkUnit(workUnit, bestGroup); } pQueue.add(bestGroup); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java index 38d050d..8d03f4f 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java @@ -81,25 +81,11 @@ public abstract class KafkaWorkUnitPacker { protected final AbstractSource<?, ?> source; protected final SourceState state; - protected final Extract.TableType tableType; - protected final String extractNameSpace; - protected final boolean isFullExtract; protected final KafkaWorkUnitSizeEstimator sizeEstimator; protected KafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState state) { this.source = source; this.state = state; - if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) { - String tableTypeStr = state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, - KafkaSource.DEFAULT_TABLE_TYPE.toString()); - tableType = Extract.TableType.valueOf(tableTypeStr); - extractNameSpace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, KafkaSource.DEFAULT_NAMESPACE_NAME); - } else { - // To be compatible, reject table type and namespace configuration keys as previous implementation - tableType = KafkaSource.DEFAULT_TABLE_TYPE; - extractNameSpace = KafkaSource.DEFAULT_NAMESPACE_NAME; - } - isFullExtract = state.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY); this.sizeEstimator = getWorkUnitSizeEstimator(); } @@ -227,13 +213,18 @@ public abstract class KafkaWorkUnitPacker { List<KafkaPartition> partitions = getPartitionsFromMultiWorkUnit(multiWorkUnit); Preconditions.checkArgument(!partitions.isEmpty(), "There must be at least one partition in the multiWorkUnit"); - Extract extract = this.source.createExtract(tableType, extractNameSpace, partitions.get(0).getTopicName()); - if (isFullExtract) { - extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true); - } - WorkUnit workUnit = WorkUnit.create(extract, interval); + // Squeeze all partitions from the multiWorkUnit into of one the work units, which can be any one + WorkUnit workUnit = multiWorkUnit.getWorkUnits().get(0); + // Update interval + workUnit.removeProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY); + workUnit.removeProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY); + workUnit.setWatermarkInterval(interval); + // Remove the original partition information + workUnit.removeProp(KafkaSource.PARTITION_ID); + workUnit.removeProp(KafkaSource.LEADER_ID); + workUnit.removeProp(KafkaSource.LEADER_HOSTANDPORT); + // Add combined partitions information populateMultiPartitionWorkUnit(partitions, workUnit); - workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, multiWorkUnit.getProp(ESTIMATED_WORKUNIT_SIZE)); LOG.info(String.format("Created MultiWorkUnit for partitions %s", partitions)); return workUnit; } @@ -243,9 +234,7 @@ public abstract class KafkaWorkUnitPacker { */ private static void populateMultiPartitionWorkUnit(List<KafkaPartition> partitions, WorkUnit workUnit) { Preconditions.checkArgument(!partitions.isEmpty(), "There should be at least one partition"); - workUnit.setProp(KafkaSource.TOPIC_NAME, partitions.get(0).getTopicName()); GobblinMetrics.addCustomTagToState(workUnit, new Tag<>("kafkaTopic", partitions.get(0).getTopicName())); - workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partitions.get(0).getTopicName()); for (int i = 0; i < partitions.size(); i++) { workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, i), partitions.get(i).getId()); workUnit.setProp(KafkaUtils.getPartitionPropName(KafkaSource.LEADER_ID, i), http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d2e43542/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 0a8984b..07d167b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -37,18 +37,18 @@ import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.lineage.LineageException; import org.apache.gobblin.lineage.LineageInfo; +import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.publisher.CommitSequencePublisher; import org.apache.gobblin.publisher.DataPublisher; -import org.apache.gobblin.publisher.NoopPublisher; import org.apache.gobblin.publisher.UnpublishedHandling; import org.apache.gobblin.runtime.commit.DatasetStateCommitStep; import org.apache.gobblin.runtime.task.TaskFactory; import org.apache.gobblin.runtime.task.TaskUtils; import org.apache.gobblin.source.extractor.JobCommitPolicy; -import lombok.AllArgsConstructor; import lombok.Data; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -57,7 +57,7 @@ import lombok.extern.slf4j.Slf4j; * {@link DataPublisher#publish(Collection)}. This class is thread-safe if and only if the implementation of * {@link DataPublisher} used is also thread-safe. */ -@AllArgsConstructor +@RequiredArgsConstructor @Slf4j final class SafeDatasetCommit implements Callable<Void> { @@ -71,6 +71,8 @@ final class SafeDatasetCommit implements Callable<Void> { private final boolean isMultithreaded; private final JobContext jobContext; + private MetricContext metricContext; + @Override public Void call() throws Exception { @@ -78,6 +80,8 @@ final class SafeDatasetCommit implements Callable<Void> { log.info(this.datasetUrn + " have been committed."); return null; } + metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class); + finalizeDatasetStateBeforeCommit(this.datasetState); Class<? extends DataPublisher> dataPublisherClass; try (Closer closer = Closer.create()) { @@ -159,6 +163,7 @@ final class SafeDatasetCommit implements Callable<Void> { } finally { try { finalizeDatasetState(datasetState, datasetUrn); + submitLineageEvent(datasetState.getTaskStates()); if (commitSequenceBuilder.isPresent()) { buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn); datasetState.setState(JobState.RunningState.COMMITTED); @@ -182,9 +187,8 @@ final class SafeDatasetCommit implements Callable<Void> { } TaskState oneWorkUnitState = states.iterator().next(); - if (oneWorkUnitState.contains(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE) && oneWorkUnitState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE).equals( - NoopPublisher.class.getName())) { - // if no publisher configured, each task should be responsible for sending lineage event. + if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) { + // Do nothing if the dataset is not configured with lineage info return; } @@ -194,14 +198,18 @@ final class SafeDatasetCommit implements Callable<Void> { Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values(); for (Collection<State> dataState: datasetStates) { Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All); - EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class), - LineageInfo.LINEAGE_NAME_SPACE).build(); + EventSubmitter submitter = new EventSubmitter.Builder(metricContext, LineageInfo.LINEAGE_NAME_SPACE).build(); for (LineageInfo info: branchLineages) { submitter.submit(info.getId(), info.getLineageMetaData()); } } } catch (LineageException e) { log.error ("Lineage event submission failed due to :" + e.toString()); + } finally { + for (TaskState taskState: states) { + // Remove lineage info from the state to avoid sending duplicate lineage events in the next run + taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE); + } } } @@ -222,7 +230,6 @@ final class SafeDatasetCommit implements Callable<Void> { try { publisher.publish(taskStates); - submitLineageEvent(taskStates); } catch (Throwable t) { log.error("Failed to commit dataset", t); setTaskFailureException(taskStates, t);
