This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 12445a6 [GOBBLIN-1577] change the multiplier used in
ExponentialWaitStrategy to a reasonable… (#3430)
12445a6 is described below
commit 12445a673f311a1053841082a2d484cfb7b323f9
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Nov 18 17:30:12 2021 -0800
[GOBBLIN-1577] change the multiplier used in ExponentialWaitStrategy to a
reasonable… (#3430)
* change the multiplier used in ExponentialWaitStrategy to 1 second. old
multiplier 2ms was retrying too fast for some use cases
* .
---
.../gobblin/configuration/ConfigurationKeys.java | 2 -
.../compaction/mapreduce/MRCompactorJobRunner.java | 4 +-
.../gobblin/publisher/BaseDataPublisher.java | 62 +++++++++++-----------
.../publisher/TimePartitionedDataPublisher.java | 2 +-
.../TimePartitionedStreamingDataPublisher.java | 2 +-
.../gobblin/publisher/TimestampDataPublisher.java | 4 +-
.../runtime/KafkaAvroJobStatusMonitorTest.java | 14 ++---
.../service/monitoring/KafkaJobStatusMonitor.java | 8 ++-
.../apache/gobblin/util/retry/RetryerFactory.java | 9 +---
9 files changed, 51 insertions(+), 56 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 07182a8..c838e91 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -888,8 +888,6 @@ public class ConfigurationKeys {
public static final String KAFKA_SOURCE_AVG_FETCH_TIME_CAP =
"kakfa.source.avgFetchTimeCap";
public static final int DEFAULT_KAFKA_SOURCE_AVG_FETCH_TIME_CAP = 100;
public static final String SHARED_KAFKA_CONFIG_PREFIX =
"gobblin.kafka.sharedConfig";
- public static final String KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES =
- "gobblin.kafka.jobStatusMonitor.retry.timeOut.minutes";
/**
* Kafka schema registry HTTP client configuration
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
index c804404..fab2e98 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/MRCompactorJobRunner.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.compaction.mapreduce;
-import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
@@ -55,6 +54,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -186,7 +186,7 @@ public abstract class MRCompactorJobRunner implements
Runnable, Comparable<MRCom
ImmutableMap.<String, Object>builder()
.put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L)) //Overall
retry for 2 minutes
.put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L)) //Try to
retry 5 seconds
- .put(RETRY_MULTIPLIER, 2L) // Muliply by 2 every attempt
+ .put(RETRY_MULTIPLIER, 2L) // Multiply by 2 every attempt
.put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
.build();
COMPACTION_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
index 56d7270..03fbf81 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/BaseDataPublisher.java
@@ -17,16 +17,6 @@
package org.apache.gobblin.publisher;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.io.Closer;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
@@ -40,7 +30,29 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
@@ -62,15 +74,6 @@ import org.apache.gobblin.writer.FsDataWriter;
import org.apache.gobblin.writer.FsWriterMetrics;
import org.apache.gobblin.writer.PartitionIdentifier;
import org.apache.gobblin.writer.PartitionedDataWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.gobblin.util.retry.RetryerFactory.*;
@@ -122,14 +125,14 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
static final String PUBLISH_RETRY_ENABLED = DATA_PUBLISHER_RETRY_PREFIX +
"enabled";
static final Config PUBLISH_RETRY_DEFAULTS;
- protected final Config retrierConfig;
+ protected final Config retryerConfig;
static {
Map<String, Object> configMap =
ImmutableMap.<String, Object>builder()
.put(RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(2L)) //Overall
retry for 2 minutes
.put(RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(5L)) //Try to
retry 5 seconds
- .put(RETRY_MULTIPLIER, 2L) // Muliply by 2 every attempt
+ .put(RETRY_MULTIPLIER, 2L) // Multiply by 2 every attempt
.put(RETRY_TYPE, RetryType.EXPONENTIAL.name())
.build();
PUBLISH_RETRY_DEFAULTS = ConfigFactory.parseMap(configMap);
@@ -194,15 +197,14 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
}
if (this.shouldRetry) {
- this.retrierConfig = ConfigBuilder.create()
+ this.retryerConfig = ConfigBuilder.create()
.loadProps(this.getState().getProperties(),
DATA_PUBLISHER_RETRY_PREFIX)
.build()
.withFallback(PUBLISH_RETRY_DEFAULTS);
- LOG.info("Retry enabled for publish with config : "+
retrierConfig.root().render(ConfigRenderOptions.concise()));
-
- }else {
+ LOG.info("Retry enabled for publish with config : " +
retryerConfig.root().render(ConfigRenderOptions.concise()));
+ } else {
LOG.info("Retry disabled for publish.");
- this.retrierConfig = WriterUtils.NO_RETRY_CONFIG;
+ this.retryerConfig = WriterUtils.NO_RETRY_CONFIG;
}
@@ -399,7 +401,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
if (publishSingleTaskData) {
// Create final output directory
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir,
- this.permissions.get(branchId), retrierConfig);
+ this.permissions.get(branchId), retryerConfig);
if(this.publisherOutputDirOwnerGroupByBranches.get(branchId).isPresent()) {
LOG.info(String.format("Setting path %s group to %s",
publisherOutputDir.toString(),
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get()));
HadoopUtils.setGroup(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir,
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get());
@@ -431,7 +433,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
} else {
// Create the parent directory of the final output directory if it
does not exist
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
- publisherOutputDir.getParent(), this.permissions.get(branchId),
retrierConfig);
+ publisherOutputDir.getParent(), this.permissions.get(branchId),
retryerConfig);
if(this.publisherOutputDirOwnerGroupByBranches.get(branchId).isPresent()) {
LOG.info(String.format("Setting path %s group to %s",
publisherOutputDir.toString(),
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get()));
HadoopUtils.setGroup(this.publisherFileSystemByBranches.get(branchId),
publisherOutputDir,
this.publisherOutputDirOwnerGroupByBranches.get(branchId).get());
@@ -481,7 +483,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
.substring(taskOutputFile.indexOf(writerOutputDir.toString()) +
writerOutputDir.toString().length() + 1);
Path publisherOutputPath = new Path(publisherOutputDir, pathSuffix);
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
- publisherOutputPath.getParent(), this.permissions.get(branchId),
retrierConfig);
+ publisherOutputPath.getParent(), this.permissions.get(branchId),
retryerConfig);
movePath(parallelRunner, workUnitState, taskOutputPath,
publisherOutputPath, branchId);
}
@@ -677,7 +679,7 @@ public class BaseDataPublisher extends
SingleTaskDataPublisher {
FileSystem fs = this.metaDataWriterFileSystemByBranches.get(branchId);
if (!fs.exists(metadataOutputPath.getParent())) {
- WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs,
metadataOutputPath, this.permissions.get(branchId), retrierConfig);
+ WriterUtils.mkdirsWithRecursivePermissionWithRetry(fs,
metadataOutputPath, this.permissions.get(branchId), retryerConfig);
}
//Delete the file if metadata already exists
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
index 30b77ea..7f6f12d 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedDataPublisher.java
@@ -60,7 +60,7 @@ public class TimePartitionedDataPublisher extends
BaseDataPublisher {
Path outputPath = new Path(publisherOutput, pathSuffix);
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
outputPath.getParent(),
- this.permissions.get(branchId), this.retrierConfig);
+ this.permissions.get(branchId), this.retryerConfig);
movePath(parallelRunner, workUnitState, status.getPath(), outputPath,
branchId);
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
index 379cd14..bfc3376 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
@@ -78,7 +78,7 @@ public class TimePartitionedStreamingDataPublisher extends
TimePartitionedDataPu
// This is used to force the publisher save recordPublisherOutputDirs as
the granularity to be parent of new file paths
// which will be used to do hive registration
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
- publisherOutputDir, this.permissions.get(branchId), retrierConfig);
+ publisherOutputDir, this.permissions.get(branchId), retryerConfig);
}
super.publishData(state, branchId, publishSingleTaskData,
writerOutputPathsMoved);
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
index 36bcd49..c484c99 100644
---
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimestampDataPublisher.java
@@ -53,7 +53,7 @@ public class TimestampDataPublisher extends BaseDataPublisher
{
Path publisherOutputDir = getPublisherOutputDir(state, branchId);
if
(!this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
- publisherOutputDir, this.permissions.get(branchId),
this.retrierConfig);
+ publisherOutputDir, this.permissions.get(branchId),
this.retryerConfig);
}
super.publishData(state, branchId, publishSingleTaskData,
writerOutputPathsMoved);
}
@@ -75,7 +75,7 @@ public class TimestampDataPublisher extends BaseDataPublisher
{
if (!this.publisherFileSystemByBranches.get(branchId).exists(newDst)) {
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
- newDst.getParent(), this.permissions.get(branchId),
this.retrierConfig);
+ newDst.getParent(), this.permissions.get(branchId),
this.retryerConfig);
}
super.movePath(parallelRunner, state, src, newDst, branchId);
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
index ae26d15..fd8f157 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/KafkaAvroJobStatusMonitorTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.gobblin.runtime;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
@@ -30,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.testng.Assert;
@@ -40,7 +37,9 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
@@ -71,6 +70,7 @@ import
org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_MULTIPLIER;
import static org.mockito.Mockito.mock;
@@ -316,7 +316,7 @@ public class KafkaAvroJobStatusMonitorTest {
});
try {
- Thread.sleep(1000);
+ Thread.sleep(1000L);
} catch(InterruptedException ex) {
Thread.currentThread().interrupt();
}
@@ -324,7 +324,8 @@ public class KafkaAvroJobStatusMonitorTest {
Config config =
ConfigFactory.empty().withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("localhost:0000"))
.withValue(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
ConfigValueFactory.fromAnyRef("org.apache.kafka.common.serialization.ByteArrayDeserializer"))
.withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY,
ConfigValueFactory.fromAnyRef(stateStoreDir))
- .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"));
+ .withValue("zookeeper.connect",
ConfigValueFactory.fromAnyRef("localhost:2121"))
+ .withValue(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX + "." +
RETRY_MULTIPLIER,
ConfigValueFactory.fromAnyRef(TimeUnit.MILLISECONDS.toMillis(1L)));
AtomicBoolean shouldThrowFakeExceptionInParseJobStatusToggle = new
AtomicBoolean(false);
int minNumFakeExceptionsExpected = 10;
@@ -356,7 +357,8 @@ public class KafkaAvroJobStatusMonitorTest {
}, 2, 2, TimeUnit.SECONDS);
Thread mainThread = Thread.currentThread();
// guardrail against excessive retries (befitting this unit test):
- toggleManagementExecutor.scheduleAtFixedRate(mainThread::interrupt, 60, 5,
TimeUnit.SECONDS);
+ toggleManagementExecutor.scheduleAtFixedRate(mainThread::interrupt, 20, 5,
TimeUnit.SECONDS);
+
jobStatusMonitor.processMessage(recordIterator.next());
Assert.assertTrue(jobStatusMonitor.getNumFakeExceptionsFromParseJobStatus() >
minNumFakeExceptionsExpected,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
index fbe21eb..9c4255e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java
@@ -80,7 +80,7 @@ import static org.apache.gobblin.util.retry.RetryerFactory.*;
*/
@Slf4j
public abstract class KafkaJobStatusMonitor extends HighLevelConsumer<byte[],
byte[]> {
- static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
+ public static final String JOB_STATUS_MONITOR_PREFIX = "jobStatusMonitor";
//We use table suffix that is different from the Gobblin job state store
suffix of jst to avoid confusion.
//gst refers to the state store suffix for GaaS-orchestrated Gobblin jobs.
public static final String GET_AND_SET_JOB_STATUS =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -131,10 +131,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
this.flowNameGroupToWorkUnitCount = new ConcurrentHashMap<>();
- Config retryerOverridesConfig =
config.hasPath(ConfigurationKeys.KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES)
- ? ConfigFactory.parseMap(ImmutableMap.of(
- RETRY_TIME_OUT_MS,
-
config.getDuration(ConfigurationKeys.KAFKA_JOB_STATUS_MONITOR_RETRY_TIME_OUT_MINUTES,
TimeUnit.MINUTES)))
+ Config retryerOverridesConfig =
config.hasPath(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
+ ? config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX)
: ConfigFactory.empty();
// log exceptions to expose errors we suffer under and/or guide
intervention when resolution not readily forthcoming
this.persistJobStatusRetryer =
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
index df79d37..5ab27a8 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/retry/RetryerFactory.java
@@ -56,17 +56,12 @@ public class RetryerFactory<T> {
private static final Predicate<Throwable> RETRY_EXCEPTION_PREDICATE;
private static final Config DEFAULTS;
static {
- RETRY_EXCEPTION_PREDICATE = new Predicate<Throwable>() {
- @Override
- public boolean apply(Throwable t) {
- return !(t instanceof NonTransientException);
- }
- };
+ RETRY_EXCEPTION_PREDICATE = t -> !(t instanceof NonTransientException);
Map<String, Object> configMap = ImmutableMap.<String, Object>builder()
.put(RETRY_TIME_OUT_MS,
TimeUnit.MINUTES.toMillis(5L))
.put(RETRY_INTERVAL_MS,
TimeUnit.SECONDS.toMillis(30L))
- .put(RETRY_MULTIPLIER, 2L)
+ .put(RETRY_MULTIPLIER,
TimeUnit.SECONDS.toMillis(1L))
.put(RETRY_TYPE,
RetryType.EXPONENTIAL.name())
.put(RETRY_TIMES, 2)
.build();