Repository: incubator-gobblin Updated Branches: refs/heads/master ab034478c -> 6f1a3aff6
[GOBBLIN-353] Fix low watermark overridden by high watermark in SalesforceSource Closes #2226 from zxcware/sales Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6f1a3aff Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6f1a3aff Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6f1a3aff Branch: refs/heads/master Commit: 6f1a3aff6149d8d09eb5b66f6d7b4402e0c04d3b Parents: ab03447 Author: zhchen <[email protected]> Authored: Tue Jan 2 13:12:21 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Jan 2 13:12:21 2018 -0800 ---------------------------------------------------------------------- .../apache/gobblin/salesforce/SalesforceSource.java | 11 ++++------- .../gobblin/salesforce/SalesforceSourceTest.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6f1a3aff/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java index b2de3da..50b3611 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java @@ -188,17 +188,14 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { throw new RuntimeException("Unexpected empty partition list"); } - // If the last group is used as the last partition point - if (count == 0) { - // Exchange the last partition point with global high watermark - partitionPoints.set(partitionPoints.size() - 1, Long.toString(expectedHighWatermark)); - } else { + if (count > 0) { // Summarize last group statistics.addValue(count); - // Add global high watermark as last point - partitionPoints.add(Long.toString(expectedHighWatermark)); } + // Add global high watermark as last point + partitionPoints.add(Long.toString(expectedHighWatermark)); + log.info("Dynamic partitioning statistics: "); log.info("data: " + Arrays.toString(statistics.getValues())); log.info(statistics.toString()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6f1a3aff/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java index 8e45c6b..80b0bc3 100644 --- a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java +++ b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java @@ -22,6 +22,20 @@ import org.testng.annotations.Test; public class SalesforceSourceTest { @Test + void testGenerateSpecifiedPartitionFromSinglePointHistogram() { + SalesforceSource.Histogram histogram = new SalesforceSource.Histogram(); + histogram.add(new SalesforceSource.HistogramGroup("2014-02-13-00:00:00", 10)); + SalesforceSource source = new SalesforceSource(); + + long expectedHighWatermark = 20170407152123L; + long lowWatermark = 20140213000000L; + int maxPartitions = 2; + String expectedPartitions = "20140213000000,20170407152123"; + String actualPartitions = source.generateSpecifiedPartitions(histogram, 1, maxPartitions, lowWatermark, expectedHighWatermark); + Assert.assertEquals(actualPartitions, expectedPartitions); + } + + @Test void testGenerateSpecifiedPartition() { SalesforceSource.Histogram histogram = new SalesforceSource.Histogram(); for (String group: HISTOGRAM.split(", ")) {
