This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1476c99  [GOBBLIN-1202] DSS-26966: Add retry for REST API call
1476c99 is described below

commit 1476c9934d99155854ca3bfdf25e705362a9e399
Author: Alex Li <[email protected]>
AuthorDate: Mon Jun 22 14:09:59 2020 -0700

    [GOBBLIN-1202] DSS-26966: Add retry for REST API call
    
    Closes #3049 from arekusuri/GOBBLIN-1202
---
 .../gobblin/salesforce/SalesforceExtractor.java    |  4 +-
 .../gobblin/salesforce/SalesforceSource.java       | 50 +++++++++++++++-------
 .../org/apache/gobblin/salesforce/SfConfig.java    | 19 +++++++-
 3 files changed, 54 insertions(+), 19 deletions(-)

diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 868ed54..2021ef8 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -128,8 +128,8 @@ public class SalesforceExtractor extends RestApiExtractor {
     this.sfConnector = (SalesforceConnector) this.connector;
     this.pkChunkingSize = conf.pkChunkingSize;
 
-    this.retryInterval = workUnitState.getPropAsLong(RETRY_INTERVAL, 
RETRY_INTERVAL_DEFAULT);
-    this.retryExceedQuotaInterval = 
workUnitState.getPropAsLong(RETRY_EXCEED_QUOTA_INTERVAL, 
RETRY_EXCEED_QUOTA_INTERVAL_DEFAULT);
+    this.retryInterval = conf.retryInterval;
+    this.retryExceedQuotaInterval = conf.retryExceedQuotaInterval;
     this.bulkApiUseQueryAll = conf.bulkApiUseQueryAll;
     this.retryLimit = conf.fetchRetryLimit;
   }
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index f8fc327..22b5541 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 
 import java.util.stream.Collectors;
+import lombok.SneakyThrows;
 import org.apache.commons.lang3.text.StrSubstitutor;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 
@@ -118,6 +119,8 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
   private boolean isEarlyStopped = false;
   protected SalesforceConnector salesforceConnector = null;
 
+  private SfConfig workUnitConf;
+
   public SalesforceSource() {
     this.lineageInfo = Optional.absent();
   }
@@ -150,10 +153,10 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
       lineageInfo.get().setSource(source, workUnit);
     }
   }
-
   @Override
   protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity, 
SourceState state, long previousWatermark) {
     List<WorkUnit> workUnits = null;
+    workUnitConf = new SfConfig(state.getProperties());
     String partitionType = state.getProp(SALESFORCE_PARTITION_TYPE, "");
     if (partitionType.equals("PK_CHUNKING")) {
       // pk-chunking only supports start-time by 
source.querybased.start.value, and does not support end-time.
@@ -164,7 +167,12 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
       workUnits = generateWorkUnitsStrategy(sourceEntity, state, 
previousWatermark);
     }
     log.info("====Generated {} workUnit(s)====", workUnits.size());
-    return workUnits;
+    if (workUnitConf.partitionOnly) {
+      log.info("It is partitionOnly mode, return blank workUnit list");
+      return new ArrayList<>();
+    } else {
+      return workUnits;
+    }
   }
 
   /**
@@ -416,24 +424,34 @@ public class SalesforceSource extends 
QueryBasedSource<JsonArray, JsonElement> {
   /**
    * Get a {@link JsonArray} containing the query results
    */
+  @SneakyThrows
   private JsonArray getRecordsForQuery(SalesforceConnector connector, String 
query) {
-    try {
-      String soqlQuery = SalesforceExtractor.getSoqlUrl(query);
-      List<Command> commands = 
RestApiConnector.constructGetCommand(connector.getFullUri(soqlQuery));
-      CommandOutput<?, ?> response = connector.getResponse(commands);
+    RestApiProcessingException exception = null;
+    for (int i = 0; i < workUnitConf.restApiRetryLimit; i++) {
+      try {
+        String soqlQuery = SalesforceExtractor.getSoqlUrl(query);
+        List<Command> commands = 
RestApiConnector.constructGetCommand(connector.getFullUri(soqlQuery));
+        CommandOutput<?, ?> response = connector.getResponse(commands);
+
+        String output;
+        Iterator<String> itr = (Iterator<String>) 
response.getResults().values().iterator();
+        if (itr.hasNext()) {
+          output = itr.next();
+        } else {
+          throw new DataRecordException("Failed to get data from salesforce; 
REST response has no output");
+        }
 
-      String output;
-      Iterator<String> itr = (Iterator<String>) 
response.getResults().values().iterator();
-      if (itr.hasNext()) {
-        output = itr.next();
-      } else {
-        throw new DataRecordException("Failed to get data from salesforce; 
REST response has no output");
+        return GSON.fromJson(output, 
JsonObject.class).getAsJsonArray("records");
+      } catch (RestApiClientException | DataRecordException e) {
+        throw new RuntimeException("Fail to get data from salesforce", e);
+      } catch (RestApiProcessingException e) {
+        exception = e;
+        log.info("Caught RestApiProcessingException, retrying({}) rest query: 
{}", i+1, query);
+        Thread.sleep(workUnitConf.restApiRetryInterval);
+        continue;
       }
-
-      return GSON.fromJson(output, JsonObject.class).getAsJsonArray("records");
-    } catch (RestApiClientException | RestApiProcessingException | 
DataRecordException e) {
-      throw new RuntimeException("Fail to get data from salesforce", e);
     }
+    throw new RuntimeException("Fail to get data from salesforce", exception);
   }
 
   /**
diff --git 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SfConfig.java 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SfConfig.java
index 4377237..ed46959 100644
--- 
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SfConfig.java
+++ 
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SfConfig.java
@@ -20,9 +20,9 @@ package org.apache.gobblin.salesforce;
 import java.util.Properties;
 import org.apache.gobblin.typedconfig.Default;
 import org.apache.gobblin.typedconfig.Key;
+import org.apache.gobblin.typedconfig.compiletime.EnumOptions;
 import org.apache.gobblin.typedconfig.compiletime.IntRange;
 
-
 public class SfConfig extends QueryBasedSourceConfig {
   public SfConfig(Properties prop) {
     super(prop);
@@ -34,6 +34,23 @@ public class SfConfig extends QueryBasedSourceConfig {
   @Key("salesforce.bulkApiUseQueryAll")@Default("false")
   public boolean bulkApiUseQueryAll;
 
+  @Key("salesforce.retry.interval")@Default("60000")
+  public int retryInterval;
+
   @Key("salesforce.fetchRetryLimit")@Default("5")
   public int fetchRetryLimit;
+
+  @Key("salesforce.retry.exceedQuotaInterval")@Default("300000")
+  public int retryExceedQuotaInterval;
+
+  @Key("sf.rest.api.retryLimit")@Default("3")
+  public int restApiRetryLimit;
+
+  @Key("sf.rest.api.retryInterval")@Default("10000") // 10 seconds
+  public int restApiRetryInterval;
+
+  // it is for test. if true, it will only execute partition part and stop.
+  @Key("sf.test.partitionOnly")@Default("false")
+  public boolean partitionOnly;
+
 }

Reply via email to