Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 06bcc948c -> 859fadcc3


[GOBBLIN-265] Add support for PK chunking to gobblin-salesforce

Closes #2120 from htran1/salesforce_pk_chunking


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/859fadcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/859fadcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/859fadcc

Branch: refs/heads/master
Commit: 859fadcc3ace98789ccab1e965da0ef7818f20b7
Parents: 06bcc94
Author: Hung Tran <[email protected]>
Authored: Wed Sep 27 17:59:52 2017 -0700
Committer: Hung Tran <[email protected]>
Committed: Wed Sep 27 17:59:52 2017 -0700

----------------------------------------------------------------------
 .../gobblin/salesforce/SalesforceExtractor.java | 153 ++++++++++++++++---
 1 file changed, 133 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/859fadcc/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 22a0850..c0c340d 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -47,6 +47,7 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.sforce.async.AsyncApiException;
 import com.sforce.async.BatchInfo;
+import com.sforce.async.BatchInfoList;
 import com.sforce.async.BatchStateEnum;
 import com.sforce.async.BulkConnection;
 import com.sforce.async.ConcurrencyMode;
@@ -68,6 +69,7 @@ import 
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
 import org.apache.gobblin.source.extractor.exception.SchemaException;
 import org.apache.gobblin.source.extractor.extract.Command;
 import org.apache.gobblin.source.extractor.extract.CommandOutput;
+import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.source.jdbc.SqlQueryUtils;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
 import 
org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;
@@ -81,6 +83,8 @@ import org.apache.gobblin.source.extractor.utils.Utils;
 import org.apache.gobblin.source.extractor.watermark.Predicate;
 import org.apache.gobblin.source.extractor.watermark.WatermarkType;
 import org.apache.gobblin.source.workunit.WorkUnit;
+
+import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -96,6 +100,14 @@ public class SalesforceExtractor extends RestApiExtractor {
   private static final String SALESFORCE_HOUR_FORMAT = "HH";
   private static final String SALESFORCE_SOAP_AUTH_SERVICE = 
"/services/Soap/u";
   private static final Gson GSON = new Gson();
+  private static final int MAX_PK_CHUNKING_SIZE = 250000;
+  private static final int MIN_PK_CHUNKING_SIZE = 100000;
+  private static final int DEFAULT_PK_CHUNKING_SIZE = 200000;
+  private static final String ENABLE_PK_CHUNKING_KEY = 
"salesforce.enablePkChunking";
+  private static final String PK_CHUNKING_SIZE_KEY = 
"salesforce.pkChunkingSize";
+  private static final int MAX_RETRY_INTERVAL_SECS = 600;
+  // avoid using too many bulk API calls by only allowing PK chunking only if 
max partitions is configured <= this
+  private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
 
   private boolean pullStatus = true;
   private String nextUrl;
@@ -103,21 +115,40 @@ public class SalesforceExtractor extends RestApiExtractor 
{
   private BulkConnection bulkConnection = null;
   private boolean bulkApiInitialRun = true;
   private JobInfo bulkJob = new JobInfo();
-  private BatchInfo bulkBatchInfo = null;
   private BufferedReader bulkBufferedReader = null;
-  private List<String> bulkResultIdList = Lists.newArrayList();
+  private List<BatchIdAndResultId> bulkResultIdList = Lists.newArrayList();
   private int bulkResultIdCount = 0;
   private boolean bulkJobFinished = true;
   private List<String> bulkRecordHeader;
   private int bulkResultColumCount;
   private boolean newBulkResultSet = true;
   private int bulkRecordCount = 0;
+  private int prevBulkRecordCount = 0;
 
+  private final boolean pkChunking;
+  private final int pkChunkingSize;
   private final SalesforceConnector sfConnector;
 
   public SalesforceExtractor(WorkUnitState state) {
     super(state);
     this.sfConnector = (SalesforceConnector) this.connector;
+
+    // don't allow pk chunking if max partitions too high or have user 
specified partitions
+    if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, 
false)
+        || 
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
+        ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) > 
PK_CHUNKING_MAX_PARTITIONS_LIMIT) {
+      if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) {
+        log.warn("Max partitions too high, so PK chunking is not enabled");
+      }
+
+      this.pkChunking = false;
+    } else {
+      this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false);
+    }
+
+    this.pkChunkingSize =
+        Math.max(MIN_PK_CHUNKING_SIZE,
+            Math.min(MAX_PK_CHUNKING_SIZE, 
state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));
   }
 
   @Override
