Repository: incubator-gobblin Updated Branches: refs/heads/master cba369929 -> 1f6e46f87
[GOBBLIN-615] Generate a valid interval when LWM==HWM in QueryBasedSource Closes #2482 from yukuai518/interval Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1f6e46f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1f6e46f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1f6e46f8 Branch: refs/heads/master Commit: 1f6e46f876ac1316cc35de3fa9984ca437244111 Parents: cba3699 Author: Kuai Yu <[email protected]> Authored: Mon Oct 22 10:53:38 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Oct 22 10:53:38 2018 -0700 ---------------------------------------------------------------------- .../extractor/extract/QueryBasedExtractor.java | 37 ++++++++++---------- .../source/extractor/partition/Partitioner.java | 21 +++++++++++ .../extractor/watermark/DateWatermark.java | 14 ++++++-- .../extractor/watermark/HourWatermark.java | 14 ++++++-- .../extractor/watermark/SimpleWatermark.java | 14 +++++--- .../extractor/watermark/TimestampWatermark.java | 14 ++++++-- .../extractor/watermark/DateWatermarkTest.java | 3 +- .../extractor/watermark/HourWatermarkTest.java | 3 +- .../watermark/SimpleWatermarkTest.java | 7 ++++ .../watermark/TimestampWatermarkTest.java | 7 ++++ 10 files changed, 100 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java index 5345e35..0324ee4 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedExtractor.java @@ -367,11 +367,11 @@ public abstract class QueryBasedExtractor<S, D> implements Extractor<S, D>, Prot /** * if snapshot extract, get latest watermark else return work unit high watermark * - * @param watermark column - * @param low watermark value - * @param high watermark value - * @param column format - * @return letst watermark + * @param watermarkColumn watermark column + * @param lwmValue low watermark value + * @param hwmValue high watermark value + * @param watermarkType watermark type + * @return latest watermark * @throws IOException */ private long getLatestWatermark(String watermarkColumn, WatermarkType watermarkType, long lwmValue, long hwmValue) @@ -429,8 +429,7 @@ public abstract class QueryBasedExtractor<S, D> implements Extractor<S, D>, Prot /** * add predicate to the predicate list - * @param Predicate(watermark column,type,format and condition) - * @return watermark list + * @param predicate watermark predicate(watermark column,type,format and condition) */ private void addPredicates(Predicate predicate) { if (predicate != null) { @@ -439,8 +438,8 @@ public abstract class QueryBasedExtractor<S, D> implements Extractor<S, D>, Prot } /** - * @param given list of watermark columns - * @param column name to search for + * @param watermarkColumn list of watermark columns + * @param columnName name to search for * @return true, if column name is part of water mark columns. otherwise, return false */ protected boolean isWatermarkColumn(String watermarkColumn, String columnName) { @@ -458,7 +457,7 @@ public abstract class QueryBasedExtractor<S, D> implements Extractor<S, D>, Prot } /** - * @param given list of watermark columns + * @param watermarkColumn list of watermark columns * @return true, if there are multiple water mark columns. otherwise, return false */ protected boolean hasMultipleWatermarkColumns(String watermarkColumn) { @@ -470,8 +469,8 @@ public abstract class QueryBasedExtractor<S, D> implements Extractor<S, D>, Prot } /** - * @param given list of primary key columns - * @param column name to search for + * @param primarykeyColumn list of primary key columns + * @param columnName name to search for * @return index of the column if it exist in given list of primary key columns. otherwise, return 0 */ protected int getPrimarykeyIndex(String primarykeyColumn, String columnName) { @@ -487,8 +486,8 @@ public abstract class QueryBasedExtractor<S, D> implements Extractor<S, D>, Prot } /** - * @param column name to search for - * @param list of metadata columns + * @param columnName name to search for + * @param columnList list of metadata columns * @return true if column is part of metadata columns. otherwise, return false. */ protected boolean isMetadataColumn(String columnName, List<String> columnList) { @@ -508,10 +507,10 @@ public abstract class QueryBasedExtractor<S, D> implements Extractor<S, D>, Prot } /** - * @param column name - * @param data type - * @param data type of elements - * @param elements + * @param columnName column name + * @param type data type + * @param elementType type of elements + * @param enumSymbols emum symbols * @return converted data type */ protected JsonObject convertDataType(String columnName, String type, String elementType, List<String> enumSymbols) { @@ -534,7 +533,7 @@ public abstract class QueryBasedExtractor<S, D> implements Extractor<S, D>, Prot } /** - * @param predicate list + * @param predicateList predicate list * @return true, if there are any predicates. otherwise, return false. */ protected boolean isPredicateExists(List<Predicate> predicateList) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java index 052d6b4..6180a5f 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/partition/Partitioner.java @@ -54,6 +54,7 @@ public class Partitioner { public static final String HAS_USER_SPECIFIED_PARTITIONS = "partitioner.hasUserSpecifiedPartitions"; public static final String USER_SPECIFIED_PARTITIONS = "partitioner.userSpecifiedPartitions"; public static final String IS_EARLY_STOPPED = "partitioner.isEarlyStopped"; + public static final String ALLOW_EQUAL_WATERMARK_BOUNDARY = "partitioner.allowEqualWatermarkBoundary"; public static final Comparator<Partition> ascendingComparator = new Comparator<Partition>() { @Override @@ -176,6 +177,26 @@ public class Partitioner { */ HashMap<Long, Long> partitionMap = getPartitions(previousWatermark); + if (partitionMap.size() == 0) { + return partitions; + } + + if (partitionMap.size() == 1) { + Map.Entry<Long, Long> entry = partitionMap.entrySet().iterator().next(); + Long lwm = entry.getKey(); + Long hwm = entry.getValue(); + + if (lwm == hwm) { + if (lwm != -1) { // we always allow [-1, -1] interval due to some test cases relies on this logic. + boolean allowEqualBoundary = state.getPropAsBoolean(ALLOW_EQUAL_WATERMARK_BOUNDARY, false); + LOG.info("Single partition with LWM = HWM and allowEqualBoundary=" + allowEqualBoundary); + if (!allowEqualBoundary) { + return partitions; + } + } + } + } + /* * Can't use highWatermark directly, as the partitionMap may have different precision. For example, highWatermark * may be specified to seconds, but partitionMap could be specified to hour or date. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java index 064087a..d3c1e97 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/DateWatermark.java @@ -92,6 +92,14 @@ public class DateWatermark implements Watermark { LOG.debug("Start time:" + startTime + "; End time:" + endTime); long lwm; long hwm; + + if (startTime.getTime() == endTime.getTime()) { + lwm = Long.parseLong(inputFormatParser.format(startTime)); + hwm = lwm; + intervalMap.put(lwm, hwm); + return intervalMap; + } + while (startTime.getTime() < endTime.getTime()) { lwm = Long.parseLong(inputFormatParser.format(startTime)); calendar.setTime(startTime); @@ -108,9 +116,9 @@ public class DateWatermark implements Watermark { /** * recalculate interval(in hours) if total number of partitions greater than maximum number of allowed partitions * - * @param difference in range - * @param hour interval (ex: 4 hours) - * @param Maximum number of allowed partitions + * @param diffInMilliSecs difference in range + * @param hourInterval hour interval (ex: 24 hours) + * @param maxIntervals max number of allowed partitions * @return calculated interval in days */ private static int getInterval(long diffInMilliSecs, long hourInterval, int maxIntervals) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java index 0b3e1af..af6d507 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/HourWatermark.java @@ -91,6 +91,14 @@ public class HourWatermark implements Watermark { LOG.debug("Start time:" + startTime + "; End time:" + endTime); long lwm; long hwm; + + if (startTime.getTime() == endTime.getTime()) { + lwm = Long.parseLong(inputFormatParser.format(startTime)); + hwm = lwm; + intervalMap.put(lwm, hwm); + return intervalMap; + } + while (startTime.getTime() < endTime.getTime()) { lwm = Long.parseLong(inputFormatParser.format(startTime)); calendar.setTime(startTime); @@ -107,9 +115,9 @@ public class HourWatermark implements Watermark { /** * recalculate interval(in hours) if total number of partitions greater than maximum number of allowed partitions * - * @param difference in range - * @param hour interval (ex: 4 hours) - * @param Maximum number of allowed partitions + * @param diffInMilliSecs difference in range + * @param hourInterval hour interval (ex: 4 hours) + * @param maxIntervals max number of allowed partitions * @return calculated interval in hours */ private static int getInterval(long diffInMilliSecs, long hourInterval, int maxIntervals) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java index 86e6a4e..5a13db3 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermark.java @@ -67,6 +67,12 @@ public class SimpleWatermark implements Watermark { long startNum = lowWatermarkValue; long endNum = highWatermarkValue; boolean longOverflow = false; + + if (startNum == endNum) { + intervalMap.put(startNum, endNum); + return intervalMap; + } + while (startNum < endNum && !longOverflow) { longOverflow = (Long.MAX_VALUE - interval < startNum); nextNum = longOverflow ? endNum : Math.min(startNum + interval, endNum); @@ -79,10 +85,10 @@ public class SimpleWatermark implements Watermark { /** * recalculate interval if total number of partitions greater than maximum number of allowed partitions * - * @param low watermark value - * @param high watermark value - * @param partition interval - * @param Maximum number of allowed partitions + * @param lowWatermarkValue low watermark value + * @param highWatermarkValue high watermark value + * @param partitionInterval partition interval + * @param maxIntervals max number of allowed partitions * @return calculated interval */ private static long getInterval(long lowWatermarkValue, long highWatermarkValue, long partitionInterval, http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java index c325d9a..4d8fa08 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermark.java @@ -92,6 +92,14 @@ public class TimestampWatermark implements Watermark { LOG.debug("Sart time:" + startTime + "; End time:" + endTime); long lwm; long hwm; + + if (startTime.getTime() == endTime.getTime()) { + lwm = Long.parseLong(inputFormatParser.format(startTime)); + hwm = lwm; + intervalMap.put(lwm, hwm); + return intervalMap; + } + while (startTime.getTime() < endTime.getTime()) { lwm = Long.parseLong(inputFormatParser.format(startTime)); calendar.setTime(startTime); @@ -108,9 +116,9 @@ public class TimestampWatermark implements Watermark { /** * recalculate interval(in hours) if total number of partitions greater than maximum number of allowed partitions * - * @param difference in range - * @param hour interval (ex: 4 hours) - * @param Maximum number of allowed partitions + * @param diffInMilliSecs difference in range + * @param hourInterval hour interval (ex: 4 hours) + * @param maxIntervals max number of allowed partitions * @return calculated interval in hours */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java index 2466fdc..0430ae4 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/DateWatermarkTest.java @@ -22,6 +22,7 @@ import java.util.Map; import org.testng.Assert; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -58,7 +59,7 @@ public class DateWatermarkTest { int partition = 30; int maxInterval = 4; Map<Long, Long> results = datewm.getIntervals(lwm, hwm, partition, maxInterval); - Map<Long, Long> expected = Maps.newHashMap(); + Map<Long, Long> expected = ImmutableMap.of(lwm, hwm); Assert.assertEquals(results, expected); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java index a631cfd..ba650b8 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/HourWatermarkTest.java @@ -22,6 +22,7 @@ import java.util.Map; import org.testng.Assert; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -58,7 +59,7 @@ public class HourWatermarkTest { int partition = 30; int maxInterval = 4; Map<Long, Long> results = datewm.getIntervals(lwm, hwm, partition, maxInterval); - Map<Long, Long> expected = Maps.newHashMap(); + Map<Long, Long> expected = ImmutableMap.of(lwm, hwm); Assert.assertEquals(results, expected); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java index d4d8754..13b8fb9 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/SimpleWatermarkTest.java @@ -24,6 +24,8 @@ import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableMap; + import org.apache.gobblin.source.extractor.extract.QueryBasedExtractor; @@ -148,6 +150,11 @@ public class SimpleWatermarkTest { Map<Long, Long> intervals = new HashMap<Long, Long>(); if (lowWatermarkValue > highWatermarkValue || partitionInterval <= 0) return intervals; + + if (lowWatermarkValue == highWatermarkValue) { + return ImmutableMap.of(lowWatermarkValue, highWatermarkValue); + } + boolean overflow = false; for (Long i = lowWatermarkValue; i < highWatermarkValue && !overflow;) { overflow = (Long.MAX_VALUE - partitionInterval < i); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1f6e46f8/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java index bc94a4b..d0959a4 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/watermark/TimestampWatermarkTest.java @@ -28,6 +28,8 @@ import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableMap; + import org.apache.gobblin.configuration.WorkUnitState; @@ -107,6 +109,11 @@ public class TimestampWatermarkTest { Map<Long, Long> intervals = new HashMap<Long, Long>(); if (lowWatermarkValue > highWatermarkValue || partitionInterval <= 0) return intervals; + + if (lowWatermarkValue == highWatermarkValue) { + return ImmutableMap.of(lowWatermarkValue, highWatermarkValue); + } + final SimpleDateFormat inputFormat = new SimpleDateFormat(this.watermarkFormat); Date startTime = inputFormat.parse(String.valueOf(lowWatermarkValue)); Date endTime = inputFormat.parse(String.valueOf(highWatermarkValue));
