Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 6b1201852 -> d98f77c14


[GOBBLIN-513] Add support for queryAll when using the Salesforce bulk API

Closes #2384 from htran1/salesforce_bulk_queryall


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d98f77c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d98f77c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d98f77c1

Branch: refs/heads/master
Commit: d98f77c14a7304d2bd0d68b8a2ca181eb7893cf7
Parents: 6b12018
Author: Hung Tran <[email protected]>
Authored: Mon Jun 11 14:29:33 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Jun 11 14:29:33 2018 -0700

----------------------------------------------------------------------
 .../gobblin/salesforce/SalesforceExtractor.java   | 18 +++++++++++++++---
 gradle/scripts/defaultBuildProperties.gradle      |  2 +-
 2 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d98f77c1/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 4442214..7064062 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -111,6 +111,11 @@ public class SalesforceExtractor extends RestApiExtractor {
   private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
   private static final String FETCH_RETRY_LIMIT_KEY = 
"salesforce.fetchRetryLimit";
   private static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
+  private static final String BULK_API_USE_QUERY_ALL = 
"salesforce.bulkApiUseQueryAll";
+  private static final boolean DEFAULT_BULK_API_USE_QUERY_ALL = false;
+  private static final String PK_CHUNKING_SKIP_COUNT_CHECK = 
"salesforce.pkChunkingSkipCountCheck";
+  private static final boolean DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK = false;
+
 
   private boolean pullStatus = true;
   private String nextUrl;
@@ -135,6 +140,9 @@ public class SalesforceExtractor extends RestApiExtractor {
   private final int fetchRetryLimit;
   private final int batchSize;
 
+  private final boolean pkChunkingSkipCountCheck;
+  private final boolean bulkApiUseQueryAll;
+
   public SalesforceExtractor(WorkUnitState state) {
     super(state);
     this.sfConnector = (SalesforceConnector) this.connector;
@@ -156,6 +164,9 @@ public class SalesforceExtractor extends RestApiExtractor {
         Math.max(MIN_PK_CHUNKING_SIZE,
             Math.min(MAX_PK_CHUNKING_SIZE, 
state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));
 
+    this.pkChunkingSkipCountCheck = 
state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK, 
DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK);
+    this.bulkApiUseQueryAll = state.getPropAsBoolean(BULK_API_USE_QUERY_ALL, 
DEFAULT_BULK_API_USE_QUERY_ALL);
+
     // Get batch size from .pull file
     int tmpBatchSize = 
state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
         ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE);
@@ -639,7 +650,8 @@ public class SalesforceExtractor extends RestApiExtractor {
     String hostName = 
this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
     String apiVersion = 
this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_VERSION);
     if (Strings.isNullOrEmpty(apiVersion)) {
-      apiVersion = "29.0";
+      // queryAll was introduced in version 39.0, so need to use a higher 
version when using queryAll with the bulk api
+      apiVersion = this.bulkApiUseQueryAll ? "42.0" : "29.0";
     }
 
     String soapAuthEndPoint = hostName + SALESFORCE_SOAP_SERVICE + "/" + 
apiVersion;
@@ -716,11 +728,11 @@ public class SalesforceExtractor extends RestApiExtractor 
{
 
       // Set bulk job attributes
       this.bulkJob.setObject(entity);
-      this.bulkJob.setOperation(OperationEnum.query);
+      this.bulkJob.setOperation(this.bulkApiUseQueryAll ? 
OperationEnum.queryAll : OperationEnum.query);
       this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel);
 
       // use pk chunking if pk chunking is configured and the expected record 
count is larger than the pk chunking size
-      if (this.pkChunking && getExpectedRecordCount() > this.pkChunkingSize) {
+      if (this.pkChunking && (this.pkChunkingSkipCountCheck || 
getExpectedRecordCount() > this.pkChunkingSize)) {
         log.info("Enabling pk chunking with size {}", this.pkChunkingSize);
         this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" 
+ this.pkChunkingSize);
         usingPkChunking = true;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d98f77c1/gradle/scripts/defaultBuildProperties.gradle
----------------------------------------------------------------------
diff --git a/gradle/scripts/defaultBuildProperties.gradle 
b/gradle/scripts/defaultBuildProperties.gradle
index ce95473..8509cc6 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -38,7 +38,7 @@ def BuildProperties BUILD_PROPERTIES = new 
BuildProperties(project)
     .register(new BuildProperty("pegasusVersion", "11.0.0", "Pegasus 
dependencies version"))
     .register(new BuildProperty("publishToMaven", false, "Enable publishing of 
artifacts to a central Maven repository"))
     .register(new BuildProperty("publishToNexus", false, "Enable publishing of 
artifacts to Nexus"))
-    .register(new BuildProperty("salesforceVersion", "37.0.3", "Salesforce 
dependencies version"))
+    .register(new BuildProperty("salesforceVersion", "42.0.0", "Salesforce 
dependencies version"))
 
 task buildProperties(description: 'Lists main properties that can be used to 
customize the build') << {
   BUILD_PROPERTIES.printHelp();

Reply via email to