arekusuri commented on a change in pull request #2868: GOBBLIN-1025: Add retry 
for PK-Chuking iterator
URL: https://github.com/apache/incubator-gobblin/pull/2868#discussion_r369859942
 
 

 ##########
 File path: 
gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 ##########
 @@ -563,92 +547,95 @@ public String getTimestampPredicateCondition(String 
column, long value, String v
     return dataTypeMap;
   }
 
-
   private Boolean isPkChunkingFetchDone = false;
 
-  private Iterator<JsonElement> getRecordSetPkChunking(WorkUnit workUnit) 
throws RuntimeException {
+  private Iterator<JsonElement> fetchRecordSetPkChunking(WorkUnit workUnit) {
     if (isPkChunkingFetchDone) {
       return null; // must return null to represent no more data.
     }
+    log.info("----Get records for pk-chunking----" + 
workUnit.getProp(PK_CHUNKING_JOB_ID));
     isPkChunkingFetchDone = true; // set to true, never come here twice.
+    bulkApiLogin();
+    String jobId = workUnit.getProp(PK_CHUNKING_JOB_ID);
+    String batchIdResultIdPairString = 
workUnit.getProp(PK_CHUNKING_BATCH_RESULT_ID_PAIRS);
+    List<FileIdVO> fileIdList = this.parseBatchIdResultIdString(jobId, 
batchIdResultIdPairString);
+    return new ResultChainingIterator(bulkConnection, fileIdList, retryLimit);
+  }
+
+  private List<FileIdVO> parseBatchIdResultIdString(String jobId, String 
batchIdResultIdString) {
+    return Arrays.stream(batchIdResultIdString.split(","))
+        .map( x -> x.split(":")).map(x -> new FileIdVO(jobId, x[0], x[1]))
+        .collect(Collectors.toList());
+  }
 
+  private Boolean isBulkFetchDone = false;
+
+  private Iterator<JsonElement> fetchRecordSet(
+      String schema,
+      String entity,
+      WorkUnit workUnit,
+      List<Predicate> predicateList
+) {
+    if (isBulkFetchDone) {
+      return null; // need to return null to indicate no more data.
+    }
+    isBulkFetchDone = true;
+    log.info("----Get records for bulk batch job----");
     try {
-      if (!bulkApiLogin()) {
-        throw new IllegalArgumentException("Invalid Login");
-      }
+      // set finish status to false before starting the bulk job
+      this.setBulkJobFinished(false);
+      this.bulkResultIdList = getQueryResultIds(entity, predicateList);
+      log.info("Number of bulk api resultSet Ids:" + 
this.bulkResultIdList.size());
+      List<FileIdVO> fileIdVoList = this.bulkResultIdList.stream()
 
 Review comment:
   FileIdVO: I was trying to say "Value Object". it is a plain object for data 
transfer.
   fileIdVoList: since fileIdVOList doesn't look smart, I made the O lower case.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to