Repository: incubator-gobblin Updated Branches: refs/heads/master 4c06d3be7 -> 95e15f003
[GOBBLIN-334] Implement SharedResourceFactory for LineageInfo Closes #2187 from zxcware/share Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/95e15f00 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/95e15f00 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/95e15f00 Branch: refs/heads/master Commit: 95e15f00386a2509d1631fcdbb5b41e7e1265460 Parents: 4c06d3b Author: zhchen <[email protected]> Authored: Fri Dec 8 11:31:22 2017 -0800 Committer: Issac Buenrostro <[email protected]> Committed: Fri Dec 8 11:31:22 2017 -0800 ---------------------------------------------------------------------- .../broker/StringNameSharedResourceKey.java | 56 +++++++++++++++++++ .../gobblin/configuration/SourceState.java | 6 ++ .../gobblin/dataset/DatasetResolverFactory.java | 11 ++-- .../gobblin/dataset/NoopDatasetResolver.java | 1 + .../gobblin/publisher/BaseDataPublisher.java | 15 ++++- .../extractor/extract/QueryBasedSource.java | 4 +- .../publisher/BaseDataPublisherTest.java | 29 ++++++++-- .../data/management/copy/CopySource.java | 9 ++- .../copy/publisher/CopyDataPublisher.java | 15 ++++- .../metrics/broker/LineageInfoFactory.java | 54 ++++++++++++++++++ .../metrics/event/lineage/LineageInfo.java | 58 +++++++++++++++++--- .../metrics/event/lineage/LineageEventTest.java | 36 +++++++++--- .../extractor/extract/kafka/KafkaSource.java | 8 ++- .../extractor/extract/jdbc/MysqlSource.java | 4 +- .../org/apache/gobblin/runtime/JobContext.java | 1 + .../org/apache/gobblin/broker/EmptyKey.java | 3 + 16 files changed, 275 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-api/src/main/java/org/apache/gobblin/broker/StringNameSharedResourceKey.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/broker/StringNameSharedResourceKey.java b/gobblin-api/src/main/java/org/apache/gobblin/broker/StringNameSharedResourceKey.java new file mode 100644 index 0000000..e7e29e2 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/broker/StringNameSharedResourceKey.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.broker; + +import org.apache.gobblin.broker.iface.SharedResourceKey; + + +/** + * A {@link SharedResourceKey} with only a string name + */ +public class StringNameSharedResourceKey implements SharedResourceKey { + private final String name; + + public StringNameSharedResourceKey(String name) { + this.name = name; + } + + @Override + public String toConfigurationKey() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + StringNameSharedResourceKey that = (StringNameSharedResourceKey) o; + + return name != null ? name.equals(that.name) : that.name == null; + } + + @Override + public int hashCode() { + return name != null ? name.hashCode() : 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java index d64dcb6..fd772ea 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java @@ -37,10 +37,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.source.workunit.Extract; import lombok.Getter; +import lombok.Setter; /** @@ -67,6 +70,9 @@ public class SourceState extends State { @Getter private final List<WorkUnitState> previousWorkUnitStates = Lists.newArrayList(); + @Getter @Setter + private SharedResourcesBroker<GobblinScopeTypes> broker; + /** * Default constructor. */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java index eb1b887..ac1cfe9 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java @@ -17,15 +17,14 @@ package org.apache.gobblin.dataset; -import org.apache.gobblin.configuration.State; - +import com.typesafe.config.Config; /** * A factory that creates an instance of {@link DatasetResolver} */ public interface DatasetResolverFactory { - String NAMESPACE = "DatasetResolverFactory"; - String CLASS = NAMESPACE + "." + "class"; - - DatasetResolver createResolver(State state); + /** + * Create a {@link DatasetResolver} instance + */ + DatasetResolver createResolver(Config config); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java index c54011a..e678d84 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java @@ -25,6 +25,7 @@ import org.apache.gobblin.configuration.State; */ public class NoopDatasetResolver implements DatasetResolver { public static final NoopDatasetResolver INSTANCE = new NoopDatasetResolver(); + public static final String FACTORY = "NOOP"; private NoopDatasetResolver() {} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java index 89bab2b..2ddcd76 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java @@ -54,6 +54,7 @@ import com.typesafe.config.ConfigRenderOptions; import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.dataset.DatasetConstants; @@ -109,6 +110,7 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { protected final int parallelRunnerThreads; protected final Map<String, ParallelRunner> parallelRunners = Maps.newHashMap(); protected final Set<Path> publisherOutputDirs = Sets.newHashSet(); + protected final Optional<LineageInfo> lineageInfo; /* Each partition in each branch may have separate metadata. The metadata mergers are responsible * for aggregating this information from all workunits so it can be published. @@ -144,6 +146,15 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { conf.set(key, this.getState().getProp(key)); } + // Extract LineageInfo from state + if (state instanceof SourceState) { + lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker()); + } else if (state instanceof WorkUnitState) { + lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable()); + } else { + lineageInfo = Optional.absent(); + } + this.numBranches = this.getState().getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1); this.shouldRetry = this.getState().getPropAsBoolean(PUBLISH_RETRY_ENABLED, false); @@ -284,7 +295,9 @@ public class BaseDataPublisher extends SingleTaskDataPublisher { private void addLineageInfo(WorkUnitState state, int branchId) { DatasetDescriptor destination = createDestinationDescriptor(state, branchId); - LineageInfo.putDestination(destination, branchId, state); + if (this.lineageInfo.isPresent()) { + this.lineageInfo.get().putDestination(destination, branchId, state); + } } protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, int branchId) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java index d074f3a..29c98d9 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java @@ -47,7 +47,6 @@ import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.configuration.WorkUnitState.WorkingState; -import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.JobCommitPolicy; import org.apache.gobblin.source.extractor.partition.Partition; @@ -93,6 +92,8 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> { */ public static final Integer CURRENT_WORK_UNIT_STATE_VERSION = 3; + protected Optional<LineageInfo> lineageInfo; + /** A class that encapsulates a source entity (aka dataset) to be processed */ @Data public static final class SourceEntity { @@ -168,6 +169,7 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> { @Override public List<WorkUnit> getWorkunits(SourceState state) { initLogger(state); + lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); List<WorkUnit> workUnits = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java index 159786b..d46d6e3 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java @@ -40,7 +40,14 @@ import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; - +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; +import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.broker.iface.SubscopedBrokerBuilder; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; @@ -48,6 +55,7 @@ import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.metadata.MetadataMerger; import org.apache.gobblin.metadata.types.GlobalMetadata; import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.ForkOperatorUtils; import org.apache.gobblin.writer.FsDataWriter; import org.apache.gobblin.writer.FsWriterMetrics; @@ -528,8 +536,9 @@ public class BaseDataPublisherTest { public void testPublishSingleTask() throws IOException { WorkUnitState state = buildTaskState(1); + LineageInfo lineageInfo = LineageInfo.getLineageInfo(state.getTaskBroker()).get(); DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic"); - LineageInfo.setSource(source, state); + lineageInfo.setSource(source, state); BaseDataPublisher publisher = new BaseDataPublisher(state); publisher.publishData(state); Assert.assertTrue(state.contains("gobblin.event.lineage.branch.0.destination")); @@ -541,9 +550,10 @@ public class BaseDataPublisherTest { throws IOException { WorkUnitState state1 = buildTaskState(2); WorkUnitState state2 = buildTaskState(2); + LineageInfo lineageInfo = LineageInfo.getLineageInfo(state1.getTaskBroker()).get(); DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic"); - LineageInfo.setSource(source, state1); - LineageInfo.setSource(source, state2); + lineageInfo.setSource(source, state1); + lineageInfo.setSource(source, state2); BaseDataPublisher publisher = new BaseDataPublisher(state1); publisher.publishData(ImmutableList.of(state1, state2)); Assert.assertTrue(state1.contains("gobblin.event.lineage.branch.0.destination")); @@ -623,7 +633,16 @@ public class BaseDataPublisherTest { } private WorkUnitState buildTaskState(int numBranches) { - WorkUnitState state = new WorkUnitState(); + SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory + .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker + .newSubscopedBuilder(new JobScopeInstance("LineageEventTest", String.valueOf(System.currentTimeMillis()))) + .build(); + SharedResourcesBroker<GobblinScopeTypes> taskBroker = jobBroker + .newSubscopedBuilder(new TaskScopeInstance("LineageEventTestTask" + String.valueOf(System.currentTimeMillis()))) + .build(); + + WorkUnitState state = new WorkUnitState(WorkUnit.createEmpty(), new State(), taskBroker); state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "namespace"); state.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "table"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java index a74d425..615d6ad 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java @@ -118,6 +118,8 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { public MetricContext metricContext; + protected Optional<LineageInfo> lineageInfo; + /** * <ul> * Does the following: @@ -139,6 +141,7 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { public List<WorkUnit> getWorkunits(final SourceState state) { this.metricContext = Instrumented.getMetricContext(state, CopySource.class); + this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); try { @@ -320,8 +323,10 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> { * a DatasetFinder. Consequently, the source and destination dataset for the CopyableFile lineage are expected * to be set by the same logic */ - if (copyableFile.getSourceDataset() != null && copyableFile.getDestinationDataset() != null) { - LineageInfo.setSource(copyableFile.getSourceDataset(), workUnit); + if (lineageInfo.isPresent() && + copyableFile.getSourceDataset() != null && + copyableFile.getDestinationDataset() != null) { + lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index e1ccf65..79b2b6a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy.publisher; +import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.metrics.event.sla.SlaEventKeys; import java.io.IOException; @@ -81,6 +82,7 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl private final FileSystem fs; protected final EventSubmitter eventSubmitter; protected final RecoveryHelper recoveryHelper; + protected final Optional<LineageInfo> lineageInfo; /** * Build a new {@link CopyDataPublisher} from {@link State}. The constructor expects the following to be set in the @@ -93,6 +95,15 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl */ public CopyDataPublisher(State state) throws IOException { super(state); + // Extract LineageInfo from state + if (state instanceof SourceState) { + lineageInfo = LineageInfo.getLineageInfo(((SourceState) state).getBroker()); + } else if (state instanceof WorkUnitState) { + lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) state).getTaskBrokerNullable()); + } else { + lineageInfo = Optional.absent(); + } + String uri = this.state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, ConfigurationKeys.LOCAL_FS_URI); this.fs = FileSystem.get(URI.create(uri), WriterUtils.getFsConfiguration(state)); @@ -214,7 +225,9 @@ public class CopyDataPublisher extends DataPublisher implements UnpublishedHandl if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() != null) { fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath()); } - LineageInfo.putDestination(copyableFile.getDestinationDataset(), 0, wus); + if (lineageInfo.isPresent()) { + lineageInfo.get().putDestination(copyableFile.getDestinationDataset(), 0, wus); + } } if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) { datasetOriginTimestamp = copyableFile.getOriginTimestamp(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/broker/LineageInfoFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/broker/LineageInfoFactory.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/broker/LineageInfoFactory.java new file mode 100644 index 0000000..4259582 --- /dev/null +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/broker/LineageInfoFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metrics.broker; + +import org.apache.gobblin.broker.EmptyKey; +import org.apache.gobblin.broker.ResourceInstance; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.ConfigView; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.ScopedConfigView; +import org.apache.gobblin.broker.iface.SharedResourceFactory; +import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; + + +/** + * A {@link SharedResourceFactory} to share a job level {@link LineageInfo} instance + */ +public class LineageInfoFactory implements SharedResourceFactory<LineageInfo, EmptyKey, GobblinScopeTypes> { + public static final String FACTORY_NAME = "lineageInfo"; + + @Override + public String getName() { + return FACTORY_NAME; + } + + @Override + public SharedResourceFactoryResponse<LineageInfo> createResource(SharedResourcesBroker<GobblinScopeTypes> broker, + ScopedConfigView<GobblinScopeTypes, EmptyKey> config) + throws NotConfiguredException { + return new ResourceInstance<>(new LineageInfo(config.getConfig())); + } + + @Override + public GobblinScopeTypes getAutoScope(SharedResourcesBroker<GobblinScopeTypes> broker, ConfigView<GobblinScopeTypes, EmptyKey> config) { + return GobblinScopeTypes.JOB; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java index 9a3cc11..8ac9ee3 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java @@ -22,18 +22,29 @@ import java.util.Map; import java.util.Set; import com.google.common.base.Joiner; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.Gson; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.broker.EmptyKey; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.NotConfiguredException; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.DatasetDescriptor; import org.apache.gobblin.dataset.DatasetResolver; import org.apache.gobblin.dataset.DatasetResolverFactory; import org.apache.gobblin.dataset.NoopDatasetResolver; +import org.apache.gobblin.metrics.broker.LineageInfoFactory; +import org.apache.gobblin.util.ConfigUtils; /** @@ -53,6 +64,7 @@ import org.apache.gobblin.dataset.NoopDatasetResolver; * <p> * The general flow is: * <ol> + * <li> get a {@link LineageInfo} instance with {@link LineageInfo#getLineageInfo(SharedResourcesBroker)}</li> * <li> source sets its {@link DatasetDescriptor} to each work unit </li> * <li> destination puts its {@link DatasetDescriptor} to the work unit </li> * <li> load and send all lineage events from all states </li> @@ -62,11 +74,22 @@ import org.apache.gobblin.dataset.NoopDatasetResolver; */ @Slf4j public final class LineageInfo { + private static final String DATASET_RESOLVER_FACTORY = "datasetResolverFactory"; + private static final String DATASET_RESOLVER_CONFIG_NAMESPACE = "datasetResolver"; + private static final String BRANCH = "branch"; private static final Gson GSON = new Gson(); private static final String NAME_KEY = "name"; - private LineageInfo() { + private static final Config FALLBACK = + ConfigFactory.parseMap(ImmutableMap.<String, Object>builder() + .put(DATASET_RESOLVER_FACTORY, NoopDatasetResolver.FACTORY) + .build()); + + private final DatasetResolver resolver; + + public LineageInfo(Config config) { + resolver = getResolver(config.withFallback(FALLBACK)); } /** @@ -80,8 +103,7 @@ public final class LineageInfo { * @param state state about a {@link org.apache.gobblin.source.workunit.WorkUnit} * */ - public static void setSource(DatasetDescriptor source, State state) { - DatasetResolver resolver = getResolver(state); + public void setSource(DatasetDescriptor source, State state) { DatasetDescriptor descriptor = resolver.resolve(source, state); if (descriptor == null) { return; @@ -100,14 +122,13 @@ public final class LineageInfo { * the method is implemented to be threadsafe * </p> */ - public static void putDestination(DatasetDescriptor destination, int branchId, State state) { + public void putDestination(DatasetDescriptor destination, int branchId, State state) { if (!hasLineageInfo(state)) { log.warn("State has no lineage info but branch " + branchId + " puts a destination: " + GSON.toJson(destination)); return; } log.debug(String.format("Put destination %s for branch %d", GSON.toJson(destination), branchId)); synchronized (state.getProp(getKey(NAME_KEY))) { - DatasetResolver resolver = getResolver(state); DatasetDescriptor descriptor = resolver.resolve(destination, state); if (descriptor == null) { return; @@ -193,19 +214,38 @@ public final class LineageInfo { return Joiner.on('.').join(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE, state.getProp(getKey(NAME_KEY))); } + + /** + * Try to get a {@link LineageInfo} instance from the given {@link SharedResourcesBroker} + */ + public static Optional<LineageInfo> getLineageInfo(@Nullable SharedResourcesBroker<GobblinScopeTypes> broker) { + if (broker == null) { + log.warn("Null broker. Will not track data lineage"); + return Optional.absent(); + } + + try { + LineageInfo lineageInfo = broker.getSharedResource(new LineageInfoFactory(), EmptyKey.INSTANCE); + return Optional.of(lineageInfo); + } catch (NotConfiguredException e) { + log.warn("Fail to get LineageInfo instance from broker. Will not track data lineage", e); + return Optional.absent(); + } + } + /** * Get the configured {@link DatasetResolver} from {@link State} */ - public static DatasetResolver getResolver(State state) { - String resolverFactory = state.getProp(DatasetResolverFactory.CLASS); - if (resolverFactory == null) { + private static DatasetResolver getResolver(Config config) { + String resolverFactory = config.getString(DATASET_RESOLVER_FACTORY); + if (resolverFactory.equals(NoopDatasetResolver.FACTORY)) { return NoopDatasetResolver.INSTANCE; } DatasetResolver resolver = NoopDatasetResolver.INSTANCE; try { DatasetResolverFactory factory = (DatasetResolverFactory) Class.forName(resolverFactory).newInstance(); - resolver = factory.createResolver(state); + resolver = factory.createResolver(ConfigUtils.getConfigOrEmpty(config, DATASET_RESOLVER_CONFIG_NAMESPACE)); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { log.error(String.format("Fail to create a DatasetResolver with factory class %s", resolverFactory)); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java index 7388de6..4352e7f 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java @@ -21,6 +21,11 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; +import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.configuration.State; import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; @@ -30,6 +35,7 @@ import org.testng.Assert; import org.testng.annotations.Test; import com.google.common.collect.Lists; +import com.typesafe.config.ConfigFactory; /** @@ -45,21 +51,22 @@ public class LineageEventTest { final String branch = "branch"; State state0 = new State(); + LineageInfo lineageInfo = getLineageInfo(); DatasetDescriptor source = new DatasetDescriptor(kafka, topic); - LineageInfo.setSource(source, state0); + lineageInfo.setSource(source, state0); DatasetDescriptor destination00 = new DatasetDescriptor(hdfs, "/data/dbchanges"); destination00.addMetadata(branch, "0"); - LineageInfo.putDestination(destination00, 0, state0); + lineageInfo.putDestination(destination00, 0, state0); DatasetDescriptor destination01 = new DatasetDescriptor(mysql, "kafka.testTopic"); destination01.addMetadata(branch, "1"); - LineageInfo.putDestination(destination01, 1, state0); + lineageInfo.putDestination(destination01, 1, state0); Map<String, LineageEventBuilder> events = LineageInfo.load(state0); verify(events.get("0"), topic, source, destination00); verify(events.get("1"), topic, source, destination01); State state1 = new State(); - LineageInfo.setSource(source, state1); + lineageInfo.setSource(source, state1); List<State> states = Lists.newArrayList(); states.add(state0); states.add(state1); @@ -73,7 +80,7 @@ public class LineageEventTest { // There are 3 full fledged lineage events DatasetDescriptor destination12 = new DatasetDescriptor(mysql, "kafka.testTopic2"); destination12.addMetadata(branch, "2"); - LineageInfo.putDestination(destination12, 2, state1); + lineageInfo.putDestination(destination12, 2, state1); eventsList = LineageInfo.load(states); Assert.assertTrue(eventsList.size() == 3); Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0")); @@ -83,10 +90,10 @@ public class LineageEventTest { // There 5 lineage events put, but only 4 unique lineage events DatasetDescriptor destination10 = destination12; - LineageInfo.putDestination(destination10, 0, state1); + lineageInfo.putDestination(destination10, 0, state1); DatasetDescriptor destination11 = new DatasetDescriptor("hive", "kafka.testTopic1"); destination11.addMetadata(branch, "1"); - LineageInfo.putDestination(destination11, 1, state1); + lineageInfo.putDestination(destination11, 1, state1); eventsList = LineageInfo.load(states); Assert.assertTrue(eventsList.size() == 4); Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0")); @@ -110,6 +117,21 @@ public class LineageEventTest { return null; } + private LineageInfo getLineageInfo() { + SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory + .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); + SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker + .newSubscopedBuilder(new JobScopeInstance("LineageEventTest", String.valueOf(System.currentTimeMillis()))) + .build(); + SharedResourcesBroker<GobblinScopeTypes> taskBroker = jobBroker + .newSubscopedBuilder(new TaskScopeInstance("LineageEventTestTask" + String.valueOf(System.currentTimeMillis()))) + .build(); + LineageInfo obj1 = LineageInfo.getLineageInfo(jobBroker).get(); + LineageInfo obj2 = LineageInfo.getLineageInfo(taskBroker).get(); + Assert.assertTrue(obj1 == obj2); + return obj2; + } + private void verify(LineageEventBuilder event, String name, DatasetDescriptor source, DatasetDescriptor destination) { Assert.assertEquals(event.getName(), name); Assert.assertEquals(event.getNamespace(), LineageEventBuilder.LIENAGE_EVENT_NAMESPACE); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 8cd25c2..69ebea6 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -32,7 +32,6 @@ import java.util.regex.Pattern; import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; -import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,6 +142,8 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { private MetricContext metricContext; + protected Optional<LineageInfo> lineageInfo; + private List<String> getLimiterExtractorReportKeys() { List<String> keyNames = new ArrayList<>(); keyNames.add(KafkaSource.TOPIC_NAME); @@ -163,6 +164,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { @Override public List<WorkUnit> getWorkunits(SourceState state) { this.metricContext = Instrumented.getMetricContext(state, KafkaSource.class); + this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap(); if (state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION)) { @@ -554,7 +556,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { // Add lineage info DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName()); source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers); - LineageInfo.setSource(source, workUnit); + if (this.lineageInfo.isPresent()) { + this.lineageInfo.get().setSource(source, workUnit); + } LOG.info(String.format("Created workunit for partition %s: lowWatermark=%d, highWatermark=%d, range=%d", partition, offsets.getStartOffset(), offsets.getLatestOffset(), offsets.getLatestOffset() - offsets.getStartOffset())); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java index e2292c7..0ed291e 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java @@ -65,6 +65,8 @@ public class MysqlSource extends QueryBasedSource<JsonArray, JsonElement> { DatasetDescriptor source = new DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, database + "." + entity.getSourceEntityName()); source.addMetadata(DatasetConstants.CONNECTION_URL, connectionUrl); - LineageInfo.setSource(source, workUnit); + if (lineageInfo.isPresent()) { + lineageInfo.get().setSource(source, workUnit); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java index 0debcac..73d613c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java @@ -152,6 +152,7 @@ public class JobContext implements Closeable { this.jobState = new JobState(jobPropsState, this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName), this.jobName, this.jobId); + this.jobState.setBroker(this.jobBroker); stagingDirProvided = this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR); outputDirProvided = this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java index edaa3a4..1df9b2a 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java @@ -27,6 +27,9 @@ import lombok.EqualsAndHashCode; */ @EqualsAndHashCode public final class EmptyKey implements SharedResourceKey { + /** A singleton instance */ + public static final EmptyKey INSTANCE = new EmptyKey(); + @Override public String toConfigurationKey() { return null;
