Repository: incubator-gobblin Updated Branches: refs/heads/master becb2b786 -> c5e83a331
[GOBBLIN-284] Add retry in SalesforceExtractor to handle transient ne⦠Closes #2137 from htran1/salesforce_fetch_fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c5e83a33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c5e83a33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c5e83a33 Branch: refs/heads/master Commit: c5e83a3317e6d76c2399dd8d64d876a4247e25b6 Parents: becb2b7 Author: Hung Tran <[email protected]> Authored: Wed Oct 11 15:47:09 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Oct 11 15:47:09 2017 -0700 ---------------------------------------------------------------------- .../gobblin/salesforce/SalesforceExtractor.java | 183 ++++++++++++++----- 1 file changed, 140 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c5e83a33/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java index 38b4d1b..0c16051 100644 --- a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java +++ b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java @@ -108,6 +108,8 @@ public class SalesforceExtractor extends RestApiExtractor { private static final int MAX_RETRY_INTERVAL_SECS = 600; // avoid using too many bulk API calls by only allowing PK chunking only if max partitions is configured <= this private static final int PK_CHUNKING_MAX_PARTITIONS_LIMIT = 3; + private static final String FETCH_RETRY_LIMIT_KEY = "salesforce.fetchRetryLimit"; + private static final int DEFAULT_FETCH_RETRY_LIMIT = 5; private boolean pullStatus = true; private String nextUrl; @@ -124,10 +126,13 @@ public class SalesforceExtractor extends RestApiExtractor { private boolean newBulkResultSet = true; private int bulkRecordCount = 0; private int prevBulkRecordCount = 0; + private List<String> csvRecord; private final boolean pkChunking; private final int pkChunkingSize; private final SalesforceConnector sfConnector; + private final int fetchRetryLimit; + private final int batchSize; public SalesforceExtractor(WorkUnitState state) { super(state); @@ -149,6 +154,13 @@ public class SalesforceExtractor extends RestApiExtractor { this.pkChunkingSize = Math.max(MIN_PK_CHUNKING_SIZE, Math.min(MAX_PK_CHUNKING_SIZE, state.getPropAsInt(PK_CHUNKING_SIZE_KEY, DEFAULT_PK_CHUNKING_SIZE))); + + // Get batch size from .pull file + int tmpBatchSize = state.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE, + ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE); + + this.batchSize = tmpBatchSize == 0 ? ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE : tmpBatchSize; + this.fetchRetryLimit = state.getPropAsInt(FETCH_RETRY_LIMIT_KEY, DEFAULT_FETCH_RETRY_LIMIT); } @Override @@ -581,7 +593,8 @@ public class SalesforceExtractor extends RestApiExtractor { // Get data from input stream // If bulk load is not finished, get data from the stream - if (!this.isBulkJobFinished()) { + // Skip empty result sets since they will cause the extractor to terminate early + while (!this.isBulkJobFinished() && (rs == null || rs.isEmpty())) { rs = getBulkData(); } @@ -775,6 +788,125 @@ public class SalesforceExtractor extends RestApiExtractor { } /** + * Get a buffered reader wrapping the query result stream for the result with the specified index + * @param index index the {@link #bulkResultIdList} + * @return a {@link BufferedReader} + * @throws AsyncApiException + */ + private BufferedReader getBulkBufferedReader(int index) throws AsyncApiException { + return new BufferedReader(new InputStreamReader( + this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), this.bulkResultIdList.get(index).getBatchId(), + this.bulkResultIdList.get(index).getResultId()), ConfigurationKeys.DEFAULT_CHARSET_ENCODING)); + } + + /** + * Fetch records into a {@link RecordSetList} up to the configured batch size {@link #batchSize}. This batch is not + * the entire Salesforce result batch. It is an internal batch in the extractor for buffering a subset of the result + * stream that comes from a Salesforce batch for more efficient processing. + * @param rs the record set to fetch into + * @param initialRecordCount Initial record count to use. This should correspond to the number of records already in rs. + * This is used to limit the number of records returned in rs to {@link #batchSize}. + * @throws DataRecordException + * @throws IOException + */ + private void fetchResultBatch(RecordSetList<JsonElement> rs, int initialRecordCount) + throws DataRecordException, IOException { + int recordCount = initialRecordCount; + + // Stream the resultset through CSV reader to identify columns in each record + InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader); + + // Get header if it is first run of a new resultset + if (this.isNewBulkResultSet()) { + this.bulkRecordHeader = reader.nextRecord(); + this.bulkResultColumCount = this.bulkRecordHeader.size(); + this.setNewBulkResultSet(false); + } + + // Get record from CSV reader stream + while ((this.csvRecord = reader.nextRecord()) != null) { + // Convert CSV record to JsonObject + JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, this.csvRecord, this.bulkResultColumCount); + rs.add(jsonObject); + recordCount++; + this.bulkRecordCount++; + + // Insert records in record set until it reaches the batch size + if (recordCount >= batchSize) { + log.info("Total number of records processed so far: " + this.bulkRecordCount); + break; + } + } + } + + /** + * Reinitialize the state of {@link #bulkBufferedReader} to handle network disconnects + * @throws IOException + * @throws AsyncApiException + */ + private void reinitializeBufferedReader() throws IOException, AsyncApiException { + // close reader and get a new input stream to reconnect to resolve intermittent network errors + this.bulkBufferedReader.close(); + this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount - 1); + + // if the result set is partially processed then we need to skip over processed records + if (!isNewBulkResultSet()) { + List<String> lastCsvRecord = null; + InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader); + + // skip header + reader.nextRecord(); + + int recordsToSkip = this.bulkRecordCount - this.prevBulkRecordCount; + log.info("Skipping {} records on retry: ", recordsToSkip); + + for (int i = 0; i < recordsToSkip; i++) { + lastCsvRecord = reader.nextRecord(); + } + + // make sure the last record processed before the error was the last record skipped so that the next + // unprocessed record is processed in the next call to fetchResultBatch() + if (recordsToSkip > 0) { + if (!this.csvRecord.equals(lastCsvRecord)) { + throw new RuntimeException("Repositioning after reconnecting did not point to the expected record"); + } + } + } + } + + /** + * Fetch a result batch with retry for network errors + * @param rs the {@link RecordSetList} to fetch into + */ + private void fetchResultBatchWithRetry(RecordSetList<JsonElement> rs) + throws AsyncApiException, DataRecordException, IOException { + boolean success = false; + int retryCount = 0; + int recordCountBeforeFetch = this.bulkRecordCount; + + do { + try { + // reinitialize the reader to establish a new connection to handle transient network errors + if (retryCount > 0) { + reinitializeBufferedReader(); + } + + // on retries there may already be records in rs, so pass the number of records as the initial count + fetchResultBatch(rs, this.bulkRecordCount - recordCountBeforeFetch); + success = true; + } catch (IOException e) { + if (retryCount < this.fetchRetryLimit) { + log.info("Exception while fetching data, retrying: " + e.getMessage(), e); + retryCount++; + } else { + log.error("Exception while fetching data: " + e.getMessage(), e); + throw e; + } + } + } while (!success); + } + + /** * Get data from the bulk api input stream * @return record set with each record as a JsonObject */ @@ -796,14 +928,12 @@ public class SalesforceExtractor extends RestApiExtractor { if (this.bulkResultIdCount < this.bulkResultIdList.size()) { log.info("Stream resultset for resultId:" + this.bulkResultIdList.get(this.bulkResultIdCount)); this.setNewBulkResultSet(true); - this.bulkBufferedReader = - new BufferedReader( - new InputStreamReader( - this.bulkConnection.getQueryResultStream(this.bulkJob.getId(), - this.bulkResultIdList.get(this.bulkResultIdCount).getBatchId(), - this.bulkResultIdList.get(this.bulkResultIdCount).getResultId()), - ConfigurationKeys.DEFAULT_CHARSET_ENCODING)); + if (this.bulkBufferedReader != null) { + this.bulkBufferedReader.close(); + } + + this.bulkBufferedReader = getBulkBufferedReader(this.bulkResultIdCount); this.bulkResultIdCount++; this.prevBulkRecordCount = bulkRecordCount; } else { @@ -814,41 +944,8 @@ public class SalesforceExtractor extends RestApiExtractor { } } - // if Buffer stream has data then process the same - - // Get batch size from .pull file - int batchSize = Utils.getAsInt(this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_FETCH_SIZE)); - if (batchSize == 0) { - batchSize = ConfigurationKeys.DEFAULT_SOURCE_FETCH_SIZE; - } - - // Stream the resultset through CSV reader to identify columns in each record - InputStreamCSVReader reader = new InputStreamCSVReader(this.bulkBufferedReader); - - // Get header if it is first run of a new resultset - if (this.isNewBulkResultSet()) { - this.bulkRecordHeader = reader.nextRecord(); - this.bulkResultColumCount = this.bulkRecordHeader.size(); - this.setNewBulkResultSet(false); - } - - List<String> csvRecord; - int recordCount = 0; - - // Get record from CSV reader stream - while ((csvRecord = reader.nextRecord()) != null) { - // Convert CSV record to JsonObject - JsonObject jsonObject = Utils.csvToJsonObject(this.bulkRecordHeader, csvRecord, this.bulkResultColumCount); - rs.add(jsonObject); - recordCount++; - this.bulkRecordCount++; - - // Insert records in record set until it reaches the batch size - if (recordCount >= batchSize) { - log.info("Total number of records processed so far: " + this.bulkRecordCount); - break; - } - } + // fetch a batch of results with retry for network errors + fetchResultBatchWithRetry(rs); } catch (Exception e) { throw new DataRecordException("Failed to get records from salesforce; error - " + e.getMessage(), e);
