This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d94e47  [GOBBLIN-865] Add feature that enables PK-chunking in 
partition
7d94e47 is described below

commit 7d94e4745c71f4b01c61828216fc90c626a9521a
Author: Alex Li <al...@linkedin.com>
AuthorDate: Wed Dec 11 22:13:51 2019 -0800

    [GOBBLIN-865] Add feature that enables PK-chunking in partition
    
    Closes #2722 from arekusuri/GOBBLIN-865-pk-
    chunking-partition
---
 .gitignore                                         |   9 +
 .../extract/restapi/RestApiExtractor.java          |   2 +-
 .../apache/gobblin/salesforce/ResultIterator.java  | 166 ++++++++++
 .../salesforce/SalesforceConfigurationKeys.java    |  21 +-
 .../gobblin/salesforce/SalesforceExtractor.java    | 366 ++++++++++++++-------
 .../gobblin/salesforce/SalesforceSource.java       | 139 +++++++-
 6 files changed, 567 insertions(+), 136 deletions(-)

diff --git a/.gitignore b/.gitignore
index 4982e54..5195ebb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
 .classpath*
 .project*
 .settings
+.DS_Store
+*.tar.gz
 
 # Intellij related
 **/.idea
@@ -62,5 +64,12 @@ package-lock.json
 
 # generated config files by tests
 **/generated-gobblin-cluster.conf
+ligradle/findbugs/findbugsInclude.xml
+ligradle/checkstyle/linkedin-checkstyle.xml
+ligradle/checkstyle/suppressions.xml
+gobblin-core/src/test/resources/serde/output-staging/
+gobblin-integration-test-log-dir/
+gobblin-modules/gobblin-elasticsearch/test-elasticsearch/
 
 temp/
