Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 4c06d3be7 -> 95e15f003


[GOBBLIN-334] Implement SharedResourceFactory for LineageInfo

Closes #2187 from zxcware/share


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/95e15f00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/95e15f00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/95e15f00

Branch: refs/heads/master
Commit: 95e15f00386a2509d1631fcdbb5b41e7e1265460
Parents: 4c06d3b
Author: zhchen <[email protected]>
Authored: Fri Dec 8 11:31:22 2017 -0800
Committer: Issac Buenrostro <[email protected]>
Committed: Fri Dec 8 11:31:22 2017 -0800

----------------------------------------------------------------------
 .../broker/StringNameSharedResourceKey.java     | 56 +++++++++++++++++++
 .../gobblin/configuration/SourceState.java      |  6 ++
 .../gobblin/dataset/DatasetResolverFactory.java | 11 ++--
 .../gobblin/dataset/NoopDatasetResolver.java    |  1 +
 .../gobblin/publisher/BaseDataPublisher.java    | 15 ++++-
 .../extractor/extract/QueryBasedSource.java     |  4 +-
 .../publisher/BaseDataPublisherTest.java        | 29 ++++++++--
 .../data/management/copy/CopySource.java        |  9 ++-
 .../copy/publisher/CopyDataPublisher.java       | 15 ++++-
 .../metrics/broker/LineageInfoFactory.java      | 54 ++++++++++++++++++
 .../metrics/event/lineage/LineageInfo.java      | 58 +++++++++++++++++---
 .../metrics/event/lineage/LineageEventTest.java | 36 +++++++++---
 .../extractor/extract/kafka/KafkaSource.java    |  8 ++-
 .../extractor/extract/jdbc/MysqlSource.java     |  4 +-
 .../org/apache/gobblin/runtime/JobContext.java  |  1 +
 .../org/apache/gobblin/broker/EmptyKey.java     |  3 +
 16 files changed, 275 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-api/src/main/java/org/apache/gobblin/broker/StringNameSharedResourceKey.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/broker/StringNameSharedResourceKey.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/broker/StringNameSharedResourceKey.java
