Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 60adccfd9 -> f51bf00b4


[GOBBLIN-430] Add lineage in SalesforceSource

Closes #2308 from zxcware/saleslineage


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f51bf00b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f51bf00b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f51bf00b

Branch: refs/heads/master
Commit: f51bf00b4fc904ecfa3ccdc74cad8e2a237ce3b6
Parents: 60adccf
Author: zhchen <[email protected]>
Authored: Thu Mar 22 14:04:33 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Thu Mar 22 14:04:33 2018 -0700

----------------------------------------------------------------------
 .../gobblin/dataset/DatasetConstants.java       |  1 +
 .../gobblin/salesforce/SalesforceSource.java    | 19 ++++++++++
 .../salesforce/SalesforceSourceTest.java        | 37 ++++++++++++++++++--
 3 files changed, 55 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f51bf00b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java 
b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
index 35bb09e..03b7fcb 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java
@@ -23,6 +23,7 @@ public class DatasetConstants {
   public static final String PLATFORM_HDFS = "hdfs";
   public static final String PLATFORM_KAFKA = "kafka";
   public static final String PLATFORM_HIVE = "hive";
+  public static final String PLATFORM_SALESFORCE = "salesforce";
   public static final String PLATFORM_MYSQL = "mysql";
 
   /** Common metadata */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f51bf00b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index 50b3611..872fceb 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -33,7 +33,9 @@ import java.util.Set;
 import org.apache.commons.lang3.text.StrSubstitutor;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.base.Strings;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Sets;
@@ -47,6 +49,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.exception.ExtractPrepareException;
@@ -101,6 +106,11 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
 
   private static final Gson GSON = new Gson();
 
+  @VisibleForTesting
+  SalesforceSource(LineageInfo lineageInfo) {
+    this.lineageInfo = Optional.fromNullable(lineageInfo);
+  }
+
   @Override
   public Extractor<JsonArray, JsonElement> getExtractor(WorkUnitState state) 
throws IOException {
     try {
@@ -112,6 +122,15 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
   }
 
   @Override
+  protected void addLineageSourceInfo(SourceState sourceState, SourceEntity 
entity, WorkUnit workUnit) {
+    DatasetDescriptor source =
+        new DatasetDescriptor(DatasetConstants.PLATFORM_SALESFORCE, 
entity.getSourceEntityName());
+    if (lineageInfo.isPresent()) {
+      lineageInfo.get().setSource(source, workUnit);
+    }
+  }
+
+  @Override
   protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
     WatermarkType watermarkType = WatermarkType.valueOf(
         state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_WATERMARK_TYPE, 
ConfigurationKeys.DEFAULT_WATERMARK_TYPE)

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f51bf00b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
index 80b0bc3..a03476b 100644
--- 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
+++ 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
@@ -16,16 +16,49 @@
  */
 package org.apache.gobblin.salesforce;
 
+import java.util.List;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.extractor.extract.QueryBasedSource;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
+import org.apache.gobblin.source.workunit.WorkUnit;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.gson.Gson;
+import com.typesafe.config.ConfigFactory;
+
 
 public class SalesforceSourceTest {
   @Test
+  void testSourceLineageInfo() {
+    SourceState sourceState = new SourceState();
+    sourceState.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, 
"salesforce");
+    sourceState.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, 
"snapshot_append");
+    sourceState.setProp(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, true);
+    sourceState.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, 
"20140213000000,20170407152123");
+    sourceState.setProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE, 
"SNAPSHOT");
+
+    QueryBasedSource.SourceEntity sourceEntity = 
QueryBasedSource.SourceEntity.fromSourceEntityName("contacts");
+
+    SalesforceSource source = new SalesforceSource(new 
LineageInfo(ConfigFactory.empty()));
+    List<WorkUnit> workUnits = source.generateWorkUnits(sourceEntity, 
sourceState, 20140213000000L);
+    Assert.assertEquals(workUnits.size(), 1);
+
+    DatasetDescriptor sourceDataset = new DatasetDescriptor("salesforce", 
"contacts");
+    Gson gson = new Gson();
+    Assert.assertEquals(gson.toJson(sourceDataset), 
workUnits.get(0).getProp("gobblin.event.lineage.source"));
+    
Assert.assertEquals(workUnits.get(0).getProp("gobblin.event.lineage.name"), 
"contacts");
+  }
+
+  @Test
   void testGenerateSpecifiedPartitionFromSinglePointHistogram() {
     SalesforceSource.Histogram histogram = new SalesforceSource.Histogram();
     histogram.add(new SalesforceSource.HistogramGroup("2014-02-13-00:00:00", 
10));
-    SalesforceSource source = new SalesforceSource();
+    SalesforceSource source = new SalesforceSource(null);
 
     long expectedHighWatermark = 20170407152123L;
     long lowWatermark = 20140213000000L;
@@ -42,7 +75,7 @@ public class SalesforceSourceTest {
       String[] groupInfo = group.split("::");
       histogram.add(new SalesforceSource.HistogramGroup(groupInfo[0], 
Integer.parseInt(groupInfo[1])));
     }
-    SalesforceSource source = new SalesforceSource();
+    SalesforceSource source = new SalesforceSource(null);
 
     long expectedHighWatermark = 20170407152123L;
     long lowWatermark = 20140213000000L;

Reply via email to