Repository: incubator-gobblin Updated Branches: refs/heads/master fdb233d61 -> b89706257
[GOBBLIN-254] Add config key to update watermark when a partition is empty Closes #2105 from jack-moseley/salesforce_retry Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/b8970625 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/b8970625 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/b8970625 Branch: refs/heads/master Commit: b8970625772ac64e3e3394ac79dd869f3c720adb Parents: fdb233d Author: Jack Moseley <[email protected]> Authored: Thu Sep 14 17:00:17 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Thu Sep 14 17:00:17 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/gobblin/configuration/ConfigurationKeys.java | 4 ++++ .../gobblin/source/extractor/extract/QueryBasedSource.java | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b8970625/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index 224064d..e54a54b 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -483,6 +483,10 @@ public class ConfigurationKeys { "source.querybased.promoteUnsignedIntToBigInt"; public static final boolean DEFAULT_SOURCE_QUERYBASED_PROMOTE_UNSIGNED_INT_TO_BIGINT = false; + public static final String SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK = + "source.querybased.resetEmptyPartitionWatermark"; + public static final boolean DEFAULT_SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK = true; + public static final String ENABLE_DELIMITED_IDENTIFIER = "enable.delimited.identifier"; public static final boolean DEFAULT_ENABLE_DELIMITED_IDENTIFIER = false; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/b8970625/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java index d94dede..c77051d 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java @@ -437,7 +437,9 @@ public abstract class QueryBasedSource<S, D> extends AbstractSource<S, D> { if (tablesWithFailedTasks.contains(entry.getKey())) { log.info("Resetting low watermark to {} because previous run failed.", entry.getValue()); result.put(entry.getKey(), entry.getValue()); - } else if (tablesWithNoUpdatesOnPreviousRun.contains(entry.getKey())) { + } else if (tablesWithNoUpdatesOnPreviousRun.contains(entry.getKey()) + && state.getPropAsBoolean(ConfigurationKeys.SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK, + ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK)) { log.info("Resetting low watermakr to {} because previous run processed no data.", entry.getValue()); result.put(entry.getKey(), entry.getValue()); } else {
