Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ab034478c -> 6f1a3aff6


[GOBBLIN-353] Fix low watermark overridden by high watermark in SalesforceSource

Closes #2226 from zxcware/sales


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6f1a3aff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6f1a3aff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6f1a3aff

Branch: refs/heads/master
Commit: 6f1a3aff6149d8d09eb5b66f6d7b4402e0c04d3b
Parents: ab03447
Author: zhchen <[email protected]>
Authored: Tue Jan 2 13:12:21 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Jan 2 13:12:21 2018 -0800

----------------------------------------------------------------------
 .../apache/gobblin/salesforce/SalesforceSource.java   | 11 ++++-------
 .../gobblin/salesforce/SalesforceSourceTest.java      | 14 ++++++++++++++
 2 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6f1a3aff/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index b2de3da..50b3611 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -188,17 +188,14 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
       throw new RuntimeException("Unexpected empty partition list");
     }
 
-    // If the last group is used as the last partition point
-    if (count == 0) {
-      // Exchange the last partition point with global high watermark
-      partitionPoints.set(partitionPoints.size() - 1, 
Long.toString(expectedHighWatermark));
-    } else {
+    if (count > 0) {
       // Summarize last group
       statistics.addValue(count);
-      // Add global high watermark as last point
-      partitionPoints.add(Long.toString(expectedHighWatermark));
     }
 
+    // Add global high watermark as last point
+    partitionPoints.add(Long.toString(expectedHighWatermark));
+
     log.info("Dynamic partitioning statistics: ");
     log.info("data: " + Arrays.toString(statistics.getValues()));
     log.info(statistics.toString());

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6f1a3aff/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
index 8e45c6b..80b0bc3 100644
--- 
a/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
+++ 
b/gobblin-salesforce/src/test/java/org/apache/gobblin/salesforce/SalesforceSourceTest.java
@@ -22,6 +22,20 @@ import org.testng.annotations.Test;
 
 public class SalesforceSourceTest {
   @Test
+  void testGenerateSpecifiedPartitionFromSinglePointHistogram() {
+    SalesforceSource.Histogram histogram = new SalesforceSource.Histogram();
+    histogram.add(new SalesforceSource.HistogramGroup("2014-02-13-00:00:00", 
10));
+    SalesforceSource source = new SalesforceSource();
+
+    long expectedHighWatermark = 20170407152123L;
+    long lowWatermark = 20140213000000L;
+    int maxPartitions = 2;
+    String expectedPartitions = "20140213000000,20170407152123";
+    String actualPartitions = source.generateSpecifiedPartitions(histogram, 1, 
maxPartitions, lowWatermark, expectedHighWatermark);
+    Assert.assertEquals(actualPartitions, expectedPartitions);
+  }
+
+  @Test
   void testGenerateSpecifiedPartition() {
     SalesforceSource.Histogram histogram = new SalesforceSource.Histogram();
     for (String group: HISTOGRAM.split(", ")) {

Reply via email to