@@ -640,22 +671,30 @@ public class SalesforceExtractor extends RestApiExtractor 
{
 
   /**
    * Get Record set using salesforce specific API(Bulk API)
-   * @param schema/databasename
    * @param entity/tablename
-   * @param list of all predicate conditions
+   * @param predicateList of all predicate conditions
      * @return iterator with batch of records
    */
-  private List<String> getQueryResultIds(String entity, List<Predicate> 
predicateList) throws Exception {
+  private List<BatchIdAndResultId> getQueryResultIds(String entity, 
List<Predicate> predicateList) throws Exception {
     if (!bulkApiLogin()) {
       throw new IllegalArgumentException("Invalid Login");
     }
 
     try {
+      boolean usingPkChunking = false;
+
       // Set bulk job attributes
       this.bulkJob.setObject(entity);
       this.bulkJob.setOperation(OperationEnum.query);
       this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel);
 
+      // use pk chunking if pk chunking is configured and the expected record 
count is larger than the pk chunking size
+      if (this.pkChunking && getExpectedRecordCount() > this.pkChunkingSize) {
+        log.info("Enabling pk chunking with size {}", this.pkChunkingSize);
+        this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" 
+ this.pkChunkingSize);
+        usingPkChunking = true;
+      }
+
       // Result type as CSV
       this.bulkJob.setContentType(ContentType.CSV);
 
@@ -680,32 +719,54 @@ public class SalesforceExtractor extends RestApiExtractor 
{
       log.info("QUERY:" + query);
       ByteArrayInputStream bout = new 
ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
 
-      this.bulkBatchInfo = 
this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
+      BatchInfo bulkBatchInfo = 
this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
 
-      int retryInterval = 30 + (int) Math.ceil((float) 
this.getExpectedRecordCount() / 10000) * 2;
+      long expectedSizePerBatch = usingPkChunking ? this.pkChunkingSize : 
this.getExpectedRecordCount();
+
+      int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS,
+          30 + (int) Math.ceil((float) expectedSizePerBatch / 10000) * 2);
       log.info("Salesforce bulk api retry interval in seconds:" + 
retryInterval);
 
       // Get batch info with complete resultset (info id - refers to the 
resultset id corresponding to entire resultset)
-      this.bulkBatchInfo = 
this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
this.bulkBatchInfo.getId());
+      bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
bulkBatchInfo.getId());
 
-      while ((this.bulkBatchInfo.getState() != BatchStateEnum.Completed)
-          && (this.bulkBatchInfo.getState() != BatchStateEnum.Failed)) {
+      // wait for completion, failure, or formation of PK chunking batches
+      while ((bulkBatchInfo.getState() != BatchStateEnum.Completed)
+          && (bulkBatchInfo.getState() != BatchStateEnum.Failed)
+          && (usingPkChunking && bulkBatchInfo.getState() != 
BatchStateEnum.NotProcessed)) {
         Thread.sleep(retryInterval * 1000);
-        this.bulkBatchInfo = 
this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
this.bulkBatchInfo.getId());
-        log.debug("Bulk Api Batch Info:" + this.bulkBatchInfo);
+        bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
bulkBatchInfo.getId());
+        log.debug("Bulk Api Batch Info:" + bulkBatchInfo);
         log.info("Waiting for bulk resultSetIds");
       }
 
