This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 00992c5e2e [rest] Refactor RESTTokenFileIO to cache FileIO in static
cache (#4965)
00992c5e2e is described below
commit 00992c5e2ef81d7a83c19d6f29c8eb0602a31e07
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 21 11:15:24 2025 +0800
[rest] Refactor RESTTokenFileIO to cache FileIO in static cache (#4965)
---
.../src/main/java/org/apache/paimon/fs/FileIO.java | 10 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 10 +-
.../java/org/apache/paimon/catalog/Catalog.java | 4 -
.../org/apache/paimon/catalog/CatalogUtils.java | 14 +-
.../org/apache/paimon/catalog/DelegateCatalog.java | 6 -
.../java/org/apache/paimon/rest/RESTCatalog.java | 123 +++++------
.../org/apache/paimon/rest/RESTCatalogLoader.java | 17 +-
.../org/apache/paimon/rest/RESTCatalogOptions.java | 6 +-
.../org/apache/paimon/rest/RESTTokenFileIO.java | 230 +++++++++++++++++++++
.../paimon/rest/RefreshCredentialFileIO.java | 148 -------------
.../java/org/apache/paimon/rest/ResourcePaths.java | 4 +-
...alsResponse.java => GetTableTokenResponse.java} | 32 +--
.../org/apache/paimon/rest/MockRESTMessage.java | 8 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 18 +-
.../org/apache/paimon/rest/RESTCatalogTest.java | 2 +-
.../apache/paimon/rest/RESTObjectMapperTest.java | 12 +-
paimon-open-api/rest-catalog-open-api.yaml | 18 +-
.../paimon/open/api/RESTCatalogController.java | 14 +-
18 files changed, 377 insertions(+), 299 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 02f846114e..40678fe478 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.ThreadSafe;
import java.io.BufferedReader;
+import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -62,7 +63,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*/
@Public
@ThreadSafe
-public interface FileIO extends Serializable {
+public interface FileIO extends Serializable, Closeable {
Logger LOG = LoggerFactory.getLogger(FileIO.class);
@@ -230,6 +231,13 @@ public interface FileIO extends Serializable {
*/
boolean rename(Path src, Path dst) throws IOException;
+ /**
+ * Override this method to empty, many FileIO implementation classes rely
on static variables
+ * and do not have the ability to close them.
+ */
+ @Override
+ default void close() {}
+
//
-------------------------------------------------------------------------
// utils
//
-------------------------------------------------------------------------
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 72d09b785f..7a72da38e7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -91,8 +91,7 @@ public abstract class AbstractCatalog implements Catalog {
return fileIO;
}
- @Override
- public FileIO fileIO(Path path) {
+ protected FileIO fileIO(Path path) {
return fileIO;
}
@@ -370,7 +369,12 @@ public abstract class AbstractCatalog implements Catalog {
new RenamingSnapshotCommit.Factory(
lockFactory().orElse(null),
lockContext().orElse(null));
return CatalogUtils.loadTable(
- this, identifier, fileIO(), this::loadTableMetadata,
commitFactory);
+ this,
+ identifier,
+ p -> fileIO(),
+ this::fileIO,
+ this::loadTableMetadata,
+ commitFactory);
}
/**
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index a0d78b2688..d0ad86c224 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -20,7 +20,6 @@ package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -375,9 +374,6 @@ public interface Catalog extends AutoCloseable {
/** {@link FileIO} of this catalog. It can access {@link #warehouse()}
path. */
FileIO fileIO();
- /** {@link FileIO} of this catalog. */
- FileIO fileIO(Path path);
-
/** Catalog options for re-creating this catalog. */
Map<String, String> options();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 7282b308e7..4d1e58c1f3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME;
@@ -171,7 +172,8 @@ public class CatalogUtils {
public static Table loadTable(
Catalog catalog,
Identifier identifier,
- FileIO fileIO,
+ Function<Path, FileIO> dataFileIO,
+ Function<Path, FileIO> objectFileIO,
TableMetadata.Loader metadataLoader,
SnapshotCommit.Factory commitFactory)
throws Catalog.TableNotExistException {
@@ -190,10 +192,11 @@ public class CatalogUtils {
new CatalogEnvironment(
identifier, metadata.uuid(), catalog.catalogLoader(),
commitFactory);
Path path = new Path(schema.options().get(PATH.key()));
- FileStoreTable table = FileStoreTableFactory.create(fileIO, path,
schema, catalogEnv);
+ FileStoreTable table =
+ FileStoreTableFactory.create(dataFileIO.apply(path), path,
schema, catalogEnv);
if (options.type() == TableType.OBJECT_TABLE) {
- table = toObjectTable(catalog, table);
+ table = toObjectTable(objectFileIO, table);
}
if (identifier.isSystemTable()) {
@@ -265,10 +268,11 @@ public class CatalogUtils {
.build();
}
- private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable
underlyingTable) {
+ private static ObjectTable toObjectTable(
+ Function<Path, FileIO> fileIOLoader, FileStoreTable
underlyingTable) {
CoreOptions options = underlyingTable.coreOptions();
String objectLocation = options.objectLocation();
- FileIO objectFileIO = catalog.fileIO(new Path(objectLocation));
+ FileIO objectFileIO = fileIOLoader.apply(new Path(objectLocation));
return ObjectTable.builder()
.underlyingTable(underlyingTable)
.objectLocation(objectLocation)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 847485a7a1..aa7852456e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -19,7 +19,6 @@
package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -62,11 +61,6 @@ public abstract class DelegateCatalog implements Catalog {
return wrapped.fileIO();
}
- @Override
- public FileIO fileIO(Path path) {
- return wrapped.fileIO(path);
- }
-
@Override
public List<String> listDatabases() {
return wrapped.listDatabases();
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index e06ef012b8..9c8100662b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -29,7 +29,6 @@ import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.auth.AuthSession;
@@ -56,6 +55,7 @@ import
org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
@@ -75,10 +75,8 @@ import org.apache.paimon.view.ViewSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -95,6 +93,7 @@ import static
org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
+import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
import static
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
@@ -102,97 +101,73 @@ import static
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
/** A catalog implementation for REST. */
public class RESTCatalog implements Catalog {
- private static final Logger LOG =
LoggerFactory.getLogger(RESTCatalog.class);
public static final String HEADER_PREFIX = "header.";
private final RESTClient client;
private final ResourcePaths resourcePaths;
private final AuthSession catalogAuth;
- private final Options options;
- private final boolean fileIORefreshCredentialEnable;
+ private final CatalogContext context;
+ private final boolean dataTokenEnabled;
private final FileIO fileIO;
private volatile ScheduledExecutorService refreshExecutor = null;
public RESTCatalog(CatalogContext context) {
- if
(context.options().getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
- throw new IllegalArgumentException("Can not config warehouse in
RESTCatalog.");
- }
+ this(context, true);
+ }
+
+ public RESTCatalog(CatalogContext context, boolean configRequired) {
this.client = new HttpClient(context.options());
this.catalogAuth = createAuthSession(context.options(),
tokenRefreshExecutor());
- Map<String, String> initHeaders =
- RESTUtil.merge(
- extractPrefixMap(context.options(), HEADER_PREFIX),
- catalogAuth.getHeaders());
- this.options =
- new Options(
- client.get(ResourcePaths.V1_CONFIG,
ConfigResponse.class, initHeaders)
- .merge(context.options().toMap()));
- this.resourcePaths = ResourcePaths.forCatalogProperties(options);
-
- this.fileIORefreshCredentialEnable =
-
options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE);
- try {
- if (fileIORefreshCredentialEnable) {
- this.fileIO = null;
- } else {
- String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
- this.fileIO =
- FileIO.get(
- new Path(warehouseStr),
- CatalogContext.create(
- options, context.preferIO(),
context.fallbackIO()));
+ Options options = context.options();
+ if (configRequired) {
+ if (context.options().contains(WAREHOUSE)) {
+ throw new IllegalArgumentException("Can not config warehouse
in RESTCatalog.");
}
- } catch (IOException e) {
- LOG.warn("Can not get FileIO from options.");
- throw new RuntimeException(e);
+
+ Map<String, String> initHeaders =
+ RESTUtil.merge(
+ extractPrefixMap(context.options(), HEADER_PREFIX),
+ catalogAuth.getHeaders());
+ options =
+ new Options(
+ client.get(ResourcePaths.V1_CONFIG,
ConfigResponse.class, initHeaders)
+ .merge(context.options().toMap()));
}
- }
- protected RESTCatalog(Options options, FileIO fileIO) {
- this.client = new HttpClient(options);
- this.catalogAuth = createAuthSession(options, tokenRefreshExecutor());
- this.options = options;
+ context = CatalogContext.create(options, context.preferIO(),
context.fallbackIO());
+ this.context = context;
this.resourcePaths = ResourcePaths.forCatalogProperties(options);
- this.fileIO = fileIO;
- this.fileIORefreshCredentialEnable =
-
options.get(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE);
+
+ this.dataTokenEnabled =
options.get(RESTCatalogOptions.DATA_TOKEN_ENABLED);
+ this.fileIO = dataTokenEnabled ? null : fileIOFromOptions(new
Path(options.get(WAREHOUSE)));
}
@Override
public String warehouse() {
- return options.get(CatalogOptions.WAREHOUSE);
+ return context.options().get(WAREHOUSE);
}
@Override
public Map<String, String> options() {
- return options.toMap();
+ return context.options().toMap();
}
@Override
public RESTCatalogLoader catalogLoader() {
- return new RESTCatalogLoader(options, fileIO);
+ return new RESTCatalogLoader(context);
}
@Override
public FileIO fileIO() {
- if (fileIORefreshCredentialEnable) {
+ // TODO remove Catalog.fileIO
+ if (dataTokenEnabled) {
throw new UnsupportedOperationException();
}
return fileIO;
}
- @Override
- public FileIO fileIO(Path path) {
- try {
- return FileIO.get(path, CatalogContext.create(options));
- } catch (IOException e) {
- LOG.warn("Can not get FileIO from options.");
- throw new RuntimeException(e);
- }
- }
-
@Override
public List<String> listDatabases() {
ListDatabasesResponse response =
@@ -306,11 +281,33 @@ public class RESTCatalog implements Catalog {
return CatalogUtils.loadTable(
this,
identifier,
- this.fileIO(identifier),
+ path -> fileIOForData(path, identifier),
+ this::fileIOFromOptions,
this::loadTableMetadata,
new RESTSnapshotCommitFactory(catalogLoader()));
}
+ private FileIO fileIOForData(Path path, Identifier identifier) {
+ return dataTokenEnabled
+ ? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
+ : this.fileIO;
+ }
+
+ private FileIO fileIOFromOptions(Path path) {
+ try {
+ return FileIO.get(path, context);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ protected GetTableTokenResponse loadTableToken(Identifier identifier) {
+ return client.get(
+ resourcePaths.tableToken(identifier.getDatabaseName(),
identifier.getObjectName()),
+ GetTableTokenResponse.class,
+ catalogAuth.getHeaders());
+ }
+
public boolean commitSnapshot(Identifier identifier, Snapshot snapshot) {
CommitTableRequest request = new CommitTableRequest(identifier,
snapshot);
CommitTableResponse response =
@@ -630,7 +627,7 @@ public class RESTCatalog implements Catalog {
@Override
public boolean caseSensitive() {
- return options.getOptional(CASE_SENSITIVE).orElse(true);
+ return context.options().getOptional(CASE_SENSITIVE).orElse(true);
}
@Override
@@ -663,12 +660,4 @@ public class RESTCatalog implements Catalog {
return refreshExecutor;
}
-
- private FileIO fileIO(Identifier identifier) {
- if (fileIORefreshCredentialEnable) {
- return new RefreshCredentialFileIO(
- resourcePaths, catalogAuth, options, client, identifier);
- }
- return fileIO;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java
index f90988d05c..efc5a0b46c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogLoader.java
@@ -18,25 +18,26 @@
package org.apache.paimon.rest;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLoader;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.options.Options;
/** Loader to create {@link RESTCatalog}. */
public class RESTCatalogLoader implements CatalogLoader {
private static final long serialVersionUID = 1L;
- private final Options options;
- private final FileIO fileIO;
+ private final CatalogContext context;
- public RESTCatalogLoader(Options options, FileIO fileIO) {
- this.options = options;
- this.fileIO = fileIO;
+ public RESTCatalogLoader(CatalogContext context) {
+ this.context = context;
+ }
+
+ public CatalogContext context() {
+ return context;
}
@Override
public RESTCatalog load() {
- return new RESTCatalog(options, fileIO);
+ return new RESTCatalog(context, false);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
index 61aed5f703..35b72469bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogOptions.java
@@ -78,9 +78,9 @@ public class RESTCatalogOptions {
.noDefaultValue()
.withDescription("REST Catalog auth token provider path.");
- public static final ConfigOption<Boolean>
FILE_IO_REFRESH_CREDENTIAL_ENABLE =
- ConfigOptions.key("file-io-refresh-credential.enabled")
+ public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
+ ConfigOptions.key("data-token.enabled")
.booleanType()
.defaultValue(false)
- .withDescription("Whether to support file io refresh
credential.");
+ .withDescription("Whether to support data token provided
by the REST server.");
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
new file mode 100644
index 0000000000..220d0eaaee
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.utils.ThreadUtils;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** A {@link FileIO} to support getting token from REST Server. */
+public class RESTTokenFileIO implements FileIO {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Cache<Token, FileIO> FILE_IO_CACHE =
+ Caffeine.newBuilder()
+ .expireAfterAccess(30, TimeUnit.MINUTES)
+ .maximumSize(100)
+ .removalListener(
+ (ignored, value, cause) -> {
+ if (value != null) {
+ ((FileIO) value).close();
+ }
+ })
+ .scheduler(
+ Scheduler.forScheduledExecutorService(
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.newDaemonThreadFactory(
+
"rest-token-file-io-scheduler"))))
+ .build();
+
+ private final RESTCatalogLoader catalogLoader;
+ private final Identifier identifier;
+ private final Path path;
+
+ // catalog instance before serialization, it will become null after
serialization, then we
+ // should create catalog from catalog loader
+ private final transient RESTCatalog catalogInstance;
+
+ // the latest token from REST Server, serializable in order to avoid
loading token from the REST
+ // Server again after serialization
+ private volatile Token token;
+
+ public RESTTokenFileIO(
+ RESTCatalogLoader catalogLoader,
+ RESTCatalog catalogInstance,
+ Identifier identifier,
+ Path path) {
+ this.catalogLoader = catalogLoader;
+ this.catalogInstance = catalogInstance;
+ this.identifier = identifier;
+ this.path = path;
+ }
+
+ @Override
+ public void configure(CatalogContext context) {
+ throw new UnsupportedOperationException("RESTTokenFileIO does not
support configuration.");
+ }
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws IOException {
+ return fileIO().newInputStream(path);
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
+ return fileIO().newOutputStream(path, overwrite);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ return fileIO().getFileStatus(path);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ return fileIO().listStatus(path);
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ return fileIO().exists(path);
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ return fileIO().delete(path, recursive);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ return fileIO().mkdirs(path);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return fileIO().rename(src, dst);
+ }
+
+ @Override
+ public boolean isObjectStore() {
+ try {
+ return fileIO().isObjectStore();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private FileIO fileIO() throws IOException {
+ if (shouldRefresh()) {
+ synchronized (this) {
+ if (shouldRefresh()) {
+ refreshToken();
+ }
+ }
+ }
+
+ FileIO fileIO = FILE_IO_CACHE.getIfPresent(token);
+ if (fileIO != null) {
+ return fileIO;
+ }
+
+ synchronized (FILE_IO_CACHE) {
+ fileIO = FILE_IO_CACHE.getIfPresent(token);
+ if (fileIO != null) {
+ return fileIO;
+ }
+
+ CatalogContext context = catalogLoader.context();
+ Options options = context.options();
+ options = new Options(RESTUtil.merge(options.toMap(),
token.token));
+ context = CatalogContext.create(options, context.preferIO(),
context.fallbackIO());
+ try {
+ fileIO = FileIO.get(path, context);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ FILE_IO_CACHE.put(token, fileIO);
+ return fileIO;
+ }
+ }
+
+ private boolean shouldRefresh() {
+ return token == null || System.currentTimeMillis() >
token.expireAtMillis;
+ }
+
+ private void refreshToken() {
+ GetTableTokenResponse response;
+ if (catalogInstance != null) {
+ response = catalogInstance.loadTableToken(identifier);
+ } else {
+ try (RESTCatalog catalog = catalogLoader.load()) {
+ response = catalog.loadTableToken(identifier);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ token = new Token(response.getToken(), response.getExpiresAtMillis());
+ }
+
+ private static class Token implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Map<String, String> token;
+ private final long expireAtMillis;
+
+ /** Cache the hash code. */
+ @Nullable private Integer hash;
+
+ private Token(Map<String, String> token, long expireAtMillis) {
+ this.token = token;
+ this.expireAtMillis = expireAtMillis;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Token token1 = (Token) o;
+ return expireAtMillis == token1.expireAtMillis &&
Objects.equals(token, token1.token);
+ }
+
+ @Override
+ public int hashCode() {
+ if (hash == null) {
+ hash = Objects.hash(token, expireAtMillis);
+ }
+ return hash;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java
deleted file mode 100644
index b55486887d..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/rest/RefreshCredentialFileIO.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.rest.auth.AuthSession;
-import org.apache.paimon.rest.responses.GetTableCredentialsResponse;
-
-import java.io.IOException;
-import java.util.Map;
-
-/** A {@link FileIO} to support refresh credential. */
-public class RefreshCredentialFileIO implements FileIO {
-
- private static final long serialVersionUID = 1L;
-
- private final ResourcePaths resourcePaths;
- private final AuthSession catalogAuth;
- protected Options options;
- private final Identifier identifier;
- private Long expireAtMillis;
- private Map<String, String> credential;
- private final transient RESTClient client;
- private transient volatile FileIO lazyFileIO;
-
- public RefreshCredentialFileIO(
- ResourcePaths resourcePaths,
- AuthSession catalogAuth,
- Options options,
- RESTClient client,
- Identifier identifier) {
- this.resourcePaths = resourcePaths;
- this.catalogAuth = catalogAuth;
- this.options = options;
- this.identifier = identifier;
- this.client = client;
- }
-
- @Override
- public void configure(CatalogContext context) {
- this.options = context.options();
- }
-
- @Override
- public SeekableInputStream newInputStream(Path path) throws IOException {
- return fileIO().newInputStream(path);
- }
-
- @Override
- public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
- return fileIO().newOutputStream(path, overwrite);
- }
-
- @Override
- public FileStatus getFileStatus(Path path) throws IOException {
- return fileIO().getFileStatus(path);
- }
-
- @Override
- public FileStatus[] listStatus(Path path) throws IOException {
- return fileIO().listStatus(path);
- }
-
- @Override
- public boolean exists(Path path) throws IOException {
- return fileIO().exists(path);
- }
-
- @Override
- public boolean delete(Path path, boolean recursive) throws IOException {
- return fileIO().delete(path, recursive);
- }
-
- @Override
- public boolean mkdirs(Path path) throws IOException {
- return fileIO().mkdirs(path);
- }
-
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
- return fileIO().rename(src, dst);
- }
-
- @Override
- public boolean isObjectStore() {
- try {
- return fileIO().isObjectStore();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private FileIO fileIO() throws IOException {
- if (lazyFileIO == null || shouldRefresh()) {
- synchronized (this) {
- if (lazyFileIO == null || shouldRefresh()) {
- GetTableCredentialsResponse response = getCredential();
- expireAtMillis = response.getExpiresAtMillis();
- credential = response.getCredential();
- Map<String, String> conf = RESTUtil.merge(options.toMap(),
credential);
- Options updateCredentialOption = new Options(conf);
- lazyFileIO =
- FileIO.get(
- new
Path(updateCredentialOption.get(CatalogOptions.WAREHOUSE)),
-
CatalogContext.create(updateCredentialOption));
- }
- }
- }
- return lazyFileIO;
- }
-
- // todo: handle exception
- private GetTableCredentialsResponse getCredential() {
- return client.get(
- resourcePaths.tableCredentials(
- identifier.getDatabaseName(),
identifier.getObjectName()),
- GetTableCredentialsResponse.class,
- catalogAuth.getHeaders());
- }
-
- private boolean shouldRefresh() {
- return expireAtMillis != null && expireAtMillis >
System.currentTimeMillis();
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 1e843f99cb..69560f36e3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -70,8 +70,8 @@ public class ResourcePaths {
return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES,
"commit");
}
- public String tableCredentials(String databaseName, String tableName) {
- return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES,
tableName, "credentials");
+ public String tableToken(String databaseName, String tableName) {
+ return SLASH.join(V1, prefix, DATABASES, databaseName, TABLES,
tableName, "token");
}
public String partitions(String databaseName, String tableName) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableTokenResponse.java
similarity index 65%
rename from
paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java
rename to
paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableTokenResponse.java
index 2792940ff6..5a279a3868 100644
---
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableCredentialsResponse.java
+++
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableTokenResponse.java
@@ -27,33 +27,33 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonPro
import java.util.Map;
-/** Response for table credentials. */
+/** Response for table token. */
@JsonIgnoreProperties(ignoreUnknown = true)
-public class GetTableCredentialsResponse implements RESTResponse {
+public class GetTableTokenResponse implements RESTResponse {
- private static final String FIELD_CREDENTIAL = "credential";
- private static final String FIELD_EXPIREAT_MILLIS = "expiresAtMillis";
+ private static final String FIELD_TOKEN = "token";
+ private static final String FIELD_EXPIRES_AT_MILLIS = "expiresAtMillis";
- @JsonProperty(FIELD_CREDENTIAL)
- private final Map<String, String> credential;
+ @JsonProperty(FIELD_TOKEN)
+ private final Map<String, String> token;
- @JsonProperty(FIELD_EXPIREAT_MILLIS)
- private long expiresAtMillis;
+ @JsonProperty(FIELD_EXPIRES_AT_MILLIS)
+ private final long expiresAtMillis;
@JsonCreator
- public GetTableCredentialsResponse(
- @JsonProperty(FIELD_EXPIREAT_MILLIS) long expiresAtMillis,
- @JsonProperty(FIELD_CREDENTIAL) Map<String, String> credential) {
+ public GetTableTokenResponse(
+ @JsonProperty(FIELD_TOKEN) Map<String, String> token,
+ @JsonProperty(FIELD_EXPIRES_AT_MILLIS) long expiresAtMillis) {
+ this.token = token;
this.expiresAtMillis = expiresAtMillis;
- this.credential = credential;
}
- @JsonGetter(FIELD_CREDENTIAL)
- public Map<String, String> getCredential() {
- return credential;
+ @JsonGetter(FIELD_TOKEN)
+ public Map<String, String> getToken() {
+ return token;
}
- @JsonGetter(FIELD_EXPIREAT_MILLIS)
+ @JsonGetter(FIELD_EXPIRES_AT_MILLIS)
public long getExpiresAtMillis() {
return expiresAtMillis;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index 576f494b88..83fab9a687 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -32,8 +32,8 @@ import org.apache.paimon.rest.requests.RenameTableRequest;
import org.apache.paimon.rest.responses.AlterDatabaseResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
-import org.apache.paimon.rest.responses.GetTableCredentialsResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
@@ -250,9 +250,9 @@ public class MockRESTMessage {
return new ListViewsResponse(ImmutableList.of("view"));
}
- public static GetTableCredentialsResponse getTableCredentialsResponse() {
- return new GetTableCredentialsResponse(
- System.currentTimeMillis(), ImmutableMap.of("key", "value"));
+ public static GetTableTokenResponse getTableCredentialsResponse() {
+ return new GetTableTokenResponse(
+ ImmutableMap.of("key", "value"), System.currentTimeMillis());
}
private static ViewSchema viewSchema() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 22fde48e99..05f3adb3c4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -42,8 +42,8 @@ import
org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.ErrorResponseResourceType;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
-import org.apache.paimon.rest.responses.GetTableCredentialsResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
@@ -151,10 +151,10 @@ public class RESTCatalogServer {
resources.length == 3
&& "tables".equals(resources[1])
&& "commit".equals(resources[2]);
- boolean isTableCredentials =
+ boolean isTableToken =
resources.length == 4
&& "tables".equals(resources[1])
- && "credentials".equals(resources[3]);
+ && "token".equals(resources[3]);
boolean isPartitions =
resources.length == 4
&& "tables".equals(resources[1])
@@ -208,16 +208,16 @@ public class RESTCatalogServer {
} else if (isPartitions) {
String tableName = resources[2];
return partitionsApiHandler(catalog, request,
databaseName, tableName);
- } else if (isTableCredentials) {
- GetTableCredentialsResponse
getTableCredentialsResponse =
- new GetTableCredentialsResponse(
- System.currentTimeMillis(),
- ImmutableMap.of("key", "value"));
+ } else if (isTableToken) {
+ GetTableTokenResponse getTableTokenResponse =
+ new GetTableTokenResponse(
+ ImmutableMap.of("key", "value"),
+ System.currentTimeMillis());
return new MockResponse()
.setResponseCode(200)
.setBody(
OBJECT_MAPPER.writeValueAsString(
-
getTableCredentialsResponse));
+ getTableTokenResponse));
} else if (isTableRename) {
return renameTableApiHandler(catalog, request);
} else if (isTableCommit) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 752f492078..1fb4dcbcd7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -114,7 +114,7 @@ class RESTCatalogTest extends CatalogTestBase {
options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
options.set(RESTCatalogOptions.TOKEN, initToken);
options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
- options.set(RESTCatalogOptions.FILE_IO_REFRESH_CREDENTIAL_ENABLE,
true);
+ options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
this.catalog = new RESTCatalog(CatalogContext.create(options));
List<Identifier> identifiers =
Lists.newArrayList(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
index 4c3b622a8c..4d9015ea77 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
@@ -32,8 +32,8 @@ import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
-import org.apache.paimon.rest.responses.GetTableCredentialsResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
@@ -276,12 +276,12 @@ public class RESTObjectMapperTest {
}
@Test
- public void getTableCredentialsResponseParseTest() throws Exception {
- GetTableCredentialsResponse response =
MockRESTMessage.getTableCredentialsResponse();
+ public void getTableTokenResponseParseTest() throws Exception {
+ GetTableTokenResponse response =
MockRESTMessage.getTableCredentialsResponse();
String responseStr = OBJECT_MAPPER.writeValueAsString(response);
- GetTableCredentialsResponse parseData =
- OBJECT_MAPPER.readValue(responseStr,
GetTableCredentialsResponse.class);
- assertEquals(response.getCredential(), parseData.getCredential());
+ GetTableTokenResponse parseData =
+ OBJECT_MAPPER.readValue(responseStr,
GetTableTokenResponse.class);
+ assertEquals(response.getToken(), parseData.getToken());
assertEquals(response.getExpiresAtMillis(),
parseData.getExpiresAtMillis());
}
}
diff --git a/paimon-open-api/rest-catalog-open-api.yaml
b/paimon-open-api/rest-catalog-open-api.yaml
index 128514d7a5..c5c4434402 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -432,12 +432,12 @@ paths:
$ref: '#/components/schemas/ErrorResponse'
"500":
description: Internal Server Error
- /v1/{prefix}/databases/{database}/tables/{table}/credentials:
+ /v1/{prefix}/databases/{database}/tables/{table}/token:
get:
tags:
- table
- summary: List credentials
- operationId: listCredentials
+ summary: Get table token
+ operationId: getTableToken
parameters:
- name: prefix
in: path
@@ -460,7 +460,7 @@ paths:
content:
application/json:
schema:
- $ref: '#/components/schemas/GetTableCredentialsResponse'
+ $ref: '#/components/schemas/GetTableDataTokenResponse'
"404":
description: Resource not found
content:
@@ -1203,16 +1203,16 @@ components:
properties:
success:
type: boolean
- GetTableCredentialsResponse:
+ GetTableDataTokenResponse:
type: object
properties:
- expiresAt:
- type: integer
- format: int64
- credentials:
+ token:
type: object
additionalProperties:
type: string
+ expiresAt:
+ type: integer
+ format: int64
AlterDatabaseRequest:
type: object
properties:
diff --git
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index c8eae97dec..3ffec9a5c3 100644
---
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -37,8 +37,8 @@ import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.rest.responses.CreateDatabaseResponse;
import org.apache.paimon.rest.responses.ErrorResponse;
import org.apache.paimon.rest.responses.GetDatabaseResponse;
-import org.apache.paimon.rest.responses.GetTableCredentialsResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTableTokenResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
@@ -358,13 +358,13 @@ public class RESTCatalogController {
}
@Operation(
- summary = "List credentials",
+ summary = "Get table token",
tags = {"table"})
@ApiResponses({
@ApiResponse(
responseCode = "200",
content = {
- @Content(schema = @Schema(implementation =
GetTableCredentialsResponse.class))
+ @Content(schema = @Schema(implementation =
GetTableTokenResponse.class))
}),
@ApiResponse(
responseCode = "404",
@@ -374,13 +374,13 @@ public class RESTCatalogController {
responseCode = "500",
content = {@Content(schema = @Schema())})
})
- @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/credentials")
- public GetTableCredentialsResponse listCredentials(
+ @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/token")
+ public GetTableTokenResponse getTableToken(
@PathVariable String prefix,
@PathVariable String database,
@PathVariable String table) {
- return new GetTableCredentialsResponse(
- System.currentTimeMillis(), ImmutableMap.of("key", "value"));
+ return new GetTableTokenResponse(
+ ImmutableMap.of("key", "value"), System.currentTimeMillis());
}
@Operation(