This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7d94e47 [GOBBLIN-865] Add feature that enables PK-chunking in
partition
7d94e47 is described below
commit 7d94e4745c71f4b01c61828216fc90c626a9521a
Author: Alex Li <[email protected]>
AuthorDate: Wed Dec 11 22:13:51 2019 -0800
[GOBBLIN-865] Add feature that enables PK-chunking in partition
Closes #2722 from arekusuri/GOBBLIN-865-pk-
chunking-partition
---
.gitignore | 9 +
.../extract/restapi/RestApiExtractor.java | 2 +-
.../apache/gobblin/salesforce/ResultIterator.java | 166 ++++++++++
.../salesforce/SalesforceConfigurationKeys.java | 21 +-
.../gobblin/salesforce/SalesforceExtractor.java | 366 ++++++++++++++-------
.../gobblin/salesforce/SalesforceSource.java | 139 +++++++-
6 files changed, 567 insertions(+), 136 deletions(-)
diff --git a/.gitignore b/.gitignore
index 4982e54..5195ebb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,8 @@
.classpath*
.project*
.settings
+.DS_Store
+*.tar.gz
# Intellij related
**/.idea
@@ -62,5 +64,12 @@ package-lock.json
# generated config files by tests
**/generated-gobblin-cluster.conf
+ligradle/findbugs/findbugsInclude.xml
+ligradle/checkstyle/linkedin-checkstyle.xml
+ligradle/checkstyle/suppressions.xml
+gobblin-core/src/test/resources/serde/output-staging/
+gobblin-integration-test-log-dir/
+gobblin-modules/gobblin-elasticsearch/test-elasticsearch/
temp/
+ligradle/*
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
index b9eeddd..b5fe56d 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java
@@ -158,7 +158,7 @@ public abstract class RestApiExtractor extends
QueryBasedExtractor<JsonArray, Js
this.updatedQuery = buildDataQuery(inputQuery, entity);
log.info("Schema:" + columnArray);
this.setOutputSchema(columnArray);
- } catch (RuntimeException | RestApiConnectionException |
RestApiProcessingException | IOException
+ } catch (RuntimeException | RestApiProcessingException |
RestApiConnectionException | IOException
| SchemaException e) {
throw new SchemaException("Failed to get schema using rest api; error -
" + e.getMessage(), e);
}
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
new file mode 100644
index 0000000..d6b5566
--- /dev/null
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+
+/**
+ * Result Iterator.
+ * Take jobId and 'batchId:resultId,batchId2:resultId2' as input build a
result record iterator.
+ */
+@Slf4j
+public class ResultIterator implements Iterator {
+ private Iterator<ResultStruct> batchIdResultIdIterator;
+ private BulkConnection bulkConnection;
+ private InputStreamCSVReader csvReader;
+ private List<String> csvHeader;
+ private int columnSize;
+ private int urlLoadRetryLimit;
+ private ResultStruct resultStruct;
+ private List<String> currentRecord = null;
+ private Boolean isLoadedCurrentRecord = false;
+ private int currentFileRowCount = 0;
+ private int totalRowCount = 0;
+
+ /**
+ * constructor
+ * need to initiate the reader and currentRecord
+ */
+ public ResultIterator(BulkConnection bulkConnection, String jobId, String
batchIdResultIdString, int urlLoadRetryLimit) {
+ this.urlLoadRetryLimit = urlLoadRetryLimit;
+ this.bulkConnection = bulkConnection;
+ this.batchIdResultIdIterator = this.parsebatchIdResultIdString(jobId,
batchIdResultIdString);
+ if (this.batchIdResultIdIterator.hasNext()) {
+ this.resultStruct = this.batchIdResultIdIterator.next();
+ this.csvReader = this.fetchResultsetAsCsvReader(this.resultStruct); //
first file reader
+ } else {
+ throw new RuntimeException("No batch-result id found.");
+ }
+ this.fulfillCurrentRecord();
+ this.csvHeader = this.currentRecord;
+ this.columnSize = this.csvHeader.size();
+ // after fetching cvs header, clean up status
+ this.resetCurrentRecordStatus();
+ }
+
+ /**
+ * call reader.next and set up currentRecord
+ */
+ private void fulfillCurrentRecord() {
+ if (this.isLoadedCurrentRecord) {
+ return; // skip, since CurrentRecord was loaded.
+ }
+ try {
+ this.currentRecord = this.csvReader.nextRecord();
+ if (this.currentRecord == null) { // according InputStreamCSVReader, it
returns null at the end of the reader.
+ log.info("Fetched {} - rows: {}", this.resultStruct,
this.currentFileRowCount); // print out log before switch result file
+ this.currentFileRowCount = 0; // clean up
+ if (this.batchIdResultIdIterator.hasNext()) { // if there is next
file, load next file.
+ this.resultStruct = this.batchIdResultIdIterator.next();
+ this.csvReader = this.fetchResultsetAsCsvReader(resultStruct);
+ this.csvReader.nextRecord(); // read and ignore the csv header.
+ this.currentRecord = this.csvReader.nextRecord();
+ } else {
+ log.info("---- Fetched {} rows -----", this.totalRowCount); // print
out log when all records were fetched.
+ }
+ }
+ this.isLoadedCurrentRecord = true;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void resetCurrentRecordStatus() {
+ this.currentRecord = null;
+ this.isLoadedCurrentRecord = false;
+ }
+
+ @Override
+ public boolean hasNext() {
+ this.fulfillCurrentRecord();
+ return this.currentRecord != null;
+ }
+
+ @Override
+ public JsonElement next() {
+ this.fulfillCurrentRecord();
+ List<String> csvRecord = this.currentRecord;
+ this.resetCurrentRecordStatus();
+ if (csvRecord == null) {
+ throw new NoSuchElementException();
+ }
+ this.currentFileRowCount++;
+ this.totalRowCount++;
+ JsonObject jsonObject = Utils.csvToJsonObject(this.csvHeader, csvRecord,
this.columnSize);
+ return jsonObject;
+ }
+
+ /**
+ * resultStruct has all data which identify a result set file
+ * fetch it and convert to a csvReader
+ */
+ private InputStreamCSVReader fetchResultsetAsCsvReader(ResultStruct
resultStruct) {
+ String jobId = resultStruct.jobId;
+ String batchId = resultStruct.batchId;
+ String resultId = resultStruct.resultId;
+ log.info("PK-Chunking workunit: fetching [jobId={}, batchId={},
resultId={}]", jobId, batchId, resultId);
+ for (int i = 0; i < this.urlLoadRetryLimit; i++) { // retries
+ try {
+ InputStream is = this.bulkConnection.getQueryResultStream(jobId,
batchId, resultId);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is,
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+ return new InputStreamCSVReader(br);
+ } catch (Exception e) { // skip, for retry
+ }
+ }
+ // tried fetchRetryLimit times, always getting exception
+ throw new RuntimeException("Tried " + this.urlLoadRetryLimit + " times,
but couldn't fetch data.");
+ }
+
+ /**
+ * input string format is "batchId:resultId,batchId2:resultId2"
+ * parse it to iterator
+ */
+ private Iterator<ResultStruct> parsebatchIdResultIdString(String jobId,
String batchIdResultIdString) {
+ return Arrays.stream(batchIdResultIdString.split(",")).map(x ->
x.split(":")).map(x -> new ResultStruct(jobId, x[0], x[1])).iterator();
+ }
+
+ @Data
+ static class ResultStruct {
+ private final String jobId;
+ private final String batchId;
+ private final String resultId;
+ public String toString() {
+ return String.format("[jobId=%s, batchId=%s, resultId=%s]", jobId,
batchId, resultId);
+ }
+ }
+
+}
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
index e868cd6..d5fe3f7 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java
@@ -17,9 +17,24 @@
package org.apache.gobblin.salesforce;
-public class SalesforceConfigurationKeys {
+public final class SalesforceConfigurationKeys {
+ private SalesforceConfigurationKeys() {
+ }
public static final String
SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED =
"source.querybased.salesforce.is.soft.deletes.pull.disabled";
- public static final int DEFAULT_SALESFORCE_MAX_CHARS_IN_FILE = 200000000;
- public static final int DEFAULT_SALESFORCE_MAX_ROWS_IN_FILE = 1000000;
+ public static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
+ public static final String BULK_API_USE_QUERY_ALL =
"salesforce.bulkApiUseQueryAll";
+
+ // pk-chunking
+ public static final String PK_CHUNKING_TEST_BATCH_ID_LIST =
"salesforce.pkChunking.testBatchIdList";
+ public static final String PK_CHUNKING_TEST_JOB_ID =
"salesforce.pkChunking.testJobId";
+ public static final String SALESFORCE_PARTITION_TYPE =
"salesforce.partitionType";
+ public static final String PARTITION_PK_CHUNKING_SIZE =
"salesforce.partition.pkChunkingSize";
+ public static final String PK_CHUNKING_JOB_ID = "_salesforce.job.id";
+ public static final String PK_CHUNKING_BATCH_RESULT_IDS =
"_salesforce.result.ids";
+ public static final int MAX_PK_CHUNKING_SIZE = 250_000; // this number is
from SFDC's doc - https://tinyurl.com/ycjvgwv2
+ public static final int MIN_PK_CHUNKING_SIZE = 20_000;
+ public static final int DEFAULT_PK_CHUNKING_SIZE = 250_000; // default to
max for saving request quota
}
+
+
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index b0d444c..36186e5 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.salesforce;
+import com.google.common.collect.Iterators;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -33,7 +34,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
-import org.apache.gobblin.salesforce.SalesforceConfigurationKeys;
+import java.util.stream.Stream;
import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
@@ -71,7 +72,6 @@ import
org.apache.gobblin.source.extractor.exception.RestApiConnectionException;
import org.apache.gobblin.source.extractor.exception.SchemaException;
import org.apache.gobblin.source.extractor.extract.Command;
import org.apache.gobblin.source.extractor.extract.CommandOutput;
-import org.apache.gobblin.source.extractor.partition.Partitioner;
import org.apache.gobblin.source.jdbc.SqlQueryUtils;
import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand;
import
org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;
@@ -89,6 +89,8 @@ import org.apache.gobblin.source.workunit.WorkUnit;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
+
/**
* An implementation of salesforce extractor for extracting data from SFDC
@@ -102,20 +104,9 @@ public class SalesforceExtractor extends RestApiExtractor {
private static final String SALESFORCE_HOUR_FORMAT = "HH";
private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u";
private static final Gson GSON = new Gson();
- private static final int MAX_PK_CHUNKING_SIZE = 250000;
- private static final int MIN_PK_CHUNKING_SIZE = 100000;
- private static final int DEFAULT_PK_CHUNKING_SIZE = 200000;
- private static final String ENABLE_PK_CHUNKING_KEY =
"salesforce.enablePkChunking";
- private static final String PK_CHUNKING_SIZE_KEY =
"salesforce.pkChunkingSize";
private static final int MAX_RETRY_INTERVAL_SECS = 600;
- // avoid using too many bulk API calls by only allowing PK chunking only if
max partitions is configured <= this
- private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3;
private static final String FETCH_RETRY_LIMIT_KEY =
"salesforce.fetchRetryLimit";
- private static final int DEFAULT_FETCH_RETRY_LIMIT = 5;
- private static final String BULK_API_USE_QUERY_ALL =
"salesforce.bulkApiUseQueryAll";
private static final boolean DEFAULT_BULK_API_USE_QUERY_ALL = false;
- private static final String PK_CHUNKING_SKIP_COUNT_CHECK =
"salesforce.pkChunkingSkipCountCheck";
- private static final boolean DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK = false;
private boolean pullStatus = true;
@@ -135,45 +126,29 @@ public class SalesforceExtractor extends RestApiExtractor
{
private int prevBulkRecordCount = 0;
private List<String> csvRecord;
- private final boolean pkChunking;
private final int pkChunkingSize;
private final SalesforceConnector sfConnector;
private final int fetchRetryLimit;
private final int batchSize;
- private final boolean pkChunkingSkipCountCheck;
private final boolean bulkApiUseQueryAll;
public SalesforceExtractor(WorkUnitState state) {
super(state);
- this.sfConnector = (SalesforceConnector) this.connector;
-
- // don't allow pk chunking if max partitions too high or have user
specified partitions
- if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS,
false)
- ||
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
- ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) >
PK_CHUNKING_MAX_PARTITIONS_LIMIT) {
- if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) {
- log.warn("Max partitions too high, so PK chunking is not enabled");
- }
-
- this.pkChunking = false;
- } else {
- this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false);
- }
+ this.sfConnector = (SalesforceConnector) this.connector;
this.pkChunkingSize =
Math.max(MIN_PK_CHUNKING_SIZE,
- Math.min(MAX_PK_CHUNKING_SIZE,
state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE)));
+ Math.min(MAX_PK_CHUNKING_SIZE,
workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE,
DEFAULT_PK_CHUNKING_SIZE)));
- this.pkChunkingSkipCountCheck =
state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK,
DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK);
- this.bulkApiUseQueryAll = state.getPropAsBoolean(BULK_API_USE_QUERY_ALL,
DEFAULT_BULK_API_USE_QUERY_ALL);
+ this.bulkApiUseQueryAll =
workUnitState.getPropAsBoolean(BULK_API_USE_QUERY_ALL,
DEFAULT_BULK_API_USE_QUERY_ALL);
// Get batch size from .pull file
- int tmpBatchSize =
state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
+ int tmpBatchSize =
workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE,
ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE);
this.batchSize = tmpBatchSize == 0 ?
ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize;
- this.fetchRetryLimit = state.getPropAsInt(FETCH_RETRY_LIMIT_KEY,
DEFAULT_FETCH_RETRY_LIMIT);
+ this.fetchRetryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY,
DEFAULT_FETCH_RETRY_LIMIT);
}
@Override
@@ -184,14 +159,14 @@ public class SalesforceExtractor extends RestApiExtractor
{
/**
* true is further pull required else false
*/
- public void setPullStatus(boolean pullStatus) {
+ private void setPullStatus(boolean pullStatus) {
this.pullStatus = pullStatus;
}
/**
* url for the next pull from salesforce
*/
- public void setNextUrl(String nextUrl) {
+ private void setNextUrl(String nextUrl) {
this.nextUrl = nextUrl;
}
@@ -203,11 +178,11 @@ public class SalesforceExtractor extends RestApiExtractor
{
this.bulkJobFinished = bulkJobFinished;
}
- public boolean isNewBulkResultSet() {
+ private boolean isNewBulkResultSet() {
return this.newBulkResultSet;
}
- public void setNewBulkResultSet(boolean newBulkResultSet) {
+ private void setNewBulkResultSet(boolean newBulkResultSet) {
this.newBulkResultSet = newBulkResultSet;
}
@@ -303,7 +278,7 @@ public class SalesforceExtractor extends RestApiExtractor {
}
query = SqlQueryUtils.addPredicate(query, defaultPredicate);
query = query + defaultSortOrder;
- log.info("QUERY: " + query);
+ log.info("getHighWatermarkMetadata - QUERY: " + query);
try {
return
constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query)));
@@ -326,7 +301,7 @@ public class SalesforceExtractor extends RestApiExtractor {
}
JsonElement element = GSON.fromJson(output, JsonObject.class);
- long high_ts;
+ long highTs;
try {
JsonObject jsonObject = element.getAsJsonObject();
JsonArray jsonArray = jsonObject.getAsJsonArray("records");
@@ -344,15 +319,15 @@ public class SalesforceExtractor extends RestApiExtractor
{
log.error("ParseException: " + e.getMessage(), e);
}
SimpleDateFormat outFormat = new SimpleDateFormat("yyyyMMddHHmmss");
- high_ts = Long.parseLong(outFormat.format(date));
+ highTs = Long.parseLong(outFormat.format(date));
} else {
- high_ts = Long.parseLong(value);
+ highTs = Long.parseLong(value);
}
} catch (Exception e) {
throw new HighWatermarkException("Failed to get high watermark from
salesforce; error - " + e.getMessage(), e);
}
- return high_ts;
+ return highTs;
}
@Override
@@ -384,7 +359,7 @@ public class SalesforceExtractor extends RestApiExtractor {
}
query = query + getLimitFromInputQuery(this.updatedQuery);
- log.info("QUERY: " + query);
+ log.info("getCountMetadata - QUERY: " + query);
return
constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query)));
} catch (Exception e) {
throw new RecordCountException("Failed to get salesforce url for record
count; error - " + e.getMessage(), e);
@@ -421,11 +396,11 @@ public class SalesforceExtractor extends RestApiExtractor
{
String query = this.updatedQuery;
String url = null;
try {
- if (this.getNextUrl() != null && this.pullStatus == true) {
+ if (this.getNextUrl() != null && this.pullStatus) {
url = this.getNextUrl();
} else {
if (isNullPredicate(predicateList)) {
- log.info("QUERY:" + query);
+ log.info("getDataMetaData null predicate - QUERY:" + query);
return
constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query)));
}
@@ -443,7 +418,7 @@ public class SalesforceExtractor extends RestApiExtractor {
}
query = query + limitString;
- log.info("QUERY: " + query);
+ log.info("getDataMetadata - QUERY: " + query);
url = this.sfConnector.getFullUri(getSoqlUrl(query));
}
return constructGetCommand(url);
@@ -556,27 +531,27 @@ public class SalesforceExtractor extends RestApiExtractor
{
@Override
public String getHourPredicateCondition(String column, long value, String
valueFormat, String operator) {
log.info("Getting hour predicate from salesforce");
- String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_HOUR_FORMAT);
- return column + " " + operator + " " + Formattedvalue;
+ String formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_HOUR_FORMAT);
+ return column + " " + operator + " " + formattedvalue;
}
@Override
public String getDatePredicateCondition(String column, long value, String
valueFormat, String operator) {
log.info("Getting date predicate from salesforce");
- String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_DATE_FORMAT);
- return column + " " + operator + " " + Formattedvalue;
+ String formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_DATE_FORMAT);
+ return column + " " + operator + " " + formattedvalue;
}
@Override
public String getTimestampPredicateCondition(String column, long value,
String valueFormat, String operator) {
log.info("Getting timestamp predicate from salesforce");
- String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_TIMESTAMP_FORMAT);
- return column + " " + operator + " " + Formattedvalue;
+ String formattedvalue = Utils.toDateTimeFormat(Long.toString(value),
valueFormat, SALESFORCE_TIMESTAMP_FORMAT);
+ return column + " " + operator + " " + formattedvalue;
}
@Override
public Map<String, String> getDataTypeMap() {
- Map<String, String> dataTypeMap = ImmutableMap.<String, String>
builder().put("url", "string")
+ Map<String, String> dataTypeMap = ImmutableMap.<String,
String>builder().put("url", "string")
.put("textarea", "string").put("reference", "string").put("phone",
"string").put("masterrecord", "string")
.put("location", "string").put("id", "string").put("encryptedstring",
"string").put("email", "string")
.put("DataCategoryGroupReference", "string").put("calculated",
"string").put("anyType", "string")
@@ -588,16 +563,43 @@ public class SalesforceExtractor extends RestApiExtractor
{
return dataTypeMap;
}
+
+ private Boolean isPkChunkingFetchDone = false;
+
+ private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit)
throws RuntimeException {
+ if (isPkChunkingFetchDone) {
+ return null; // must return null to represent no more data.
+ }
+ isPkChunkingFetchDone = true; // set to true, never come here twice.
+
+ try {
+ if (!bulkApiLogin()) {
+ throw new IllegalArgumentException("Invalid Login");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+ String batchIdResultIdString =
workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS);
+ return new ResultIterator(bulkConnection, jobId, batchIdResultIdString,
fetchRetryLimit);
+ }
+
@Override
public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String
entity, WorkUnit workUnit,
List<Predicate> predicateList) throws IOException {
log.debug("Getting salesforce data using bulk api");
- RecordSet<JsonElement> rs = null;
+ // new version of extractor: bulk api with pk-chunking in pre-partitioning
of SalesforceSource
+ if (workUnit.contains(PK_CHUNKING_JOB_ID)) {
+ log.info("----pk-chunking get record set----" +
workUnit.getProp(PK_CHUNKING_JOB_ID));
+ return getRecordSetPkChunking(workUnit);
+ }
+ log.info("----bulk get record set----");
try {
//Get query result ids in the first run
//result id is used to construct url while fetching data
- if (this.bulkApiInitialRun == true) {
+ if (this.bulkApiInitialRun) {
// set finish status to false before starting the bulk job
this.setBulkJobFinished(false);
this.bulkResultIdList = getQueryResultIds(entity, predicateList);
@@ -607,6 +609,7 @@ public class SalesforceExtractor extends RestApiExtractor {
// Get data from input stream
// If bulk load is not finished, get data from the stream
// Skip empty result sets since they will cause the extractor to
terminate early
+ RecordSet<JsonElement> rs = null;
while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) {
rs = getBulkData();
}
@@ -615,11 +618,11 @@ public class SalesforceExtractor extends RestApiExtractor
{
this.bulkApiInitialRun = false;
// If bulk job is finished, get soft deleted records using Rest API
- boolean isSoftDeletesPullDisabled = Boolean.valueOf(this.workUnit
-
.getProp(SalesforceConfigurationKeys.SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED));
- if (rs == null || rs.isEmpty()) {
+ boolean disableSoftDeletePull =
this.workUnit.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED);
+ if (rs == null || rs.isEmpty()) { // TODO: Only when the rs is empty, we
need to fetch soft deleted records. WHY?
// Get soft delete records only if IsDeleted column exists and soft
deletes pull is not disabled
- if (this.columnList.contains("IsDeleted") &&
!isSoftDeletesPullDisabled) {
+ if (this.columnList.contains("IsDeleted") && !disableSoftDeletePull) {
+ log.info("Pull soft delete records");
return this.getSoftDeletedRecords(schema, entity, workUnit,
predicateList);
}
log.info("Ignoring soft delete records");
@@ -634,7 +637,7 @@ public class SalesforceExtractor extends RestApiExtractor {
/**
* Get soft deleted records using Rest Api
- * @return iterator with deleted records
+ * @return iterator with deleted records
*/
private Iterator<JsonElement> getSoftDeletedRecords(String schema, String
entity, WorkUnit workUnit,
List<Predicate> predicateList) throws DataRecordException {
@@ -645,7 +648,7 @@ public class SalesforceExtractor extends RestApiExtractor {
* Login to salesforce
* @return login status
*/
- public boolean bulkApiLogin() throws Exception {
+ private boolean bulkApiLogin() throws Exception {
log.info("Authenticating salesforce bulk api");
boolean success = false;
String hostName =
this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME);
@@ -705,6 +708,27 @@ public class SalesforceExtractor extends RestApiExtractor {
}
/**
+ * same as getQueryResultIdsPkChunking but the arguments are different.
+ * this function can take existing batch ids to return
JobIdAndBatchIdResultIdList
+ * It is for test/debug. developers may want to skip execute query on SFDC,
use a list of existing batch ids
+ */
+ public JobIdAndBatchIdResultIdList
getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) {
+ try {
+ if (!bulkApiLogin()) {
+ throw new IllegalArgumentException("Invalid Login");
+ }
+ int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS * 1000, 30 +
this.pkChunkingSize * 2);
+ if (batchIdListStr != null) {
+ log.info("The batchId was specified.");
+ return retrievePkChunkingResultIdsByBatchId(this.bulkConnection,
jobId, batchIdListStr);
+ } else {
+ return retrievePkChunkingResultIds(this.bulkConnection, jobId,
retryInterval);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ /**
* get BulkConnection instance
* @return
*/
@@ -713,10 +737,79 @@ public class SalesforceExtractor extends RestApiExtractor
{
}
/**
+ * this function currently only for pk-chunking. it is from
getQueryResultIds
+ * TODO: abstract this function to a common function: arguments need to add
connetion, header, output-format
+ * TODO: make it and its related functions pure function (no side effect).
Currently still unnecesarily changing this.bulkJob)
+ */
+ public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunking(String
entity, List<Predicate> predicateList) {
+ try {
+ if (!bulkApiLogin()) {
+ throw new IllegalArgumentException("Invalid Login");
+ }
+ BulkConnection connection = this.bulkConnection;
+ JobInfo jobRequest = new JobInfo();
+ jobRequest.setObject(entity);
+ jobRequest.setOperation(OperationEnum.queryAll);
+ jobRequest.setConcurrencyMode(ConcurrencyMode.Parallel);
+ log.info("Enabling pk chunking with size {}", this.pkChunkingSize);
+ connection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" +
this.pkChunkingSize);
+ // Result type as CSV
+ jobRequest.setContentType(ContentType.CSV);
+ JobInfo createdJob = connection.createJob(jobRequest);
+ log.info("Created bulk job: {}", createdJob.getId());
+ this.bulkJob = createdJob; // other functions need to use it TODO:
remove bulkJob from this class
+ String jobId = createdJob.getId();
+ JobInfo jobResponse = connection.getJobStatus(jobId);
+ // Construct query with the predicates
+ String query = this.updatedQuery;
+ if (!isNullPredicate(predicateList)) {
+ String limitString = getLimitFromInputQuery(query);
+ query = query.replace(limitString, "");
+ Iterator<Predicate> i = predicateList.listIterator();
+ while (i.hasNext()) {
+ Predicate predicate = i.next();
+ query = SqlQueryUtils.addPredicate(query, predicate.getCondition());
+ }
+ query = query + limitString;
+ }
+ log.info("Submitting PK Chunking query:" + query);
+ ByteArrayInputStream bout = new
ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+
+ BatchInfo executeQueryBatch =
connection.createBatchFromStream(jobResponse, bout);
+ String pkChunkingBatchId = executeQueryBatch.getId();
+
+ int waitMilliSecond = 60 * 1000;
+
+ // Get batch info with complete resultset (info id - refers to the
resultset id corresponding to entire resultset)
+ BatchInfo batchResponse = connection.getBatchInfo(jobId,
pkChunkingBatchId);
+
+ // wait for completion, failure, or formation of PK chunking batches
+ // user query will be submitted to sfdc and create the first batch,
+ // It is supposed to create more batch after the initial batch
+ BatchStateEnum batchState = batchResponse.getState();
+ while (batchState == BatchStateEnum.InProgress || batchState ==
BatchStateEnum.Queued) {
+ Thread.sleep(waitMilliSecond);
+ batchResponse = connection.getBatchInfo(createdJob.getId(),
executeQueryBatch.getId());
+ log.info("Waiting for first batch (jobId={}, pkChunkingBatchId={})",
jobId, pkChunkingBatchId);
+ batchState = batchResponse.getState();
+ }
+
+ if (batchResponse.getState() == BatchStateEnum.Failed) {
+ log.error("Bulk batch failed: " + batchResponse.toString());
+ throw new Exception("Failed to get bulk batch info for jobId " + jobId
+ " error - " + batchResponse.getStateMessage());
+ }
+ JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList =
retrievePkChunkingResultIds(connection, jobId, waitMilliSecond);
+ return jobIdAndBatchIdResultIdList;
+ } catch (Exception e) {
+ throw new RuntimeException("getQueryResultIdsPkChunking: error - " +
e.getMessage(), e);
+ }
+ }
+
+ /**
* Get Record set using salesforce specific API(Bulk API)
* @param entity/tablename
* @param predicateList of all predicate conditions
- * @return iterator with batch of records
+ * @return iterator with batch of records
*/
private List<BatchIdAndResultId> getQueryResultIds(String entity,
List<Predicate> predicateList) throws Exception {
if (!bulkApiLogin()) {
@@ -724,24 +817,17 @@ public class SalesforceExtractor extends RestApiExtractor
{
}
try {
- boolean usingPkChunking = false;
-
// Set bulk job attributes
this.bulkJob.setObject(entity);
this.bulkJob.setOperation(this.bulkApiUseQueryAll ?
OperationEnum.queryAll : OperationEnum.query);
this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel);
- // use pk chunking if pk chunking is configured and the expected record
count is larger than the pk chunking size
- if (this.pkChunking && (this.pkChunkingSkipCountCheck ||
getExpectedRecordCount() > this.pkChunkingSize)) {
- log.info("Enabling pk chunking with size {}", this.pkChunkingSize);
- this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize="
+ this.pkChunkingSize);
- usingPkChunking = true;
- }
-
// Result type as CSV
this.bulkJob.setContentType(ContentType.CSV);
this.bulkJob = this.bulkConnection.createJob(this.bulkJob);
+ log.info("Created bulk job [jobId={}]", this.bulkJob.getId());
+
this.bulkJob = this.bulkConnection.getJobStatus(this.bulkJob.getId());
// Construct query with the predicates
@@ -759,38 +845,30 @@ public class SalesforceExtractor extends RestApiExtractor
{
query = query + limitString;
}
- log.info("QUERY:" + query);
+ log.info("getQueryResultIds - QUERY:" + query);
ByteArrayInputStream bout = new
ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
BatchInfo bulkBatchInfo =
this.bulkConnection.createBatchFromStream(this.bulkJob, bout);
- long expectedSizePerBatch = usingPkChunking ? this.pkChunkingSize :
this.getExpectedRecordCount();
-
- int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS,
- 30 + (int) Math.ceil((float) expectedSizePerBatch / 10000) * 2);
- log.info("Salesforce bulk api retry interval in seconds:" +
retryInterval);
+ int waitMilliSeconds = 60 * 1000; // wait 1 minute
// Get batch info with complete resultset (info id - refers to the
resultset id corresponding to entire resultset)
bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(),
bulkBatchInfo.getId());
// wait for completion, failure, or formation of PK chunking batches
- while ((bulkBatchInfo.getState() != BatchStateEnum.Completed)
- && (bulkBatchInfo.getState() != BatchStateEnum.Failed)
- && (!usingPkChunking || bulkBatchInfo.getState() !=
BatchStateEnum.NotProcessed)) {
- Thread.sleep(retryInterval * 1000);
+ // if it is InProgress or Queued, continue to wait.
+ while (bulkBatchInfo.getState() == BatchStateEnum.InProgress ||
bulkBatchInfo.getState() == BatchStateEnum.Queued) {
+ Thread.sleep(waitMilliSeconds);
bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(),
bulkBatchInfo.getId());
- log.debug("Bulk Api Batch Info:" + bulkBatchInfo);
log.info("Waiting for bulk resultSetIds");
}
// Wait for pk chunking batches
BatchInfoList batchInfoList =
this.bulkConnection.getBatchInfoList(this.bulkJob.getId());
-
- if (usingPkChunking && bulkBatchInfo.getState() ==
BatchStateEnum.NotProcessed) {
- bulkBatchInfo = waitForPkBatches(batchInfoList, retryInterval);
- }
-
- if (bulkBatchInfo.getState() == BatchStateEnum.Failed) {
+ BatchStateEnum state = bulkBatchInfo.getState();
+ // not sure if the state can be "NotProcessed" in non-pk-chunking bulk
request
+ // SFDC doc says - This state is assigned when a job is aborted while
the batch is queued
+ if (state == BatchStateEnum.Failed || state ==
BatchStateEnum.InProgress) {
log.error("Bulk batch failed: " + bulkBatchInfo.toString());
throw new RuntimeException("Failed to get bulk batch info for jobId "
+ bulkBatchInfo.getJobId()
+ " error - " + bulkBatchInfo.getStateMessage());
@@ -817,6 +895,7 @@ public class SalesforceExtractor extends RestApiExtractor {
}
}
+
/**
* Get a buffered reader wrapping the query result stream for the result
with the specified index
* @param index index the {@link #bulkResultIdList}
@@ -904,6 +983,8 @@ public class SalesforceExtractor extends RestApiExtractor {
}
}
+
+
/**
* Fetch a result batch with retry for network errors
* @param rs the {@link RecordSetList} to fetch into
@@ -938,7 +1019,7 @@ public class SalesforceExtractor extends RestApiExtractor {
/**
* Get data from the bulk api input stream
- * @return record set with each record as a JsonObject
+ * @return record set with each record as a JsonObject
*/
private RecordSet<JsonElement> getBulkData() throws DataRecordException {
log.debug("Processing bulk api batch...");
@@ -987,53 +1068,82 @@ public class SalesforceExtractor extends
RestApiExtractor {
@Override
public void closeConnection() throws Exception {
if (this.bulkConnection != null
- &&
!this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed"))
{
+ &&
!this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed"))
{
log.info("Closing salesforce bulk job connection");
- this.bulkConnection.closeJob(this.bulkJob.getId());
+ this.bulkConnection.closeJob(this.getBulkJobId());
}
}
- public static List<Command> constructGetCommand(String restQuery) {
+ private static List<Command> constructGetCommand(String restQuery) {
return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery),
RestApiCommandType.GET));
}
+
+ private JobIdAndBatchIdResultIdList
retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId,
String batchIdListStr) {
+ Iterator<String> batchIds = Arrays.stream(batchIdListStr.split(",")).map(x
-> x.trim()).filter(x -> !x.equals("")).iterator();
+ try {
+ List<BatchIdAndResultId> batchIdAndResultIdList =
fetchBatchResultIds(connection, jobId, batchIds);
+ return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Waits for the PK batches to complete. The wait will stop after all
batches are complete or on the first failed batch
- * @param batchInfoList list of batch info
- * @param retryInterval the polling interval
- * @return the last {@link BatchInfo} processed
- * @throws InterruptedException
- * @throws AsyncApiException
*/
- private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int
retryInterval)
- throws InterruptedException, AsyncApiException {
- BatchInfo batchInfo = null;
- BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
-
- // Wait for all batches other than the first one. The first one is not
processed in PK chunking mode
- for (int i = 1; i < batchInfos.length; i++) {
- BatchInfo bi = batchInfos[i];
-
- // get refreshed job status
- bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId());
-
- while ((bi.getState() != BatchStateEnum.Completed)
- && (bi.getState() != BatchStateEnum.Failed)) {
- Thread.sleep(retryInterval * 1000);
- bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(),
bi.getId());
- log.debug("Bulk Api Batch Info:" + bi);
- log.info("Waiting for bulk resultSetIds");
+ private JobIdAndBatchIdResultIdList
retrievePkChunkingResultIds(BulkConnection connection, String jobId, int
waitMilliSecond) {
+ log.info("Waiting for completion of the the bulk job [jobId={}])'s sub
queries.", jobId);
+ try {
+ while (true) {
+ BatchInfoList batchInfoList = connection.getBatchInfoList(jobId);
+ BatchInfo[] batchInfos = batchInfoList.getBatchInfo();
+ if (needContinueToPoll(batchInfos, waitMilliSecond)) {
+ continue; // continue to wait
+ }
+ if (Arrays.stream(batchInfos).filter(x -> x.getState() ==
BatchStateEnum.NotProcessed).count() != 1) {
+ throw new Exception("PK-Chunking query should have 1 and only 1
batch with state=NotProcessed.");
+ }
+ Stream<BatchInfo> stream = Arrays.stream(batchInfos);
+ Iterator<String> batchIds = stream.filter(x ->
x.getNumberRecordsProcessed() != 0).map(x -> x.getId()).iterator();
+ List<BatchIdAndResultId> batchIdAndResultIdList =
fetchBatchResultIds(connection, jobId, batchIds);
+ return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList);
}
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
- batchInfo = bi;
+ private List<BatchIdAndResultId> fetchBatchResultIds(BulkConnection
connection, String jobId, Iterator<String> batchIds) throws Exception {
+ List<BatchIdAndResultId> batchIdAndResultIdList = Lists.newArrayList();
+ while (batchIds.hasNext()) {
+ String batchId = batchIds.next();
+ QueryResultList result = connection.getQueryResultList(jobId, batchId);
+ Stream<String> stream = Arrays.stream(result.getResult());
+ Iterator<BatchIdAndResultId> it = stream.map(rId -> new
BatchIdAndResultId(batchId, rId)).iterator();
+ Iterators.addAll(batchIdAndResultIdList, it);
+ }
+ return batchIdAndResultIdList;
+ }
- // exit if there was a failure
- if (batchInfo.getState() == BatchStateEnum.Failed) {
- break;
+ private Boolean needContinueToPoll(BatchInfo[] batchInfos, long toWait) {
+ long queued = Arrays.stream(batchInfos).filter(x -> x.getState() ==
BatchStateEnum.Queued).count();
+ long inProgress = Arrays.stream(batchInfos).filter(x -> x.getState() ==
BatchStateEnum.InProgress).count();
+ for (BatchInfo bi : batchInfos) {
+ BatchStateEnum state = bi.getState();
+ if (state == BatchStateEnum.InProgress || state ==
BatchStateEnum.Queued) {
+ try {
+ log.info("Total: {}, queued: {}, InProgress: {}, waiting for
[batchId: {}, state: {}]", batchInfos.length, queued, inProgress, bi.getId(),
state);
+ Thread.sleep(toWait);
+ } catch (InterruptedException e) { // skip
+ }
+ return true; // need to continue to wait
+ }
+ if (state == BatchStateEnum.Failed) {
+ throw new RuntimeException(String.format("[batchId=%s] failed",
bi.getId()));
}
}
-
- return batchInfo;
+ return false; // no need to wait
}
//Moving config creation in a separate method for custom config parameters
like setting up transport factory.
@@ -1056,9 +1166,19 @@ public class SalesforceExtractor extends
RestApiExtractor {
return config;
}
+ public String getBulkJobId() {
+ return workUnit.getProp(PK_CHUNKING_JOB_ID, this.bulkJob.getId());
+ }
+
@Data
- private static class BatchIdAndResultId {
+ public static class BatchIdAndResultId {
private final String batchId;
private final String resultId;
}
+
+ @Data
+ public static class JobIdAndBatchIdResultIdList {
+ private final String jobId;
+ private final List<BatchIdAndResultId> batchIdAndResultIdList;
+ }
}
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index aedd97b..96ad4b7 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.salesforce;
+import com.google.common.collect.Lists;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
@@ -30,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
@@ -65,7 +67,9 @@ import
org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector;
import org.apache.gobblin.source.extractor.partition.Partition;
import org.apache.gobblin.source.extractor.partition.Partitioner;
import org.apache.gobblin.source.extractor.utils.Utils;
+import org.apache.gobblin.source.extractor.watermark.Predicate;
import org.apache.gobblin.source.extractor.watermark.WatermarkType;
+import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
import lombok.AllArgsConstructor;
@@ -73,13 +77,16 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import static org.apache.gobblin.configuration.ConfigurationKeys.*;
+import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*;
+import
org.apache.gobblin.salesforce.SalesforceExtractor.JobIdAndBatchIdResultIdList;
+import org.apache.gobblin.salesforce.SalesforceExtractor.BatchIdAndResultId;
/**
* An implementation of {@link QueryBasedSource} for salesforce data sources.
*/
@Slf4j
public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement>
{
-
public static final String USE_ALL_OBJECTS = "use.all.objects";
public static final boolean DEFAULT_USE_ALL_OBJECTS = false;
@@ -146,12 +153,121 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
@Override
protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity,
SourceState state, long previousWatermark) {
+ List<WorkUnit> workUnits = null;
+ String partitionType = state.getProp(SALESFORCE_PARTITION_TYPE, "");
+ if (partitionType.equals("PK_CHUNKING")) {
+ // pk-chunking only supports start-time by
source.querybased.start.value, and does not support end-time.
+ // always ingest data later than or equal source.querybased.start.value.
+ // we should only pk chunking based work units only in case of
snapshot/full ingestion
+ workUnits = generateWorkUnitsPkChunking(sourceEntity, state,
previousWatermark);
+ } else {
+ workUnits = generateWorkUnitsStrategy(sourceEntity, state,
previousWatermark);
+ }
+ log.info("====Generated {} workUnit(s)====", workUnits.size());
+ return workUnits;
+ }
+
+ /**
+ * generate workUnit for pk chunking
+ */
+ private List<WorkUnit> generateWorkUnitsPkChunking(SourceEntity
sourceEntity, SourceState state, long previousWatermark) {
+ JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList =
executeQueryWithPkChunking(state, previousWatermark);
+ return createWorkUnits(sourceEntity, state, jobIdAndBatchIdResultIdList);
+ }
+
+ private JobIdAndBatchIdResultIdList executeQueryWithPkChunking(
+ SourceState sourceState,
+ long previousWatermark
+ ) throws RuntimeException {
+ State state = new State(sourceState);
+ WorkUnit workUnit = WorkUnit.createEmpty();
+ WorkUnitState workUnitState = new WorkUnitState(workUnit, state);
+ workUnitState.setId("Execute pk-chunking");
+ try {
+ SalesforceExtractor salesforceExtractor = (SalesforceExtractor)
this.getExtractor(workUnitState);
+ Partitioner partitioner = new Partitioner(sourceState);
+ if (isEarlyStopEnabled(state) && partitioner.isFullDump()) {
+ throw new UnsupportedOperationException("Early stop mode cannot work
with full dump mode.");
+ }
+ Partition partition = partitioner.getGlobalPartition(previousWatermark);
+ String condition = "";
+ Date startDate = Utils.toDate(partition.getLowWatermark(),
Partitioner.WATERMARKTIMEFORMAT);
+ String field =
sourceState.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
+ // pk-chunking only supports start-time by
source.querybased.start.value, and does not support end-time.
+ // always ingest data later than or equal source.querybased.start.value.
+ // we should only pk chunking based work units only in case of
snapshot/full ingestion
+ if (startDate != null && field != null) {
+ String lowWatermarkDate = Utils.dateToString(startDate,
SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT);
+ condition = field + " >= " + lowWatermarkDate;
+ }
+ Predicate predicate = new Predicate(null, 0, condition, "", null);
+ List<Predicate> predicateList = Arrays.asList(predicate);
+ String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY);
+
+ if (state.contains(PK_CHUNKING_TEST_JOB_ID)) {
+ String jobId = state.getProp(PK_CHUNKING_TEST_JOB_ID, "");
+ log.info("---Skip query, fetching result files directly for
[jobId={}]", jobId);
+ String batchIdListStr = state.getProp(PK_CHUNKING_TEST_BATCH_ID_LIST);
+ return salesforceExtractor.getQueryResultIdsPkChunkingFetchOnly(jobId,
batchIdListStr);
+ } else {
+ log.info("---Pk Chunking query submit.");
+ return salesforceExtractor.getQueryResultIdsPkChunking(entity,
predicateList);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Create work units by taking a bulkJobId.
+ * The work units won't contain a query in this case. Instead they will
contain a BulkJobId and a list of `batchId:resultId`
+ * So in extractor, the work to do is just to fetch the resultSet files.
+ */
+ private List<WorkUnit> createWorkUnits(
+ SourceEntity sourceEntity,
+ SourceState state,
+ JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList
+ ) {
+ String nameSpaceName =
state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY);
+ Extract.TableType tableType =
Extract.TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase());
+ String outputTableName = sourceEntity.getDestTableName();
+ Extract extract = createExtract(tableType, nameSpaceName, outputTableName);
+
+ List<WorkUnit> workUnits = Lists.newArrayList();
+ int partitionNumber = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS,
1);
+ List<BatchIdAndResultId> batchResultIds =
jobIdAndBatchIdResultIdList.getBatchIdAndResultIdList();
+ int total = batchResultIds.size();
+
+ // size of every partition should be: math.ceil(total/partitionNumber),
use simpler way: (total+partitionNumber-1)/partitionNumber
+ int sizeOfPartition = (total + partitionNumber - 1) / partitionNumber;
+ List<List<BatchIdAndResultId>> partitionedResultIds =
Lists.partition(batchResultIds, sizeOfPartition);
+ log.info("----partition strategy: max-parti={}, size={}, actual-parti={},
total={}", partitionNumber, sizeOfPartition, partitionedResultIds.size(),
total);
+
+ for (List<BatchIdAndResultId> resultIds : partitionedResultIds) {
+ WorkUnit workunit = new WorkUnit(extract);
+ String bulkJobId = jobIdAndBatchIdResultIdList.getJobId();
+ workunit.setProp(PK_CHUNKING_JOB_ID, bulkJobId);
+ String resultIdStr = resultIds.stream().map(x -> x.getBatchId() + ":" +
x.getResultId()).collect(Collectors.joining(","));
+ workunit.setProp(PK_CHUNKING_BATCH_RESULT_IDS, resultIdStr);
+ workunit.setProp(ConfigurationKeys.SOURCE_ENTITY,
sourceEntity.getSourceEntityName());
+ workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY,
sourceEntity.getDestTableName());
+ workunit.setProp(WORK_UNIT_STATE_VERSION_KEY,
CURRENT_WORK_UNIT_STATE_VERSION);
+ addLineageSourceInfo(state, sourceEntity, workunit);
+ workUnits.add(workunit);
+ }
+ return workUnits;
+ }
+
+ /**
+ *
+ */
+ private List<WorkUnit> generateWorkUnitsStrategy(SourceEntity sourceEntity,
SourceState state, long previousWatermark) {
WatermarkType watermarkType = WatermarkType.valueOf(
state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_WATERMARK_TYPE,
ConfigurationKeys.DEFAULT_WATERMARK_TYPE)
.toUpperCase());
String watermarkColumn =
state.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY);
- int maxPartitions =
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
+ int maxPartitions = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS,
ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS);
int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE,
DEFAULT_MIN_TARGET_PARTITION_SIZE);
@@ -191,11 +307,13 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) {
HistogramGroup lastPlusOne =
histogram.get(histogramAdjust.getGroups().size());
long earlyStopHighWatermark =
Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT,
Partitioner.WATERMARKTIMEFORMAT));
- log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {},
expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY),
partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark);
+ log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {},
expected HW : {}]",
+ state.getProp(ConfigurationKeys.JOB_NAME_KEY),
partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark);
this.isEarlyStopped = true;
expectedHighWatermark = earlyStopHighWatermark;
} else {
- log.info("Job {} will be finished in a single run. [LW : {}, expected HW
: {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY),
partition.getLowWatermark(), expectedHighWatermark);
+ log.info("Job {} will be finished in a single run. [LW : {}, expected HW
: {}]",
+ state.getProp(ConfigurationKeys.JOB_NAME_KEY),
partition.getLowWatermark(), expectedHighWatermark);
}
String specifiedPartitions = generateSpecifiedPartitions(histogramAdjust,
minTargetPartitionSize, maxPartitions,
@@ -204,10 +322,13 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
state.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, specifiedPartitions);
state.setProp(Partitioner.IS_EARLY_STOPPED, isEarlyStopped);
- return super.generateWorkUnits(sourceEntity, state, previousWatermark);
+ List<WorkUnit> workUnits = super.generateWorkUnits(sourceEntity, state,
previousWatermark);
+ Boolean disableSoft =
state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
false);
+ workUnits.stream().forEach(x ->
x.setProp(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED,
disableSoft));
+ return workUnits;
}
- private boolean isEarlyStopEnabled (State state) {
+ private boolean isEarlyStopEnabled(State state) {
return state.getPropAsBoolean(ConfigurationKeys.SOURCE_EARLY_STOP_ENABLED,
ConfigurationKeys.DEFAULT_SOURCE_EARLY_STOP_ENABLED);
}
@@ -285,7 +406,7 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
* Compute the target partition size.
*/
private int computeTargetPartitionSize(Histogram histogram, int
minTargetPartitionSize, int maxPartitions) {
- return Math.max(minTargetPartitionSize,
+ return Math.max(minTargetPartitionSize,
DoubleMath.roundToInt((double) histogram.totalRecordCount /
maxPartitions, RoundingMode.CEILING));
}
@@ -387,8 +508,7 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
*/
private Histogram getRefinedHistogram(SalesforceConnector connector, String
entity, String watermarkColumn,
SourceState state, Partition partition, Histogram histogram) {
- final int maxPartitions =
state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS,
- ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS);
+ final int maxPartitions =
state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS,
DEFAULT_MAX_NUMBER_OF_PARTITIONS);
final int probeLimit = state.getPropAsInt(DYNAMIC_PROBING_LIMIT,
DEFAULT_DYNAMIC_PROBING_LIMIT);
final int minTargetPartitionSize =
state.getPropAsInt(MIN_TARGET_PARTITION_SIZE,
DEFAULT_MIN_TARGET_PARTITION_SIZE);
final Histogram outputHistogram = new Histogram();
@@ -651,3 +771,4 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
private int probeCount = 0;
}
}
+