This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bb18f18cf Core: Improve refresh executor shutdown in REST catalog
close (#4991)
bb18f18cf is described below
commit bb18f18cf83e87f33c0891b9b875aca2c155503d
Author: Bryan Keller <[email protected]>
AuthorDate: Sun Jun 12 16:28:25 2022 -0700
Core: Improve refresh executor shutdown in REST catalog close (#4991)
---
.../java/org/apache/iceberg/rest/RESTSessionCatalog.java | 15 +++++++++++++--
.../java/org/apache/iceberg/rest/auth/OAuth2Util.java | 2 +-
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 14a09c1b3..11803b79b 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -29,6 +29,7 @@ import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
@@ -83,7 +84,7 @@ import org.slf4j.LoggerFactory;
public class RESTSessionCatalog extends BaseSessionCatalog implements
Configurable<Configuration>, Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(RESTSessionCatalog.class);
- private static final long MAX_REFRESH_WINDOW_MILLIS = 60_000; // 1 minute
+ private static final long MAX_REFRESH_WINDOW_MILLIS = 300_000; // 5 minutes
private static final long MIN_REFRESH_WAIT_MILLIS = 10;
private static final List<String> TOKEN_PREFERENCE_ORDER = ImmutableList.of(
OAuth2Properties.ID_TOKEN_TYPE, OAuth2Properties.ACCESS_TOKEN_TYPE,
OAuth2Properties.JWT_TOKEN_TYPE,
@@ -363,15 +364,25 @@ public class RESTSessionCatalog extends
BaseSessionCatalog implements Configurab
@Override
public void close() throws IOException {
+ shutdownRefreshExecutor();
+
if (client != null) {
client.close();
}
+ }
+ private void shutdownRefreshExecutor() {
if (refreshExecutor != null) {
ScheduledExecutorService service = refreshExecutor;
this.refreshExecutor = null;
- service.shutdown();
+ List<Runnable> tasks = service.shutdownNow();
+ tasks.forEach(task -> {
+ if (task instanceof Future) {
+ ((Future<?>) task).cancel(true);
+ }
+ });
+
try {
if (service.awaitTermination(1, TimeUnit.MINUTES)) {
LOG.warn("Timed out waiting for refresh executor to terminate");
diff --git a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
index 8c8a223ee..8906dcea8 100644
--- a/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
+++ b/core/src/main/java/org/apache/iceberg/rest/auth/OAuth2Util.java
@@ -325,7 +325,7 @@ public class OAuth2Util {
.run(holder ->
holder.set(refreshToken(client, headers(), token, tokenType,
OAuth2Properties.CATALOG_SCOPE)));
- if (!isSuccessful) {
+ if (!isSuccessful || ref.get() == null) {
return null;
}