+ligradle/*
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
index b9eeddd..b5fe56d 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
@@ -158,7 +158,7 @@ public abstract class RestApiExtractor extends 
QueryBasedExtractor<JsonArray, Js
       this.updatedQuery = buildDataQuery(inputQuery, entity);
       log.info("Schema:" + columnArray);
       this.setOutputSchema(columnArray);
-    } catch (RuntimeException | RestApiConnectionException | 
RestApiProcessingException | IOException
+    } catch (RuntimeException | RestApiProcessingException | 
RestApiConnectionException | IOException
         | SchemaException e) {
       throw new SchemaException("Failed to get schema using rest api; error - 
" + e.getMessage(), e);
     }
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
new file mode 100644
index 0000000..d6b5566
--- /dev/null
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+
+/**
+ * Result Iterator.
+ * Take jobId and 'batchId:resultId,batchId2:resultId2' as input build a 
result record iterator.
+ */
+@Slf4j
+public class ResultIterator implements Iterator {
+  private Iterator<ResultStruct> batchIdResultIdIterator;
+  private BulkConnection bulkConnection;
+  private InputStreamCSVReader csvReader;
+  private List<String> csvHeader;
+  private int columnSize;
+  private int urlLoadRetryLimit;
+  private ResultStruct resultStruct;
+  private List<String> currentRecord = null;
+  private Boolean isLoadedCurrentRecord = false;
+  private int currentFileRowCount = 0;
+  private int totalRowCount = 0;
+
+  /**
+   *  constructor
+   *  need to initiate the reader and currentRecord
+   */
+  public ResultIterator(BulkConnection bulkConnection, String jobId, String 
batchIdResultIdString, int urlLoadRetryLimit) {
+    this.urlLoadRetryLimit = urlLoadRetryLimit;
+    this.bulkConnection = bulkConnection;
+    this.batchIdResultIdIterator = this.parsebatchIdResultIdString(jobId, 
batchIdResultIdString);
+    if (this.batchIdResultIdIterator.hasNext()) {
+      this.resultStruct = this.batchIdResultIdIterator.next();
+      this.csvReader = this.fetchResultsetAsCsvReader(this.resultStruct); // 
first file reader
+    } else {
+      throw new RuntimeException("No batch-result id found.");
+    }
+    this.fulfillCurrentRecord();
+    this.csvHeader = this.currentRecord;
+    this.columnSize = this.csvHeader.size();
+    // after fetching cvs header, clean up status
+    this.resetCurrentRecordStatus();
+  }
+
+  /**
+   * call reader.next and set up currentRecord
+   */
+  private void fulfillCurrentRecord() {
+    if (this.isLoadedCurrentRecord) {
+      return; // skip, since CurrentRecord was loaded.
+    }
+    try {
+      this.currentRecord = this.csvReader.nextRecord();
+      if (this.currentRecord == null) { // according InputStreamCSVReader, it 
returns null at the end of the reader.
+        log.info("Fetched {} - rows: {}", this.resultStruct, 
this.currentFileRowCount); // print out log before switch result file
+        this.currentFileRowCount = 0; // clean up
+        if (this.batchIdResultIdIterator.hasNext()) { // if there is next 
file, load next file.
+          this.resultStruct = this.batchIdResultIdIterator.next();
+          this.csvReader = this.fetchResultsetAsCsvReader(resultStruct);
+          this.csvReader.nextRecord(); // read and ignore the csv header.
+          this.currentRecord = this.csvReader.nextRecord();
+        } else {
+          log.info("---- Fetched {} rows -----", this.totalRowCount); // print 
out log when all records were fetched.
+        }
+      }
+      this.isLoadedCurrentRecord = true;
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void resetCurrentRecordStatus() {
+    this.currentRecord = null;
+    this.isLoadedCurrentRecord = false;
+  }
+
+  @Override
+  public boolean hasNext() {
+    this.fulfillCurrentRecord();
+    return this.currentRecord != null;
+  }
+
+  @Override
+  public JsonElement next() {
+    this.fulfillCurrentRecord();
+    List<String> csvRecord = this.currentRecord;
+    this.resetCurrentRecordStatus();
+    if (csvRecord == null) {
+      throw new NoSuchElementException();
+    }
+    this.currentFileRowCount++;
+    this.totalRowCount++;
+    JsonObject jsonObject = Utils.csvToJsonObject(this.csvHeader, csvRecord, 
this.columnSize);
+    return jsonObject;
+  }
+
+  /**
+   * resultStruct has all data which identify a result set file
+   * fetch it and convert to a csvReader
+   */
+  private InputStreamCSVReader fetchResultsetAsCsvReader(ResultStruct 
resultStruct) {
+    String jobId = resultStruct.jobId;
+    String batchId = resultStruct.batchId;
+    String resultId = resultStruct.resultId;
+    log.info("PK-Chunking workunit: fetching [jobId={}, batchId={}, 
resultId={}]", jobId, batchId, resultId);
+    for (int i = 0; i < this.urlLoadRetryLimit; i++) { // retries
+      try {
+        InputStream is = this.bulkConnection.getQueryResultStream(jobId, 
batchId, resultId);
+        BufferedReader br = new BufferedReader(new InputStreamReader(is, 
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+        return new InputStreamCSVReader(br);
+      } catch (Exception e) { // skip, for retry
+      }
+    }
+    // tried fetchRetryLimit times, always getting exception
+    throw new RuntimeException("Tried " + this.urlLoadRetryLimit + " times, 
but couldn't fetch data.");
+  }
+
+  /**
+   * input string format is "batchId:resultId,batchId2:resultId2"
+   * parse it to iterator
+   */
+  private Iterator<ResultStruct> parsebatchIdResultIdString(String jobId, 
String batchIdResultIdString) {
+    return Arrays.stream(batchIdResultIdString.split(",")).map(x -> 
x.split(":")).map(x -> new ResultStruct(jobId, x[0], x[1])).iterator();
+  }
+
+  @Data
+  static class ResultStruct {
+    private final String jobId;
+    private final String batchId;
+    private final String resultId;
+    public String toString() {
+      return String.format("[jobId=%s, batchId=%s, resultId=%s]", jobId, 
batchId, resultId);
+    }
+  }
+
+}
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
index e868cd6..d5fe3f7 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
@@ -17,9 +17,24 @@
 
 package org.apache.gobblin.salesforce;
 
-public class SalesforceConfigurationKeys {
+public final class SalesforceConfigurationKeys {
+  private SalesforceConfigurationKeys() {
+  }
   public static final String 
SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED =
       "source.querybased.salesforce.is.soft.deletes.pull.disabled";
-  public static final int DEFAULT_SALESFORCE_MAX_CHARS_IN_FILE = 200000000;
-  public static final int DEFAULT_SALESFORCE_MAX_ROWS_IN_FILE = 1000000;
+  public static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
+  public static final String BULK_API_USE_QUERY_ALL = 
"salesforce.bulkApiUseQueryAll";
+
+  // pk-chunking
+  public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = 
"salesforce.pkChunking.testBatchIdList";
+  public static final String PK_CHUNKING_TEST_JOB_ID = 
"salesforce.pkChunking.testJobId";
+  public static final String SALESFORCE_PARTITION_TYPE = 
"salesforce.partitionType";
+  public static final String PARTITION_PK_CHUNKING_SIZE = 
"salesforce.partition.pkChunkingSize";
+  public static final String PK_CHUNKING_JOB_ID = "_salesforce.job.id";
+  public static final String PK_CHUNKING_BATCH_RESULT_IDS = 
"_salesforce.result.ids";
+  public static final int MAX_PK_CHUNKING_SIZE = 250_000; // this number is 
from SFDC's doc - https://tinyurl.com/ycjvgwv2
+  public static final int MIN_PK_CHUNKING_SIZE = 20_000;
+  public static final int DEFAULT_PK_CHUNKING_SIZE = 250_000; // default to 
max for saving request quota
 }
+
+
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index b0d444c..36186e5 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.salesforce;
 
+import com.google.common.collect.Iterators;
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -33,7 +34,7 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 
-import org.apache.gobblin.salesforce.SalesforceConfigurationKeys;
+import java.util.stream.Stream;
 import org.apache.http.HttpEntity;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.methods.HttpGet;
@@ -71,7 +72,6 @@ import 
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
 import org.apache.gobblin.source.extractor.exception.SchemaException;
 import org.apache.gobblin.source.extractor.extract.Command;
 import org.apache.gobblin.source.extractor.extract.CommandOutput;
-import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.source.jdbc.SqlQueryUtils;
 import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
 import 
org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;
@@ -89,6 +89,8 @@ import org.apache.gobblin.source.workunit.WorkUnit;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
+import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
+
 
 /**
  * An implementation of salesforce extractor for extracting data from SFDC
@@ -102,20 +104,9 @@ public class SalesforceExtractor extends RestApiExtractor {
   private static final String SALESFORCE_HOUR_FORMAT = "HH";
   private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u";
   private static final Gson GSON = new Gson();
-  private static final int MAX_PK_CHUNKING_SIZE = 250000;
-  private static final int MIN_PK_CHUNKING_SIZE = 100000;
-  private static final int DEFAULT_PK_CHUNKING_SIZE = 200000;
-  private static final String ENABLE_PK_CHUNKING_KEY = 
"salesforce.enablePkChunking";
-  private static final String PK_CHUNKING_SIZE_KEY = 
"salesforce.pkChunkingSize";
   private static final int MAX_RETRY_INTERVAL_SECS = 600;
-  // avoid using too many bulk API calls by only allowing PK chunking only if 
max partitions is configured <= this
-  private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
   private static final String FETCH_RETRY_LIMIT_KEY = 
"salesforce.fetchRetryLimit";
-  private static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
-  private static final String BULK_API_USE_QUERY_ALL = 
"salesforce.bulkApiUseQueryAll";
   private static final boolean DEFAULT_BULK_API_USE_QUERY_ALL = false;
-  private static final String PK_CHUNKING_SKIP_COUNT_CHECK = 
"salesforce.pkChunkingSkipCountCheck";
-  private static final boolean DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK = false;
 
 
   private boolean pullStatus = true;
@@ -135,45 +126,29 @@ public class SalesforceExtractor extends RestApiExtractor 
{
   private int prevBulkRecordCount = 0;
   private List<String> csvRecord;
 
-  private final boolean pkChunking;
   private final int pkChunkingSize;
   private final SalesforceConnector sfConnector;
   private final int fetchRetryLimit;
   private final int batchSize;
 
-  private final boolean pkChunkingSkipCountCheck;
   private final boolean bulkApiUseQueryAll;
 
   public SalesforceExtractor(WorkUnitState state) {
     super(state);
-    this.sfConnector = (SalesforceConnector) this.connector;
-
-    // don't allow pk chunking if max partitions too high or have user 
specified partitions
-    if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, 
false)
-        || 
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
-        ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) > 
PK_CHUNKING_MAX_PARTITIONS_LIMIT) {
-      if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) {
-        log.warn("Max partitions too high, so PK chunking is not enabled");
-      }
-
-      this.pkChunking = false;
-    } else {
-      this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false);
-    }
 
+    this.sfConnector = (SalesforceConnector) this.connector;
     this.pkChunkingSize =
         Math.max(MIN_PK_CHUNKING_SIZE,
-            Math.min(MAX_PK_CHUNKING_SIZE, 
state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));
+            Math.min(MAX_PK_CHUNKING_SIZE, 
workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, 
DEFAULT_PK_CHUNKING_SIZE)));
 
-    this.pkChunkingSkipCountCheck = 
state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK, 
DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK);
-    this.bulkApiUseQueryAll = state.getPropAsBoolean(BULK_API_USE_QUERY_ALL, 
DEFAULT_BULK_API_USE_QUERY_ALL);
+    this.bulkApiUseQueryAll = 
workUnitState.getPropAsBoolean(BULK_API_USE_QUERY_ALL, 
DEFAULT_BULK_API_USE_QUERY_ALL);
 
     // Get batch size from .pull file
-    int tmpBatchSize = 
state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
+    int tmpBatchSize = 
workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
         ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE);
 
     this.batchSize = tmpBatchSize == 0 ? 
ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize;
-    this.fetchRetryLimit = state.getPropAsInt(FETCH_RETRY_LIMIT_KEY, 
DEFAULT_FETCH_RETRY_LIMIT);
+    this.fetchRetryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY, 
DEFAULT_FETCH_RETRY_LIMIT);
   }
 
   @Override
@@ -184,14 +159,14 @@ public class SalesforceExtractor extends RestApiExtractor 
{
   /**
    * true is further pull required else false
    */
-  public void setPullStatus(boolean pullStatus) {
+  private void setPullStatus(boolean pullStatus) {
     this.pullStatus = pullStatus;
   }
 
   /**
    * url for the next pull from salesforce
    */
-  public void setNextUrl(String nextUrl) {
+  private void setNextUrl(String nextUrl) {
     this.nextUrl = nextUrl;
   }
 
@@ -203,11 +178,11 @@ public class SalesforceExtractor extends RestApiExtractor 
{
     this.bulkJobFinished = bulkJobFinished;
   }
 
-  public boolean isNewBulkResultSet() {
+  private boolean isNewBulkResultSet() {
     return this.newBulkResultSet;
   }
 
-  public void setNewBulkResultSet(boolean newBulkResultSet) {
+  private void setNewBulkResultSet(boolean newBulkResultSet) {
     this.newBulkResultSet = newBulkResultSet;
   }
 
@@ -303,7 +278,7 @@ public class SalesforceExtractor extends RestApiExtractor {
     }
     query = SqlQueryUtils.addPredicate(query, defaultPredicate);
     query = query + defaultSortOrder;
-    log.info("QUERY: " + query);
+    log.info("getHighWatermarkMetadata - QUERY: " + query);
 
     try {
       return 
constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query)));
@@ -326,7 +301,7 @@ public class SalesforceExtractor extends RestApiExtractor {
     }
 
     JsonElement element = GSON.fromJson(output, JsonObject.class);
-    long high_ts;
+    long highTs;
     try {
       JsonObject jsonObject = element.getAsJsonObject();
       JsonArray jsonArray = jsonObject.getAsJsonArray("records");
@@ -344,15 +319,15 @@ public class SalesforceExtractor extends RestApiExtractor 
{
           log.error("ParseException: " + e.getMessage(), e);
         }
         SimpleDateFormat outFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-        high_ts = Long.parseLong(outFormat.format(date));
+        highTs = Long.parseLong(outFormat.format(date));
       } else {
-        high_ts = Long.parseLong(value);
+        highTs = Long.parseLong(value);
       }
 
     } catch (Exception e) {
       throw new HighWatermarkException("Failed to get high watermark from 
salesforce; error - " + e.getMessage(), e);
     }
-    return high_ts;
+    return highTs;
   }
 
   @Override
@@ -384,7 +359,7 @@ public class SalesforceExtractor extends RestApiExtractor {
       }
 
       query = query + getLimitFromInputQuery(this.updatedQuery);
-      log.info("QUERY: " + query);
+      log.info("getCountMetadata - QUERY: " + query);
       return 
constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query)));
     } catch (Exception e) {
       throw new RecordCountException("Failed to get salesforce url for record 
count; error - " + e.getMessage(), e);
@@ -421,11 +396,11 @@ public class SalesforceExtractor extends RestApiExtractor 
{
     String query = this.updatedQuery;
     String url = null;
     try {
-      if (this.getNextUrl() != null && this.pullStatus == true) {
+      if (this.getNextUrl() != null && this.pullStatus) {
         url = this.getNextUrl();
       } else {
         if (isNullPredicate(predicateList)) {
-          log.info("QUERY:" + query);
+          log.info("getDataMetaData null predicate - QUERY:" + query);
           return 
constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query)));
         }
 
@@ -443,7 +418,7 @@ public class SalesforceExtractor extends RestApiExtractor {
         }
 
         query = query + limitString;
-        log.info("QUERY: " + query);
+        log.info("getDataMetadata - QUERY: " + query);
         url = this.sfConnector.getFullUri(getSoqlUrl(query));
       }
       return constructGetCommand(url);
@@ -556,27 +531,27 @@ public class SalesforceExtractor extends RestApiExtractor 
{
   @Override
   public String getHourPredicateCondition(String column, long value, String 
valueFormat, String operator) {
     log.info("Getting hour predicate from salesforce");
-    String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_HOUR_FORMAT);
-    return column + " " + operator + " " + Formattedvalue;
+    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_HOUR_FORMAT);
+    return column + " " + operator + " " + formattedvalue;
   }
 
   @Override
   public String getDatePredicateCondition(String column, long value, String 
valueFormat, String operator) {
     log.info("Getting date predicate from salesforce");
-    String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_DATE_FORMAT);
-    return column + " " + operator + " " + Formattedvalue;
+    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_DATE_FORMAT);
+    return column + " " + operator + " " + formattedvalue;
   }
 
   @Override
   public String getTimestampPredicateCondition(String column, long value, 
String valueFormat, String operator) {
     log.info("Getting timestamp predicate from salesforce");
-    String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_TIMESTAMP_FORMAT);
-    return column + " " + operator + " " + Formattedvalue;
+    String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), 
valueFormat, SALESFORCE_TIMESTAMP_FORMAT);
+    return column + " " + operator + " " + formattedvalue;
   }
 
   @Override
   public Map<String, String> getDataTypeMap() {
-    Map<String, String> dataTypeMap = ImmutableMap.<String, String> 
builder().put("url", "string")
+    Map<String, String> dataTypeMap = ImmutableMap.<String, 
String>builder().put("url", "string")
         .put("textarea", "string").put("reference", "string").put("phone", 
"string").put("masterrecord", "string")
         .put("location", "string").put("id", "string").put("encryptedstring", 
"string").put("email", "string")
         .put("DataCategoryGroupReference", "string").put("calculated", 
"string").put("anyType", "string")
@@ -588,16 +563,43 @@ public class SalesforceExtractor extends RestApiExtractor 
{
     return dataTypeMap;
   }
 
+
+  private Boolean isPkChunkingFetchDone = false;
+
+  private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) 
throws RuntimeException {
+    if (isPkChunkingFetchDone) {
+      return null; // must return null to represent no more data.
+    }
+    isPkChunkingFetchDone = true; // set to true, never come here twice.
+
+    try {
+      if (!bulkApiLogin()) {
+        throw new IllegalArgumentException("Invalid Login");
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+    String batchIdResultIdString = 
workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS);
+    return new ResultIterator(bulkConnection, jobId, batchIdResultIdString, 
fetchRetryLimit);
+  }
+
   @Override
   public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String 
entity, WorkUnit workUnit,
       List<Predicate> predicateList) throws IOException {
     log.debug("Getting salesforce data using bulk api");
-    RecordSet<JsonElement> rs = null;
 
+    // new version of extractor: bulk api with pk-chunking in pre-partitioning 
of SalesforceSource
+    if (workUnit.contains(PK_CHUNKING_JOB_ID)) {
+      log.info("----pk-chunking get record set----" + 
workUnit.getProp(PK_CHUNKING_JOB_ID));
+      return getRecordSetPkChunking(workUnit);
+    }
+    log.info("----bulk get record set----");
     try {
       //Get query result ids in the first run
       //result id is used to construct url while fetching data
-      if (this.bulkApiInitialRun == true) {
+      if (this.bulkApiInitialRun) {
         // set finish status to false before starting the bulk job
         this.setBulkJobFinished(false);
         this.bulkResultIdList = getQueryResultIds(entity, predicateList);
@@ -607,6 +609,7 @@ public class SalesforceExtractor extends RestApiExtractor {
       // Get data from input stream
       // If bulk load is not finished, get data from the stream
       // Skip empty result sets since they will cause the extractor to 
terminate early
+      RecordSet<JsonElement> rs = null;
       while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) {
         rs = getBulkData();
       }
@@ -615,11 +618,11 @@ public class SalesforceExtractor extends RestApiExtractor 
{
       this.bulkApiInitialRun = false;
 
       // If bulk job is finished, get soft deleted records using Rest API
-      boolean isSoftDeletesPullDisabled = Boolean.valueOf(this.workUnit
-          
.getProp(SalesforceConfigurationKeys.SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED));
-      if (rs == null || rs.isEmpty()) {
+      boolean disableSoftDeletePull = 
this.workUnit.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED);
+      if (rs == null || rs.isEmpty()) { // TODO: Only when the rs is empty, we 
need to fetch soft deleted records. WHY?
         // Get soft delete records only if IsDeleted column exists and soft 
deletes pull is not disabled
-        if (this.columnList.contains("IsDeleted") && 
!isSoftDeletesPullDisabled) {
+        if (this.columnList.contains("IsDeleted") && !disableSoftDeletePull) {
+          log.info("Pull soft delete records");
           return this.getSoftDeletedRecords(schema, entity, workUnit, 
predicateList);
         }
         log.info("Ignoring soft delete records");
@@ -634,7 +637,7 @@ public class SalesforceExtractor extends RestApiExtractor {
 
   /**
    * Get soft deleted records using Rest Api
-     * @return iterator with deleted records
+   * @return iterator with deleted records
    */
   private Iterator<JsonElement> getSoftDeletedRecords(String schema, String 
entity, WorkUnit workUnit,
       List<Predicate> predicateList) throws DataRecordException {
@@ -645,7 +648,7 @@ public class SalesforceExtractor extends RestApiExtractor {
    * Login to salesforce
    * @return login status
    */
-  public boolean bulkApiLogin() throws Exception {
+  private boolean bulkApiLogin() throws Exception {
     log.info("Authenticating salesforce bulk api");
     boolean success = false;
     String hostName = 
this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
@@ -705,6 +708,27 @@ public class SalesforceExtractor extends RestApiExtractor {
   }
 
   /**
+   * same as getQueryResultIdsPkChunking but the arguments are different.
+   * this function can take existing batch ids to return 
JobIdAndBatchIdResultIdList
+   * It is for test/debug. developers may want to skip execute query on SFDC, 
use a list of existing batch ids
+   */
+  public JobIdAndBatchIdResultIdList 
getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) {
+    try {
+      if (!bulkApiLogin()) {
+        throw new IllegalArgumentException("Invalid Login");
+      }
+      int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS * 1000, 30 + 
this.pkChunkingSize  * 2);
+      if (batchIdListStr != null) {
+        log.info("The batchId was specified.");
+        return retrievePkChunkingResultIdsByBatchId(this.bulkConnection, 
jobId, batchIdListStr);
+      } else {
+        return retrievePkChunkingResultIds(this.bulkConnection, jobId, 
retryInterval);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+  /**
    * get BulkConnection instance
    * @return
    */
@@ -713,10 +737,79 @@ public class SalesforceExtractor extends RestApiExtractor 
{
   }
 
   /**
+   *  this function currently only for pk-chunking. it is from 
getQueryResultIds
+   *  TODO: abstract this function to a common function: arguments need to add 
connetion, header, output-format
+   *  TODO: make it and its related functions pure function (no side effect). 
Currently still unnecesarily changing this.bulkJob)
+   */
+  public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunking(String 
entity, List<Predicate> predicateList) {
+    try {
+      if (!bulkApiLogin()) {
+        throw new IllegalArgumentException("Invalid Login");
+      }
+      BulkConnection connection = this.bulkConnection;
+      JobInfo jobRequest = new JobInfo();
+      jobRequest.setObject(entity);
+      jobRequest.setOperation(OperationEnum.queryAll);
+      jobRequest.setConcurrencyMode(ConcurrencyMode.Parallel);
+      log.info("Enabling pk chunking with size {}", this.pkChunkingSize);
+      connection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" + 
this.pkChunkingSize);
+      // Result type as CSV
+      jobRequest.setContentType(ContentType.CSV);
+      JobInfo createdJob = connection.createJob(jobRequest);
+      log.info("Created bulk job: {}", createdJob.getId());
+      this.bulkJob = createdJob; // other functions need to use it TODO: 
remove bulkJob from this class
+      String jobId = createdJob.getId();
+      JobInfo jobResponse = connection.getJobStatus(jobId);
+      // Construct query with the predicates
+      String query = this.updatedQuery;
+      if (!isNullPredicate(predicateList)) {
+        String limitString = getLimitFromInputQuery(query);
+        query = query.replace(limitString, "");
+        Iterator<Predicate> i = predicateList.listIterator();
+        while (i.hasNext()) {
+          Predicate predicate = i.next();
+          query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
+        }
+        query = query + limitString;
+      }
+      log.info("Submitting PK Chunking query:" + query);
+      ByteArrayInputStream bout = new 
ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+
+      BatchInfo executeQueryBatch = 
connection.createBatchFromStream(jobResponse, bout);
+      String pkChunkingBatchId = executeQueryBatch.getId();
+
+      int waitMilliSecond = 60 * 1000;
+
+      // Get batch info with complete resultset (info id - refers to the 
resultset id corresponding to entire resultset)
+      BatchInfo batchResponse = connection.getBatchInfo(jobId, 
pkChunkingBatchId);
+
+      // wait for completion, failure, or formation of PK chunking batches
+      // user query will be submitted to sfdc and create the first batch,
+      // It is supposed to create more batch after the initial batch
+      BatchStateEnum batchState = batchResponse.getState();
+      while (batchState == BatchStateEnum.InProgress || batchState == 
BatchStateEnum.Queued) {
+        Thread.sleep(waitMilliSecond);
+        batchResponse = connection.getBatchInfo(createdJob.getId(), 
executeQueryBatch.getId());
+        log.info("Waiting for first batch (jobId={}, pkChunkingBatchId={})", 
jobId, pkChunkingBatchId);
+        batchState = batchResponse.getState();
+      }
+
+      if (batchResponse.getState() == BatchStateEnum.Failed) {
+        log.error("Bulk batch failed: " + batchResponse.toString());
+        throw new Exception("Failed to get bulk batch info for jobId " + jobId 
+ " error - " + batchResponse.getStateMessage());
+      }
+      JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = 
retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
+      return jobIdAndBatchIdResultIdList;
+    } catch (Exception e) {
+      throw new RuntimeException("getQueryResultIdsPkChunking: error - " + 
e.getMessage(), e);
+    }
+  }
+
+  /**
    * Get Record set using salesforce specific API(Bulk API)
    * @param entity/tablename
    * @param predicateList of all predicate conditions
-     * @return iterator with batch of records
+   * @return iterator with batch of records
    */
   private List<BatchIdAndResultId> getQueryResultIds(String entity, 
List<Predicate> predicateList) throws Exception {
     if (!bulkApiLogin()) {
@@ -724,24 +817,17 @@ public class SalesforceExtractor extends RestApiExtractor 
{
     }
 
     try {
-      boolean usingPkChunking = false;
-
       // Set bulk job attributes
       this.bulkJob.setObject(entity);
       this.bulkJob.setOperation(this.bulkApiUseQueryAll ? 
OperationEnum.queryAll : OperationEnum.query);
       this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel);
 
-      // use pk chunking if pk chunking is configured and the expected record 
count is larger than the pk chunking size
-      if (this.pkChunking && (this.pkChunkingSkipCountCheck || 
getExpectedRecordCount() > this.pkChunkingSize)) {
-        log.info("Enabling pk chunking with size {}", this.pkChunkingSize);
-        this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" 
+ this.pkChunkingSize);
-        usingPkChunking = true;
-      }
-
       // Result type as CSV
       this.bulkJob.setContentType(ContentType.CSV);
 
       this.bulkJob = this.bulkConnection.createJob(this.bulkJob);
+      log.info("Created bulk job [jobId={}]", this.bulkJob.getId());
+
       this.bulkJob = this.bulkConnection.getJobStatus(this.bulkJob.getId());
 
       // Construct query with the predicates
@@ -759,38 +845,30 @@ public class SalesforceExtractor extends RestApiExtractor 
{
         query = query + limitString;
       }
 
-      log.info("QUERY:" + query);
+      log.info("getQueryResultIds - QUERY:" + query);
       ByteArrayInputStream bout = new 
ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
 
       BatchInfo bulkBatchInfo = 
this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
 
-      long expectedSizePerBatch = usingPkChunking ? this.pkChunkingSize : 
this.getExpectedRecordCount();
-
-      int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS,
-          30 + (int) Math.ceil((float) expectedSizePerBatch / 10000) * 2);
-      log.info("Salesforce bulk api retry interval in seconds:" + 
retryInterval);
+      int waitMilliSeconds = 60 * 1000; // wait 1 minute
 
       // Get batch info with complete resultset (info id - refers to the 
resultset id corresponding to entire resultset)
       bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
bulkBatchInfo.getId());
 
       // wait for completion, failure, or formation of PK chunking batches
-      while ((bulkBatchInfo.getState() != BatchStateEnum.Completed)
-          && (bulkBatchInfo.getState() != BatchStateEnum.Failed)
-          && (!usingPkChunking || bulkBatchInfo.getState() != 
BatchStateEnum.NotProcessed)) {
-        Thread.sleep(retryInterval * 1000);
+      // if it is InProgress or Queued, continue to wait.
+      while (bulkBatchInfo.getState() == BatchStateEnum.InProgress || 
bulkBatchInfo.getState() == BatchStateEnum.Queued) {
+        Thread.sleep(waitMilliSeconds);
         bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
bulkBatchInfo.getId());
-        log.debug("Bulk Api Batch Info:" + bulkBatchInfo);
         log.info("Waiting for bulk resultSetIds");
       }
 
       // Wait for pk chunking batches
       BatchInfoList batchInfoList = 
this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
-
-      if (usingPkChunking && bulkBatchInfo.getState() == 
BatchStateEnum.NotProcessed) {
-        bulkBatchInfo = waitForPkBatches(batchInfoList, retryInterval);
-      }
-
-      if (bulkBatchInfo.getState() == BatchStateEnum.Failed) {
+      BatchStateEnum state = bulkBatchInfo.getState();
+      // not sure if the state can be "NotProcessed" in non-pk-chunking bulk 
request
+      // SFDC doc says - This state is assigned when a job is aborted while 
the batch is queued
+      if (state == BatchStateEnum.Failed || state == 
BatchStateEnum.InProgress) {
         log.error("Bulk batch failed: " + bulkBatchInfo.toString());
         throw new RuntimeException("Failed to get bulk batch info for jobId " 
+ bulkBatchInfo.getJobId()
             + " error - " + bulkBatchInfo.getStateMessage());
@@ -817,6 +895,7 @@ public class SalesforceExtractor extends RestApiExtractor {
     }
   }
 
+
   /**
    * Get a buffered reader wrapping the query result stream for the result 
with the specified index
    * @param index index the {@link #bulkResultIdList}
@@ -904,6 +983,8 @@ public class SalesforceExtractor extends RestApiExtractor {
     }
   }
 
+
+
   /**
    * Fetch a result batch with retry for network errors
    * @param rs the {@link RecordSetList} to fetch into
@@ -938,7 +1019,7 @@ public class SalesforceExtractor extends RestApiExtractor {
 
   /**
    * Get data from the bulk api input stream
-     * @return record set with each record as a JsonObject
+   * @return record set with each record as a JsonObject
    */
   private RecordSet<JsonElement> getBulkData() throws DataRecordException {
     log.debug("Processing bulk api batch...");
@@ -987,53 +1068,82 @@ public class SalesforceExtractor extends 
RestApiExtractor {
   @Override
   public void closeConnection() throws Exception {
     if (this.bulkConnection != null
-        && 
!this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed"))
 {
+        && 
!this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed"))
 {
       log.info("Closing salesforce bulk job connection");
-      this.bulkConnection.closeJob(this.bulkJob.getId());
+      this.bulkConnection.closeJob(this.getBulkJobId());
     }
   }
 
-  public static List<Command> constructGetCommand(String restQuery) {
+  private static List<Command> constructGetCommand(String restQuery) {
     return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), 
RestApiCommandType.GET));
   }
 
+
+  private JobIdAndBatchIdResultIdList 
retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId, 
String batchIdListStr) {
+    Iterator<String> batchIds = Arrays.stream(batchIdListStr.split(",")).map(x 
-> x.trim()).filter(x -> !x.equals("")).iterator();
+    try {
+      List<BatchIdAndResultId> batchIdAndResultIdList = 
fetchBatchResultIds(connection, jobId, batchIds);
+      return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /**
    * Waits for the PK batches to complete. The wait will stop after all 
batches are complete or on the first failed batch
-   * @param batchInfoList list of batch info
-   * @param retryInterval the polling interval
-   * @return the last {@link BatchInfo} processed
-   * @throws InterruptedException
-   * @throws AsyncApiException
    */
-  private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int 
retryInterval)
-      throws InterruptedException, AsyncApiException {
-    BatchInfo batchInfo = null;
-    BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
-
-    // Wait for all batches other than the first one. The first one is not 
processed in PK chunking mode
-    for (int i = 1; i < batchInfos.length; i++) {
-      BatchInfo bi = batchInfos[i];
-
-      // get refreshed job status
-      bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
-
-      while ((bi.getState() != BatchStateEnum.Completed)
-          && (bi.getState() != BatchStateEnum.Failed)) {
-        Thread.sleep(retryInterval * 1000);
-        bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), 
bi.getId());
-        log.debug("Bulk Api Batch Info:" + bi);
-        log.info("Waiting for bulk resultSetIds");
+  private JobIdAndBatchIdResultIdList 
retrievePkChunkingResultIds(BulkConnection connection, String jobId, int 
waitMilliSecond) {
+    log.info("Waiting for completion of the the bulk job [jobId={}])'s sub 
queries.", jobId);
+    try {
+      while (true) {
+        BatchInfoList batchInfoList = connection.getBatchInfoList(jobId);
+        BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
+        if (needContinueToPoll(batchInfos, waitMilliSecond)) {
+          continue; // continue to wait
+        }
+        if (Arrays.stream(batchInfos).filter(x -> x.getState() == 
BatchStateEnum.NotProcessed).count() != 1) {
+          throw new Exception("PK-Chunking query should have 1 and only 1 
batch with state=NotProcessed.");
+        }
+        Stream<BatchInfo> stream = Arrays.stream(batchInfos);
+        Iterator<String> batchIds = stream.filter(x -> 
x.getNumberRecordsProcessed() != 0).map(x -> x.getId()).iterator();
+        List<BatchIdAndResultId> batchIdAndResultIdList = 
fetchBatchResultIds(connection, jobId, batchIds);
+        return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList);
       }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 
-      batchInfo = bi;
+  private List<BatchIdAndResultId> fetchBatchResultIds(BulkConnection 
connection, String jobId, Iterator<String> batchIds) throws Exception {
+    List<BatchIdAndResultId> batchIdAndResultIdList = Lists.newArrayList();
+    while (batchIds.hasNext()) {
+      String batchId = batchIds.next();
+      QueryResultList result = connection.getQueryResultList(jobId, batchId);
+      Stream<String> stream = Arrays.stream(result.getResult());
+      Iterator<BatchIdAndResultId> it = stream.map(rId -> new 
BatchIdAndResultId(batchId, rId)).iterator();
+      Iterators.addAll(batchIdAndResultIdList, it);
+    }
+    return batchIdAndResultIdList;
+  }
 
-      // exit if there was a failure
-      if (batchInfo.getState() == BatchStateEnum.Failed) {
-        break;
+  private Boolean needContinueToPoll(BatchInfo[] batchInfos, long toWait) {
+    long queued = Arrays.stream(batchInfos).filter(x -> x.getState() == 
BatchStateEnum.Queued).count();
+    long inProgress = Arrays.stream(batchInfos).filter(x -> x.getState() == 
BatchStateEnum.InProgress).count();
+    for (BatchInfo bi : batchInfos) {
+      BatchStateEnum state = bi.getState();
+      if (state == BatchStateEnum.InProgress || state == 
BatchStateEnum.Queued) {
+        try {
+          log.info("Total: {}, queued: {}, InProgress: {}, waiting for 
[batchId: {}, state: {}]", batchInfos.length, queued, inProgress, bi.getId(), 
state);
+          Thread.sleep(toWait);
+        } catch (InterruptedException e) { // skip
+        }
+        return true; // need to continue to wait
+      }
+      if (state == BatchStateEnum.Failed) {
+        throw new RuntimeException(String.format("[batchId=%s] failed", 
bi.getId()));
       }
     }
-
-    return batchInfo;
+    return false; // no need to wait
   }
 
   //Moving config creation in a separate method for custom config parameters 
like setting up transport factory.
@@ -1056,9 +1166,19 @@ public class SalesforceExtractor extends 
RestApiExtractor {
     return config;
   }
 
+  public String getBulkJobId() {
+    return workUnit.getProp(PK_CHUNKING_JOB_ID, this.bulkJob.getId());
+  }
+
   @Data
-  private static class BatchIdAndResultId {
+  public static class BatchIdAndResultId {
     private final String batchId;
     private final String resultId;
   }
+
+  @Data
+  public static class JobIdAndBatchIdResultIdList {
+    private final String jobId;
+    private final List<BatchIdAndResultId> batchIdAndResultIdList;
+  }
 }
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index aedd97b..96ad4b7 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.salesforce;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.ArrayList;
@@ -30,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.text.StrSubstitutor;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 
@@ -65,7 +67,9 @@ import 
org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
 import org.apache.gobblin.source.extractor.partition.Partition;
 import org.apache.gobblin.source.extractor.partition.Partitioner;
 import org.apache.gobblin.source.extractor.utils.Utils;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
 import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
 import lombok.AllArgsConstructor;
@@ -73,13 +77,16 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
+import 
org.apache.gobblin.salesforce.SalesforceExtractor.JobIdAndBatchIdResultIdList;
+import org.apache.gobblin.salesforce.SalesforceExtractor.BatchIdAndResultId;
 
 /**
  * An implementation of {@link QueryBasedSource} for salesforce data sources.
  */
 @Slf4j
 public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> 
{
-
   public static final String USE_ALL_OBJECTS = "use.all.objects";
   public static final boolean DEFAULT_USE_ALL_OBJECTS = false;
 
@@ -146,12 +153,121 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
 
   @Override
   protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
+    List<WorkUnit> workUnits = null;
+    String partitionType = state.getProp(SALESFORCE_PARTITION_TYPE, "");
+    if (partitionType.equals("PK_CHUNKING")) {
+      // pk-chunking only supports start-time by 
source.querybased.start.value, and does not support end-time.
+      // always ingest data later than or equal source.querybased.start.value.
+      // we should only pk chunking based work units only in case of 
snapshot/full ingestion
+      workUnits = generateWorkUnitsPkChunking(sourceEntity, state, 
previousWatermark);
+    } else {
+      workUnits = generateWorkUnitsStrategy(sourceEntity, state, 
previousWatermark);
+    }
+    log.info("====Generated {} workUnit(s)====", workUnits.size());
+    return workUnits;
+  }
+
+  /**
+   * generate workUnit for pk chunking
+   */
+  private List<WorkUnit> generateWorkUnitsPkChunking(SourceEntity 
sourceEntity, SourceState state, long previousWatermark) {
+    JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = 
executeQueryWithPkChunking(state, previousWatermark);
+    return createWorkUnits(sourceEntity, state, jobIdAndBatchIdResultIdList);
+  }
+
+  private JobIdAndBatchIdResultIdList executeQueryWithPkChunking(
+      SourceState sourceState,
+      long previousWatermark
+  ) throws RuntimeException {
+    State state = new State(sourceState);
+    WorkUnit workUnit = WorkUnit.createEmpty();
+    WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+    workUnitState.setId("Execute pk-chunking");
+    try {
+      SalesforceExtractor salesforceExtractor = (SalesforceExtractor) 
this.getExtractor(workUnitState);
+      Partitioner partitioner = new Partitioner(sourceState);
+      if (isEarlyStopEnabled(state) && partitioner.isFullDump()) {
+        throw new UnsupportedOperationException("Early stop mode cannot work 
with full dump mode.");
+      }
+      Partition partition = partitioner.getGlobalPartition(previousWatermark);
+      String condition = "";
+      Date startDate = Utils.toDate(partition.getLowWatermark(), 
Partitioner.WATERMARKTIMEFORMAT);
+      String field = 
sourceState.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
+      // pk-chunking only supports start-time by 
source.querybased.start.value, and does not support end-time.
+      // always ingest data later than or equal source.querybased.start.value.
+      // we should only pk chunking based work units only in case of 
snapshot/full ingestion
+      if (startDate != null && field != null) {
+        String lowWatermarkDate = Utils.dateToString(startDate, 
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+        condition = field + " >= " + lowWatermarkDate;
+      }
+      Predicate predicate = new Predicate(null, 0, condition, "", null);
+      List<Predicate> predicateList = Arrays.asList(predicate);
+      String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY);
+
+      if (state.contains(PK_CHUNKING_TEST_JOB_ID)) {
+        String jobId = state.getProp(PK_CHUNKING_TEST_JOB_ID, "");
+        log.info("---Skip query, fetching result files directly for 
[jobId={}]", jobId);
+        String batchIdListStr = state.getProp(PK_CHUNKING_TEST_BATCH_ID_LIST);
+        return salesforceExtractor.getQueryResultIdsPkChunkingFetchOnly(jobId, 
batchIdListStr);
+      } else {
+        log.info("---Pk Chunking query submit.");
+        return salesforceExtractor.getQueryResultIdsPkChunking(entity, 
predicateList);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   *  Create work units by taking a bulkJobId.
+   *  The work units won't contain a query in this case. Instead they will 
contain a BulkJobId and a list of `batchId:resultId`
+   *  So in extractor, the work to do is just to fetch the resultSet files.
+   */
+  private List<WorkUnit> createWorkUnits(
+      SourceEntity sourceEntity,
+      SourceState state,
+      JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList
+  ) {
+    String nameSpaceName = 
state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY);
+    Extract.TableType tableType = 
Extract.TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase());
+    String outputTableName = sourceEntity.getDestTableName();
+    Extract extract = createExtract(tableType, nameSpaceName, outputTableName);
+
+    List<WorkUnit> workUnits = Lists.newArrayList();
+    int partitionNumber = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS, 
1);
+    List<BatchIdAndResultId> batchResultIds = 
jobIdAndBatchIdResultIdList.getBatchIdAndResultIdList();
+    int total = batchResultIds.size();
+
+    // size of every partition should be: math.ceil(total/partitionNumber), 
use simpler way: (total+partitionNumber-1)/partitionNumber
+    int sizeOfPartition = (total + partitionNumber - 1) / partitionNumber;
+    List<List<BatchIdAndResultId>> partitionedResultIds = 
Lists.partition(batchResultIds, sizeOfPartition);
+    log.info("----partition strategy: max-parti={}, size={}, actual-parti={}, 
total={}", partitionNumber, sizeOfPartition, partitionedResultIds.size(), 
total);
+
+    for (List<BatchIdAndResultId> resultIds : partitionedResultIds) {
+      WorkUnit workunit = new WorkUnit(extract);
+      String bulkJobId = jobIdAndBatchIdResultIdList.getJobId();
+      workunit.setProp(PK_CHUNKING_JOB_ID, bulkJobId);
+      String resultIdStr = resultIds.stream().map(x -> x.getBatchId() + ":" + 
x.getResultId()).collect(Collectors.joining(","));
+      workunit.setProp(PK_CHUNKING_BATCH_RESULT_IDS, resultIdStr);
+      workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, 
sourceEntity.getSourceEntityName());
+      workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, 
sourceEntity.getDestTableName());
+      workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, 
CURRENT_WORK_UNIT_STATE_VERSION);
+      addLineageSourceInfo(state, sourceEntity, workunit);
+      workUnits.add(workunit);
+    }
+    return workUnits;
+  }
+
+  /**
+   *
+   */
+  private List<WorkUnit> generateWorkUnitsStrategy(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
     WatermarkType watermarkType = WatermarkType.valueOf(
         state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_WATERMARK_TYPE, 
ConfigurationKeys.DEFAULT_WATERMARK_TYPE)
             .toUpperCase());
     String watermarkColumn = 
state.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
 
-    int maxPartitions = 
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
+    int maxPartitions = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS,
         ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS);
     int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, 
DEFAULT_MIN_TARGET_PARTITION_SIZE);
 
@@ -191,11 +307,13 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
     if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) {
       HistogramGroup lastPlusOne = 
histogram.get(histogramAdjust.getGroups().size());
       long earlyStopHighWatermark = 
Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT, 
Partitioner.WATERMARKTIMEFORMAT));
-      log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, 
expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), 
partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark);
+      log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, 
expected HW : {}]",
+          state.getProp(ConfigurationKeys.JOB_NAME_KEY), 
partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark);
       this.isEarlyStopped = true;
       expectedHighWatermark = earlyStopHighWatermark;
     } else {
-      log.info("Job {} will be finished in a single run. [LW : {}, expected HW 
: {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), 
partition.getLowWatermark(), expectedHighWatermark);
+      log.info("Job {} will be finished in a single run. [LW : {}, expected HW 
: {}]",
+          state.getProp(ConfigurationKeys.JOB_NAME_KEY), 
partition.getLowWatermark(), expectedHighWatermark);
     }
 
     String specifiedPartitions = generateSpecifiedPartitions(histogramAdjust, 
minTargetPartitionSize, maxPartitions,
@@ -204,10 +322,13 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
     state.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, specifiedPartitions);
     state.setProp(Partitioner.IS_EARLY_STOPPED, isEarlyStopped);
 
-    return super.generateWorkUnits(sourceEntity, state, previousWatermark);
+    List<WorkUnit> workUnits = super.generateWorkUnits(sourceEntity, state, 
previousWatermark);
+    Boolean disableSoft = 
state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
 false);
+    workUnits.stream().forEach(x -> 
x.setProp(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED, 
disableSoft));
+    return workUnits;
   }
 
-  private boolean isEarlyStopEnabled (State state) {
+  private boolean isEarlyStopEnabled(State state) {
     return state.getPropAsBoolean(ConfigurationKeys.SOURCE_EARLY_STOP_ENABLED, 
ConfigurationKeys.DEFAULT_SOURCE_EARLY_STOP_ENABLED);
   }
 
@@ -285,7 +406,7 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
    * Compute the target partition size.
    */
   private int computeTargetPartitionSize(Histogram histogram, int 
minTargetPartitionSize, int maxPartitions) {
-     return Math.max(minTargetPartitionSize,
+    return Math.max(minTargetPartitionSize,
         DoubleMath.roundToInt((double) histogram.totalRecordCount / 
maxPartitions, RoundingMode.CEILING));
   }
 
@@ -387,8 +508,7 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
    */
   private Histogram getRefinedHistogram(SalesforceConnector connector, String 
entity, String watermarkColumn,
       SourceState state, Partition partition, Histogram histogram) {
-    final int maxPartitions = 
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
-        ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS);
+    final int maxPartitions = 
state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS, 
DEFAULT_MAX_NUMBER_OF_PARTITIONS);
     final int probeLimit = state.getPropAsInt(DYNAMIC_PROBING_LIMIT, 
DEFAULT_DYNAMIC_PROBING_LIMIT);
     final int minTargetPartitionSize = 
state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, 
DEFAULT_MIN_TARGET_PARTITION_SIZE);
     final Histogram outputHistogram = new Histogram();
@@ -651,3 +771,4 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
     private int probeCount = 0;
   }
 }
+

Reply via email to