This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1476c99 [GOBBLIN-1202] DSS-26966: Add retry for REST API call
1476c99 is described below
commit 1476c9934d99155854ca3bfdf25e705362a9e399
Author: Alex Li <[email protected]>
AuthorDate: Mon Jun 22 14:09:59 2020 -0700
[GOBBLIN-1202] DSS-26966: Add retry for REST API call
Closes #3049 from arekusuri/GOBBLIN-1202
---
.../gobblin/salesforce/SalesforceExtractor.java | 4 +-
.../gobblin/salesforce/SalesforceSource.java | 50 +++++++++++++++-------
.../org/apache/gobblin/salesforce/SfConfig.java | 19 +++++++-
3 files changed, 54 insertions(+), 19 deletions(-)
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
index 868ed54..2021ef8 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceExtractor.java
@@ -128,8 +128,8 @@ public class SalesforceExtractor extends RestApiExtractor {
this.sfConnector = (SalesforceConnector) this.connector;
this.pkChunkingSize = conf.pkChunkingSize;
- this.retryInterval = workUnitState.getPropAsLong(RETRY_INTERVAL,
RETRY_INTERVAL_DEFAULT);
- this.retryExceedQuotaInterval =
workUnitState.getPropAsLong(RETRY_EXCEED_QUOTA_INTERVAL,
RETRY_EXCEED_QUOTA_INTERVAL_DEFAULT);
+ this.retryInterval = conf.retryInterval;
+ this.retryExceedQuotaInterval = conf.retryExceedQuotaInterval;
this.bulkApiUseQueryAll = conf.bulkApiUseQueryAll;
this.retryLimit = conf.fetchRetryLimit;
}
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
index f8fc327..22b5541 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SalesforceSource.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import lombok.SneakyThrows;
import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
@@ -118,6 +119,8 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
private boolean isEarlyStopped = false;
protected SalesforceConnector salesforceConnector = null;
+ private SfConfig workUnitConf;
+
public SalesforceSource() {
this.lineageInfo = Optional.absent();
}
@@ -150,10 +153,10 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
lineageInfo.get().setSource(source, workUnit);
}
}
-
@Override
protected List<WorkUnit> generateWorkUnits(SourceEntity sourceEntity,
SourceState state, long previousWatermark) {
List<WorkUnit> workUnits = null;
+ workUnitConf = new SfConfig(state.getProperties());
String partitionType = state.getProp(SALESFORCE_PARTITION_TYPE, "");
if (partitionType.equals("PK_CHUNKING")) {
// pk-chunking only supports start-time by
source.querybased.start.value, and does not support end-time.
@@ -164,7 +167,12 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
workUnits = generateWorkUnitsStrategy(sourceEntity, state,
previousWatermark);
}
log.info("====Generated {} workUnit(s)====", workUnits.size());
- return workUnits;
+ if (workUnitConf.partitionOnly) {
+ log.info("It is partitionOnly mode, return blank workUnit list");
+ return new ArrayList<>();
+ } else {
+ return workUnits;
+ }
}
/**
@@ -416,24 +424,34 @@ public class SalesforceSource extends
QueryBasedSource<JsonArray, JsonElement> {
/**
* Get a {@link JsonArray} containing the query results
*/
+ @SneakyThrows
private JsonArray getRecordsForQuery(SalesforceConnector connector, String
query) {
- try {
- String soqlQuery = SalesforceExtractor.getSoqlUrl(query);
- List<Command> commands =
RestApiConnector.constructGetCommand(connector.getFullUri(soqlQuery));
- CommandOutput<?, ?> response = connector.getResponse(commands);
+ RestApiProcessingException exception = null;
+ for (int i = 0; i < workUnitConf.restApiRetryLimit; i++) {
+ try {
+ String soqlQuery = SalesforceExtractor.getSoqlUrl(query);
+ List<Command> commands =
RestApiConnector.constructGetCommand(connector.getFullUri(soqlQuery));
+ CommandOutput<?, ?> response = connector.getResponse(commands);
+
+ String output;
+ Iterator<String> itr = (Iterator<String>)
response.getResults().values().iterator();
+ if (itr.hasNext()) {
+ output = itr.next();
+ } else {
+ throw new DataRecordException("Failed to get data from salesforce;
REST response has no output");
+ }
- String output;
- Iterator<String> itr = (Iterator<String>)
response.getResults().values().iterator();
- if (itr.hasNext()) {
- output = itr.next();
- } else {
- throw new DataRecordException("Failed to get data from salesforce;
REST response has no output");
+ return GSON.fromJson(output,
JsonObject.class).getAsJsonArray("records");
+ } catch (RestApiClientException | DataRecordException e) {
+ throw new RuntimeException("Fail to get data from salesforce", e);
+ } catch (RestApiProcessingException e) {
+ exception = e;
+ log.info("Caught RestApiProcessingException, retrying({}) rest query:
{}", i+1, query);
+ Thread.sleep(workUnitConf.restApiRetryInterval);
+ continue;
}
-
- return GSON.fromJson(output, JsonObject.class).getAsJsonArray("records");
- } catch (RestApiClientException | RestApiProcessingException |
DataRecordException e) {
- throw new RuntimeException("Fail to get data from salesforce", e);
}
+ throw new RuntimeException("Fail to get data from salesforce", exception);
}
/**
diff --git
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SfConfig.java
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SfConfig.java
index 4377237..ed46959 100644
---
a/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SfConfig.java
+++
b/gobblin-salesforce/src/main/java/org/apache/gobblin/salesforce/SfConfig.java
@@ -20,9 +20,9 @@ package org.apache.gobblin.salesforce;
import java.util.Properties;
import org.apache.gobblin.typedconfig.Default;
import org.apache.gobblin.typedconfig.Key;
+import org.apache.gobblin.typedconfig.compiletime.EnumOptions;
import org.apache.gobblin.typedconfig.compiletime.IntRange;
-
public class SfConfig extends QueryBasedSourceConfig {
public SfConfig(Properties prop) {
super(prop);
@@ -34,6 +34,23 @@ public class SfConfig extends QueryBasedSourceConfig {
@Key("salesforce.bulkApiUseQueryAll")@Default("false")
public boolean bulkApiUseQueryAll;
+ @Key("salesforce.retry.interval")@Default("60000")
+ public int retryInterval;
+
@Key("salesforce.fetchRetryLimit")@Default("5")
public int fetchRetryLimit;
+
+ @Key("salesforce.retry.exceedQuotaInterval")@Default("300000")
+ public int retryExceedQuotaInterval;
+
+ @Key("sf.rest.api.retryLimit")@Default("3")
+ public int restApiRetryLimit;
+
+ @Key("sf.rest.api.retryInterval")@Default("10000") // 10 seconds
+ public int restApiRetryInterval;
+
+ // it is for test. if true, it will only execute partition part and stop.
+ @Key("sf.test.partitionOnly")@Default("false")
+ public boolean partitionOnly;
+
}