Repository: incubator-gobblin Updated Branches: refs/heads/master da382dbf1 -> 280b1d35e
[GOBBLIN-182] Add lineage event for query based source Closes #2034 from yukuai518/lineage2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/280b1d35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/280b1d35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/280b1d35 Branch: refs/heads/master Commit: 280b1d35edbb66ec9db19d852caa1b8ed43a34ac Parents: da382db Author: Kuai Yu <[email protected]> Authored: Thu Aug 17 11:25:11 2017 -0700 Committer: Issac Buenrostro <[email protected]> Committed: Thu Aug 17 11:25:11 2017 -0700 ---------------------------------------------------------------------- .../gobblin/lineage/LineageException.java | 39 ++++ .../org/apache/gobblin/lineage/LineageInfo.java | 234 +++++++++++++++++++ .../gobblin/publisher/BaseDataPublisher.java | 10 + .../extractor/extract/QueryBasedSource.java | 6 + .../apache/gobblin/lineage/LineageInfoTest.java | 160 +++++++++++++ .../extractor/extract/jdbc/MysqlSource.java | 13 ++ .../gobblin/runtime/SafeDatasetCommit.java | 31 +++ 7 files changed, 493 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java new file mode 100644 index 0000000..8dcf592 --- /dev/null +++ b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.lineage; + +/** + * A set of exceptions used by {@link LineageInfo} when lineage information is serialized or deserialized. + */ +public class LineageException extends Exception { + public LineageException(String message) { + super(message); + } + public static class LineageConflictAttributeException extends LineageException { + public LineageConflictAttributeException (String key, String oldValue, String newValue) { + super ("Lineage has conflict value: key=" + key + " value=[1]" + oldValue + " [2]" + newValue); + } + } + + public static class LineageUnsupportedLevelException extends LineageException { + public LineageUnsupportedLevelException (LineageInfo.Level level) { + super (level.toString() + " is not supported"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java new file mode 100644 index 0000000..8d582f2 --- /dev/null +++ b/gobblin-core/src/main/java/org/apache/gobblin/lineage/LineageInfo.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.lineage; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * A class to restore all lineage information from a {@link State} + * All lineage attributes are under LINEAGE_NAME_SPACE namespace. + * + * For example, a typical lineage attributes looks like: + * gobblin.lineage.K1 ---> V1 + * gobblin.lineage.branch.3.K2 ---> V2 + * + * K1 is dataset level attribute, K2 is branch level attribute, and branch id is 3. + */ + +@Slf4j +public class LineageInfo { + + public static final String LINEAGE_NAME_SPACE = "gobblin.lineage"; + public static final String BRANCH_ID_METADATA_KEY = "branchId"; + private static final String DATASET_PREFIX = LINEAGE_NAME_SPACE + "."; + private static final String BRANCH_PREFIX = DATASET_PREFIX + "branch."; + + @Getter + private String datasetUrn; + @Getter + private String jobId; + + private Map<String, String> lineageMetaData; + + public enum Level { + DATASET, + BRANCH, + All + } + + private LineageInfo() { + } + + private LineageInfo(String datasetUrn, String jobId, ImmutableMap<String, String> lineageMetaData) { + Preconditions.checkArgument(datasetUrn != null); + Preconditions.checkArgument(jobId != null); + this.datasetUrn = datasetUrn; + this.jobId = jobId; + this.lineageMetaData = lineageMetaData; + } + + /** + * Retrieve lineage information from a {@link State} by {@link Level} + * @param state A single state + * @param level {@link Level#DATASET} only load dataset level lineage attributes + * {@link Level#BRANCH} only load branch level lineage attributes + * {@link Level#All} load all lineage attributes + * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element. + */ + public static Collection<LineageInfo> load (State state, Level level) throws LineageException { + return load(Collections.singleton(state), level); + } + + /** + * Get all lineage meta data. + */ + public ImmutableMap<String, String> getLineageMetaData() { + return ImmutableMap.copyOf(lineageMetaData); + } + + /** + * Retrieve all lineage information from different {@link State}s. + * This requires the job id and dataset urn to be present in the state, under job.id and dataset.urn. + * A global union operation is applied to combine all <K, V> pairs from the input {@link State}s. If multiple {@link State}s + * share the same K, but have conflicting V, a {@link LineageException} is thrown. + * + * {@link Level} can control if a dataset level or branch level information should be used. When {@link Level#All} is + * specified, all levels of information will be returned; otherwise only specified level of information will be returned. + * + * For instance, assume we have below input states: + * State[0]: gobblin.lineage.K1 ---> V1 + * gobblin.lineage.K2 ---> V2 + * gobblin.lineage.branch.1.K4 ---> V4 + * State[1]: gobblin.lineage.K2 ---> V2 + * gobblin.lineage.K3 ---> V3 + * gobblin.lineage.branch.1.K4 ---> V4 + * gobblin.lineage.branch.1.K5 ---> V5 + * gobblin.lineage.branch.2.K6 ---> V6 + * + * (1) With {@link Level#DATASET} level, the output would be: + * LinieageInfo[0]: K1 ---> V1 + * K2 ---> V2 + * K3 ---> V3 + * (2) With {@link Level#All} level, the output would be: (because there are two branches, so there are two LineageInfo) + * LineageInfo[0]: K1 ---> V1 + * K2 ---> V2 + * K3 ---> V3 + * K4 ---> V4 + * K5 ---> V5 + * + * LineageInfo[1]: K1 ---> V1 + * K2 ---> V2 + * K3 ---> V3 + * K6 ---> V6 + * + * (3) With {@link Level#BRANCH} level, the output would be: (only branch level information was returned) + * LineageInfo[0]: K4 ---> V4 + * K5 ---> V5 + * LineageInfo[1]: K6 ---> V6 + * + * @param states All states which belong to the same dataset and share the same jobId. + * @param level {@link Level#DATASET} only load dataset level lineage attributes + * {@link Level#BRANCH} only load branch level lineage attributes + * {@link Level#All} load all lineage attributes + * @return A collection of {@link LineageInfo}s per branch. When level is {@link Level#DATASET}, this list has only single element. + * + * @throws LineageException.LineageConflictAttributeException if two states have same key but not the same value. + */ + public static Collection<LineageInfo> load (Collection<? extends State> states, Level level) throws LineageException { + Preconditions.checkArgument(states != null && !states.isEmpty()); + Map<String, String> datasetMetaData = new HashMap<>(); + Map<String, Map<String, String>> branchAggregate = new HashMap<>(); + + State anyOne = states.iterator().next(); + String jobId = anyOne.getProp(ConfigurationKeys.JOB_ID_KEY, ""); + String urn = anyOne.getProp(ConfigurationKeys.DATASET_URN_KEY, ConfigurationKeys.DEFAULT_DATASET_URN); + + for (State state: states) { + for (Map.Entry<Object, Object> entry : state.getProperties().entrySet()) { + if (entry.getKey() instanceof String && ((String) entry.getKey()).startsWith(LINEAGE_NAME_SPACE)) { + + String lineageKey = ((String) entry.getKey()); + String lineageValue = (String) entry.getValue(); + + if (lineageKey.startsWith(BRANCH_PREFIX)) { + String branchPrefixStrip = lineageKey.substring(BRANCH_PREFIX.length()); + String branchId = branchPrefixStrip.substring(0, branchPrefixStrip.indexOf(".")); + String key = branchPrefixStrip.substring(branchPrefixStrip.indexOf(".") + 1); + + if (level == Level.BRANCH || level == Level.All) { + if (!branchAggregate.containsKey(branchId)) { + branchAggregate.put(branchId, new HashMap<>()); + } + Map<String, String> branchMetaData = branchAggregate.get(branchId); + String prev = branchMetaData.put(key, lineageValue); + if (prev != null && !prev.equals(lineageValue)) { + throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue); + } + } + } else if (lineageKey.startsWith(DATASET_PREFIX)) { + if (level == Level.DATASET || level == Level.All) { + String prev = datasetMetaData.put(lineageKey.substring(DATASET_PREFIX.length()), lineageValue); + if (prev != null && !prev.equals(lineageValue)) { + throw new LineageException.LineageConflictAttributeException(lineageKey, prev, lineageValue); + } + } + } + } + } + } + + Collection<LineageInfo> collection = Sets.newHashSet(); + + if (level == Level.DATASET) { + ImmutableMap<String, String> metaData = ImmutableMap.<String, String>builder() + .putAll(datasetMetaData) + .build(); + collection.add(new LineageInfo(urn, jobId, metaData)); + return collection; + } else if (level == Level.BRANCH || level == Level.All){ + if (branchAggregate.isEmpty()) { + if (level == Level.All) { + collection.add(new LineageInfo(urn, jobId, ImmutableMap.<String, String>builder().putAll(datasetMetaData).build())); + } + return collection; + } + for (Map.Entry<String, Map<String, String>> branchMetaDataEntry: branchAggregate.entrySet()) { + String branchId = branchMetaDataEntry.getKey(); + Map<String, String> branchMetaData = branchMetaDataEntry.getValue(); + ImmutableMap.Builder<String, String> metaDataBuilder = ImmutableMap.builder(); + if (level == Level.All) { + metaDataBuilder.putAll(datasetMetaData); + } + metaDataBuilder.putAll(branchMetaData).put(BRANCH_ID_METADATA_KEY, branchId); + collection.add(new LineageInfo(urn, jobId, metaDataBuilder.build())); + } + + return collection; + } else { + throw new LineageException.LineageUnsupportedLevelException(level); + } + } + + public static void setDatasetLineageAttribute (State state, String key, String value) { + state.setProp(DATASET_PREFIX + key, value); + } + + public static void setBranchLineageAttribute (State state, int branchId, String key, String value) { + state.setProp(BRANCH_PREFIX + Joiner.on(".").join(branchId, key), value); + } + + public final String getId() { + return Joiner.on(":::").join(this.datasetUrn, this.jobId); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java index f0d0e32..19314e5 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.Set; import org.apache.commons.io.IOUtils; +import org.apache.gobblin.lineage.LineageInfo; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -97,6 +98,9 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { protected final int parallelRunnerThreads; protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap(); protected final Set<Path> publisherOutputDirs = Sets.newHashSet(); + + public static final String PUBLISH_OUTOUT = "publish.output"; + /* Each partition in each branch may have separate metadata. The metadata mergers are responsible * for aggregating this information from all workunits so it can be published. */ @@ -328,6 +332,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { if (!replaceFinalOutputDir) { addWriterOutputToExistingDir(writerOutputDir, publisherOutputDir, state, branchId, parallelRunner); writerOutputPathsMoved.add(writerOutputDir); + addPublisherLineageInfo(state, branchId, publisherOutputDir.toString()); return; } @@ -342,9 +347,14 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { movePath(parallelRunner, state, writerOutputDir, publisherOutputDir, branchId); writerOutputPathsMoved.add(writerOutputDir); + addPublisherLineageInfo(state, branchId, publisherOutputDir.toString()); } } + protected void addPublisherLineageInfo(WorkUnitState state, int branchId, String output) { + LineageInfo.setBranchLineageAttribute(state, branchId, PUBLISH_OUTOUT, output); + } + /** * Get the output directory path this {@link BaseDataPublisher} will write to. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java index fa5a360..d94dede 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.lineage.LineageInfo; import org.slf4j.MDC; import com.google.common.base.Optional; @@ -234,6 +235,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> { workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName()); workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName()); workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION); + addLineageSourceInfo (state, sourceEntity, workunit); partition.serialize(workunit); workUnits.add(workunit); } @@ -241,6 +243,10 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> { return workUnits; } + protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) { + workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, entity.destTableName); + } + protected Set<SourceEntity> getFilteredSourceEntities(SourceState state) { Set<SourceEntity> unfilteredEntities = getSourceEntities(state); return getFilteredSourceEntitiesHelper(state, unfilteredEntities); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java new file mode 100644 index 0000000..2a7ea15 --- /dev/null +++ b/gobblin-core/src/test/java/org/apache/gobblin/lineage/LineageInfoTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.lineage; + +import java.util.Collection; +import java.util.Map; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.junit.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import gobblin.configuration.State; + + +public class LineageInfoTest { + + @Test + public void testDatasetLevel () { + Collection<LineageInfo> collection = null; + try { + collection = LineageInfo.load(createTestStates(), LineageInfo.Level.DATASET); + } catch (LineageException e) { + Assert.fail(e.toString()); + } + + Assert.assertEquals(1, collection.size()); + LineageInfo info = collection.iterator().next(); + ImmutableMap<String, String> map = info.getLineageMetaData(); + Assert.assertEquals(3, map.size()); + Assert.assertEquals("V1", map.get("K1")); + Assert.assertEquals("V2", map.get("K2")); + Assert.assertEquals("V3", map.get("K3")); + } + + @Test + public void testBranchLevel () { + Collection<LineageInfo> collection = null; + try { + collection = LineageInfo.load(createTestStates(), LineageInfo.Level.BRANCH); + } catch (LineageException e) { + Assert.fail(e.toString()); + } + + Assert.assertEquals(2, collection.size()); + + for (LineageInfo info: collection) { + Map<String, String> map = info.getLineageMetaData(); + String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY); + if (branchId.equals("1")) { + Assert.assertEquals(3, map.size()); // include BRANCH_ID_METADATA_KEY + Assert.assertEquals("V4", map.get("K4")); + Assert.assertEquals("V5", map.get("K5")); + } + + if (branchId.equals("2")) { + Assert.assertEquals(2, map.size()); // include BRANCH_ID_METADATA_KEY + Assert.assertEquals("V6", map.get("K6")); + } + } + } + + @Test + public void testAllLevel () { + Collection<LineageInfo> collection = null; + try { + collection = LineageInfo.load(createTestStates(), LineageInfo.Level.All); + } catch (LineageException e) { + Assert.fail(e.toString()); + } + + Assert.assertEquals(2, collection.size()); + for (LineageInfo info: collection) { + Map<String, String> map = info.getLineageMetaData(); + String branchId = map.get(LineageInfo.BRANCH_ID_METADATA_KEY); + if (branchId.equals("1")) { + Assert.assertEquals(6, map.size()); // include BRANCH_ID_METADATA_KEY + Assert.assertEquals("V1", map.get("K1")); + Assert.assertEquals("V2", map.get("K2")); + Assert.assertEquals("V3", map.get("K3")); + Assert.assertEquals("V4", map.get("K4")); + Assert.assertEquals("V5", map.get("K5")); + } + + if (branchId.equals("2")) { + Assert.assertEquals(5, map.size()); // include BRANCH_ID_METADATA_KEY + Assert.assertEquals("V1", map.get("K1")); + Assert.assertEquals("V2", map.get("K2")); + Assert.assertEquals("V3", map.get("K3")); + Assert.assertEquals("V6", map.get("K6")); + } + } + } + + @Test + public void testNoBranchInfo () { + State state = new State(); + state.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456"); + state.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent"); + LineageInfo.setDatasetLineageAttribute(state,"K1", "V1"); + LineageInfo.setDatasetLineageAttribute(state,"K2", "V2"); + Collection<LineageInfo> collection = null; + try { + collection = LineageInfo.load(Lists.newArrayList(state), LineageInfo.Level.BRANCH); + } catch (LineageException e) { + Assert.fail(e.toString()); + } + + Assert.assertEquals(true, collection.isEmpty()); + } + + private Collection<State> createTestStates() { + /* + * State[0]: gobblin.lineage.K1 ---> V1 + * gobblin.lineage.K2 ---> V2 + * gobblin.lineage.branch.1.K4 ---> V4 + * State[1]: gobblin.lineage.K2 ---> V2 + * gobblin.lineage.K3 ---> V3 + * gobblin.lineage.branch.1.K4 ---> V4 + * gobblin.lineage.branch.1.K5 ---> V5 + * gobblin.lineage.branch.2.K6 ---> V6 + */ + State state_1 = new State(); + state_1.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456"); + state_1.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent"); + LineageInfo.setDatasetLineageAttribute(state_1,"K1", "V1"); + LineageInfo.setDatasetLineageAttribute(state_1,"K2", "V2"); + LineageInfo.setBranchLineageAttribute(state_1, 1, "K4", "V4"); + + + State state_2 = new State(); + state_2.setProp(ConfigurationKeys.JOB_ID_KEY, "test_job_id_123456"); + state_2.setProp(ConfigurationKeys.DATASET_URN_KEY, "PageViewEvent"); + + LineageInfo.setDatasetLineageAttribute(state_2,"K2", "V2"); + LineageInfo.setDatasetLineageAttribute(state_2,"K3", "V3"); + LineageInfo.setBranchLineageAttribute(state_2, 1, "K4", "V4"); + LineageInfo.setBranchLineageAttribute(state_2, 1, "K5", "V5"); + LineageInfo.setBranchLineageAttribute(state_2, 2, "K6", "V6"); + + return Lists.newArrayList(state_1, state_2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java index 20a0823..57fdedd 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java @@ -17,10 +17,14 @@ package org.apache.gobblin.source.extractor.extract.jdbc; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.lineage.LineageInfo; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.source.extractor.exception.ExtractPrepareException; import java.io.IOException; +import org.apache.gobblin.source.workunit.WorkUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,4 +54,13 @@ public class MysqlSource extends QueryBasedSource<JsonArray, JsonElement> { } return extractor; } + + protected void addLineageSourceInfo (SourceState sourceState, SourceEntity entity, WorkUnit workUnit) { + super.addLineageSourceInfo(sourceState, entity, workUnit); + String host = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME); + String port = sourceState.getProp(ConfigurationKeys.SOURCE_CONN_PORT); + String database = sourceState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA); + String connectionUrl = "jdbc:mysql://" + host.trim() + ":" + port + "/" + database.trim(); + LineageInfo.setDatasetLineageAttribute(workUnit, "connectionUrl", connectionUrl); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/280b1d35/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java index 363adf3..9521575 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java @@ -33,8 +33,13 @@ import org.apache.gobblin.commit.CommitStep; import org.apache.gobblin.commit.DeliverySemantics; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.lineage.LineageException; +import org.apache.gobblin.lineage.LineageInfo; +import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.publisher.CommitSequencePublisher; import org.apache.gobblin.publisher.DataPublisher; +import org.apache.gobblin.publisher.NoopPublisher; import org.apache.gobblin.publisher.UnpublishedHandling; import org.apache.gobblin.runtime.commit.DatasetStateCommitStep; import org.apache.gobblin.runtime.task.TaskFactory; @@ -159,6 +164,7 @@ final class SafeDatasetCommit implements Callable<Void> { } else if (canPersistStates) { persistDatasetState(datasetUrn, datasetState); } + } catch (IOException | RuntimeException ioe) { log.error(String .format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobContext.getJobId()), @@ -169,6 +175,30 @@ final class SafeDatasetCommit implements Callable<Void> { return null; } + private void submitLineageEvent(Collection<TaskState> states) { + if (states.size() == 0) { + return; + } + + TaskState oneWorkUnitState = states.iterator().next(); + if (oneWorkUnitState.contains(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE) && oneWorkUnitState.getProp(ConfigurationKeys.JOB_DATA_PUBLISHER_TYPE).equals( + NoopPublisher.class.getName())) { + // if no publisher configured, each task should be responsible for sending lineage event. + return; + } + + try { + Collection<LineageInfo> branchLineages = LineageInfo.load(states, LineageInfo.Level.All); + EventSubmitter submitter = new EventSubmitter.Builder(Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class), + LineageInfo.LINEAGE_NAME_SPACE).build(); + for (LineageInfo info: branchLineages) { + submitter.submit(info.getId(), info.getLineageMetaData()); + } + } catch (LineageException e) { + log.error ("Lineage event submission failed due to :" + e.toString()); + } + } + /** * Synchronized version of {@link #commitDataset(Collection, DataPublisher)} used when publisher is not * thread safe. @@ -186,6 +216,7 @@ final class SafeDatasetCommit implements Callable<Void> { try { publisher.publish(taskStates); + submitLineageEvent(taskStates); } catch (Throwable t) { log.error("Failed to commit dataset", t); setTaskFailureException(taskStates, t);