-      if (this.bulkBatchInfo.getState() == BatchStateEnum.Failed) {
+      // Wait for pk chunking batches
+      BatchInfoList batchInfoList = 
this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
+
+      if (usingPkChunking && bulkBatchInfo.getState() == 
BatchStateEnum.NotProcessed) {
+        bulkBatchInfo = waitForPkBatches(batchInfoList, retryInterval);
+      }
+
+      if (bulkBatchInfo.getState() == BatchStateEnum.Failed) {
         log.error("Bulk batch failed: " + bulkBatchInfo.toString());
-        throw new RuntimeException("Failed to get bulk batch info for jobId " 
+ this.bulkBatchInfo.getJobId()
-            + " error - " + this.bulkBatchInfo.getStateMessage());
+        throw new RuntimeException("Failed to get bulk batch info for jobId " 
+ bulkBatchInfo.getJobId()
+            + " error - " + bulkBatchInfo.getStateMessage());
+      }
+
+      // Get resultset ids of all the batches from the batch info list
+      List<BatchIdAndResultId> batchIdAndResultIdList = Lists.newArrayList();
+
+      for (BatchInfo bi : batchInfoList.getBatchInfo()) {
+        QueryResultList list = 
this.bulkConnection.getQueryResultList(this.bulkJob.getId(), bi.getId());
+
+        for (String result : list.getResult()) {
+          batchIdAndResultIdList.add(new BatchIdAndResultId(bi.getId(), 
result));
+        }
       }
 
-      // Get resultset ids from the batch info
-      QueryResultList list = 
this.bulkConnection.getQueryResultList(this.bulkJob.getId(), 
this.bulkBatchInfo.getId());
+      log.info("QueryResultList: " + batchIdAndResultIdList);
 
-      return Arrays.asList(list.getResult());
+      return batchIdAndResultIdList;
 
     } catch (RuntimeException | AsyncApiException | InterruptedException e) {
       throw new RuntimeException(
@@ -725,6 +786,12 @@ public class SalesforceExtractor extends RestApiExtractor {
       // if Buffer is empty then get stream for the new resultset id
       if (this.bulkBufferedReader == null || !this.bulkBufferedReader.ready()) 
{
 
+        // log the number of records from each result set after it is 
processed (bulkResultIdCount > 0)
+        if (this.bulkResultIdCount > 0) {
+          log.info("Result set {} had {} records", this.bulkResultIdCount,
+              this.bulkRecordCount - this.prevBulkRecordCount);
+        }
+
         // if there is unprocessed resultset id then get result stream for 
that id
         if (this.bulkResultIdCount < this.bulkResultIdList.size()) {
           log.info("Stream resultset for resultId:" + 
this.bulkResultIdList.get(this.bulkResultIdCount));
@@ -732,11 +799,13 @@ public class SalesforceExtractor extends RestApiExtractor 
{
           this.bulkBufferedReader =
               new BufferedReader(
                   new InputStreamReader(
-                      
this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), 
this.bulkBatchInfo.getId(),
-                          this.bulkResultIdList.get(this.bulkResultIdCount)),
+                      
this.bulkConnection.getQueryResultStream(this.bulkJob.getId(),
+                          
this.bulkResultIdList.get(this.bulkResultIdCount).getBatchId(),
+                          
this.bulkResultIdList.get(this.bulkResultIdCount).getResultId()),
                       ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
 
           this.bulkResultIdCount++;
+          this.prevBulkRecordCount = bulkRecordCount;
         } else {
           // if result stream processed for all resultset ids then finish the 
bulk job
           log.info("Bulk job is finished");
@@ -801,4 +870,48 @@ public class SalesforceExtractor extends RestApiExtractor {
     return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), 
RestApiCommandType.GET));
   }
 
+  /**
+   * Waits for the PK batches to complete. The wait will stop after all 
batches are complete or on the first failed batch
+   * @param batchInfoList list of batch info
+   * @param retryInterval the polling interval
+   * @return the last {@link BatchInfo} processed
+   * @throws InterruptedException
+   * @throws AsyncApiException
+   */
+  private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int 
retryInterval)
+      throws InterruptedException, AsyncApiException {
+    BatchInfo batchInfo = null;
+    BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
+
+    // Wait for all batches other than the first one. The first one is not 
processed in PK chunking mode
+    for (int i = 1; i < batchInfos.length; i++) {
+      BatchInfo bi = batchInfos[i];
+
+      // get refreshed job status
+      bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
+
+      while ((bi.getState() != BatchStateEnum.Completed)
+          && (bi.getState() != BatchStateEnum.Failed)) {
+        Thread.sleep(retryInterval * 1000);
+        bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
bi.getId());
+        log.debug("Bulk Api Batch Info:" + bi);
+        log.info("Waiting for bulk resultSetIds");
+      }
+
+      batchInfo = bi;
+
+      // exit if there was a failure
+      if (batchInfo.getState() == BatchStateEnum.Failed) {
+        break;
+      }
+    }
+
+    return batchInfo;
+  }
+
+  @Data
+  private static class BatchIdAndResultId {
+    private final String batchId;
+    private final String resultId;
+  }
 }

Reply via email to