This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 60a139750 Add config to set close timeout in HiveRegister (#3512)
60a139750 is described below
commit 60a139750ae4a02dda3388d020c9dc30bd60ebef
Author: Jack Moseley <[email protected]>
AuthorDate: Wed May 25 11:20:38 2022 -0700
Add config to set close timeout in HiveRegister (#3512)
---
.../main/java/org/apache/gobblin/hive/HiveRegister.java | 14 +++++++++++---
.../org/apache/gobblin/hive/writer/HiveMetadataWriter.java | 5 ++++-
2 files changed, 15 insertions(+), 4 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index 1efde8c5d..ad53c3735 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.reflect.ConstructorUtils;
@@ -68,6 +69,7 @@ public abstract class HiveRegister implements Closeable {
public static final String HIVE_PARTITION_COMPARATOR_TYPE =
"hive.partition.comparator.type";
public static final String DEFAULT_HIVE_PARTITION_COMPARATOR_TYPE =
HivePartitionComparator.class.getName();
public static final String HIVE_METASTORE_URI_KEY = "hive.metastore.uri";
+ public static final String HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY =
"hiveRegister.close.timeout.seconds";
protected static final String HIVE_DB_EXTENSION = ".db";
@@ -77,6 +79,7 @@ public abstract class HiveRegister implements Closeable {
protected final Optional<String> hiveDbRootDir;
protected final ListeningExecutorService executor;
protected final Map<String, Future<Void>> futures = Maps.newConcurrentMap();
+ protected final long timeOutSeconds;
protected HiveRegister(State state) {
this.props = new HiveRegProps(state);
@@ -84,6 +87,7 @@ public abstract class HiveRegister implements Closeable {
this.executor = ExecutorsUtils.loggingDecorator(ScalingThreadPoolExecutor
.newScalingThreadPool(0, this.props.getNumThreads(),
TimeUnit.SECONDS.toMillis(10),
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of(getClass().getSimpleName()))));
+ this.timeOutSeconds =
this.props.getPropAsLong(HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY, -1L);
}
/**
@@ -362,9 +366,13 @@ public abstract class HiveRegister implements Closeable {
throws IOException {
for (Map.Entry<String, Future<Void>> entry : this.futures.entrySet()) {
try {
- entry.getValue().get();
- } catch (InterruptedException | ExecutionException ee) {
- throw new IOException("Failed to finish registration for " +
entry.getKey(), ee.getCause());
+ if (timeOutSeconds > 0L) {
+ entry.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
+ } else {
+ entry.getValue().get();
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e)
{
+ throw new IOException("Failed to finish registration for " +
entry.getKey(), e.getCause());
}
}
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 7911eb275..84f805eb9 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -109,7 +109,6 @@ public class HiveMetadataWriter implements MetadataWriter {
public HiveMetadataWriter(State state) throws IOException {
this.state = state;
- this.hiveRegister = this.closer.register(HiveRegister.get(state));
this.whitelistBlacklist = new
WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""),
state.getProp(HIVE_REGISTRATION_BLACKLIST, ""));
this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
@@ -119,6 +118,10 @@ public class HiveMetadataWriter implements MetadataWriter {
this.latestSchemaMap = new HashMap<>();
this.timeOutSeconds =
state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS,
DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
+ if (!state.contains(HiveRegister.HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY))
{
+ state.setProp(HiveRegister.HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY,
timeOutSeconds);
+ }
+ this.hiveRegister = this.closer.register(HiveRegister.get(state));
List<Tag<?>> tags = Lists.newArrayList();
String clusterIdentifier = ClustersNames.getInstance().getClusterName();
tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME,
clusterIdentifier));