[ 
https://issues.apache.org/jira/browse/GOBBLIN-1025?focusedWorklogId=375816&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-375816
 ]

ASF GitHub Bot logged work on GOBBLIN-1025:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Jan/20 19:12
            Start Date: 22/Jan/20 19:12
    Worklog Time Spent: 10m 
      Work Description: htran1 commented on pull request #2868: GOBBLIN-1025: 
Add retry for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369749915
 
 

 ##########
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/BulkResultIterator.java
 ##########
 @@ -0,0 +1,115 @@
+package org.apache.gobblin.salesforce;
+
+import com.google.gson.JsonElement;
+import com.sforce.async.BulkConnection;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.source.extractor.utils.InputStreamCSVReader;
+import org.apache.gobblin.source.extractor.utils.Utils;
+
+
+@Slf4j
+public class BulkResultIterator implements Iterator<JsonElement> {
+  private FileIdVO fileIdVO;
+  private int retryLimit;
+  private BulkConnection conn;
+  private InputStreamCSVReader csvReader;
+  private List<String> header;
+  private int columnSize;
+  private int lineCount = 0; // this is different than currentFileRowCount. 
cvs file has header
+  private List<String> preLoadedLine = null;
+
+  public BulkResultIterator(BulkConnection conn, FileIdVO fileIdVO, int 
retryLimit) {
+    log.info("create BulkResultIterator: " + fileIdVO);
+    this.conn = conn;
+    this.fileIdVO = fileIdVO;
+    this.retryLimit = retryLimit;
+  }
+
+  /**
+   * read first data record from cvsReader and initiate header
+   * not supposed to do it in constructor function, for delay creating file 
stream
+   */
+  private void initHeader() {
+    this.header = this.nextLine(); // first line is header
+    this.columnSize = this.header.size();
+    this.preLoadedLine = this.nextLine(); // initialize: buffer one record data
+  }
+
+  private List<String> nextLine() {
+    Exception exception = null;
+    for (int i = 0; i < retryLimit; i++) {
+      try {
+        if (this.csvReader == null) {
+          this.csvReader = openAndSeekCsvReader();
+        }
+        List<String> line = this.csvReader.nextRecord();
+        this.lineCount++;
+        return line;
+      } catch (InputStreamCSVReader.CSVParseException e) {
+        throw new RuntimeException(e); // don't retry if it is parse error
+      } catch (Exception e) { // if it is any other exception, retry may 
resolve the issue.
+        exception = e;
+        log.info("***Retrying***: {} - {}", fileIdVO, e.getMessage());
+        this.csvReader = openAndSeekCsvReader();
+      }
+    }
+    throw new RuntimeException("***Retried***: Failed, tried " + retryLimit + 
" times - ", exception);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (this.header == null) {
+      initHeader();
+    }
+    return this.preLoadedLine != null;
+  }
+
+  @Override
+  public JsonElement next() {
+    if (this.header == null) {
+      initHeader();
+    }
+    JsonElement jsonObject = Utils.csvToJsonObject(this.header, 
this.preLoadedLine, this.columnSize);
+    this.preLoadedLine = this.nextLine();
+    if (this.preLoadedLine == null) {
+      log.info("----Record count: [{}] for {}", lineCount - 1, fileIdVO);
+    }
+    return jsonObject;
+  }
+
+  private InputStreamCSVReader openAndSeekCsvReader() {
+    String jobId = fileIdVO.getJobId();
+    String batchId = fileIdVO.getBatchId();
+    String resultId = fileIdVO.getResultId();
+    log.info("Fetching [jobId={}, batchId={}, resultId={}]", jobId, batchId, 
resultId);
+    closeCsvReader();
+    try {
+      InputStream is = conn.getQueryResultStream(jobId, batchId, resultId);
+      BufferedReader br = new BufferedReader(new InputStreamReader(is, 
ConfigurationKeys.DEFAULT_CHARSET_ENCODING));
+      csvReader = new InputStreamCSVReader(br);
+      for (int j = 0; j < lineCount; j++) {
+        csvReader.nextRecord(); // skip these record
+      }
+      // failed to skip seekLineNumber lines, try to open the file again. same 
time count+1
+      return csvReader;
 
 Review comment:
   I don't see the sanity check that after skipping the position matches the 
prior position. This was the old logic.
   
         for (int i = 0; i < recordsToSkip; i++) {      
           lastCsvRecord = reader.nextRecord(); 
         }      
   
         // make sure the last record processed before the error was the last 
record skipped so that the next   
         // unprocessed record is processed in the next call to 
fetchResultBatch()      
         if (recordsToSkip > 0) {       
           if (!this.csvRecord.equals(lastCsvRecord)) { 
             throw new RuntimeException("Repositioning after reconnecting did 
not point to the expected record");       
           }    
         }      
       }
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 375816)
    Time Spent: 1.5h  (was: 1h 20m)

> Add retry for PK-Chunking iterator
> ----------------------------------
>
>                 Key: GOBBLIN-1025
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1025
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Alex Li
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> In SFDC connector, there is a class called `ResultIterator` (I will change 
> the name to SalesforceRecordIterator).
> It was using by only PK-Chunking currently. It encapsulated fetching a list 
> of result files to a record iterator.
> However, the csvReader.nextRecord() may throw out network IO exception. We 
> should do retry in this case.
> When a result file is fetched partly and one network IO exception happens, we 
> are in a special situation - first half of the file is already fetched to our 
> local, but another half of the file is still on datasource. 
> We need to
> 1. reopen the file stream
> 2. skip all the records that we already fetched, seek the cursor to the 
> record which we haven't fetched yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to