This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bb18f18cf Core: Improve refresh executor shutdown in REST catalog 
close (#4991)
bb18f18cf is described below

commit bb18f18cf83e87f33c0891b9b875aca2c155503d
Author: Bryan Keller <[email protected]>
AuthorDate: Sun Jun 12 16:28:25 2022 -0700

    Core: Improve refresh executor shutdown in REST catalog close (#4991)
---
 .../java/org/apache/iceberg/rest/RESTSessionCatalog.java  | 15 +++++++++++++--
 .../java/org/apache/iceberg/rest/auth/OAuth2Util.java     |  2 +-
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 14a09c1b3..11803b79b 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -29,6 +29,7 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
@@ -83,7 +84,7 @@ import org.slf4j.LoggerFactory;
 
 public class RESTSessionCatalog extends BaseSessionCatalog implements 
Configurable<Configuration>, Closeable {
   private static final Logger LOG = 
LoggerFactory.getLogger(RESTSessionCatalog.class);
-  private static final long MAX_REFRESH_WINDOW_MILLIS = 60_000; // 1 minute
+  private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes
   private static final long MIN_REFRESH_WAIT_MILLIS = 10;
   private static final List<String> TOKEN_PREFERENCE_ORDER = ImmutableList.of(
       OAuth2Properties.ID_TOKEN_TYPE, OAuth2Properties.ACCESS_TOKEN_TYPE, 
OAuth2Properties.JWT_TOKEN_TYPE,
@@ -363,15 +364,25 @@ public class RESTSessionCatalog extends 
BaseSessionCatalog implements Configurab
 
   @Override
   public void close() throws IOException {
+    shutdownRefreshExecutor();
+
     if (client != null) {
       client.close();
     }
+  }
 
+  private void shutdownRefreshExecutor() {
     if (refreshExecutor != null) {
       ScheduledExecutorService service = refreshExecutor;
       this.refreshExecutor = null;
 
-      service.shutdown();
+      List<Runnable> tasks = service.shutdownNow();
+      tasks.forEach(task -> {
+        if (task instanceof Future) {
+          ((Future<?>) task).cancel(true);
+        }
+      });
+
       try {
         if (service.awaitTermination(1, TimeUnit.MINUTES)) {
           LOG.warn("Timed out waiting for refresh executor to terminate");
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java 
b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
index 8c8a223ee..8906dcea8 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
@@ -325,7 +325,7 @@ public class OAuth2Util {
             .run(holder ->
                 holder.set(refreshToken(client, headers(), token, tokenType, 
OAuth2Properties.CATALOG_SCOPE)));
 
-        if (!isSuccessful) {
+        if (!isSuccessful || ref.get() == null) {
           return null;
         }
 

Reply via email to