This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 12445a6  [GOBBLIN-1577] change the multiplier used in 
ExponentialWaitStrategy to a reasonable… (#3430)
12445a6 is described below

commit 12445a673f311a1053841082a2d484cfb7b323f9
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Nov 18 17:30:12 2021 -0800

    [GOBBLIN-1577] change the multiplier used in ExponentialWaitStrategy to a 
reasonable… (#3430)
    
    * change the multiplier used in ExponentialWaitStrategy to 1 second. old 
multiplier 2ms was retrying too fast for some use cases
    
    * .
---
 .../gobblin/configuration/ConfigurationKeys.java   |  2 -
 .../compaction/mapreduce/MRCompactorJobRunner.java |  4 +-
 .../gobblin/publisher/BaseDataPublisher.java       | 62 +++++++++++-----------
 .../publisher/TimePartitionedDataPublisher.java    |  2 +-
 .../TimePartitionedStreamingDataPublisher.java     |  2 +-
 .../gobblin/publisher/TimestampDataPublisher.java  |  4 +-
 .../runtime/KafkaAvroJobStatusMonitorTest.java     | 14 ++---
 .../service/monitoring/KafkaJobStatusMonitor.java  |  8 ++-
 .../apache/gobblin/util/retry/RetryerFactory.java  |  9 +---
 9 files changed, 51 insertions(+), 56 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 07182a8..c838e91 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -888,8 +888,6 @@ public class ConfigurationKeys {
   public static final String KAFKA_SOURCE_AVG_FETCH_TIME_CAP = 
"kakfa.source.avgFetchTimeCap";
   public static final int DEFAULT_KAFKA_SOURCE_AVG_FETCH_TIME_CAP = 100;
   public static final String SHARED_KAFKA_CONFIG_PREFIX = 
"gobblin.kafka.sharedConfig";
-  public static final String KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES =
-      "gobblin.kafka.jobStatusMonitor.retry.timeOut.minutes";
 
   /**
    * Kafka schema registry HTTP client configuration
diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index c804404..fab2e98 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.compaction.mapreduce;
 
-import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
@@ -55,6 +54,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -186,7 +186,7 @@ public abstract class MRCompactorJobRunner implements 
Runnable, Comparable<MRCom
         ImmutableMap.<String, Object>builder()
             .put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L))   //Overall 
retry for 2 minutes
             .put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L)) //Try to 
retry 5 seconds
-            .put(RETRY_MULTIPLIER, 2L) // Muliply by 2 every attempt
+            .put(RETRY_MULTIPLIER, 2L) // Multiply by 2 every attempt
             .put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
             .build();
     COMPACTION_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 56d7270..03fbf81 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -17,16 +17,6 @@
 
 package org.apache.gobblin.publisher;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
@@ -40,7 +30,29 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
@@ -62,15 +74,6 @@ import org.apache.gobblin.writer.FsDataWriter;
 import org.apache.gobblin.writer.FsWriterMetrics;
 import org.apache.gobblin.writer.PartitionIdentifier;
 import org.apache.gobblin.writer.PartitionedDataWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.gobblin.util.retry.RetryerFactory.*;
 
@@ -122,14 +125,14 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
   static final String PUBLISH_RETRY_ENABLED = DATA_PUBLISHER_RETRY_PREFIX + 
"enabled";
 
   static final Config PUBLISH_RETRY_DEFAULTS;
-  protected final Config retrierConfig;
+  protected final Config retryerConfig;
 
   static {
     Map<String, Object> configMap =
         ImmutableMap.<String, Object>builder()
             .put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L))   //Overall 
retry for 2 minutes
             .put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L)) //Try to 
retry 5 seconds
-            .put(RETRY_MULTIPLIER, 2L) // Muliply by 2 every attempt
+            .put(RETRY_MULTIPLIER, 2L) // Multiply by 2 every attempt
             .put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
             .build();
     PUBLISH_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
@@ -194,15 +197,14 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
     }
 
     if (this.shouldRetry) {
-      this.retrierConfig = ConfigBuilder.create()
+      this.retryerConfig = ConfigBuilder.create()
           .loadProps(this.getState().getProperties(), 
DATA_PUBLISHER_RETRY_PREFIX)
           .build()
           .withFallback(PUBLISH_RETRY_DEFAULTS);
-      LOG.info("Retry enabled for publish with config : "+ 
retrierConfig.root().render(ConfigRenderOptions.concise()));
-
-    }else {
+      LOG.info("Retry enabled for publish with config : " + 
retryerConfig.root().render(ConfigRenderOptions.concise()));
+    } else {
       LOG.info("Retry disabled for publish.");
-      this.retrierConfig = WriterUtils.NO_RETRY_CONFIG;
+      this.retryerConfig = WriterUtils.NO_RETRY_CONFIG;
     }
 
 
@@ -399,7 +401,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
     if (publishSingleTaskData) {
       // Create final output directory
       
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
 publisherOutputDir,
-          this.permissions.get(branchId), retrierConfig);
+          this.permissions.get(branchId), retryerConfig);
       
if(this.publisherOutputDirOwnerGroupByBranches.get(branchId).isPresent()) {
         LOG.info(String.format("Setting path %s group to %s", 
publisherOutputDir.toString(), 
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get()));
         HadoopUtils.setGroup(this.publisherFileSystemByBranches.get(branchId), 
publisherOutputDir, 
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get());
@@ -431,7 +433,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
       } else {
         // Create the parent directory of the final output directory if it 
does not exist
         
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
-            publisherOutputDir.getParent(), this.permissions.get(branchId), 
retrierConfig);
+            publisherOutputDir.getParent(), this.permissions.get(branchId), 
retryerConfig);
         
if(this.publisherOutputDirOwnerGroupByBranches.get(branchId).isPresent()) {
           LOG.info(String.format("Setting path %s group to %s", 
publisherOutputDir.toString(), 
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get()));
           
HadoopUtils.setGroup(this.publisherFileSystemByBranches.get(branchId), 
publisherOutputDir, 
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get());
@@ -481,7 +483,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
           .substring(taskOutputFile.indexOf(writerOutputDir.toString()) + 
writerOutputDir.toString().length() + 1);
       Path publisherOutputPath = new Path(publisherOutputDir, pathSuffix);
       
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
-          publisherOutputPath.getParent(), this.permissions.get(branchId), 
retrierConfig);
+          publisherOutputPath.getParent(), this.permissions.get(branchId), 
retryerConfig);
 
       movePath(parallelRunner, workUnitState, taskOutputPath, 
publisherOutputPath, branchId);
     }
@@ -677,7 +679,7 @@ public class BaseDataPublisher extends 
SingleTaskDataPublisher {
       FileSystem fs = this.metaDataWriterFileSystemByBranches.get(branchId);
 
         if (!fs.exists(metadataOutputPath.getParent())) {
-          WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs, 
metadataOutputPath, this.permissions.get(branchId), retrierConfig);
+          WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs, 
metadataOutputPath, this.permissions.get(branchId), retryerConfig);
         }
 
       //Delete the file if metadata already exists
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
index 30b77ea..7f6f12d 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
@@ -60,7 +60,7 @@ public class TimePartitionedDataPublisher extends 
BaseDataPublisher {
       Path outputPath = new Path(publisherOutput, pathSuffix);
 
       
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
 outputPath.getParent(),
-            this.permissions.get(branchId), this.retrierConfig);
+            this.permissions.get(branchId), this.retryerConfig);
 
       movePath(parallelRunner, workUnitState, status.getPath(), outputPath, 
branchId);
     }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
index 379cd14..bfc3376 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
@@ -78,7 +78,7 @@ public class TimePartitionedStreamingDataPublisher extends 
TimePartitionedDataPu
       // This is used to force the publisher save recordPublisherOutputDirs as 
the granularity to be parent of new file paths
       // which will be used to do hive registration
       
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
-          publisherOutputDir, this.permissions.get(branchId), retrierConfig);
+          publisherOutputDir, this.permissions.get(branchId), retryerConfig);
     }
     super.publishData(state, branchId, publishSingleTaskData, 
writerOutputPathsMoved);
   }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
index 36bcd49..c484c99 100644
--- 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
@@ -53,7 +53,7 @@ public class TimestampDataPublisher extends BaseDataPublisher 
{
     Path publisherOutputDir = getPublisherOutputDir(state, branchId);
     if 
(!this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
       
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
-          publisherOutputDir, this.permissions.get(branchId), 
this.retrierConfig);
+          publisherOutputDir, this.permissions.get(branchId), 
this.retryerConfig);
     }
     super.publishData(state, branchId, publishSingleTaskData, 
writerOutputPathsMoved);
   }
@@ -75,7 +75,7 @@ public class TimestampDataPublisher extends BaseDataPublisher 
{
 
     if (!this.publisherFileSystemByBranches.get(branchId).exists(newDst)) {
       
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
-          newDst.getParent(), this.permissions.get(branchId), 
this.retrierConfig);
+          newDst.getParent(), this.permissions.get(branchId), 
this.retryerConfig);
     }
 
     super.movePath(parallelRunner, state, src, newDst, branchId);
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index ae26d15..fd8f157 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.gobblin.runtime;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -30,7 +28,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.io.FileUtils;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.testng.Assert;
@@ -40,7 +37,9 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
@@ -71,6 +70,7 @@ import 
org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
 
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
 import static org.mockito.Mockito.mock;
 
 
@@ -316,7 +316,7 @@ public class KafkaAvroJobStatusMonitorTest {
     });
 
     try {
-      Thread.sleep(1000);
+      Thread.sleep(1000L);
     } catch(InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
@@ -324,7 +324,8 @@ public class KafkaAvroJobStatusMonitorTest {
     Config config = 
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("localhost:0000"))
         
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY, 
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
         .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, 
ConfigValueFactory.fromAnyRef(stateStoreDir))
-        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"));
+        .withValue("zookeeper.connect", 
ConfigValueFactory.fromAnyRef("localhost:2121"))
+        .withValue(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." + 
RETRY_MULTIPLIER, 
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
 
     AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new 
AtomicBoolean(false);
     int minNumFakeExceptionsExpected = 10;
@@ -356,7 +357,8 @@ public class KafkaAvroJobStatusMonitorTest {
     }, 2, 2, TimeUnit.SECONDS);
     Thread mainThread = Thread.currentThread();
     // guardrail against excessive retries (befitting this unit test):
-    toggleManagementExecutor.scheduleAtFixedRate(mainThread::interrupt, 60, 5, 
TimeUnit.SECONDS);
+    toggleManagementExecutor.scheduleAtFixedRate(mainThread::interrupt, 20, 5, 
TimeUnit.SECONDS);
+
     jobStatusMonitor.processMessage(recordIterator.next());
 
     
Assert.assertTrue(jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus() > 
minNumFakeExceptionsExpected,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index fbe21eb..9c4255e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -80,7 +80,7 @@ import static org.apache.gobblin.util.retry.RetryerFactory.*;
  */
 @Slf4j
 public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[], 
byte[]> {
-  static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
+  public static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
   //We use table suffix that is different from the Gobblin job state store 
suffix of jst to avoid confusion.
   //gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
   public static final String GET_AND_SET_JOB_STATUS = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -131,10 +131,8 @@ public abstract class KafkaJobStatusMonitor extends 
HighLevelConsumer<byte[], by
 
     this.flowNameGroupToWorkUnitCount = new ConcurrentHashMap<>();
 
-    Config retryerOverridesConfig = 
config.hasPath(ConfigurationKeys.KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES)
-        ? ConfigFactory.parseMap(ImmutableMap.of(
-                RETRY_TIME_OUT_MS,
-                
config.getDuration(ConfigurationKeys.KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES,
 TimeUnit.MINUTES)))
+    Config retryerOverridesConfig = 
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
+        ? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
         : ConfigFactory.empty();
     // log exceptions to expose errors we suffer under and/or guide 
intervention when resolution not readily forthcoming
     this.persistJobStatusRetryer =
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
index df79d37..5ab27a8 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
@@ -56,17 +56,12 @@ public class RetryerFactory<T> {
   private static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE;
   private static final Config DEFAULTS;
   static {
-    RETRY_EXCEPTION_PREDICATE = new Predicate<Throwable>() {
-      @Override
-      public boolean apply(Throwable t) {
-        return !(t instanceof NonTransientException);
-      }
-    };
+    RETRY_EXCEPTION_PREDICATE = t -> !(t instanceof NonTransientException);
 
     Map<String, Object> configMap = ImmutableMap.<String, Object>builder()
                                                 .put(RETRY_TIME_OUT_MS, 
TimeUnit.MINUTES.toMillis(5L))
                                                 .put(RETRY_INTERVAL_MS, 
TimeUnit.SECONDS.toMillis(30L))
-                                                .put(RETRY_MULTIPLIER, 2L)
+                                                .put(RETRY_MULTIPLIER, 
TimeUnit.SECONDS.toMillis(1L))
                                                 .put(RETRY_TYPE, 
RetryType.EXPONENTIAL.name())
                                                 .put(RETRY_TIMES, 2)
                                                 .build();

Reply via email to