This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e105313 [GOBBLIN-1186] explicitly set
source.querybased.salesforce.is.soft.deletes.pull.disabled for simple mode[]
e105313 is described below
commit e1053134b1eb9eb6c71e9be5b74599d8db118771
Author: Alex Li <[email protected]>
AuthorDate: Tue Jun 9 16:36:04 2020 -0700
[GOBBLIN-1186] explicitly set
source.querybased.salesforce.is.soft.deletes.pull.disabled for simple mode[]
Closes #3034 from arekusuri/GOBBLIN-1186
---
.../main/java/org/apache/gobblin/salesforce/SalesforceSource.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index 637df52..f8fc327 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -262,6 +262,8 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
*
*/
private List<WorkUnit> generateWorkUnitsStrategy(SourceEntity sourceEntity,
SourceState state, long previousWatermark) {
+ Boolean disableSoft =
state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
false);
+ log.info("disable soft delete pull: " + disableSoft);
WatermarkType watermarkType = WatermarkType.valueOf(
state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_WATERMARK_TYPE,
ConfigurationKeys.DEFAULT_WATERMARK_TYPE)
.toUpperCase());
@@ -274,7 +276,9 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
// Only support time related watermark
if (watermarkType == WatermarkType.SIMPLE ||
Strings.isNullOrEmpty(watermarkColumn) || !state.getPropAsBoolean(
ENABLE_DYNAMIC_PARTITIONING) || maxPartitions <= 1) {
- return super.generateWorkUnits(sourceEntity, state, previousWatermark);
+ List<WorkUnit> workUnits = super.generateWorkUnits(sourceEntity, state,
previousWatermark);
+ workUnits.stream().forEach(x ->
x.setProp(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
disableSoft));
+ return workUnits;
}
Partitioner partitioner = new Partitioner(state);
@@ -323,7 +327,6 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
state.setProp(Partitioner.IS_EARLY_STOPPED, isEarlyStopped);
List<WorkUnit> workUnits = super.generateWorkUnits(sourceEntity, state,
previousWatermark);
- Boolean disableSoft =
state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
false);
workUnits.stream().forEach(x ->
x.setProp(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
disableSoft));
return workUnits;
}