Repository: incubator-gobblin
Updated Branches:
  refs/heads/master cba369929 -> 1f6e46f87


[GOBBLIN-615] Generate a valid interval when LWM==HWM in QueryBasedSource

Closes #2482 from yukuai518/interval


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1f6e46f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1f6e46f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1f6e46f8

Branch: refs/heads/master
Commit: 1f6e46f876ac1316cc35de3fa9984ca437244111
Parents: cba3699
Author: Kuai Yu <[email protected]>
Authored: Mon Oct 22 10:53:38 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Oct 22 10:53:38 2018 -0700

----------------------------------------------------------------------
 .../extractor/extract/QueryBasedExtractor.java  | 37 ++++++++++----------
 .../source/extractor/partition/Partitioner.java | 21 +++++++++++
 .../extractor/watermark/DateWatermark.java      | 14 ++++++--
 .../extractor/watermark/HourWatermark.java      | 14 ++++++--
 .../extractor/watermark/SimpleWatermark.java    | 14 +++++---
 .../extractor/watermark/TimestampWatermark.java | 14 ++++++--
 .../extractor/watermark/DateWatermarkTest.java  |  3 +-
 .../extractor/watermark/HourWatermarkTest.java  |  3 +-
 .../watermark/SimpleWatermarkTest.java          |  7 ++++
 .../watermark/TimestampWatermarkTest.java       |  7 ++++
 10 files changed, 100 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java