new file mode 100644
index 0000000..e7e29e2
--- /dev/null
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/broker/StringNameSharedResourceKey.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.broker;
+
+import org.apache.gobblin.broker.iface.SharedResourceKey;
+
+
+/**
+ * A {@link SharedResourceKey} with only a string name
+ */
+public class StringNameSharedResourceKey implements SharedResourceKey {
+  private final String name;
+
+  public StringNameSharedResourceKey(String name) {
+    this.name = name;
+  }
+
+  @Override
+  public String toConfigurationKey() {
+    return name;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    StringNameSharedResourceKey that = (StringNameSharedResourceKey) o;
+
+    return name != null ? name.equals(that.name) : that.name == null;
+  }
+
+  @Override
+  public int hashCode() {
+    return name != null ? name.hashCode() : 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
index d64dcb6..fd772ea 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/SourceState.java
@@ -37,10 +37,13 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.source.workunit.Extract;
 
 import lombok.Getter;
+import lombok.Setter;
 
 
 /**
@@ -67,6 +70,9 @@ public class SourceState extends State {
   @Getter
   private final List<WorkUnitState> previousWorkUnitStates = 
Lists.newArrayList();
 
+  @Getter @Setter
+  private SharedResourcesBroker<GobblinScopeTypes> broker;
+
   /**
    * Default constructor.
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
index eb1b887..ac1cfe9 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetResolverFactory.java
@@ -17,15 +17,14 @@
 
 package org.apache.gobblin.dataset;
 
-import org.apache.gobblin.configuration.State;
-
+import com.typesafe.config.Config;
 
 /**
  * A factory that creates an instance of {@link DatasetResolver}
  */
 public interface DatasetResolverFactory {
-  String NAMESPACE = "DatasetResolverFactory";
-  String CLASS = NAMESPACE + "." + "class";
-
-  DatasetResolver createResolver(State state);
+  /**
+   * Create a {@link DatasetResolver} instance
+   */
+  DatasetResolver createResolver(Config config);
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
index c54011a..e678d84 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/NoopDatasetResolver.java
@@ -25,6 +25,7 @@ import org.apache.gobblin.configuration.State;
  */
 public class NoopDatasetResolver implements DatasetResolver {
   public static final NoopDatasetResolver INSTANCE = new NoopDatasetResolver();
+  public static final String FACTORY = "NOOP";
 
   private NoopDatasetResolver() {}
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 89bab2b..2ddcd76 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -54,6 +54,7 @@ import com.typesafe.config.ConfigRenderOptions;
 
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.dataset.DatasetConstants;
@@ -109,6 +110,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
   protected final int parallelRunnerThreads;
   protected final Map<String, ParallelRunner> parallelRunners = 
Maps.newHashMap();
   protected final Set<Path> publisherOutputDirs = Sets.newHashSet();
+  protected final Optional<LineageInfo> lineageInfo;
 
   /* Each partition in each branch may have separate metadata. The metadata 
mergers are responsible
    * for aggregating this information from all workunits so it can be 
published.
@@ -144,6 +146,15 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
       conf.set(key, this.getState().getProp(key));
     }
 
+    // Extract LineageInfo from state
+    if (state instanceof SourceState) {
+      lineageInfo = LineageInfo.getLineageInfo(((SourceState) 
state).getBroker());
+    } else if (state instanceof WorkUnitState) {
+      lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) 
state).getTaskBrokerNullable());
+    } else {
+      lineageInfo = Optional.absent();
+    }
+
     this.numBranches = 
this.getState().getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1);
     this.shouldRetry = this.getState().getPropAsBoolean(PUBLISH_RETRY_ENABLED, 
false);
 
@@ -284,7 +295,9 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
 
   private void addLineageInfo(WorkUnitState state, int branchId) {
     DatasetDescriptor destination = createDestinationDescriptor(state, 
branchId);
-    LineageInfo.putDestination(destination, branchId, state);
+    if (this.lineageInfo.isPresent()) {
+      this.lineageInfo.get().putDestination(destination, branchId, state);
+    }
   }
 
   protected DatasetDescriptor createDestinationDescriptor(WorkUnitState state, 
int branchId) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
index d074f3a..29c98d9 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java
@@ -47,7 +47,6 @@ import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
-import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.JobCommitPolicy;
 import org.apache.gobblin.source.extractor.partition.Partition;
@@ -93,6 +92,8 @@ public abstract class QueryBasedSource<S, D> extends 
AbstractSource<S, D> {
    */
   public static final Integer CURRENT_WORK_UNIT_STATE_VERSION = 3;
 
+  protected Optional<LineageInfo> lineageInfo;
+
   /** A class that encapsulates a source entity (aka dataset) to be processed 
*/
   @Data
   public static final class SourceEntity {
@@ -168,6 +169,7 @@ public abstract class QueryBasedSource<S, D> extends 
AbstractSource<S, D> {
   @Override
   public List<WorkUnit> getWorkunits(SourceState state) {
     initLogger(state);
+    lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
 
     List<WorkUnit> workUnits = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
index 159786b..d46d6e3 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/BaseDataPublisherTest.java
@@ -40,7 +40,14 @@ import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
-
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.broker.iface.SubscopedBrokerBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -48,6 +55,7 @@ import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.metadata.MetadataMerger;
 import org.apache.gobblin.metadata.types.GlobalMetadata;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ForkOperatorUtils;
 import org.apache.gobblin.writer.FsDataWriter;
 import org.apache.gobblin.writer.FsWriterMetrics;
@@ -528,8 +536,9 @@ public class BaseDataPublisherTest {
   public void testPublishSingleTask()
       throws IOException {
     WorkUnitState state = buildTaskState(1);
+    LineageInfo lineageInfo = 
LineageInfo.getLineageInfo(state.getTaskBroker()).get();
     DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic");
-    LineageInfo.setSource(source, state);
+    lineageInfo.setSource(source, state);
     BaseDataPublisher publisher = new BaseDataPublisher(state);
     publisher.publishData(state);
     
Assert.assertTrue(state.contains("gobblin.event.lineage.branch.0.destination"));
@@ -541,9 +550,10 @@ public class BaseDataPublisherTest {
       throws IOException {
     WorkUnitState state1 = buildTaskState(2);
     WorkUnitState state2 = buildTaskState(2);
+    LineageInfo lineageInfo = 
LineageInfo.getLineageInfo(state1.getTaskBroker()).get();
     DatasetDescriptor source = new DatasetDescriptor("kafka", "testTopic");
-    LineageInfo.setSource(source, state1);
-    LineageInfo.setSource(source, state2);
+    lineageInfo.setSource(source, state1);
+    lineageInfo.setSource(source, state2);
     BaseDataPublisher publisher = new BaseDataPublisher(state1);
     publisher.publishData(ImmutableList.of(state1, state2));
     
Assert.assertTrue(state1.contains("gobblin.event.lineage.branch.0.destination"));
@@ -623,7 +633,16 @@ public class BaseDataPublisherTest {
   }
 
   private WorkUnitState buildTaskState(int numBranches) {
-    WorkUnitState state = new WorkUnitState();
+    SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(ConfigFactory.empty(), 
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker
+        .newSubscopedBuilder(new JobScopeInstance("LineageEventTest", 
String.valueOf(System.currentTimeMillis())))
+        .build();
+    SharedResourcesBroker<GobblinScopeTypes> taskBroker = jobBroker
+        .newSubscopedBuilder(new TaskScopeInstance("LineageEventTestTask" + 
String.valueOf(System.currentTimeMillis())))
+        .build();
+
+    WorkUnitState state = new WorkUnitState(WorkUnit.createEmpty(), new 
State(), taskBroker);
 
     state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "namespace");
     state.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "table");

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index a74d425..615d6ad 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -118,6 +118,8 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
 
   public MetricContext metricContext;
 
+  protected Optional<LineageInfo> lineageInfo;
+
   /**
    * <ul>
    * Does the following:
@@ -139,6 +141,7 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
   public List<WorkUnit> getWorkunits(final SourceState state) {
 
     this.metricContext = Instrumented.getMetricContext(state, 
CopySource.class);
+    this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
 
     try {
 
@@ -320,8 +323,10 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
        * a DatasetFinder. Consequently, the source and destination dataset for 
the CopyableFile lineage are expected
        * to be set by the same logic
        */
-      if (copyableFile.getSourceDataset() != null && 
copyableFile.getDestinationDataset() != null) {
-        LineageInfo.setSource(copyableFile.getSourceDataset(), workUnit);
+      if (lineageInfo.isPresent() &&
+          copyableFile.getSourceDataset() != null &&
+          copyableFile.getDestinationDataset() != null) {
+        lineageInfo.get().setSource(copyableFile.getSourceDataset(), workUnit);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index e1ccf65..79b2b6a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.data.management.copy.publisher;
 
 
+import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
 import java.io.IOException;
@@ -81,6 +82,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
   private final FileSystem fs;
   protected final EventSubmitter eventSubmitter;
   protected final RecoveryHelper recoveryHelper;
+  protected final Optional<LineageInfo> lineageInfo;
 
   /**
    * Build a new {@link CopyDataPublisher} from {@link State}. The constructor 
expects the following to be set in the
@@ -93,6 +95,15 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
    */
   public CopyDataPublisher(State state) throws IOException {
     super(state);
+    // Extract LineageInfo from state
+    if (state instanceof SourceState) {
+      lineageInfo = LineageInfo.getLineageInfo(((SourceState) 
state).getBroker());
+    } else if (state instanceof WorkUnitState) {
+      lineageInfo = LineageInfo.getLineageInfo(((WorkUnitState) 
state).getTaskBrokerNullable());
+    } else {
+      lineageInfo = Optional.absent();
+    }
+
     String uri = this.state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
     this.fs = FileSystem.get(URI.create(uri), 
WriterUtils.getFsConfiguration(state));
 
@@ -214,7 +225,9 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
           if (!fileSetRoot.isPresent() && copyableFile.getDatasetOutputPath() 
!= null) {
             fileSetRoot = Optional.of(copyableFile.getDatasetOutputPath());
           }
-          LineageInfo.putDestination(copyableFile.getDestinationDataset(), 0, 
wus);
+          if (lineageInfo.isPresent()) {
+            
lineageInfo.get().putDestination(copyableFile.getDestinationDataset(), 0, wus);
+          }
         }
         if (datasetOriginTimestamp > copyableFile.getOriginTimestamp()) {
           datasetOriginTimestamp = copyableFile.getOriginTimestamp();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/broker/LineageInfoFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/broker/LineageInfoFactory.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/broker/LineageInfoFactory.java
new file mode 100644
index 0000000..4259582
--- /dev/null
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/broker/LineageInfoFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metrics.broker;
+
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+
+
+/**
+ * A {@link SharedResourceFactory} to share a job level {@link LineageInfo} 
instance
+ */
+public class LineageInfoFactory implements SharedResourceFactory<LineageInfo, 
EmptyKey, GobblinScopeTypes> {
+  public static final String FACTORY_NAME = "lineageInfo";
+
+  @Override
+  public String getName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public SharedResourceFactoryResponse<LineageInfo> 
createResource(SharedResourcesBroker<GobblinScopeTypes> broker,
+      ScopedConfigView<GobblinScopeTypes, EmptyKey> config)
+      throws NotConfiguredException {
+    return new ResourceInstance<>(new LineageInfo(config.getConfig()));
+  }
+
+  @Override
+  public GobblinScopeTypes 
getAutoScope(SharedResourcesBroker<GobblinScopeTypes> broker, 
ConfigView<GobblinScopeTypes, EmptyKey> config) {
+    return GobblinScopeTypes.JOB;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
index 9a3cc11..8ac9ee3 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/lineage/LineageInfo.java
@@ -22,18 +22,29 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 
+import javax.annotation.Nullable;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.broker.EmptyKey;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.dataset.DatasetResolver;
 import org.apache.gobblin.dataset.DatasetResolverFactory;
 import org.apache.gobblin.dataset.NoopDatasetResolver;
+import org.apache.gobblin.metrics.broker.LineageInfoFactory;
+import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
@@ -53,6 +64,7 @@ import org.apache.gobblin.dataset.NoopDatasetResolver;
  * <p>
  *   The general flow is:
  *   <ol>
+ *     <li> get a {@link LineageInfo} instance with {@link 
LineageInfo#getLineageInfo(SharedResourcesBroker)}</li>
  *     <li> source sets its {@link DatasetDescriptor} to each work unit </li>
  *     <li> destination puts its {@link DatasetDescriptor} to the work unit 
</li>
  *     <li> load and send all lineage events from all states </li>
@@ -62,11 +74,22 @@ import org.apache.gobblin.dataset.NoopDatasetResolver;
  */
 @Slf4j
 public final class LineageInfo {
+  private static final String DATASET_RESOLVER_FACTORY = 
"datasetResolverFactory";
+  private static final String DATASET_RESOLVER_CONFIG_NAMESPACE = 
"datasetResolver";
+
   private static final String BRANCH = "branch";
   private static final Gson GSON = new Gson();
   private static final String NAME_KEY = "name";
 
-  private LineageInfo() {
+  private static final Config FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(DATASET_RESOLVER_FACTORY, NoopDatasetResolver.FACTORY)
+          .build());
+
+  private final DatasetResolver resolver;
+
+  public LineageInfo(Config config) {
+    resolver = getResolver(config.withFallback(FALLBACK));
   }
 
   /**
@@ -80,8 +103,7 @@ public final class LineageInfo {
    * @param state state about a {@link 
org.apache.gobblin.source.workunit.WorkUnit}
    *
    */
-  public static void setSource(DatasetDescriptor source, State state) {
-    DatasetResolver resolver = getResolver(state);
+  public void setSource(DatasetDescriptor source, State state) {
     DatasetDescriptor descriptor = resolver.resolve(source, state);
     if (descriptor == null) {
       return;
@@ -100,14 +122,13 @@ public final class LineageInfo {
    *   the method is implemented to be threadsafe
    * </p>
    */
-  public static void putDestination(DatasetDescriptor destination, int 
branchId, State state) {
+  public void putDestination(DatasetDescriptor destination, int branchId, 
State state) {
     if (!hasLineageInfo(state)) {
       log.warn("State has no lineage info but branch " + branchId + " puts a 
destination: " + GSON.toJson(destination));
       return;
     }
     log.debug(String.format("Put destination %s for branch %d", 
GSON.toJson(destination), branchId));
     synchronized (state.getProp(getKey(NAME_KEY))) {
-      DatasetResolver resolver = getResolver(state);
       DatasetDescriptor descriptor = resolver.resolve(destination, state);
       if (descriptor == null) {
         return;
@@ -193,19 +214,38 @@ public final class LineageInfo {
     return Joiner.on('.').join(LineageEventBuilder.LIENAGE_EVENT_NAMESPACE, 
state.getProp(getKey(NAME_KEY)));
   }
 
+
+  /**
+   * Try to get a {@link LineageInfo} instance from the given {@link 
SharedResourcesBroker}
+   */
+  public static Optional<LineageInfo> getLineageInfo(@Nullable 
SharedResourcesBroker<GobblinScopeTypes> broker) {
+    if (broker == null) {
+      log.warn("Null broker. Will not track data lineage");
+      return Optional.absent();
+    }
+
+    try {
+      LineageInfo lineageInfo = broker.getSharedResource(new 
LineageInfoFactory(), EmptyKey.INSTANCE);
+      return Optional.of(lineageInfo);
+    } catch (NotConfiguredException e) {
+      log.warn("Fail to get LineageInfo instance from broker. Will not track 
data lineage", e);
+      return Optional.absent();
+    }
+  }
+
   /**
    * Get the configured {@link DatasetResolver} from {@link State}
    */
-  public static DatasetResolver getResolver(State state) {
-    String resolverFactory = state.getProp(DatasetResolverFactory.CLASS);
-    if (resolverFactory == null) {
+  private static DatasetResolver getResolver(Config config) {
+    String resolverFactory = config.getString(DATASET_RESOLVER_FACTORY);
+    if (resolverFactory.equals(NoopDatasetResolver.FACTORY)) {
       return NoopDatasetResolver.INSTANCE;
     }
 
     DatasetResolver resolver = NoopDatasetResolver.INSTANCE;
     try {
       DatasetResolverFactory factory = (DatasetResolverFactory) 
Class.forName(resolverFactory).newInstance();
-      resolver = factory.createResolver(state);
+      resolver = factory.createResolver(ConfigUtils.getConfigOrEmpty(config, 
DATASET_RESOLVER_CONFIG_NAMESPACE));
     } catch (InstantiationException | IllegalAccessException | 
ClassNotFoundException e) {
       log.error(String.format("Fail to create a DatasetResolver with factory 
class %s", resolverFactory));
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
index 7388de6..4352e7f 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/test/java/org/apache/gobblin/metrics/event/lineage/LineageEventTest.java
@@ -21,6 +21,11 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
@@ -30,6 +35,7 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
+import com.typesafe.config.ConfigFactory;
 
 
 /**
@@ -45,21 +51,22 @@ public class LineageEventTest {
     final String branch = "branch";
 
     State state0 = new State();
+    LineageInfo lineageInfo = getLineageInfo();
     DatasetDescriptor source = new DatasetDescriptor(kafka, topic);
-    LineageInfo.setSource(source, state0);
+    lineageInfo.setSource(source, state0);
     DatasetDescriptor destination00 = new DatasetDescriptor(hdfs, 
"/data/dbchanges");
     destination00.addMetadata(branch, "0");
-    LineageInfo.putDestination(destination00, 0, state0);
+    lineageInfo.putDestination(destination00, 0, state0);
     DatasetDescriptor destination01 = new DatasetDescriptor(mysql, 
"kafka.testTopic");
     destination01.addMetadata(branch, "1");
-    LineageInfo.putDestination(destination01, 1, state0);
+    lineageInfo.putDestination(destination01, 1, state0);
 
     Map<String, LineageEventBuilder> events = LineageInfo.load(state0);
     verify(events.get("0"), topic, source, destination00);
     verify(events.get("1"), topic, source, destination01);
 
     State state1 = new State();
-    LineageInfo.setSource(source, state1);
+    lineageInfo.setSource(source, state1);
     List<State> states = Lists.newArrayList();
     states.add(state0);
     states.add(state1);
@@ -73,7 +80,7 @@ public class LineageEventTest {
     // There are 3 full fledged lineage events
     DatasetDescriptor destination12 = new DatasetDescriptor(mysql, 
"kafka.testTopic2");
     destination12.addMetadata(branch, "2");
-    LineageInfo.putDestination(destination12, 2, state1);
+    lineageInfo.putDestination(destination12, 2, state1);
     eventsList = LineageInfo.load(states);
     Assert.assertTrue(eventsList.size() == 3);
     Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
@@ -83,10 +90,10 @@ public class LineageEventTest {
 
     // There 5 lineage events put, but only 4 unique lineage events
     DatasetDescriptor destination10 = destination12;
-    LineageInfo.putDestination(destination10, 0, state1);
+    lineageInfo.putDestination(destination10, 0, state1);
     DatasetDescriptor destination11 = new DatasetDescriptor("hive", 
"kafka.testTopic1");
     destination11.addMetadata(branch, "1");
-    LineageInfo.putDestination(destination11, 1, state1);
+    lineageInfo.putDestination(destination11, 1, state1);
     eventsList = LineageInfo.load(states);
     Assert.assertTrue(eventsList.size() == 4);
     Assert.assertEquals(getLineageEvent(eventsList, 0, hdfs), events.get("0"));
@@ -110,6 +117,21 @@ public class LineageEventTest {
     return null;
   }
 
+  private LineageInfo getLineageInfo() {
+    SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(ConfigFactory.empty(), 
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker
+        .newSubscopedBuilder(new JobScopeInstance("LineageEventTest", 
String.valueOf(System.currentTimeMillis())))
+        .build();
+    SharedResourcesBroker<GobblinScopeTypes> taskBroker = jobBroker
+        .newSubscopedBuilder(new TaskScopeInstance("LineageEventTestTask" + 
String.valueOf(System.currentTimeMillis())))
+        .build();
+    LineageInfo obj1 = LineageInfo.getLineageInfo(jobBroker).get();
+    LineageInfo obj2 = LineageInfo.getLineageInfo(taskBroker).get();
+    Assert.assertTrue(obj1 == obj2);
+    return obj2;
+  }
+
   private void verify(LineageEventBuilder event, String name, 
DatasetDescriptor source, DatasetDescriptor destination) {
     Assert.assertEquals(event.getName(), name);
     Assert.assertEquals(event.getNamespace(), 
LineageEventBuilder.LIENAGE_EVENT_NAMESPACE);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 8cd25c2..69ebea6 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -32,7 +32,6 @@ import java.util.regex.Pattern;
 
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
-import org.apache.gobblin.metrics.event.lineage.LineageEventBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,6 +142,8 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
 
   private MetricContext metricContext;
 
+  protected Optional<LineageInfo> lineageInfo;
+
   private List<String> getLimiterExtractorReportKeys() {
     List<String> keyNames = new ArrayList<>();
     keyNames.add(KafkaSource.TOPIC_NAME);
@@ -163,6 +164,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   @Override
   public List<WorkUnit> getWorkunits(SourceState state) {
     this.metricContext = Instrumented.getMetricContext(state, 
KafkaSource.class);
+    this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
 
     Map<String, List<WorkUnit>> workUnits = Maps.newConcurrentMap();
     if 
(state.getPropAsBoolean(KafkaSource.GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION))
 {
@@ -554,7 +556,9 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
     // Add lineage info
     DatasetDescriptor source = new 
DatasetDescriptor(DatasetConstants.PLATFORM_KAFKA, partition.getTopicName());
     source.addMetadata(DatasetConstants.BROKERS, kafkaBrokers);
-    LineageInfo.setSource(source, workUnit);
+    if (this.lineageInfo.isPresent()) {
+      this.lineageInfo.get().setSource(source, workUnit);
+    }
 
     LOG.info(String.format("Created workunit for partition %s: 
lowWatermark=%d, highWatermark=%d, range=%d", partition,
         offsets.getStartOffset(), offsets.getLatestOffset(), 
offsets.getLatestOffset() - offsets.getStartOffset()));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
 
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
index e2292c7..0ed291e 100644
--- 
a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
+++ 
b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/MysqlSource.java
@@ -65,6 +65,8 @@ public class MysqlSource extends QueryBasedSource<JsonArray, 
JsonElement> {
     DatasetDescriptor source =
         new DatasetDescriptor(DatasetConstants.PLATFORM_MYSQL, database + "." 
+ entity.getSourceEntityName());
     source.addMetadata(DatasetConstants.CONNECTION_URL, connectionUrl);
-    LineageInfo.setSource(source, workUnit);
+    if (lineageInfo.isPresent()) {
+      lineageInfo.get().setSource(source, workUnit);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 0debcac..73d613c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -152,6 +152,7 @@ public class JobContext implements Closeable {
     this.jobState =
         new JobState(jobPropsState, 
this.datasetStateStore.getLatestDatasetStatesByUrns(this.jobName), this.jobName,
             this.jobId);
+    this.jobState.setBroker(this.jobBroker);
 
     stagingDirProvided = 
this.jobState.contains(ConfigurationKeys.WRITER_STAGING_DIR);
     outputDirProvided = 
this.jobState.contains(ConfigurationKeys.WRITER_OUTPUT_DIR);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/95e15f00/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java
----------------------------------------------------------------------
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java
index edaa3a4..1df9b2a 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/EmptyKey.java
@@ -27,6 +27,9 @@ import lombok.EqualsAndHashCode;
  */
 @EqualsAndHashCode
 public final class EmptyKey implements SharedResourceKey {
+  /** A singleton instance */
+  public static final EmptyKey INSTANCE = new EmptyKey();
+
   @Override
   public String toConfigurationKey() {
     return null;

Reply via email to