Repository: incubator-gobblin Updated Branches: refs/heads/master a34a81a42 -> 3e229db98
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java new file mode 100644 index 0000000..4e711b9 --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.event.lineage; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; + + +public class LineageEventTest { + @Test + public void testEvent() { + final String topic = "testTopic"; + State state0 = new State(); + DatasetDescriptor source = new DatasetDescriptor("kafka", topic); + LineageInfo.setSource(source, state0); + DatasetDescriptor destination0 = new DatasetDescriptor("hdfs", "/data/dbchanges"); + LineageInfo.putDestination(destination0, 0, state0); + DatasetDescriptor destination1 = new DatasetDescriptor("mysql", "kafka.testTopic"); + LineageInfo.putDestination(destination1, 1, state0); + + Map<String, LineageEventBuilder> events = LineageInfo.load(state0); + verify(events.get("0"), topic, source, destination0, 0); + verify(events.get("1"), topic, source, destination1, 1); + + State state1 = new State(); + LineageInfo.setSource(source, state1); + List<State> states = Lists.newArrayList(); + states.add(state0); + states.add(state1); + + // Test only full fledged lineage events are loaded + try { + Collection<LineageEventBuilder> eventsList = LineageInfo.load(states); + Assert.assertTrue(eventsList.size() == 2); + Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0")); + Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1")); + } catch (LineageException e) { + Assert.fail("Unexpected exception"); + } + + // There are 3 full fledged lineage events + DatasetDescriptor destination2 = new DatasetDescriptor("mysql", "kafka.testTopic2"); + LineageInfo.putDestination(destination2, 2, state1); + try { + Collection<LineageEventBuilder> eventsList = LineageInfo.load(states); + Assert.assertTrue(eventsList.size() == 3); + Assert.assertEquals(getLineageEvent(eventsList, 0), events.get("0")); + Assert.assertEquals(getLineageEvent(eventsList, 1), events.get("1")); + verify(getLineageEvent(eventsList, 2), topic, source, destination2, 2); + } catch (LineageException e) { + Assert.fail("Unexpected exception"); + } + + // Throw conflict exception when there is a conflict on a branch between 2 states + LineageInfo.putDestination(destination2, 0, state1); + boolean hasLineageException = false; + try { + Collection<LineageEventBuilder> eventsList = LineageInfo.load(states); + } catch (LineageException e) { + Assert.assertTrue(e instanceof LineageException.ConflictException); + hasLineageException = true; + } + Assert.assertTrue(hasLineageException); + } + + private LineageEventBuilder getLineageEvent(Collection<LineageEventBuilder> events, int branchId) { + for (LineageEventBuilder event : events) { + if (event.getDestination().getMetadata().get(LineageInfo.BRANCH).equals(String.valueOf(branchId))) { + return event; + } + } + return null; + } + + private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor destination, int branchId) { + Assert.assertEquals(event.getName(), name); + Assert.assertEquals(event.getNamespace(), LineageEventBuilder.LIENAGE_EVENT_NAMESPACE); + Assert.assertEquals(event.getMetadata().get(GobblinEventBuilder.EVENT_TYPE), LineageEventBuilder.LINEAGE_EVENT_TYPE); + Assert.assertTrue(event.getSource().equals(source)); + + DatasetDescriptor updatedDestination = new DatasetDescriptor(destination); + updatedDestination.addMetadata(LineageInfo.BRANCH, String.valueOf(branchId)); + Assert.assertTrue(event.getDestination().equals(updatedDestination)); + + // It only has eventType info + Assert.assertTrue(event.getMetadata().size() == 1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 45223fb..8cd25c2 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -30,7 +30,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; -import org.apache.gobblin.lineage.LineageInfo; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +55,7 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient; import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient.GobblinKafkaConsumerClientFactory; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.extract.EventBasedSource; import org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker; import org.apache.gobblin.source.extractor.limiter.LimiterConfigurationKeys; @@ -549,8 +552,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { workUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, offsets.getLatestOffset()); // Add lineage info - workUnit.setProp(LineageInfo.LINEAGE_DATASET_URN, partition.getTopicName()); - LineageInfo.setDatasetLineageAttribute(workUnit, ConfigurationKeys.KAFKA_BROKERS, kafkaBrokers); + DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName()); + source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers); + LineageInfo.setSource(source, workUnit); LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition, offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset())); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java index 57fdedd..e2292c7 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java @@ -19,12 +19,13 @@ package org.apache.gobblin.source.extractor.extract.jdbc; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; -import org.apache.gobblin.lineage.LineageInfo; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.source.extractor.exception.ExtractPrepareException; import java.io.IOException; -import org.apache.gobblin.source.workunit.WorkUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,7 @@ import com.google.gson.JsonElement; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.source.extractor.extract.QueryBasedSource; import org.apache.gobblin.source.jdbc.MysqlExtractor; +import org.apache.gobblin.source.workunit.WorkUnit; /** @@ -55,12 +57,14 @@ public class MysqlSource extends QueryBasedSource<JsonArray, JsonElement> { return extractor; } - protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) { - super.addLineageSourceInfo(sourceState, entity, workUnit); + protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) { String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME); String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT); String database = sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA); String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" + database.trim(); - LineageInfo.setDatasetLineageAttribute(workUnit, "connectionUrl", connectionUrl); + DatasetDescriptor source = + new DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, database + "." + entity.getSourceEntityName()); + source.addMetadata(DatasetConstants.CONNECTION_URL, connectionUrl); + LineageInfo.setSource(source, workUnit); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index d6a1b58..7ff9bb1 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -22,24 +22,27 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; +import org.apache.commons.lang.StringUtils; + import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.io.Closer; import org.apache.gobblin.commit.CommitSequence; import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.commit.DeliverySemantics; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.lineage.LineageException; -import org.apache.gobblin.lineage.LineageInfo; +import org.apache.gobblin.metrics.event.lineage.LineageException; +import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.FailureEventBuilder; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.publisher.CommitSequencePublisher; import org.apache.gobblin.publisher.DataPublisher; import org.apache.gobblin.publisher.UnpublishedHandling; @@ -170,8 +173,7 @@ final class SafeDatasetCommit implements Callable<Void> { try { finalizeDatasetState(datasetState, datasetUrn); maySubmitFailureEvent(datasetState); - submitLineageEvent(datasetState.getTaskStates()); - + maySubmitLineageEvent(datasetState); if (commitSequenceBuilder.isPresent()) { buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn); datasetState.setState(JobState.RunningState.COMMITTED); @@ -197,38 +199,45 @@ final class SafeDatasetCommit implements Callable<Void> { } } - private void submitLineageEvent(Collection<TaskState> states) { - if (states.size() == 0) { - return; + private void maySubmitLineageEvent(JobState.DatasetState datasetState) { + Collection<TaskState> allStates = datasetState.getTaskStates(); + Collection<TaskState> states = Lists.newArrayList(); + // Filter out failed states or states that don't have lineage info + for (TaskState state : allStates) { + if (state.getWorkingState() == WorkUnitState.WorkingState.COMMITTED && + LineageInfo.hasLineageInfo(state)) { + states.add(state); + } } - - TaskState oneWorkUnitState = states.iterator().next(); - if (!oneWorkUnitState.contains(LineageInfo.LINEAGE_DATASET_URN)) { - // Do nothing if the dataset is not configured with lineage info + if (states.size() == 0) { return; } try { - // Aggregate states by lineage.dataset.urn, in case datasetUrn may be set to empty so that all task states falls into one empty dataset. - // FixMe: once all dataset.urn attribues are set properly, we don't need this aggregation. - Collection<Collection<State>> datasetStates = LineageInfo.aggregateByDatasetUrn(states).values(); - for (Collection<State> dataState: datasetStates) { - Collection<LineageInfo> branchLineages = LineageInfo.load(dataState, LineageInfo.Level.All); - EventSubmitter submitter = new EventSubmitter.Builder(metricContext, LineageInfo.LINEAGE_NAME_SPACE).build(); - for (LineageInfo info: branchLineages) { - submitter.submit(info.getId(), info.getLineageMetaData()); + if (StringUtils.isEmpty(datasetUrn)) { + // This dataset may contain different kinds of LineageEvent + for (Collection<TaskState> collection : aggregateByLineageEvent(states)) { + submitLineageEvent(collection); } + } else { + submitLineageEvent(states); } } catch (LineageException e) { - log.error ("Lineage event submission failed due to :" + e.toString()); + log.error("Lineage event submission failed due to :" + e.toString()); } finally { - for (TaskState taskState: states) { - // Remove lineage info from the state to avoid sending duplicate lineage events in the next run - taskState.removePropsWithPrefix(LineageInfo.LINEAGE_NAME_SPACE); + // Purge lineage info from all states + for (TaskState taskState : allStates) { + LineageInfo.purgeLineageInfo(taskState); } } } + private void submitLineageEvent(Collection<TaskState> states) throws LineageException { + Collection<LineageEventBuilder> events = LineageInfo.load(states); + // Send events + events.forEach(event -> event.submit(metricContext)); + } + /** * Synchronized version of {@link #commitDataset(Collection, DataPublisher)} used when publisher is not * thread safe. @@ -415,4 +424,15 @@ final class SafeDatasetCommit implements Callable<Void> { return Optional.of(new DatasetStateCommitStep.Builder<>().withProps(datasetState).withDatasetUrn(datasetUrn) .withDatasetState(datasetState).build()); } + + private static Collection<Collection<TaskState>> aggregateByLineageEvent(Collection<TaskState> states) { + Map<String, Collection<TaskState>> statesByEvents = Maps.newHashMap(); + for (TaskState state : states) { + String eventName = LineageInfo.getFullEventName(state); + Collection<TaskState> statesForEvent = statesByEvents.computeIfAbsent(eventName, k -> Lists.newArrayList()); + statesForEvent.add(state); + } + + return statesByEvents.values(); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template b/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template index 4cdf991..7380257 100644 --- a/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template +++ b/gobblin-runtime/src/test/resources/templates/textFileBasedSourceTest.template @@ -2,3 +2,5 @@ source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource writer.builder.class="org.apache.gobblin.writer.test.GobblinTestEventBusWriter$Builder" extract.table.type=APPEND_ONLY + +data.publisher.final.dir=/tmp http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/3e229db9/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties ---------------------------------------------------------------------- diff --git a/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties b/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties index 83a97de..b5f8fef 100644 --- a/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties +++ b/gobblin-test-harness/src/test/resources/runtime_test/skip_workunits_test.properties @@ -24,6 +24,6 @@ job.lock.enabled=false state.store.dir=./gobblin-test-harness/src/test/resources/runtime_test/state_store writer.staging.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_staging writer.output.dir=./gobblin-test-harness/src/test/resources/runtime_test/writer_output - +data.publisher.final.dir=/tmp source.class=org.apache.gobblin.TestSkipWorkUnitsSource
