This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 55fdecbd1 [GOBBLIN-2110]Made retry_exception_predicate configurable in 
RetryerFactory (#4001)
55fdecbd1 is described below

commit 55fdecbd172695fe24d92ecb7b92c2c8456bb8ae
Author: pratapaditya04 <[email protected]>
AuthorDate: Thu Jul 18 00:44:14 2024 +0530

    [GOBBLIN-2110]Made retry_exception_predicate configurable in RetryerFactory 
(#4001)
    
    * made retry_exception_predicate configurable in RetryerFactory
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 +
 .../apache/gobblin/util/retry/RetryerFactory.java  | 71 +++++++++++++++++---
 .../gobblin/util/retry/RetryerFactoryTest.java     | 78 ++++++++++++++++++++++
 3 files changed, 140 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 0f4dbd10c..b6f37c315 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -1061,6 +1061,8 @@ public class ConfigurationKeys {
   // describes a comma separated list of non transient errors that may come in 
a gobblin job
   // e.g. "invalid_grant,CredentialStoreException"
   public static final String GOBBLIN_NON_TRANSIENT_ERRORS = 
"gobblin.errorMessages.nonTransientErrors";
+  // Key to store a comma-separated list of exception class names that should 
be retried
+  public static final String EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY = 
"EXCEPTION_LIST_FOR_RETRY";
 
   /**
    * Configuration properties related to Flows
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
index 5ab27a8a1..c018767ed 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
@@ -16,12 +16,16 @@
  */
 package org.apache.gobblin.util.retry;
 
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.commons.collections.CollectionUtils;
+
 
 import com.github.rholder.retry.Retryer;
 import com.github.rholder.retry.RetryerBuilder;
@@ -32,8 +36,13 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.gobblin.exception.NonTransientException;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
 
 /**
  * Factory class that builds Retryer.
@@ -52,11 +61,12 @@ public class RetryerFactory<T> {
   public static final String RETRY_TYPE = "retry_type";
   // value large or equal to 1
   public static final String RETRY_TIMES = "retry_times";
+  public static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE_DEFAULT;
 
-  private static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE;
   private static final Config DEFAULTS;
+
   static {
-    RETRY_EXCEPTION_PREDICATE = t -> !(t instanceof NonTransientException);
+    RETRY_EXCEPTION_PREDICATE_DEFAULT = t -> !(t instanceof 
NonTransientException);
 
     Map<String, Object> configMap = ImmutableMap.<String, Object>builder()
                                                 .put(RETRY_TIME_OUT_MS, 
TimeUnit.MINUTES.toMillis(5L))
@@ -87,6 +97,7 @@ public class RetryerFactory<T> {
     RetryType type = 
RetryType.valueOf(config.getString(RETRY_TYPE).toUpperCase());
 
     RetryerBuilder<T> builder;
+
     switch (type) {
       case EXPONENTIAL:
         builder = newExponentialRetryerBuilder(config);
@@ -104,6 +115,44 @@ public class RetryerFactory<T> {
     return builder.build();
   }
 
+  /**
+   Retrieves a retry predicate based on the configuration provided. If the 
configuration
+   does not specify any exceptions, a default retry predicate is returned.
+   *
+   @param config the configuration object containing the list of exception 
class names
+   @return a Predicate that evaluates to true if the throwable should be 
retried, false otherwise
+   */
+  @VisibleForTesting
+  public static Predicate<Throwable> 
getRetryPredicateFromConfigOrDefault(Config config) {
+    // Retrieve the list of exception class names from the configuration
+    List<String> exceptionList = ConfigUtils.getStringList(config, 
ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY);
+
+    // If the exception list is null or empty, return the default retry 
predicate
+    if (CollectionUtils.isEmpty(exceptionList)) {
+      return RETRY_EXCEPTION_PREDICATE_DEFAULT;
+    }
+
+    // Create a retry predicate by mapping each exception class name to a 
Predicate
+    return exceptionList.stream().map(exceptionClassName -> {
+          try {
+            Class<?> clazz = Class.forName(exceptionClassName);
+            if (Exception.class.isAssignableFrom(clazz)) {
+              // Return a Predicate that checks if a Throwable is an instance 
of the class
+              return (Predicate<Throwable>) clazz::isInstance;
+            } else {
+              LOG.error("{} is not an exception,ignoring", exceptionClassName);
+            }
+          } catch (ClassNotFoundException ignored) {
+            LOG.error("Class not found for the given exception className 
{},ignoring it", exceptionClassName, ignored);
+          } catch (Exception ignored) {
+            LOG.error("Failed to instantiate exception {},ignoring it", 
exceptionClassName, ignored);
+          }
+          return null;
+        }).filter(Objects::nonNull) // Filter out any null values
+        .reduce(com.google.common.base.Predicates::or) // Combine all 
predicates using logical OR
+        .orElse(RETRY_EXCEPTION_PREDICATE_DEFAULT); // Default to 
retryExceptionPredicate if no valid predicates are found
+  }
+
   /**
    * Creates new instance of retryer based on the config and having no {@link 
RetryListener}
    */
@@ -112,24 +161,24 @@ public class RetryerFactory<T> {
   }
 
   private static <T> RetryerBuilder<T> newFixedRetryerBuilder(Config config) {
-    return RetryerBuilder.<T> newBuilder()
-        .retryIfException(RETRY_EXCEPTION_PREDICATE)
+    return RetryerBuilder.<T>newBuilder()
+        .retryIfException(getRetryPredicateFromConfigOrDefault(config))
         
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS), 
TimeUnit.MILLISECONDS))
         
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
 TimeUnit.MILLISECONDS));
   }
 
   private static <T> RetryerBuilder<T> newExponentialRetryerBuilder(Config 
config) {
-    return RetryerBuilder.<T> newBuilder()
-        .retryIfException(RETRY_EXCEPTION_PREDICATE)
-        
.withWaitStrategy(WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER),
-                                                         
config.getLong(RETRY_INTERVAL_MS),
-                                                         
TimeUnit.MILLISECONDS))
+    return RetryerBuilder.<T>newBuilder()
+        .retryIfException(getRetryPredicateFromConfigOrDefault(config))
+        .withWaitStrategy(
+            WaitStrategies.exponentialWait(config.getLong(RETRY_MULTIPLIER), 
config.getLong(RETRY_INTERVAL_MS),
+                TimeUnit.MILLISECONDS))
         
