This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 55fdecbd1 [GOBBLIN-2110]Made retry_exception_predicate configurable in
RetryerFactory (#4001)
55fdecbd1 is described below
commit 55fdecbd172695fe24d92ecb7b92c2c8456bb8ae
Author: pratapaditya04 <[email protected]>
AuthorDate: Thu Jul 18 00:44:14 2024 +0530
[GOBBLIN-2110]Made retry_exception_predicate configurable in RetryerFactory
(#4001)
* made retry_exception_predicate configurable in RetryerFactory
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../apache/gobblin/util/retry/RetryerFactory.java | 71 +++++++++++++++++---
.../gobblin/util/retry/RetryerFactoryTest.java | 78 ++++++++++++++++++++++
3 files changed, 140 insertions(+), 11 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 0f4dbd10c..b6f37c315 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1061,6 +1061,8 @@ public class ConfigurationKeys {
// describes a comma separated list of non transient errors that may come in
a gobblin job
// e.g. "invalid_grant,CredentialStoreException"
public static final String GOBBLIN_NON_TRANSIENT_ERRORS =
"gobblin.errorMessages.nonTransientErrors";
+ // Key to store a comma-separated list of exception class names that should
be retried
+ public static final String EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY =
"EXCEPTION_LIST_FOR_RETRY";
/**
* Configuration properties related to Flows
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
index 5ab27a8a1..c018767ed 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
@@ -16,12 +16,16 @@
*/
package org.apache.gobblin.util.retry;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.collections.CollectionUtils;
+
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
@@ -32,8 +36,13 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.gobblin.exception.NonTransientException;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
/**
* Factory class that builds Retryer.
@@ -52,11 +61,12 @@ public class RetryerFactory<T> {
public static final String RETRY_TYPE = "retry_type";
// value large or equal to 1
public static final String RETRY_TIMES = "retry_times";
+ public static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE_DEFAULT;
- private static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE;
private static final Config DEFAULTS;
+
static {
- RETRY_EXCEPTION_PREDICATE = t -> !(t instanceof NonTransientException);
+ RETRY_EXCEPTION_PREDICATE_DEFAULT = t -> !(t instanceof
NonTransientException);
Map<String, Object> configMap = ImmutableMap.<String, Object>builder()
.put(RETRY_TIME_OUT_MS,
TimeUnit.MINUTES.toMillis(5L))
@@ -87,6 +97,7 @@ public class RetryerFactory<T> {
RetryType type =
RetryType.valueOf(config.getString(RETRY_TYPE).toUpperCase());
RetryerBuilder<T> builder;
+
switch (type) {
case EXPONENTIAL:
builder = newExponentialRetryerBuilder(config);
@@ -104,6 +115,44 @@ public class RetryerFactory<T> {
return builder.build();
}
+ /**
+ Retrieves a retry predicate based on the configuration provided. If the
configuration
+ does not specify any exceptions, a default retry predicate is returned.
+ *
+ @param config the configuration object containing the list of exception
class names
+ @return a Predicate that evaluates to true if the throwable should be
retried, false otherwise
+ */
+ @VisibleForTesting
+ public static Predicate<Throwable>
getRetryPredicateFromConfigOrDefault(Config config) {
+ // Retrieve the list of exception class names from the configuration
+ List<String> exceptionList = ConfigUtils.getStringList(config,
ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY);
+
+ // If the exception list is null or empty, return the default retry
predicate
+ if (CollectionUtils.isEmpty(exceptionList)) {
+ return RETRY_EXCEPTION_PREDICATE_DEFAULT;
+ }
+
+ // Create a retry predicate by mapping each exception class name to a
Predicate
+ return exceptionList.stream().map(exceptionClassName -> {
+ try {
+ Class<?> clazz = Class.forName(exceptionClassName);
+ if (Exception.class.isAssignableFrom(clazz)) {
+ // Return a Predicate that checks if a Throwable is an instance
of the class
+ return (Predicate<Throwable>) clazz::isInstance;
+ } else {
+ LOG.error("{} is not an exception,ignoring", exceptionClassName);
+ }
+ } catch (ClassNotFoundException ignored) {
+ LOG.error("Class not found for the given exception className
{},ignoring it", exceptionClassName, ignored);
+ } catch (Exception ignored) {
+ LOG.error("Failed to instantiate exception {},ignoring it",
exceptionClassName, ignored);
+ }
+ return null;
+ }).filter(Objects::nonNull) // Filter out any null values
+ .reduce(com.google.common.base.Predicates::or) // Combine all
predicates using logical OR
+ .orElse(RETRY_EXCEPTION_PREDICATE_DEFAULT); // Default to
retryExceptionPredicate if no valid predicates are found
+ }
+
/**
* Creates new instance of retryer based on the config and having no {@link
RetryListener}
*/
@@ -112,24 +161,24 @@ public class RetryerFactory<T> {
}
private static <T> RetryerBuilder<T> newFixedRetryerBuilder(Config config) {
- return RetryerBuilder.<T> newBuilder()
- .retryIfException(RETRY_EXCEPTION_PREDICATE)
+ return RetryerBuilder.<T>newBuilder()
+ .retryIfException(getRetryPredicateFromConfigOrDefault(config))
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS));
}
private static <T> RetryerBuilder<T> newExponentialRetryerBuilder(Config
config) {
- return RetryerBuilder.<T> newBuilder()
- .retryIfException(RETRY_EXCEPTION_PREDICATE)
-
.withWaitStrategy(WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER),
-
config.getLong(RETRY_INTERVAL_MS),
-
TimeUnit.MILLISECONDS))
+ return RetryerBuilder.<T>newBuilder()
+ .retryIfException(getRetryPredicateFromConfigOrDefault(config))
+ .withWaitStrategy(
+ WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER),
config.getLong(RETRY_INTERVAL_MS),
+ TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
TimeUnit.MILLISECONDS));
}
private static <T> RetryerBuilder<T>
newFixedAttemptBoundRetryerBuilder(Config config) {
- return RetryerBuilder.<T> newBuilder()
- .retryIfException(RETRY_EXCEPTION_PREDICATE)
+ return RetryerBuilder.<T>newBuilder()
+ .retryIfException(getRetryPredicateFromConfigOrDefault(config))
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(config.getInt(RETRY_TIMES)));
}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/retry/RetryerFactoryTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/retry/RetryerFactoryTest.java
new file mode 100644
index 000000000..598016061
--- /dev/null
+++
b/gobblin-utility/src/test/java/org/apache/gobblin/util/retry/RetryerFactoryTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.retry;
+
+import java.util.Arrays;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Predicate;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+
+/**
+ * Unit tests for the {@link org.apache.gobblin.util.retry.RetryerFactory}
class.
+ */
+public class RetryerFactoryTest {
+
+ @Test
+ public void testGetRetryPredicateFromConfigOrDefault_withEmptyConfig() {
+ Config config = ConfigFactory.empty();
+ Predicate<Throwable> result =
RetryerFactory.getRetryPredicateFromConfigOrDefault(config);
+
+ Assert.assertEquals(RetryerFactory.RETRY_EXCEPTION_PREDICATE_DEFAULT,
result);
+ }
+
+ @Test
+ public void testGetRetryPredicateFromConfigOrDefault_withValidException() {
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
+
ConfigValueFactory.fromAnyRef(Arrays.asList("java.lang.RuntimeException")));
+
+ Predicate<Throwable> result =
RetryerFactory.getRetryPredicateFromConfigOrDefault(config);
+
+ Assert.assertTrue(result.test(new RuntimeException()));
+ Assert.assertFalse(result.test(new Exception()));
+ }
+
+ @Test
+ public void testGetRetryPredicateFromConfigOrDefault_withInvalidException() {
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
+
ConfigValueFactory.fromAnyRef(Arrays.asList("non.existent.Exception")));
+
+ Predicate<Throwable> result =
RetryerFactory.getRetryPredicateFromConfigOrDefault(config);
+
+ Assert.assertEquals(RetryerFactory.RETRY_EXCEPTION_PREDICATE_DEFAULT,
result);
+ }
+
+ @Test
+ public void testGetRetryPredicateFromConfigOrDefault_withMixedExceptions() {
+ Config config = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
+
ConfigValueFactory.fromAnyRef(Arrays.asList("java.lang.RuntimeException",
"non.existent.Exception")));
+
+ Predicate<Throwable> result =
RetryerFactory.getRetryPredicateFromConfigOrDefault(config);
+
+ Assert.assertTrue(result.test(new RuntimeException()));
+ Assert.assertFalse(result.test(new Exception()));
+ }
+}