This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 60a139750 Add config to set close timeout in HiveRegister (#3512)
60a139750 is described below

commit 60a139750ae4a02dda3388d020c9dc30bd60ebef
Author: Jack Moseley <[email protected]>
AuthorDate: Wed May 25 11:20:38 2022 -0700

    Add config to set close timeout in HiveRegister (#3512)
---
 .../main/java/org/apache/gobblin/hive/HiveRegister.java    | 14 +++++++++++---
 .../org/apache/gobblin/hive/writer/HiveMetadataWriter.java |  5 ++++-
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index 1efde8c5d..ad53c3735 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 
@@ -68,6 +69,7 @@ public abstract class HiveRegister implements Closeable {
   public static final String HIVE_PARTITION_COMPARATOR_TYPE = 
"hive.partition.comparator.type";
   public static final String DEFAULT_HIVE_PARTITION_COMPARATOR_TYPE = 
HivePartitionComparator.class.getName();
   public static final String HIVE_METASTORE_URI_KEY = "hive.metastore.uri";
+  public static final String HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY = 
"hiveRegister.close.timeout.seconds";
 
   protected static final String HIVE_DB_EXTENSION = ".db";
 
@@ -77,6 +79,7 @@ public abstract class HiveRegister implements Closeable {
   protected final Optional<String> hiveDbRootDir;
   protected final ListeningExecutorService executor;
   protected final Map<String, Future<Void>> futures = Maps.newConcurrentMap();
+  protected final long timeOutSeconds;
 
   protected HiveRegister(State state) {
     this.props = new HiveRegProps(state);
@@ -84,6 +87,7 @@ public abstract class HiveRegister implements Closeable {
     this.executor = ExecutorsUtils.loggingDecorator(ScalingThreadPoolExecutor
         .newScalingThreadPool(0, this.props.getNumThreads(), 
TimeUnit.SECONDS.toMillis(10),
             ExecutorsUtils.newThreadFactory(Optional.of(log), 
Optional.of(getClass().getSimpleName()))));
+    this.timeOutSeconds = 
this.props.getPropAsLong(HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY, -1L);
   }
 
   /**
@@ -362,9 +366,13 @@ public abstract class HiveRegister implements Closeable {
       throws IOException {
     for (Map.Entry<String, Future<Void>> entry : this.futures.entrySet()) {
       try {
-        entry.getValue().get();
-      } catch (InterruptedException | ExecutionException ee) {
-        throw new IOException("Failed to finish registration for " + 
entry.getKey(), ee.getCause());
+        if (timeOutSeconds > 0L) {
+          entry.getValue().get(timeOutSeconds, TimeUnit.SECONDS);
+        } else {
+          entry.getValue().get();
+        }
+      } catch (InterruptedException | ExecutionException | TimeoutException e) 
{
+        throw new IOException("Failed to finish registration for " + 
entry.getKey(), e.getCause());
       }
     }
   }
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 7911eb275..84f805eb9 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -109,7 +109,6 @@ public class HiveMetadataWriter implements MetadataWriter {
 
   public HiveMetadataWriter(State state) throws IOException {
     this.state = state;
-    this.hiveRegister = this.closer.register(HiveRegister.get(state));
     this.whitelistBlacklist = new 
WhitelistBlacklist(state.getProp(HIVE_REGISTRATION_WHITELIST, ""),
         state.getProp(HIVE_REGISTRATION_BLACKLIST, ""));
     this.schemaRegistry = KafkaSchemaRegistry.get(state.getProperties());
@@ -119,6 +118,10 @@ public class HiveMetadataWriter implements MetadataWriter {
     this.latestSchemaMap = new HashMap<>();
     this.timeOutSeconds =
         state.getPropAsLong(HIVE_REGISTRATION_TIMEOUT_IN_SECONDS, 
DEFAULT_HIVE_REGISTRATION_TIMEOUT_IN_SECONDS);
+    if (!state.contains(HiveRegister.HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY)) 
{
+      state.setProp(HiveRegister.HIVE_REGISTER_CLOSE_TIMEOUT_SECONDS_KEY, 
timeOutSeconds);
+    }
+    this.hiveRegister = this.closer.register(HiveRegister.get(state));
     List<Tag<?>> tags = Lists.newArrayList();
     String clusterIdentifier = ClustersNames.getInstance().getClusterName();
     tags.add(new Tag<>(MetadataWriterKeys.CLUSTER_IDENTIFIER_KEY_NAME, 
clusterIdentifier));

Reply via email to