Repository: incubator-gobblin Updated Branches: refs/heads/master 60adccfd9 -> f51bf00b4
[GOBBLIN-430] Add lineage in SalesforceSource Closes #2308 from zxcware/saleslineage Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f51bf00b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f51bf00b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f51bf00b Branch: refs/heads/master Commit: f51bf00b4fc904ecfa3ccdc74cad8e2a237ce3b6 Parents: 60adccf Author: zhchen <[email protected]> Authored: Thu Mar 22 14:04:33 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Mar 22 14:04:33 2018 -0700 ---------------------------------------------------------------------- .../gobblin/dataset/DatasetConstants.java | 1 + .../gobblin/salesforce/SalesforceSource.java | 19 ++++++++++ .../salesforce/SalesforceSourceTest.java | 37 ++++++++++++++++++-- 3 files changed, 55 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f51bf00b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java index 35bb09e..03b7fcb 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/dataset/DatasetConstants.java @@ -23,6 +23,7 @@ public class DatasetConstants { public static final String PLATFORM_HDFS = "hdfs"; public static final String PLATFORM_KAFKA = "kafka"; public static final String PLATFORM_HIVE = "hive"; + public static final String PLATFORM_SALESFORCE = "salesforce"; public static final String PLATFORM_MYSQL = "mysql"; /** Common metadata */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f51bf00b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java index 50b3611..872fceb 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java @@ -33,7 +33,9 @@ import java.util.Set; import org.apache.commons.lang3.text.StrSubstitutor; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; +import com.google.common.base.Optional; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.Sets; @@ -47,6 +49,9 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.SourceState; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; import org.apache.gobblin.source.extractor.DataRecordException; import org.apache.gobblin.source.extractor.Extractor; import org.apache.gobblin.source.extractor.exception.ExtractPrepareException; @@ -101,6 +106,11 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { private static final Gson GSON = new Gson(); + @VisibleForTesting + SalesforceSource(LineageInfo lineageInfo) { + this.lineageInfo = Optional.fromNullable(lineageInfo); + } + @Override public Extractor<JsonArray, JsonElement> getExtractor(WorkUnitState state) throws IOException { try { @@ -112,6 +122,15 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { } @Override + protected void addLineageSourceInfo(SourceState sourceState, SourceEntity entity, WorkUnit workUnit) { + DatasetDescriptor source = + new DatasetDescriptor(DatasetConstants.PLATFORM_SALESFORCE, entity.getSourceEntityName()); + if (lineageInfo.isPresent()) { + lineageInfo.get().setSource(source, workUnit); + } + } + + @Override protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { WatermarkType watermarkType = WatermarkType.valueOf( state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_WATERMARK_TYPE, ConfigurationKeys.DEFAULT_WATERMARK_TYPE) http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f51bf00b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java index 80b0bc3..a03476b 100644 --- a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java +++ b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java @@ -16,16 +16,49 @@ */ package org.apache.gobblin.salesforce; +import java.util.List; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.extractor.extract.QueryBasedSource; +import org.apache.gobblin.source.extractor.partition.Partitioner; +import org.apache.gobblin.source.workunit.WorkUnit; import org.testng.Assert; import org.testng.annotations.Test; +import com.google.gson.Gson; +import com.typesafe.config.ConfigFactory; + public class SalesforceSourceTest { @Test + void testSourceLineageInfo() { + SourceState sourceState = new SourceState(); + sourceState.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "salesforce"); + sourceState.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, "snapshot_append"); + sourceState.setProp(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, true); + sourceState.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, "20140213000000,20170407152123"); + sourceState.setProp(ConfigurationKeys.SOURCE_QUERYBASED_EXTRACT_TYPE, "SNAPSHOT"); + + QueryBasedSource.SourceEntity sourceEntity = QueryBasedSource.SourceEntity.fromSourceEntityName("contacts"); + + SalesforceSource source = new SalesforceSource(new LineageInfo(ConfigFactory.empty())); + List<WorkUnit> workUnits = source.generateWorkUnits(sourceEntity, sourceState, 20140213000000L); + Assert.assertEquals(workUnits.size(), 1); + + DatasetDescriptor sourceDataset = new DatasetDescriptor("salesforce", "contacts"); + Gson gson = new Gson(); + Assert.assertEquals(gson.toJson(sourceDataset), workUnits.get(0).getProp("gobblin.event.lineage.source")); + Assert.assertEquals(workUnits.get(0).getProp("gobblin.event.lineage.name"), "contacts"); + } + + @Test void testGenerateSpecifiedPartitionFromSinglePointHistogram() { SalesforceSource.Histogram histogram = new SalesforceSource.Histogram(); histogram.add(new SalesforceSource.HistogramGroup("2014-02-13-00:00:00", 10)); - SalesforceSource source = new SalesforceSource(); + SalesforceSource source = new SalesforceSource(null); long expectedHighWatermark = 20170407152123L; long lowWatermark = 20140213000000L; @@ -42,7 +75,7 @@ public class SalesforceSourceTest { String[] groupInfo = group.split("::"); histogram.add(new SalesforceSource.HistogramGroup(groupInfo[0], Integer.parseInt(groupInfo[1]))); } - SalesforceSource source = new SalesforceSource(); + SalesforceSource source = new SalesforceSource(null); long expectedHighWatermark = 20170407152123L; long lowWatermark = 20140213000000L;
