This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 72306bbfd6 [GOBBLIN-2156] log error after each failed AzkabanClient
retry (#4055)
72306bbfd6 is described below
commit 72306bbfd6490bf8721fd58b73cf91ee0e8729e5
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Fri Sep 13 13:19:14 2024 -0700
[GOBBLIN-2156] log error after each failed AzkabanClient retry (#4055)
---
.../modules/orchestration/AzkabanClient.java | 53 ++++++++++++++--------
1 file changed, 35 insertions(+), 18 deletions(-)
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
index 02e7f27308..5341bb0739 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanClient.java
@@ -17,19 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.github.rholder.retry.AttemptTimeLimiters;
-import com.github.rholder.retry.RetryException;
-import com.github.rholder.retry.Retryer;
-import com.github.rholder.retry.RetryerBuilder;
-import com.github.rholder.retry.StopStrategies;
-import com.github.rholder.retry.WaitStrategies;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.io.Closer;
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
@@ -42,7 +29,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import lombok.Builder;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
@@ -61,6 +48,24 @@ import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.AttemptTimeLimiters;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.io.Closer;
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import lombok.Builder;
+
/**
* A simple http based client that uses Ajax API to communicate with Azkaban
server.
@@ -80,9 +85,9 @@ public class AzkabanClient implements Closeable {
protected CloseableHttpClient httpClient;
private ExecutorService executorService;
private Closer closer = Closer.create();
- private Retryer<AzkabanClientStatus> retryer;
- private static Logger log = LoggerFactory.getLogger(AzkabanClient.class);
- private Duration requestTimeout;
+ private Retryer<AzkabanClientStatus<?>> retryer;
+ private static final Logger log =
LoggerFactory.getLogger(AzkabanClient.class);
+ private final Duration requestTimeout;
/**
* Child class should have a different builderMethodName.
@@ -109,13 +114,25 @@ public class AzkabanClient implements Closeable {
this.initializeClient();
this.initializeSessionManager();
this.intializeExecutorService();
+ RetryListener retryListener = new RetryListener() {
+ @Override
+ public <V> void onRetry(Attempt<V> attempt) {
+ if (attempt.hasException()) {
+ String msg = String.format("(Likely retryable) failure running
Azkaban API [attempt: %d; %s after start]",
+ attempt.getAttemptNumber(),
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+ log.warn(msg, attempt.getExceptionCause());
+ }
+ }
+ };
+
- this.retryer = RetryerBuilder.<AzkabanClientStatus>newBuilder()
+ this.retryer = RetryerBuilder.<AzkabanClientStatus<?>>newBuilder()
.retryIfExceptionOfType(InvalidSessionException.class)
.withAttemptTimeLimiter(AttemptTimeLimiters.fixedTimeLimit(this.requestTimeout.toMillis(),
TimeUnit.MILLISECONDS,
this.executorService))
.withWaitStrategy(WaitStrategies.exponentialWait(60, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
+ .withRetryListener(retryListener)
.build();
try {
this.sessionId = this.sessionManager.fetchSession();