[
https://issues.apache.org/jira/browse/GOBBLIN-1025?focusedWorklogId=375815&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375815
]
ASF GitHub Bot logged work on GOBBLIN-1025:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Jan/20 19:12
Start Date: 22/Jan/20 19:12
Worklog Time Spent: 10m
Work Description: htran1 commented on pull request #2868: GOBBLIN-1025:
Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369747865
##########
File path:
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
##########
@@ -563,92 +547,95 @@ public String getTimestampPredicateCondition(String
column, long value, String v
return dataTypeMap;
}
-
private Boolean isPkChunkingFetchDone = false;
- private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit)
throws RuntimeException {
+ private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
if (isPkChunkingFetchDone) {
return null; // must return null to represent no more data.
}
+ log.info("----Get records for pk-chunking----" +
workUnit.getProp(PK_CHUNKING_JOB_ID));
isPkChunkingFetchDone = true; // set to true, never come here twice.
+ bulkApiLogin();
+ String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+ String batchIdResultIdPairString =
workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
+ List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId,
batchIdResultIdPairString);
+ return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+ }
+
+ private List<FileIdVO> parseBatchIdResultIdString(String jobId, String
batchIdResultIdString) {
+ return Arrays.stream(batchIdResultIdString.split(","))
+ .map( x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1]))
+ .collect(Collectors.toList());
+ }
+ private Boolean isBulkFetchDone = false;
+
+ private Iterator<JsonElement> fetchRecordSet(
+ String schema,
+ String entity,
+ WorkUnit workUnit,
+ List<Predicate> predicateList
+) {
+ if (isBulkFetchDone) {
+ return null; // need to return null to indicate no more data.
+ }
+ isBulkFetchDone = true;
+ log.info("----Get records for bulk batch job----");
try {
- if (!bulkApiLogin()) {
- throw new IllegalArgumentException("Invalid Login");
- }
+ // set finish status to false before starting the bulk job
+ this.setBulkJobFinished(false);
+ this.bulkResultIdList = getQueryResultIds(entity, predicateList);
+ log.info("Number of bulk api resultSet Ids:" +
this.bulkResultIdList.size());
+ List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
Review comment:
Is the `o` in fileIdVoList intentional or is it supposed to be a zero?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 375815)
Time Spent: 1h 20m (was: 1h 10m)
> Add retry for PK-Chunking iterator
> ----------------------------------
>
> Key: GOBBLIN-1025
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1025
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Alex Li
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> In SFDC connector, there is a class called `ResultIterator` (I will change
> the name to SalesforceRecordIterator).
> It was using by only PK-Chunking currently. It encapsulated fetching a list
> of result files to a record iterator.
> However, the csvReader.nextRecord() may throw out network IO exception. We
> should do retry in this case.
> When a result file is fetched partly and one network IO exception happens, we
> are in a special situation - first half of the file is already fetched to our
> local, but another half of the file is still on datasource.
> We need to
> 1. reopen the file stream
> 2. skip all the records that we already fetched, seek the cursor to the
> record which we haven't fetched yet.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)