[
https://issues.apache.org/jira/browse/GOBBLIN-1025?focusedWorklogId=375816&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375816
]
ASF GitHub Bot logged work on GOBBLIN-1025:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Jan/20 19:12
Start Date: 22/Jan/20 19:12
Worklog Time Spent: 10m
Work Description: htran1 commented on pull request #2868: GOBBLIN-1025:
Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369749915
##########
File path:
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
##########
@@ -0,0 +1,115 @@
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+
+@Slf4j
+public class BulkResultIterator implements Iterator<JsonElement> {
+ private FileIdVO fileIdVO;
+ private int retryLimit;
+ private BulkConnection conn;
+ private InputStreamCSVReader csvReader;
+ private List<String> header;
+ private int columnSize;
+ private int lineCount = 0; // this is different than currentFileRowCount.
cvs file has header
+ private List<String> preLoadedLine = null;
+
+ public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int
retryLimit) {
+ log.info("create BulkResultIterator: " + fileIdVO);
+ this.conn = conn;
+ this.fileIdVO = fileIdVO;
+ this.retryLimit = retryLimit;
+ }
+
+ /**
+ * read first data record from cvsReader and initiate header
+ * not supposed to do it in constructor function, for delay creating file
stream
+ */
+ private void initHeader() {
+ this.header = this.nextLine(); // first line is header
+ this.columnSize = this.header.size();
+ this.preLoadedLine = this.nextLine(); // initialize: buffer one record data
+ }
+
+ private List<String> nextLine() {
+ Exception exception = null;
+ for (int i = 0; i < retryLimit; i++) {
+ try {
+ if (this.csvReader == null) {
+ this.csvReader = openAndSeekCsvReader();
+ }
+ List<String> line = this.csvReader.nextRecord();
+ this.lineCount++;
+ return line;
+ } catch (InputStreamCSVReader.CSVParseException e) {
+ throw new RuntimeException(e); // don't retry if it is parse error
+ } catch (Exception e) { // if it is any other exception, retry may
resolve the issue.
+ exception = e;
+ log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
+ this.csvReader = openAndSeekCsvReader();
+ }
+ }
+ throw new RuntimeException("***Retried***: Failed, tried " + retryLimit +
" times - ", exception);
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (this.header == null) {
+ initHeader();
+ }
+ return this.preLoadedLine != null;
+ }
+
+ @Override
+ public JsonElement next() {
+ if (this.header == null) {
+ initHeader();
+ }
+ JsonElement jsonObject = Utils.csvToJsonObject(this.header,
this.preLoadedLine, this.columnSize);
+ this.preLoadedLine = this.nextLine();
+ if (this.preLoadedLine == null) {
+ log.info("----Record count: [{}] for {}", lineCount - 1, fileIdVO);
+ }
+ return jsonObject;
+ }
+
+ private InputStreamCSVReader openAndSeekCsvReader() {
+ String jobId = fileIdVO.getJobId();
+ String batchId = fileIdVO.getBatchId();
+ String resultId = fileIdVO.getResultId();
+ log.info("Fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId,
resultId);
+ closeCsvReader();
+ try {
+ InputStream is = conn.getQueryResultStream(jobId, batchId, resultId);
+ BufferedReader br = new BufferedReader(new InputStreamReader(is,
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+ csvReader = new InputStreamCSVReader(br);
+ for (int j = 0; j < lineCount; j++) {
+ csvReader.nextRecord(); // skip these record
+ }
+ // failed to skip seekLineNumber lines, try to open the file again. same
time count+1
+ return csvReader;
Review comment:
I don't see the sanity check that after skipping the position matches the
prior position. This was the old logic.
for (int i = 0; i < recordsToSkip; i++) {
lastCsvRecord = reader.nextRecord();
}
// make sure the last record processed before the error was the last
record skipped so that the next
// unprocessed record is processed in the next call to
fetchResultBatch()
if (recordsToSkip > 0) {
if (!this.csvRecord.equals(lastCsvRecord)) {
throw new RuntimeException("Repositioning after reconnecting did
not point to the expected record");
}
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 375816)
Time Spent: 1.5h (was: 1h 20m)
> Add retry for PK-Chunking iterator
> ----------------------------------
>
> Key: GOBBLIN-1025
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1025
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Alex Li
> Priority: Major
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> In SFDC connector, there is a class called `ResultIterator` (I will change
> the name to SalesforceRecordIterator).
> It was using by only PK-Chunking currently. It encapsulated fetching a list
> of result files to a record iterator.
> However, the csvReader.nextRecord() may throw out network IO exception. We
> should do retry in this case.
> When a result file is fetched partly and one network IO exception happens, we
> are in a special situation - first half of the file is already fetched to our
> local, but another half of the file is still on datasource.
> We need to
> 1. reopen the file stream
> 2. skip all the records that we already fetched, seek the cursor to the
> record which we haven't fetched yet.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)