index 5345e35..0324ee4 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java
@@ -367,11 +367,11 @@ public abstract class QueryBasedExtractor<S, D> 
implements Extractor<S, D>, Prot
   /**
    * if snapshot extract, get latest watermark else return work unit high 
watermark
    *
-   * @param watermark column
-   * @param low watermark value
-   * @param high watermark value
-   * @param column format
-   * @return letst watermark
+   * @param watermarkColumn watermark column
+   * @param lwmValue low watermark value
+   * @param hwmValue high watermark value
+   * @param watermarkType watermark type
+   * @return latest watermark
    * @throws IOException
    */
   private long getLatestWatermark(String watermarkColumn, WatermarkType 
watermarkType, long lwmValue, long hwmValue)
@@ -429,8 +429,7 @@ public abstract class QueryBasedExtractor<S, D> implements 
Extractor<S, D>, Prot
 
   /**
    * add predicate to the predicate list
-   * @param Predicate(watermark column,type,format and condition)
-   * @return watermark list
+   * @param predicate watermark predicate(watermark column,type,format and 
condition)
    */
   private void addPredicates(Predicate predicate) {
     if (predicate != null) {
@@ -439,8 +438,8 @@ public abstract class QueryBasedExtractor<S, D> implements 
Extractor<S, D>, Prot
   }
 
   /**
-   * @param given list of watermark columns
-   * @param column name to search for
+   * @param watermarkColumn list of watermark columns
+   * @param columnName name to search for
    * @return true, if column name is part of water mark columns. otherwise, 
return false
    */
   protected boolean isWatermarkColumn(String watermarkColumn, String 
columnName) {
@@ -458,7 +457,7 @@ public abstract class QueryBasedExtractor<S, D> implements 
Extractor<S, D>, Prot
   }
 
   /**
-   * @param given list of watermark columns
+   * @param watermarkColumn list of watermark columns
    * @return true, if there are multiple water mark columns. otherwise, return 
false
    */
   protected boolean hasMultipleWatermarkColumns(String watermarkColumn) {
@@ -470,8 +469,8 @@ public abstract class QueryBasedExtractor<S, D> implements 
Extractor<S, D>, Prot
   }
 
   /**
-   * @param given list of primary key columns
-   * @param column name to search for
+   * @param primarykeyColumn list of primary key columns
+   * @param columnName name to search for
    * @return index of the column if it exist in given list of primary key 
columns. otherwise, return 0
    */
   protected int getPrimarykeyIndex(String primarykeyColumn, String columnName) 
{
@@ -487,8 +486,8 @@ public abstract class QueryBasedExtractor<S, D> implements 
Extractor<S, D>, Prot
   }
 
   /**
-   * @param column name to search for
-   * @param list of metadata columns
+   * @param columnName name to search for
+   * @param columnList list of metadata columns
    * @return true if column is part of metadata columns. otherwise, return 
false.
    */
   protected boolean isMetadataColumn(String columnName, List<String> 
columnList) {
@@ -508,10 +507,10 @@ public abstract class QueryBasedExtractor<S, D> 
implements Extractor<S, D>, Prot
   }
 
   /**
-   * @param column name
-   * @param data type
-   * @param data type of elements
-   * @param elements
+   * @param columnName column name
+   * @param type data type
+   * @param elementType type of elements
+   * @param enumSymbols emum symbols
    * @return converted data type
    */
   protected JsonObject convertDataType(String columnName, String type, String 
elementType, List<String> enumSymbols) {
@@ -534,7 +533,7 @@ public abstract class QueryBasedExtractor<S, D> implements 
Extractor<S, D>, Prot
   }
 
   /**
-   * @param predicate list
+   * @param predicateList predicate list
    * @return true, if there are any predicates. otherwise, return false.
    */
   protected boolean isPredicateExists(List<Predicate> predicateList) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
index 052d6b4..6180a5f 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java
@@ -54,6 +54,7 @@ public class Partitioner {
   public static final String HAS_USER_SPECIFIED_PARTITIONS = 
"partitioner.hasUserSpecifiedPartitions";
   public static final String USER_SPECIFIED_PARTITIONS = 
"partitioner.userSpecifiedPartitions";
   public static final String IS_EARLY_STOPPED = "partitioner.isEarlyStopped";
+  public static final String ALLOW_EQUAL_WATERMARK_BOUNDARY = 
"partitioner.allowEqualWatermarkBoundary";
 
   public static final Comparator<Partition> ascendingComparator = new 
Comparator<Partition>() {
     @Override
@@ -176,6 +177,26 @@ public class Partitioner {
      */
     HashMap<Long, Long> partitionMap = getPartitions(previousWatermark);
 
+    if (partitionMap.size() == 0) {
+      return partitions;
+    }
+
+    if (partitionMap.size() == 1) {
+      Map.Entry<Long, Long> entry = partitionMap.entrySet().iterator().next();
+      Long lwm = entry.getKey();
+      Long hwm = entry.getValue();
+
+      if (lwm == hwm) {
+        if (lwm != -1) { // we always allow [-1, -1] interval due to some test 
cases relies on this logic.
+          boolean allowEqualBoundary = 
state.getPropAsBoolean(ALLOW_EQUAL_WATERMARK_BOUNDARY, false);
+          LOG.info("Single partition with LWM = HWM and allowEqualBoundary=" + 
allowEqualBoundary);
+          if (!allowEqualBoundary) {
+            return partitions;
+          }
+        }
+      }
+    }
+
     /*
      * Can't use highWatermark directly, as the partitionMap may have 
different precision. For example, highWatermark
      * may be specified to seconds, but partitionMap could be specified to 
hour or date.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java
index 064087a..d3c1e97 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java
@@ -92,6 +92,14 @@ public class DateWatermark implements Watermark {
     LOG.debug("Start time:" + startTime + "; End time:" + endTime);
     long lwm;
     long hwm;
+
+    if (startTime.getTime() == endTime.getTime()) {
+      lwm = Long.parseLong(inputFormatParser.format(startTime));
+      hwm = lwm;
+      intervalMap.put(lwm, hwm);
+      return intervalMap;
+    }
+
     while (startTime.getTime() < endTime.getTime()) {
       lwm = Long.parseLong(inputFormatParser.format(startTime));
       calendar.setTime(startTime);
@@ -108,9 +116,9 @@ public class DateWatermark implements Watermark {
   /**
    * recalculate interval(in hours) if total number of partitions greater than 
maximum number of allowed partitions
    *
-   * @param difference in range
-   * @param hour interval (ex: 4 hours)
-   * @param Maximum number of allowed partitions
+   * @param diffInMilliSecs difference in range
+   * @param hourInterval hour interval (ex: 24 hours)
+   * @param maxIntervals max number of allowed partitions
    * @return calculated interval in days
    */
   private static int getInterval(long diffInMilliSecs, long hourInterval, int 
maxIntervals) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java
index 0b3e1af..af6d507 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java
@@ -91,6 +91,14 @@ public class HourWatermark implements Watermark {
     LOG.debug("Start time:" + startTime + "; End time:" + endTime);
     long lwm;
     long hwm;
+
+    if (startTime.getTime() == endTime.getTime()) {
+      lwm = Long.parseLong(inputFormatParser.format(startTime));
+      hwm = lwm;
+      intervalMap.put(lwm, hwm);
+      return intervalMap;
+    }
+
     while (startTime.getTime() < endTime.getTime()) {
       lwm = Long.parseLong(inputFormatParser.format(startTime));
       calendar.setTime(startTime);
@@ -107,9 +115,9 @@ public class HourWatermark implements Watermark {
   /**
    * recalculate interval(in hours) if total number of partitions greater than 
maximum number of allowed partitions
    *
-   * @param difference in range
-   * @param hour interval (ex: 4 hours)
-   * @param Maximum number of allowed partitions
+   * @param diffInMilliSecs difference in range
+   * @param hourInterval hour interval (ex: 4 hours)
+   * @param maxIntervals max number of allowed partitions
    * @return calculated interval in hours
    */
   private static int getInterval(long diffInMilliSecs, long hourInterval, int 
maxIntervals) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java
index 86e6a4e..5a13db3 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java
@@ -67,6 +67,12 @@ public class SimpleWatermark implements Watermark {
     long startNum = lowWatermarkValue;
     long endNum = highWatermarkValue;
     boolean longOverflow = false;
+
+    if (startNum == endNum) {
+      intervalMap.put(startNum, endNum);
+      return intervalMap;
+    }
+
     while (startNum < endNum && !longOverflow) {
       longOverflow = (Long.MAX_VALUE - interval < startNum);
       nextNum = longOverflow ? endNum : Math.min(startNum + interval, endNum);
@@ -79,10 +85,10 @@ public class SimpleWatermark implements Watermark {
   /**
    * recalculate interval if total number of partitions greater than maximum 
number of allowed partitions
    *
-   * @param low watermark value
-   * @param high watermark value
-   * @param partition interval
-   * @param Maximum number of allowed partitions
+   * @param lowWatermarkValue low watermark value
+   * @param highWatermarkValue high watermark value
+   * @param partitionInterval partition interval
+   * @param maxIntervals max number of allowed partitions
    * @return calculated interval
    */
   private static long getInterval(long lowWatermarkValue, long 
highWatermarkValue, long partitionInterval,

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java
index c325d9a..4d8fa08 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java
@@ -92,6 +92,14 @@ public class TimestampWatermark implements Watermark {
     LOG.debug("Sart time:" + startTime + "; End time:" + endTime);
     long lwm;
     long hwm;
+
+    if (startTime.getTime() == endTime.getTime()) {
+      lwm = Long.parseLong(inputFormatParser.format(startTime));
+      hwm = lwm;
+      intervalMap.put(lwm, hwm);
+      return intervalMap;
+    }
+
     while (startTime.getTime() < endTime.getTime()) {
       lwm = Long.parseLong(inputFormatParser.format(startTime));
       calendar.setTime(startTime);
@@ -108,9 +116,9 @@ public class TimestampWatermark implements Watermark {
   /**
    * recalculate interval(in hours) if total number of partitions greater than 
maximum number of allowed partitions
    *
-   * @param difference in range
-   * @param hour interval (ex: 4 hours)
-   * @param Maximum number of allowed partitions
+   * @param diffInMilliSecs difference in range
+   * @param hourInterval hour interval (ex: 4 hours)
+   * @param maxIntervals max number of allowed partitions
    * @return calculated interval in hours
    */
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java
index 2466fdc..0430ae4 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 
@@ -58,7 +59,7 @@ public class DateWatermarkTest {
     int partition = 30;
     int maxInterval = 4;
     Map<Long, Long> results = datewm.getIntervals(lwm, hwm, partition, 
maxInterval);
-    Map<Long, Long> expected = Maps.newHashMap();
+    Map<Long, Long> expected = ImmutableMap.of(lwm, hwm);
     Assert.assertEquals(results, expected);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java
index a631cfd..ba650b8 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 
 
@@ -58,7 +59,7 @@ public class HourWatermarkTest {
     int partition = 30;
     int maxInterval = 4;
     Map<Long, Long> results = datewm.getIntervals(lwm, hwm, partition, 
maxInterval);
-    Map<Long, Long> expected = Maps.newHashMap();
+    Map<Long, Long> expected = ImmutableMap.of(lwm, hwm);
     Assert.assertEquals(results, expected);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java
index d4d8754..13b8fb9 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java
@@ -24,6 +24,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.gobblin.source.extractor.extract.QueryBasedExtractor;
 
 
@@ -148,6 +150,11 @@ public class SimpleWatermarkTest {
     Map<Long, Long> intervals = new HashMap<Long, Long>();
     if (lowWatermarkValue > highWatermarkValue || partitionInterval <= 0)
       return intervals;
+
+    if (lowWatermarkValue == highWatermarkValue) {
+      return ImmutableMap.of(lowWatermarkValue, highWatermarkValue);
+    }
+
     boolean overflow = false;
     for (Long i = lowWatermarkValue; i < highWatermarkValue && !overflow;) {
       overflow = (Long.MAX_VALUE - partitionInterval < i);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java
index bc94a4b..d0959a4 100644
--- 
a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java
@@ -28,6 +28,8 @@ import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.gobblin.configuration.WorkUnitState;
 
 
@@ -107,6 +109,11 @@ public class TimestampWatermarkTest {
     Map<Long, Long> intervals = new HashMap<Long, Long>();
     if (lowWatermarkValue > highWatermarkValue || partitionInterval <= 0)
       return intervals;
+
+    if (lowWatermarkValue == highWatermarkValue) {
+      return ImmutableMap.of(lowWatermarkValue, highWatermarkValue);
+    }
+
     final SimpleDateFormat inputFormat = new 
SimpleDateFormat(this.watermarkFormat);
     Date startTime = inputFormat.parse(String.valueOf(lowWatermarkValue));
     Date endTime = inputFormat.parse(String.valueOf(highWatermarkValue));

Reply via email to