.withStopStrategy(StopStrategies.stopAfterDelay(config.getLong(RETRY_TIME_OUT_MS),
 TimeUnit.MILLISECONDS));
   }
 
   private static <T> RetryerBuilder<T> 
newFixedAttemptBoundRetryerBuilder(Config config) {
-    return RetryerBuilder.<T> newBuilder()
-        .retryIfException(RETRY_EXCEPTION_PREDICATE)
+    return RetryerBuilder.<T>newBuilder()
+        .retryIfException(getRetryPredicateFromConfigOrDefault(config))
         
.withWaitStrategy(WaitStrategies.fixedWait(config.getLong(RETRY_INTERVAL_MS), 
TimeUnit.MILLISECONDS))
         
.withStopStrategy(StopStrategies.stopAfterAttempt(config.getInt(RETRY_TIMES)));
   }
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/retry/RetryerFactoryTest.java
 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/retry/RetryerFactoryTest.java
new file mode 100644
index 000000000..598016061
--- /dev/null
+++ 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/retry/RetryerFactoryTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.util.retry;
+
+import java.util.Arrays;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.google.common.base.Predicate;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+
+/**
+ * Unit tests for the {@link org.apache.gobblin.util.retry.RetryerFactory} 
class.
+ */
+public class RetryerFactoryTest {
+
+  @Test
+  public void testGetRetryPredicateFromConfigOrDefault_withEmptyConfig() {
+    Config config = ConfigFactory.empty();
+    Predicate<Throwable> result = 
RetryerFactory.getRetryPredicateFromConfigOrDefault(config);
+
+    Assert.assertEquals(RetryerFactory.RETRY_EXCEPTION_PREDICATE_DEFAULT, 
result);
+  }
+
+  @Test
+  public void testGetRetryPredicateFromConfigOrDefault_withValidException() {
+    Config config = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
+            
ConfigValueFactory.fromAnyRef(Arrays.asList("java.lang.RuntimeException")));
+
+    Predicate<Throwable> result = 
RetryerFactory.getRetryPredicateFromConfigOrDefault(config);
+
+    Assert.assertTrue(result.test(new RuntimeException()));
+    Assert.assertFalse(result.test(new Exception()));
+  }
+
+  @Test
+  public void testGetRetryPredicateFromConfigOrDefault_withInvalidException() {
+    Config config = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
+            
ConfigValueFactory.fromAnyRef(Arrays.asList("non.existent.Exception")));
+
+    Predicate<Throwable> result = 
RetryerFactory.getRetryPredicateFromConfigOrDefault(config);
+
+    Assert.assertEquals(RetryerFactory.RETRY_EXCEPTION_PREDICATE_DEFAULT, 
result);
+  }
+
+  @Test
+  public void testGetRetryPredicateFromConfigOrDefault_withMixedExceptions() {
+    Config config = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.EXCEPTION_LIST_FOR_RETRY_CONFIG_KEY,
+            
ConfigValueFactory.fromAnyRef(Arrays.asList("java.lang.RuntimeException", 
"non.existent.Exception")));
+
+    Predicate<Throwable> result = 
RetryerFactory.getRetryPredicateFromConfigOrDefault(config);
+
+    Assert.assertTrue(result.test(new RuntimeException()));
+    Assert.assertFalse(result.test(new Exception()));
+  }
+}

Reply via email to