This is an automated email from the ASF dual-hosted git repository. hutran pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 7d94e47 [GOBBLIN-865] Add feature that enables PK-chunking in partition 7d94e47 is described below commit 7d94e4745c71f4b01c61828216fc90c626a9521a Author: Alex Li <al...@linkedin.com> AuthorDate: Wed Dec 11 22:13:51 2019 -0800 [GOBBLIN-865] Add feature that enables PK-chunking in partition Closes #2722 from arekusuri/GOBBLIN-865-pk- chunking-partition --- .gitignore | 9 + .../extract/restapi/RestApiExtractor.java | 2 +- .../apache/gobblin/salesforce/ResultIterator.java | 166 ++++++++++ .../salesforce/SalesforceConfigurationKeys.java | 21 +- .../gobblin/salesforce/SalesforceExtractor.java | 366 ++++++++++++++------- .../gobblin/salesforce/SalesforceSource.java | 139 +++++++- 6 files changed, 567 insertions(+), 136 deletions(-) diff --git a/.gitignore b/.gitignore index 4982e54..5195ebb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ .classpath* .project* .settings +.DS_Store +*.tar.gz # Intellij related **/.idea @@ -62,5 +64,12 @@ package-lock.json # generated config files by tests **/generated-gobblin-cluster.conf +ligradle/findbugs/findbugsInclude.xml +ligradle/checkstyle/linkedin-checkstyle.xml +ligradle/checkstyle/suppressions.xml +gobblin-core/src/test/resources/serde/output-staging/ +gobblin-integration-test-log-dir/ +gobblin-modules/gobblin-elasticsearch/test-elasticsearch/ temp/ +ligradle/* diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java index b9eeddd..b5fe56d 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/restapi/RestApiExtractor.java @@ -158,7 +158,7 @@ public abstract class RestApiExtractor extends QueryBasedExtractor<JsonArray, Js this.updatedQuery = buildDataQuery(inputQuery, entity); log.info("Schema:" + columnArray); this.setOutputSchema(columnArray); - } catch (RuntimeException | RestApiConnectionException | RestApiProcessingException | IOException + } catch (RuntimeException | RestApiProcessingException | RestApiConnectionException | IOException | SchemaException e) { throw new SchemaException("Failed to get schema using rest api; error - " + e.getMessage(), e); } diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java new file mode 100644 index 0000000..d6b5566 --- /dev/null +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/ResultIterator.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.salesforce; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.sforce.async.BulkConnection; +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader; +import org.apache.gobblin.source.extractor.utils.Utils; + + +/** + * Result Iterator. + * Take jobId and 'batchId:resultId,batchId2:resultId2' as input build a result record iterator. + */ +@Slf4j +public class ResultIterator implements Iterator { + private Iterator<ResultStruct> batchIdResultIdIterator; + private BulkConnection bulkConnection; + private InputStreamCSVReader csvReader; + private List<String> csvHeader; + private int columnSize; + private int urlLoadRetryLimit; + private ResultStruct resultStruct; + private List<String> currentRecord = null; + private Boolean isLoadedCurrentRecord = false; + private int currentFileRowCount = 0; + private int totalRowCount = 0; + + /** + * constructor + * need to initiate the reader and currentRecord + */ + public ResultIterator(BulkConnection bulkConnection, String jobId, String batchIdResultIdString, int urlLoadRetryLimit) { + this.urlLoadRetryLimit = urlLoadRetryLimit; + this.bulkConnection = bulkConnection; + this.batchIdResultIdIterator = this.parsebatchIdResultIdString(jobId, batchIdResultIdString); + if (this.batchIdResultIdIterator.hasNext()) { + this.resultStruct = this.batchIdResultIdIterator.next(); + this.csvReader = this.fetchResultsetAsCsvReader(this.resultStruct); // first file reader + } else { + throw new RuntimeException("No batch-result id found."); + } + this.fulfillCurrentRecord(); + this.csvHeader = this.currentRecord; + this.columnSize = this.csvHeader.size(); + // after fetching cvs header, clean up status + this.resetCurrentRecordStatus(); + } + + /** + * call reader.next and set up currentRecord + */ + private void fulfillCurrentRecord() { + if (this.isLoadedCurrentRecord) { + return; // skip, since CurrentRecord was loaded. + } + try { + this.currentRecord = this.csvReader.nextRecord(); + if (this.currentRecord == null) { // according InputStreamCSVReader, it returns null at the end of the reader. + log.info("Fetched {} - rows: {}", this.resultStruct, this.currentFileRowCount); // print out log before switch result file + this.currentFileRowCount = 0; // clean up + if (this.batchIdResultIdIterator.hasNext()) { // if there is next file, load next file. + this.resultStruct = this.batchIdResultIdIterator.next(); + this.csvReader = this.fetchResultsetAsCsvReader(resultStruct); + this.csvReader.nextRecord(); // read and ignore the csv header. + this.currentRecord = this.csvReader.nextRecord(); + } else { + log.info("---- Fetched {} rows -----", this.totalRowCount); // print out log when all records were fetched. + } + } + this.isLoadedCurrentRecord = true; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void resetCurrentRecordStatus() { + this.currentRecord = null; + this.isLoadedCurrentRecord = false; + } + + @Override + public boolean hasNext() { + this.fulfillCurrentRecord(); + return this.currentRecord != null; + } + + @Override + public JsonElement next() { + this.fulfillCurrentRecord(); + List<String> csvRecord = this.currentRecord; + this.resetCurrentRecordStatus(); + if (csvRecord == null) { + throw new NoSuchElementException(); + } + this.currentFileRowCount++; + this.totalRowCount++; + JsonObject jsonObject = Utils.csvToJsonObject(this.csvHeader, csvRecord, this.columnSize); + return jsonObject; + } + + /** + * resultStruct has all data which identify a result set file + * fetch it and convert to a csvReader + */ + private InputStreamCSVReader fetchResultsetAsCsvReader(ResultStruct resultStruct) { + String jobId = resultStruct.jobId; + String batchId = resultStruct.batchId; + String resultId = resultStruct.resultId; + log.info("PK-Chunking workunit: fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, resultId); + for (int i = 0; i < this.urlLoadRetryLimit; i++) { // retries + try { + InputStream is = this.bulkConnection.getQueryResultStream(jobId, batchId, resultId); + BufferedReader br = new BufferedReader(new InputStreamReader(is, ConfigurationKeys.DEFAULT_CHARSET_ENCODING)); + return new InputStreamCSVReader(br); + } catch (Exception e) { // skip, for retry + } + } + // tried fetchRetryLimit times, always getting exception + throw new RuntimeException("Tried " + this.urlLoadRetryLimit + " times, but couldn't fetch data."); + } + + /** + * input string format is "batchId:resultId,batchId2:resultId2" + * parse it to iterator + */ + private Iterator<ResultStruct> parsebatchIdResultIdString(String jobId, String batchIdResultIdString) { + return Arrays.stream(batchIdResultIdString.split(",")).map(x -> x.split(":")).map(x -> new ResultStruct(jobId, x[0], x[1])).iterator(); + } + + @Data + static class ResultStruct { + private final String jobId; + private final String batchId; + private final String resultId; + public String toString() { + return String.format("[jobId=%s, batchId=%s, resultId=%s]", jobId, batchId, resultId); + } + } + +} diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java index e868cd6..d5fe3f7 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceConfigurationKeys.java @@ -17,9 +17,24 @@ package org.apache.gobblin.salesforce; -public class SalesforceConfigurationKeys { +public final class SalesforceConfigurationKeys { + private SalesforceConfigurationKeys() { + } public static final String SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED = "source.querybased.salesforce.is.soft.deletes.pull.disabled"; - public static final int DEFAULT_SALESFORCE_MAX_CHARS_IN_FILE = 200000000; - public static final int DEFAULT_SALESFORCE_MAX_ROWS_IN_FILE = 1000000; + public static final int DEFAULT_FETCH_RETRY_LIMIT = 5; + public static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll"; + + // pk-chunking + public static final String PK_CHUNKING_TEST_BATCH_ID_LIST = "salesforce.pkChunking.testBatchIdList"; + public static final String PK_CHUNKING_TEST_JOB_ID = "salesforce.pkChunking.testJobId"; + public static final String SALESFORCE_PARTITION_TYPE = "salesforce.partitionType"; + public static final String PARTITION_PK_CHUNKING_SIZE = "salesforce.partition.pkChunkingSize"; + public static final String PK_CHUNKING_JOB_ID = "_salesforce.job.id"; + public static final String PK_CHUNKING_BATCH_RESULT_IDS = "_salesforce.result.ids"; + public static final int MAX_PK_CHUNKING_SIZE = 250_000; // this number is from SFDC's doc - https://tinyurl.com/ycjvgwv2 + public static final int MIN_PK_CHUNKING_SIZE = 20_000; + public static final int DEFAULT_PK_CHUNKING_SIZE = 250_000; // default to max for saving request quota } + + diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java index b0d444c..36186e5 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java @@ -17,6 +17,7 @@ package org.apache.gobblin.salesforce; +import com.google.common.collect.Iterators; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -33,7 +34,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; -import org.apache.gobblin.salesforce.SalesforceConfigurationKeys; +import java.util.stream.Stream; import org.apache.http.HttpEntity; import org.apache.http.NameValuePair; import org.apache.http.client.methods.HttpGet; @@ -71,7 +72,6 @@ import org.apache.gobblin.source.extractor.exception.RestApiConnectionException; import org.apache.gobblin.source.extractor.exception.SchemaException; import org.apache.gobblin.source.extractor.extract.Command; import org.apache.gobblin.source.extractor.extract.CommandOutput; -import org.apache.gobblin.source.extractor.partition.Partitioner; import org.apache.gobblin.source.jdbc.SqlQueryUtils; import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand; import org.apache.gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType; @@ -89,6 +89,8 @@ import org.apache.gobblin.source.workunit.WorkUnit; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*; + /** * An implementation of salesforce extractor for extracting data from SFDC @@ -102,20 +104,9 @@ public class SalesforceExtractor extends RestApiExtractor { private static final String SALESFORCE_HOUR_FORMAT = "HH"; private static final String SALESFORCE_SOAP_SERVICE = "/services/Soap/u"; private static final Gson GSON = new Gson(); - private static final int MAX_PK_CHUNKING_SIZE = 250000; - private static final int MIN_PK_CHUNKING_SIZE = 100000; - private static final int DEFAULT_PK_CHUNKING_SIZE = 200000; - private static final String ENABLE_PK_CHUNKING_KEY = "salesforce.enablePkChunking"; - private static final String PK_CHUNKING_SIZE_KEY = "salesforce.pkChunkingSize"; private static final int MAX_RETRY_INTERVAL_SECS = 600; - // avoid using too many bulk API calls by only allowing PK chunking only if max partitions is configured <= this - private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3; private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit"; - private static final int DEFAULT_FETCH_RETRY_LIMIT = 5; - private static final String BULK_API_USE_QUERY_ALL = "salesforce.bulkApiUseQueryAll"; private static final boolean DEFAULT_BULK_API_USE_QUERY_ALL = false; - private static final String PK_CHUNKING_SKIP_COUNT_CHECK = "salesforce.pkChunkingSkipCountCheck"; - private static final boolean DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK = false; private boolean pullStatus = true; @@ -135,45 +126,29 @@ public class SalesforceExtractor extends RestApiExtractor { private int prevBulkRecordCount = 0; private List<String> csvRecord; - private final boolean pkChunking; private final int pkChunkingSize; private final SalesforceConnector sfConnector; private final int fetchRetryLimit; private final int batchSize; - private final boolean pkChunkingSkipCountCheck; private final boolean bulkApiUseQueryAll; public SalesforceExtractor(WorkUnitState state) { super(state); - this.sfConnector = (SalesforceConnector) this.connector; - - // don't allow pk chunking if max partitions too high or have user specified partitions - if (state.getPropAsBoolean(Partitioner.HAS_USER_SPECIFIED_PARTITIONS, false) - || state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, - ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS) > PK_CHUNKING_MAX_PARTITIONS_LIMIT) { - if (state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false)) { - log.warn("Max partitions too high, so PK chunking is not enabled"); - } - - this.pkChunking = false; - } else { - this.pkChunking = state.getPropAsBoolean(ENABLE_PK_CHUNKING_KEY, false); - } + this.sfConnector = (SalesforceConnector) this.connector; this.pkChunkingSize = Math.max(MIN_PK_CHUNKING_SIZE, - Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE))); + Math.min(MAX_PK_CHUNKING_SIZE, workUnitState.getPropAsInt(PARTITION_PK_CHUNKING_SIZE, DEFAULT_PK_CHUNKING_SIZE))); - this.pkChunkingSkipCountCheck = state.getPropAsBoolean(PK_CHUNKING_SKIP_COUNT_CHECK, DEFAULT_PK_CHUNKING_SKIP_COUNT_CHECK); - this.bulkApiUseQueryAll = state.getPropAsBoolean(BULK_API_USE_QUERY_ALL, DEFAULT_BULK_API_USE_QUERY_ALL); + this.bulkApiUseQueryAll = workUnitState.getPropAsBoolean(BULK_API_USE_QUERY_ALL, DEFAULT_BULK_API_USE_QUERY_ALL); // Get batch size from .pull file - int tmpBatchSize = state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE, + int tmpBatchSize = workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE, ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE); this.batchSize = tmpBatchSize == 0 ? ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize; - this.fetchRetryLimit = state.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT); + this.fetchRetryLimit = workUnitState.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT); } @Override @@ -184,14 +159,14 @@ public class SalesforceExtractor extends RestApiExtractor { /** * true is further pull required else false */ - public void setPullStatus(boolean pullStatus) { + private void setPullStatus(boolean pullStatus) { this.pullStatus = pullStatus; } /** * url for the next pull from salesforce */ - public void setNextUrl(String nextUrl) { + private void setNextUrl(String nextUrl) { this.nextUrl = nextUrl; } @@ -203,11 +178,11 @@ public class SalesforceExtractor extends RestApiExtractor { this.bulkJobFinished = bulkJobFinished; } - public boolean isNewBulkResultSet() { + private boolean isNewBulkResultSet() { return this.newBulkResultSet; } - public void setNewBulkResultSet(boolean newBulkResultSet) { + private void setNewBulkResultSet(boolean newBulkResultSet) { this.newBulkResultSet = newBulkResultSet; } @@ -303,7 +278,7 @@ public class SalesforceExtractor extends RestApiExtractor { } query = SqlQueryUtils.addPredicate(query, defaultPredicate); query = query + defaultSortOrder; - log.info("QUERY: " + query); + log.info("getHighWatermarkMetadata - QUERY: " + query); try { return constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query))); @@ -326,7 +301,7 @@ public class SalesforceExtractor extends RestApiExtractor { } JsonElement element = GSON.fromJson(output, JsonObject.class); - long high_ts; + long highTs; try { JsonObject jsonObject = element.getAsJsonObject(); JsonArray jsonArray = jsonObject.getAsJsonArray("records"); @@ -344,15 +319,15 @@ public class SalesforceExtractor extends RestApiExtractor { log.error("ParseException: " + e.getMessage(), e); } SimpleDateFormat outFormat = new SimpleDateFormat("yyyyMMddHHmmss"); - high_ts = Long.parseLong(outFormat.format(date)); + highTs = Long.parseLong(outFormat.format(date)); } else { - high_ts = Long.parseLong(value); + highTs = Long.parseLong(value); } } catch (Exception e) { throw new HighWatermarkException("Failed to get high watermark from salesforce; error - " + e.getMessage(), e); } - return high_ts; + return highTs; } @Override @@ -384,7 +359,7 @@ public class SalesforceExtractor extends RestApiExtractor { } query = query + getLimitFromInputQuery(this.updatedQuery); - log.info("QUERY: " + query); + log.info("getCountMetadata - QUERY: " + query); return constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query))); } catch (Exception e) { throw new RecordCountException("Failed to get salesforce url for record count; error - " + e.getMessage(), e); @@ -421,11 +396,11 @@ public class SalesforceExtractor extends RestApiExtractor { String query = this.updatedQuery; String url = null; try { - if (this.getNextUrl() != null && this.pullStatus == true) { + if (this.getNextUrl() != null && this.pullStatus) { url = this.getNextUrl(); } else { if (isNullPredicate(predicateList)) { - log.info("QUERY:" + query); + log.info("getDataMetaData null predicate - QUERY:" + query); return constructGetCommand(this.sfConnector.getFullUri(getSoqlUrl(query))); } @@ -443,7 +418,7 @@ public class SalesforceExtractor extends RestApiExtractor { } query = query + limitString; - log.info("QUERY: " + query); + log.info("getDataMetadata - QUERY: " + query); url = this.sfConnector.getFullUri(getSoqlUrl(query)); } return constructGetCommand(url); @@ -556,27 +531,27 @@ public class SalesforceExtractor extends RestApiExtractor { @Override public String getHourPredicateCondition(String column, long value, String valueFormat, String operator) { log.info("Getting hour predicate from salesforce"); - String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, SALESFORCE_HOUR_FORMAT); - return column + " " + operator + " " + Formattedvalue; + String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, SALESFORCE_HOUR_FORMAT); + return column + " " + operator + " " + formattedvalue; } @Override public String getDatePredicateCondition(String column, long value, String valueFormat, String operator) { log.info("Getting date predicate from salesforce"); - String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, SALESFORCE_DATE_FORMAT); - return column + " " + operator + " " + Formattedvalue; + String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, SALESFORCE_DATE_FORMAT); + return column + " " + operator + " " + formattedvalue; } @Override public String getTimestampPredicateCondition(String column, long value, String valueFormat, String operator) { log.info("Getting timestamp predicate from salesforce"); - String Formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, SALESFORCE_TIMESTAMP_FORMAT); - return column + " " + operator + " " + Formattedvalue; + String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, SALESFORCE_TIMESTAMP_FORMAT); + return column + " " + operator + " " + formattedvalue; } @Override public Map<String, String> getDataTypeMap() { - Map<String, String> dataTypeMap = ImmutableMap.<String, String> builder().put("url", "string") + Map<String, String> dataTypeMap = ImmutableMap.<String, String>builder().put("url", "string") .put("textarea", "string").put("reference", "string").put("phone", "string").put("masterrecord", "string") .put("location", "string").put("id", "string").put("encryptedstring", "string").put("email", "string") .put("DataCategoryGroupReference", "string").put("calculated", "string").put("anyType", "string") @@ -588,16 +563,43 @@ public class SalesforceExtractor extends RestApiExtractor { return dataTypeMap; } + + private Boolean isPkChunkingFetchDone = false; + + private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) throws RuntimeException { + if (isPkChunkingFetchDone) { + return null; // must return null to represent no more data. + } + isPkChunkingFetchDone = true; // set to true, never come here twice. + + try { + if (!bulkApiLogin()) { + throw new IllegalArgumentException("Invalid Login"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID); + String batchIdResultIdString = workUnit.getProp(PK_CHUNKING_BATCH_RESULT_IDS); + return new ResultIterator(bulkConnection, jobId, batchIdResultIdString, fetchRetryLimit); + } + @Override public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws IOException { log.debug("Getting salesforce data using bulk api"); - RecordSet<JsonElement> rs = null; + // new version of extractor: bulk api with pk-chunking in pre-partitioning of SalesforceSource + if (workUnit.contains(PK_CHUNKING_JOB_ID)) { + log.info("----pk-chunking get record set----" + workUnit.getProp(PK_CHUNKING_JOB_ID)); + return getRecordSetPkChunking(workUnit); + } + log.info("----bulk get record set----"); try { //Get query result ids in the first run //result id is used to construct url while fetching data - if (this.bulkApiInitialRun == true) { + if (this.bulkApiInitialRun) { // set finish status to false before starting the bulk job this.setBulkJobFinished(false); this.bulkResultIdList = getQueryResultIds(entity, predicateList); @@ -607,6 +609,7 @@ public class SalesforceExtractor extends RestApiExtractor { // Get data from input stream // If bulk load is not finished, get data from the stream // Skip empty result sets since they will cause the extractor to terminate early + RecordSet<JsonElement> rs = null; while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) { rs = getBulkData(); } @@ -615,11 +618,11 @@ public class SalesforceExtractor extends RestApiExtractor { this.bulkApiInitialRun = false; // If bulk job is finished, get soft deleted records using Rest API - boolean isSoftDeletesPullDisabled = Boolean.valueOf(this.workUnit - .getProp(SalesforceConfigurationKeys.SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED)); - if (rs == null || rs.isEmpty()) { + boolean disableSoftDeletePull = this.workUnit.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED); + if (rs == null || rs.isEmpty()) { // TODO: Only when the rs is empty, we need to fetch soft deleted records. WHY? // Get soft delete records only if IsDeleted column exists and soft deletes pull is not disabled - if (this.columnList.contains("IsDeleted") && !isSoftDeletesPullDisabled) { + if (this.columnList.contains("IsDeleted") && !disableSoftDeletePull) { + log.info("Pull soft delete records"); return this.getSoftDeletedRecords(schema, entity, workUnit, predicateList); } log.info("Ignoring soft delete records"); @@ -634,7 +637,7 @@ public class SalesforceExtractor extends RestApiExtractor { /** * Get soft deleted records using Rest Api - * @return iterator with deleted records + * @return iterator with deleted records */ private Iterator<JsonElement> getSoftDeletedRecords(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) throws DataRecordException { @@ -645,7 +648,7 @@ public class SalesforceExtractor extends RestApiExtractor { * Login to salesforce * @return login status */ - public boolean bulkApiLogin() throws Exception { + private boolean bulkApiLogin() throws Exception { log.info("Authenticating salesforce bulk api"); boolean success = false; String hostName = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME); @@ -705,6 +708,27 @@ public class SalesforceExtractor extends RestApiExtractor { } /** + * same as getQueryResultIdsPkChunking but the arguments are different. + * this function can take existing batch ids to return JobIdAndBatchIdResultIdList + * It is for test/debug. developers may want to skip execute query on SFDC, use a list of existing batch ids + */ + public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunkingFetchOnly(String jobId, String batchIdListStr) { + try { + if (!bulkApiLogin()) { + throw new IllegalArgumentException("Invalid Login"); + } + int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS * 1000, 30 + this.pkChunkingSize * 2); + if (batchIdListStr != null) { + log.info("The batchId was specified."); + return retrievePkChunkingResultIdsByBatchId(this.bulkConnection, jobId, batchIdListStr); + } else { + return retrievePkChunkingResultIds(this.bulkConnection, jobId, retryInterval); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * get BulkConnection instance * @return */ @@ -713,10 +737,79 @@ public class SalesforceExtractor extends RestApiExtractor { } /** + * this function currently only for pk-chunking. it is from getQueryResultIds + * TODO: abstract this function to a common function: arguments need to add connetion, header, output-format + * TODO: make it and its related functions pure function (no side effect). Currently still unnecesarily changing this.bulkJob) + */ + public JobIdAndBatchIdResultIdList getQueryResultIdsPkChunking(String entity, List<Predicate> predicateList) { + try { + if (!bulkApiLogin()) { + throw new IllegalArgumentException("Invalid Login"); + } + BulkConnection connection = this.bulkConnection; + JobInfo jobRequest = new JobInfo(); + jobRequest.setObject(entity); + jobRequest.setOperation(OperationEnum.queryAll); + jobRequest.setConcurrencyMode(ConcurrencyMode.Parallel); + log.info("Enabling pk chunking with size {}", this.pkChunkingSize); + connection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" + this.pkChunkingSize); + // Result type as CSV + jobRequest.setContentType(ContentType.CSV); + JobInfo createdJob = connection.createJob(jobRequest); + log.info("Created bulk job: {}", createdJob.getId()); + this.bulkJob = createdJob; // other functions need to use it TODO: remove bulkJob from this class + String jobId = createdJob.getId(); + JobInfo jobResponse = connection.getJobStatus(jobId); + // Construct query with the predicates + String query = this.updatedQuery; + if (!isNullPredicate(predicateList)) { + String limitString = getLimitFromInputQuery(query); + query = query.replace(limitString, ""); + Iterator<Predicate> i = predicateList.listIterator(); + while (i.hasNext()) { + Predicate predicate = i.next(); + query = SqlQueryUtils.addPredicate(query, predicate.getCondition()); + } + query = query + limitString; + } + log.info("Submitting PK Chunking query:" + query); + ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING)); + + BatchInfo executeQueryBatch = connection.createBatchFromStream(jobResponse, bout); + String pkChunkingBatchId = executeQueryBatch.getId(); + + int waitMilliSecond = 60 * 1000; + + // Get batch info with complete resultset (info id - refers to the resultset id corresponding to entire resultset) + BatchInfo batchResponse = connection.getBatchInfo(jobId, pkChunkingBatchId); + + // wait for completion, failure, or formation of PK chunking batches + // user query will be submitted to sfdc and create the first batch, + // It is supposed to create more batch after the initial batch + BatchStateEnum batchState = batchResponse.getState(); + while (batchState == BatchStateEnum.InProgress || batchState == BatchStateEnum.Queued) { + Thread.sleep(waitMilliSecond); + batchResponse = connection.getBatchInfo(createdJob.getId(), executeQueryBatch.getId()); + log.info("Waiting for first batch (jobId={}, pkChunkingBatchId={})", jobId, pkChunkingBatchId); + batchState = batchResponse.getState(); + } + + if (batchResponse.getState() == BatchStateEnum.Failed) { + log.error("Bulk batch failed: " + batchResponse.toString()); + throw new Exception("Failed to get bulk batch info for jobId " + jobId + " error - " + batchResponse.getStateMessage()); + } + JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = retrievePkChunkingResultIds(connection, jobId, waitMilliSecond); + return jobIdAndBatchIdResultIdList; + } catch (Exception e) { + throw new RuntimeException("getQueryResultIdsPkChunking: error - " + e.getMessage(), e); + } + } + + /** * Get Record set using salesforce specific API(Bulk API) * @param entity/tablename * @param predicateList of all predicate conditions - * @return iterator with batch of records + * @return iterator with batch of records */ private List<BatchIdAndResultId> getQueryResultIds(String entity, List<Predicate> predicateList) throws Exception { if (!bulkApiLogin()) { @@ -724,24 +817,17 @@ public class SalesforceExtractor extends RestApiExtractor { } try { - boolean usingPkChunking = false; - // Set bulk job attributes this.bulkJob.setObject(entity); this.bulkJob.setOperation(this.bulkApiUseQueryAll ? OperationEnum.queryAll : OperationEnum.query); this.bulkJob.setConcurrencyMode(ConcurrencyMode.Parallel); - // use pk chunking if pk chunking is configured and the expected record count is larger than the pk chunking size - if (this.pkChunking && (this.pkChunkingSkipCountCheck || getExpectedRecordCount() > this.pkChunkingSize)) { - log.info("Enabling pk chunking with size {}", this.pkChunkingSize); - this.bulkConnection.addHeader("Sforce-Enable-PKChunking", "chunkSize=" + this.pkChunkingSize); - usingPkChunking = true; - } - // Result type as CSV this.bulkJob.setContentType(ContentType.CSV); this.bulkJob = this.bulkConnection.createJob(this.bulkJob); + log.info("Created bulk job [jobId={}]", this.bulkJob.getId()); + this.bulkJob = this.bulkConnection.getJobStatus(this.bulkJob.getId()); // Construct query with the predicates @@ -759,38 +845,30 @@ public class SalesforceExtractor extends RestApiExtractor { query = query + limitString; } - log.info("QUERY:" + query); + log.info("getQueryResultIds - QUERY:" + query); ByteArrayInputStream bout = new ByteArrayInputStream(query.getBytes(ConfigurationKeys.DEFAULT_CHARSET_ENCODING)); BatchInfo bulkBatchInfo = this.bulkConnection.createBatchFromStream(this.bulkJob, bout); - long expectedSizePerBatch = usingPkChunking ? this.pkChunkingSize : this.getExpectedRecordCount(); - - int retryInterval = Math.min(MAX_RETRY_INTERVAL_SECS, - 30 + (int) Math.ceil((float) expectedSizePerBatch / 10000) * 2); - log.info("Salesforce bulk api retry interval in seconds:" + retryInterval); + int waitMilliSeconds = 60 * 1000; // wait 1 minute // Get batch info with complete resultset (info id - refers to the resultset id corresponding to entire resultset) bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId()); // wait for completion, failure, or formation of PK chunking batches - while ((bulkBatchInfo.getState() != BatchStateEnum.Completed) - && (bulkBatchInfo.getState() != BatchStateEnum.Failed) - && (!usingPkChunking || bulkBatchInfo.getState() != BatchStateEnum.NotProcessed)) { - Thread.sleep(retryInterval * 1000); + // if it is InProgress or Queued, continue to wait. + while (bulkBatchInfo.getState() == BatchStateEnum.InProgress || bulkBatchInfo.getState() == BatchStateEnum.Queued) { + Thread.sleep(waitMilliSeconds); bulkBatchInfo = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bulkBatchInfo.getId()); - log.debug("Bulk Api Batch Info:" + bulkBatchInfo); log.info("Waiting for bulk resultSetIds"); } // Wait for pk chunking batches BatchInfoList batchInfoList = this.bulkConnection.getBatchInfoList(this.bulkJob.getId()); - - if (usingPkChunking && bulkBatchInfo.getState() == BatchStateEnum.NotProcessed) { - bulkBatchInfo = waitForPkBatches(batchInfoList, retryInterval); - } - - if (bulkBatchInfo.getState() == BatchStateEnum.Failed) { + BatchStateEnum state = bulkBatchInfo.getState(); + // not sure if the state can be "NotProcessed" in non-pk-chunking bulk request + // SFDC doc says - This state is assigned when a job is aborted while the batch is queued + if (state == BatchStateEnum.Failed || state == BatchStateEnum.InProgress) { log.error("Bulk batch failed: " + bulkBatchInfo.toString()); throw new RuntimeException("Failed to get bulk batch info for jobId " + bulkBatchInfo.getJobId() + " error - " + bulkBatchInfo.getStateMessage()); @@ -817,6 +895,7 @@ public class SalesforceExtractor extends RestApiExtractor { } } + /** * Get a buffered reader wrapping the query result stream for the result with the specified index * @param index index the {@link #bulkResultIdList} @@ -904,6 +983,8 @@ public class SalesforceExtractor extends RestApiExtractor { } } + + /** * Fetch a result batch with retry for network errors * @param rs the {@link RecordSetList} to fetch into @@ -938,7 +1019,7 @@ public class SalesforceExtractor extends RestApiExtractor { /** * Get data from the bulk api input stream - * @return record set with each record as a JsonObject + * @return record set with each record as a JsonObject */ private RecordSet<JsonElement> getBulkData() throws DataRecordException { log.debug("Processing bulk api batch..."); @@ -987,53 +1068,82 @@ public class SalesforceExtractor extends RestApiExtractor { @Override public void closeConnection() throws Exception { if (this.bulkConnection != null - && !this.bulkConnection.getJobStatus(this.bulkJob.getId()).getState().toString().equals("Closed")) { + && !this.bulkConnection.getJobStatus(this.getBulkJobId()).getState().toString().equals("Closed")) { log.info("Closing salesforce bulk job connection"); - this.bulkConnection.closeJob(this.bulkJob.getId()); + this.bulkConnection.closeJob(this.getBulkJobId()); } } - public static List<Command> constructGetCommand(String restQuery) { + private static List<Command> constructGetCommand(String restQuery) { return Arrays.asList(new RestApiCommand().build(Arrays.asList(restQuery), RestApiCommandType.GET)); } + + private JobIdAndBatchIdResultIdList retrievePkChunkingResultIdsByBatchId(BulkConnection connection, String jobId, String batchIdListStr) { + Iterator<String> batchIds = Arrays.stream(batchIdListStr.split(",")).map(x -> x.trim()).filter(x -> !x.equals("")).iterator(); + try { + List<BatchIdAndResultId> batchIdAndResultIdList = fetchBatchResultIds(connection, jobId, batchIds); + return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + /** * Waits for the PK batches to complete. The wait will stop after all batches are complete or on the first failed batch - * @param batchInfoList list of batch info - * @param retryInterval the polling interval - * @return the last {@link BatchInfo} processed - * @throws InterruptedException - * @throws AsyncApiException */ - private BatchInfo waitForPkBatches(BatchInfoList batchInfoList, int retryInterval) - throws InterruptedException, AsyncApiException { - BatchInfo batchInfo = null; - BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); - - // Wait for all batches other than the first one. The first one is not processed in PK chunking mode - for (int i = 1; i < batchInfos.length; i++) { - BatchInfo bi = batchInfos[i]; - - // get refreshed job status - bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); - - while ((bi.getState() != BatchStateEnum.Completed) - && (bi.getState() != BatchStateEnum.Failed)) { - Thread.sleep(retryInterval * 1000); - bi = this.bulkConnection.getBatchInfo(this.bulkJob.getId(), bi.getId()); - log.debug("Bulk Api Batch Info:" + bi); - log.info("Waiting for bulk resultSetIds"); + private JobIdAndBatchIdResultIdList retrievePkChunkingResultIds(BulkConnection connection, String jobId, int waitMilliSecond) { + log.info("Waiting for completion of the the bulk job [jobId={}])'s sub queries.", jobId); + try { + while (true) { + BatchInfoList batchInfoList = connection.getBatchInfoList(jobId); + BatchInfo[] batchInfos = batchInfoList.getBatchInfo(); + if (needContinueToPoll(batchInfos, waitMilliSecond)) { + continue; // continue to wait + } + if (Arrays.stream(batchInfos).filter(x -> x.getState() == BatchStateEnum.NotProcessed).count() != 1) { + throw new Exception("PK-Chunking query should have 1 and only 1 batch with state=NotProcessed."); + } + Stream<BatchInfo> stream = Arrays.stream(batchInfos); + Iterator<String> batchIds = stream.filter(x -> x.getNumberRecordsProcessed() != 0).map(x -> x.getId()).iterator(); + List<BatchIdAndResultId> batchIdAndResultIdList = fetchBatchResultIds(connection, jobId, batchIds); + return new JobIdAndBatchIdResultIdList(jobId, batchIdAndResultIdList); } + } catch (Exception e) { + throw new RuntimeException(e); + } + } - batchInfo = bi; + private List<BatchIdAndResultId> fetchBatchResultIds(BulkConnection connection, String jobId, Iterator<String> batchIds) throws Exception { + List<BatchIdAndResultId> batchIdAndResultIdList = Lists.newArrayList(); + while (batchIds.hasNext()) { + String batchId = batchIds.next(); + QueryResultList result = connection.getQueryResultList(jobId, batchId); + Stream<String> stream = Arrays.stream(result.getResult()); + Iterator<BatchIdAndResultId> it = stream.map(rId -> new BatchIdAndResultId(batchId, rId)).iterator(); + Iterators.addAll(batchIdAndResultIdList, it); + } + return batchIdAndResultIdList; + } - // exit if there was a failure - if (batchInfo.getState() == BatchStateEnum.Failed) { - break; + private Boolean needContinueToPoll(BatchInfo[] batchInfos, long toWait) { + long queued = Arrays.stream(batchInfos).filter(x -> x.getState() == BatchStateEnum.Queued).count(); + long inProgress = Arrays.stream(batchInfos).filter(x -> x.getState() == BatchStateEnum.InProgress).count(); + for (BatchInfo bi : batchInfos) { + BatchStateEnum state = bi.getState(); + if (state == BatchStateEnum.InProgress || state == BatchStateEnum.Queued) { + try { + log.info("Total: {}, queued: {}, InProgress: {}, waiting for [batchId: {}, state: {}]", batchInfos.length, queued, inProgress, bi.getId(), state); + Thread.sleep(toWait); + } catch (InterruptedException e) { // skip + } + return true; // need to continue to wait + } + if (state == BatchStateEnum.Failed) { + throw new RuntimeException(String.format("[batchId=%s] failed", bi.getId())); } } - - return batchInfo; + return false; // no need to wait } //Moving config creation in a separate method for custom config parameters like setting up transport factory. @@ -1056,9 +1166,19 @@ public class SalesforceExtractor extends RestApiExtractor { return config; } + public String getBulkJobId() { + return workUnit.getProp(PK_CHUNKING_JOB_ID, this.bulkJob.getId()); + } + @Data - private static class BatchIdAndResultId { + public static class BatchIdAndResultId { private final String batchId; private final String resultId; } + + @Data + public static class JobIdAndBatchIdResultIdList { + private final String jobId; + private final List<BatchIdAndResultId> batchIdAndResultIdList; + } } diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java index aedd97b..96ad4b7 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java @@ -17,6 +17,7 @@ package org.apache.gobblin.salesforce; +import com.google.common.collect.Lists; import java.io.IOException; import java.math.RoundingMode; import java.util.ArrayList; @@ -30,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.text.StrSubstitutor; import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics; @@ -65,7 +67,9 @@ import org.apache.gobblin.source.extractor.extract.restapi.RestApiConnector; import org.apache.gobblin.source.extractor.partition.Partition; import org.apache.gobblin.source.extractor.partition.Partitioner; import org.apache.gobblin.source.extractor.utils.Utils; +import org.apache.gobblin.source.extractor.watermark.Predicate; import org.apache.gobblin.source.extractor.watermark.WatermarkType; +import org.apache.gobblin.source.workunit.Extract; import org.apache.gobblin.source.workunit.WorkUnit; import lombok.AllArgsConstructor; @@ -73,13 +77,16 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import static org.apache.gobblin.configuration.ConfigurationKeys.*; +import static org.apache.gobblin.salesforce.SalesforceConfigurationKeys.*; +import org.apache.gobblin.salesforce.SalesforceExtractor.JobIdAndBatchIdResultIdList; +import org.apache.gobblin.salesforce.SalesforceExtractor.BatchIdAndResultId; /** * An implementation of {@link QueryBasedSource} for salesforce data sources. */ @Slf4j public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { - public static final String USE_ALL_OBJECTS = "use.all.objects"; public static final boolean DEFAULT_USE_ALL_OBJECTS = false; @@ -146,12 +153,121 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { @Override protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + List<WorkUnit> workUnits = null; + String partitionType = state.getProp(SALESFORCE_PARTITION_TYPE, ""); + if (partitionType.equals("PK_CHUNKING")) { + // pk-chunking only supports start-time by source.querybased.start.value, and does not support end-time. + // always ingest data later than or equal source.querybased.start.value. + // we should only pk chunking based work units only in case of snapshot/full ingestion + workUnits = generateWorkUnitsPkChunking(sourceEntity, state, previousWatermark); + } else { + workUnits = generateWorkUnitsStrategy(sourceEntity, state, previousWatermark); + } + log.info("====Generated {} workUnit(s)====", workUnits.size()); + return workUnits; + } + + /** + * generate workUnit for pk chunking + */ + private List<WorkUnit> generateWorkUnitsPkChunking(SourceEntity sourceEntity, SourceState state, long previousWatermark) { + JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList = executeQueryWithPkChunking(state, previousWatermark); + return createWorkUnits(sourceEntity, state, jobIdAndBatchIdResultIdList); + } + + private JobIdAndBatchIdResultIdList executeQueryWithPkChunking( + SourceState sourceState, + long previousWatermark + ) throws RuntimeException { + State state = new State(sourceState); + WorkUnit workUnit = WorkUnit.createEmpty(); + WorkUnitState workUnitState = new WorkUnitState(workUnit, state); + workUnitState.setId("Execute pk-chunking"); + try { + SalesforceExtractor salesforceExtractor = (SalesforceExtractor) this.getExtractor(workUnitState); + Partitioner partitioner = new Partitioner(sourceState); + if (isEarlyStopEnabled(state) && partitioner.isFullDump()) { + throw new UnsupportedOperationException("Early stop mode cannot work with full dump mode."); + } + Partition partition = partitioner.getGlobalPartition(previousWatermark); + String condition = ""; + Date startDate = Utils.toDate(partition.getLowWatermark(), Partitioner.WATERMARKTIMEFORMAT); + String field = sourceState.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY); + // pk-chunking only supports start-time by source.querybased.start.value, and does not support end-time. + // always ingest data later than or equal source.querybased.start.value. + // we should only pk chunking based work units only in case of snapshot/full ingestion + if (startDate != null && field != null) { + String lowWatermarkDate = Utils.dateToString(startDate, SalesforceExtractor.SALESFORCE_TIMESTAMP_FORMAT); + condition = field + " >= " + lowWatermarkDate; + } + Predicate predicate = new Predicate(null, 0, condition, "", null); + List<Predicate> predicateList = Arrays.asList(predicate); + String entity = sourceState.getProp(ConfigurationKeys.SOURCE_ENTITY); + + if (state.contains(PK_CHUNKING_TEST_JOB_ID)) { + String jobId = state.getProp(PK_CHUNKING_TEST_JOB_ID, ""); + log.info("---Skip query, fetching result files directly for [jobId={}]", jobId); + String batchIdListStr = state.getProp(PK_CHUNKING_TEST_BATCH_ID_LIST); + return salesforceExtractor.getQueryResultIdsPkChunkingFetchOnly(jobId, batchIdListStr); + } else { + log.info("---Pk Chunking query submit."); + return salesforceExtractor.getQueryResultIdsPkChunking(entity, predicateList); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Create work units by taking a bulkJobId. + * The work units won't contain a query in this case. Instead they will contain a BulkJobId and a list of `batchId:resultId` + * So in extractor, the work to do is just to fetch the resultSet files. + */ + private List<WorkUnit> createWorkUnits( + SourceEntity sourceEntity, + SourceState state, + JobIdAndBatchIdResultIdList jobIdAndBatchIdResultIdList + ) { + String nameSpaceName = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY); + Extract.TableType tableType = Extract.TableType.valueOf(state.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY).toUpperCase()); + String outputTableName = sourceEntity.getDestTableName(); + Extract extract = createExtract(tableType, nameSpaceName, outputTableName); + + List<WorkUnit> workUnits = Lists.newArrayList(); + int partitionNumber = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS, 1); + List<BatchIdAndResultId> batchResultIds = jobIdAndBatchIdResultIdList.getBatchIdAndResultIdList(); + int total = batchResultIds.size(); + + // size of every partition should be: math.ceil(total/partitionNumber), use simpler way: (total+partitionNumber-1)/partitionNumber + int sizeOfPartition = (total + partitionNumber - 1) / partitionNumber; + List<List<BatchIdAndResultId>> partitionedResultIds = Lists.partition(batchResultIds, sizeOfPartition); + log.info("----partition strategy: max-parti={}, size={}, actual-parti={}, total={}", partitionNumber, sizeOfPartition, partitionedResultIds.size(), total); + + for (List<BatchIdAndResultId> resultIds : partitionedResultIds) { + WorkUnit workunit = new WorkUnit(extract); + String bulkJobId = jobIdAndBatchIdResultIdList.getJobId(); + workunit.setProp(PK_CHUNKING_JOB_ID, bulkJobId); + String resultIdStr = resultIds.stream().map(x -> x.getBatchId() + ":" + x.getResultId()).collect(Collectors.joining(",")); + workunit.setProp(PK_CHUNKING_BATCH_RESULT_IDS, resultIdStr); + workunit.setProp(ConfigurationKeys.SOURCE_ENTITY, sourceEntity.getSourceEntityName()); + workunit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, sourceEntity.getDestTableName()); + workunit.setProp(WORK_UNIT_STATE_VERSION_KEY, CURRENT_WORK_UNIT_STATE_VERSION); + addLineageSourceInfo(state, sourceEntity, workunit); + workUnits.add(workunit); + } + return workUnits; + } + + /** + * + */ + private List<WorkUnit> generateWorkUnitsStrategy(SourceEntity sourceEntity, SourceState state, long previousWatermark) { WatermarkType watermarkType = WatermarkType.valueOf( state.getProp(ConfigurationKeys.SOURCE_QUERYBASED_WATERMARK_TYPE, ConfigurationKeys.DEFAULT_WATERMARK_TYPE) .toUpperCase()); String watermarkColumn = state.getProp(ConfigurationKeys.EXTRACT_DELTA_FIELDS_KEY); - int maxPartitions = state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, + int maxPartitions = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS, ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS); int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, DEFAULT_MIN_TARGET_PARTITION_SIZE); @@ -191,11 +307,13 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { if (histogramAdjust.getGroups().size() < histogram.getGroups().size()) { HistogramGroup lastPlusOne = histogram.get(histogramAdjust.getGroups().size()); long earlyStopHighWatermark = Long.parseLong(Utils.toDateTimeFormat(lastPlusOne.getKey(), SECONDS_FORMAT, Partitioner.WATERMARKTIMEFORMAT)); - log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark); + log.info("Job {} will be stopped earlier. [LW : {}, early-stop HW : {}, expected HW : {}]", + state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), earlyStopHighWatermark, expectedHighWatermark); this.isEarlyStopped = true; expectedHighWatermark = earlyStopHighWatermark; } else { - log.info("Job {} will be finished in a single run. [LW : {}, expected HW : {}]", state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), expectedHighWatermark); + log.info("Job {} will be finished in a single run. [LW : {}, expected HW : {}]", + state.getProp(ConfigurationKeys.JOB_NAME_KEY), partition.getLowWatermark(), expectedHighWatermark); } String specifiedPartitions = generateSpecifiedPartitions(histogramAdjust, minTargetPartitionSize, maxPartitions, @@ -204,10 +322,13 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { state.setProp(Partitioner.USER_SPECIFIED_PARTITIONS, specifiedPartitions); state.setProp(Partitioner.IS_EARLY_STOPPED, isEarlyStopped); - return super.generateWorkUnits(sourceEntity, state, previousWatermark); + List<WorkUnit> workUnits = super.generateWorkUnits(sourceEntity, state, previousWatermark); + Boolean disableSoft = state.getPropAsBoolean(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED, false); + workUnits.stream().forEach(x -> x.setProp(SOURCE_QUERYBASED_SALESFORCE_IS_SOFT_DELETES_PULL_DISABLED, disableSoft)); + return workUnits; } - private boolean isEarlyStopEnabled (State state) { + private boolean isEarlyStopEnabled(State state) { return state.getPropAsBoolean(ConfigurationKeys.SOURCE_EARLY_STOP_ENABLED, ConfigurationKeys.DEFAULT_SOURCE_EARLY_STOP_ENABLED); } @@ -285,7 +406,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { * Compute the target partition size. */ private int computeTargetPartitionSize(Histogram histogram, int minTargetPartitionSize, int maxPartitions) { - return Math.max(minTargetPartitionSize, + return Math.max(minTargetPartitionSize, DoubleMath.roundToInt((double) histogram.totalRecordCount / maxPartitions, RoundingMode.CEILING)); } @@ -387,8 +508,7 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { */ private Histogram getRefinedHistogram(SalesforceConnector connector, String entity, String watermarkColumn, SourceState state, Partition partition, Histogram histogram) { - final int maxPartitions = state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, - ConfigurationKeys.DEFAULT_MAX_NUMBER_OF_PARTITIONS); + final int maxPartitions = state.getPropAsInt(SOURCE_MAX_NUMBER_OF_PARTITIONS, DEFAULT_MAX_NUMBER_OF_PARTITIONS); final int probeLimit = state.getPropAsInt(DYNAMIC_PROBING_LIMIT, DEFAULT_DYNAMIC_PROBING_LIMIT); final int minTargetPartitionSize = state.getPropAsInt(MIN_TARGET_PARTITION_SIZE, DEFAULT_MIN_TARGET_PARTITION_SIZE); final Histogram outputHistogram = new Histogram(); @@ -651,3 +771,4 @@ public class SalesforceSource extends QueryBasedSource<JsonArray, JsonElement> { private int probeCount = 0; } } +