Repository: incubator-gobblin Updated Branches: refs/heads/master 6b1201852 -> d98f77c14
[GOBBLIN-513] Add support for queryAll when using the Salesforce bulk API Closes #2384 from htran1/salesforce_bulk_queryall Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d98f77c1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d98f77c1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d98f77c1 Branch: refs/heads/master Commit: d98f77c14a7304d2bd0d68b8a2ca181eb7893cf7 Parents: 6b12018 Author: Hung Tran <[email protected]> Authored: Mon Jun 11 14:29:33 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Jun 11 14:29:33 2018 -0700 ---------------------------------------------------------------------- .../gobblin/salesforce/SalesforceExtractor.java | 18 +++++++++++++++--- gradle/scripts/defaultBuildProperties.gradle | 2 +- 2 files changed, 16 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d98f77c1/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java index 4442214..7064062 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java @@ -111,6 +111,11 @@ public class SalesforceExtractor extends RestApiExtractor { private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3; private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit"; private static final int DEFAULT_FETCH_RETRY_LIMIT = 5; + private static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll"; + private static final boolean DEFAULT_BULK_API_USE_QUERY_ALL = false; + private static final String PK_CHUNKING_SKIP_COUNT_CHECK = "salesforce.pkChunkingSkipCountCheck"; + private static final boolean DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK = false; + private boolean pullStatus = true; private String nextUrl; @@ -135,6 +140,9 @@ public class SalesforceExtractor extends RestApiExtractor { private final int fetchRetryLimit; private final int batchSize; + private final boolean pkChunkingSkipCountCheck; + private final boolean bulkApiUseQueryAll; + public SalesforceExtractor(WorkUnitState state) { super(state); this.sfConnector = (SalesforceConnector) this.connector; @@ -156,6 +164,9 @@ public class SalesforceExtractor extends RestApiExtractor { Math.max(MIN_PK_CHUNKING_SIZE, Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE))); + this.pkChunkingSkipCountCheck = state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK, DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK); + this.bulkApiUseQueryAll = state.getPropAsBoolean(BULK_API_USE_QUERY_ALL, DEFAULT_BULK_API_USE_QUERY_ALL); + // Get batch size from .pull file int tmpBatchSize = state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE, ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE); @@ -639,7 +650,8 @@ public class SalesforceExtractor extends RestApiExtractor { String hostName = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME); String apiVersion = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_VERSION); if (Strings.isNullOrEmpty(apiVersion)) { - apiVersion = "29.0"; + // queryAll was introduced in version 39.0, so need to use a higher version when using queryAll with the bulk api + apiVersion = this.bulkApiUseQueryAll ? "42.0" : "29.0"; } String soapAuthEndPoint = hostName + SALESFORCE_SOAP_SERVICE + "/" + apiVersion; @@ -716,11 +728,11 @@ public class SalesforceExtractor extends RestApiExtractor { // Set bulk job attributes this.bulkJob.setObject(entity); - this.bulkJob.setOperation(OperationEnum.query); + this.bulkJob.setOperation(this.bulkApiUseQueryAll ? OperationEnum.queryAll : OperationEnum.query); this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel); // use pk chunking if pk chunking is configured and the expected record count is larger than the pk chunking size - if (this.pkChunking && getExpectedRecordCount() > this.pkChunkingSize) { + if (this.pkChunking && (this.pkChunkingSkipCountCheck || getExpectedRecordCount() > this.pkChunkingSize)) { log.info("Enabling pk chunking with size {}", this.pkChunkingSize); this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" + this.pkChunkingSize); usingPkChunking = true; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d98f77c1/gradle/scripts/defaultBuildProperties.gradle ---------------------------------------------------------------------- diff --git a/gradle/scripts/defaultBuildProperties.gradle b/gradle/scripts/defaultBuildProperties.gradle index ce95473..8509cc6 100644 --- a/gradle/scripts/defaultBuildProperties.gradle +++ b/gradle/scripts/defaultBuildProperties.gradle @@ -38,7 +38,7 @@ def BuildProperties BUILD_PROPERTIES = new BuildProperties(project) .register(new BuildProperty("pegasusVersion", "11.0.0", "Pegasus dependencies version")) .register(new BuildProperty("publishToMaven", false, "Enable publishing of artifacts to a central Maven repository")) .register(new BuildProperty("publishToNexus", false, "Enable publishing of artifacts to Nexus")) - .register(new BuildProperty("salesforceVersion", "37.0.3", "Salesforce dependencies version")) + .register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce dependencies version")) task buildProperties(description: 'Lists main properties that can be used to customize the build') << { BUILD_PROPERTIES.printHelp();
