This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 9605b4531f [rest] Refactory RESTTokenFileIO to let it reused in vfs (#5939) 9605b4531f is described below commit 9605b4531fff1537bff00259430c61b75c94df3d Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Wed Jul 23 13:01:16 2025 +0800 [rest] Refactory RESTTokenFileIO to let it reused in vfs (#5939) --- .../java/org/apache/paimon/rest/RESTToken.java | 0 .../org/apache/paimon/rest/RESTTokenFileIO.java | 43 +++------ .../java/org/apache/paimon/rest/RESTCatalog.java | 2 +- .../java/org/apache/paimon/vfs/VFSDataToken.java | 64 ------------- .../java/org/apache/paimon/vfs/VFSOperations.java | 103 ++------------------- .../paimon/vfs/hadoop/PaimonVirtualFileSystem.java | 5 +- 6 files changed, 24 insertions(+), 193 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java b/paimon-api/src/main/java/org/apache/paimon/rest/RESTToken.java similarity index 100% rename from paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java rename to paimon-api/src/main/java/org/apache/paimon/rest/RESTToken.java diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java similarity index 84% rename from paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java rename to paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java index cbfb28d6e3..c34e62123d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java @@ -18,7 +18,6 @@ package org.apache.paimon.rest; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; @@ -51,7 +50,7 @@ import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; /** A {@link FileIO} to support getting token from REST Server. */ public class RESTTokenFileIO implements FileIO { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED = ConfigOptions.key("data-token.enabled") @@ -74,25 +73,22 @@ public class RESTTokenFileIO implements FileIO { private static final Logger LOG = LoggerFactory.getLogger(RESTTokenFileIO.class); - private final RESTCatalogLoader catalogLoader; + private final CatalogContext catalogContext; private final Identifier identifier; private final Path path; - // catalog instance before serialization, it will become null after serialization, then we - // should create catalog from catalog loader - private final transient RESTCatalog catalogInstance; + // Api instance before serialization, it will become null after serialization, then we should + // create RESTApi from catalogContext + private transient volatile RESTApi apiInstance; // the latest token from REST Server, serializable in order to avoid loading token from the REST // Server again after serialization private volatile RESTToken token; public RESTTokenFileIO( - RESTCatalogLoader catalogLoader, - RESTCatalog catalogInstance, - Identifier identifier, - Path path) { - this.catalogLoader = catalogLoader; - this.catalogInstance = catalogInstance; + CatalogContext catalogContext, RESTApi apiInstance, Identifier identifier, Path path) { + this.catalogContext = catalogContext; + this.apiInstance = apiInstance; this.identifier = identifier; this.path = path; } @@ -165,12 +161,13 @@ public class RESTTokenFileIO implements FileIO { return fileIO; } - CatalogContext context = catalogLoader.context(); - Options options = context.options(); + Options options = catalogContext.options(); // the original options are not overwritten options = new Options(RESTUtil.merge(token.token(), options.toMap())); options.set(FILE_IO_ALLOW_CACHE, false); - context = CatalogContext.create(options, context.preferIO(), context.fallbackIO()); + CatalogContext context = + CatalogContext.create( + options, catalogContext.preferIO(), catalogContext.fallbackIO()); try { fileIO = FileIO.get(path, context); } catch (IOException e) { @@ -199,20 +196,10 @@ public class RESTTokenFileIO implements FileIO { private void refreshToken() { LOG.info("begin refresh data token for identifier [{}]", identifier); - GetTableTokenResponse response; - if (catalogInstance != null) { - try { - response = catalogInstance.loadTableToken(identifier); - } catch (Catalog.TableNotExistException e) { - throw new RuntimeException(e); - } - } else { - try (RESTCatalog catalog = catalogLoader.load()) { - response = catalog.loadTableToken(identifier); - } catch (Exception e) { - throw new RuntimeException(e); - } + if (apiInstance == null) { + apiInstance = new RESTApi(catalogContext.options(), false); } + GetTableTokenResponse response = apiInstance.loadTableToken(identifier); LOG.info( "end refresh data token for identifier [{}] expiresAtMillis [{}]", identifier, diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 6f574a1ea5..fcf2d0525f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -971,7 +971,7 @@ public class RESTCatalog implements Catalog { private FileIO fileIOForData(Path path, Identifier identifier) { return dataTokenEnabled - ? new RESTTokenFileIO(catalogLoader(), this, identifier, path) + ? new RESTTokenFileIO(context, api, identifier, path) : fileIOFromOptions(path); } diff --git a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java deleted file mode 100644 index 730ee591af..0000000000 --- a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.vfs; - -import javax.annotation.Nullable; - -import java.io.Serializable; -import java.util.Map; -import java.util.Objects; - -/** Data token. */ -public class VFSDataToken implements Serializable { - private static final long serialVersionUID = 1L; - private final Map<String, String> token; - private final long expireAtMillis; - @Nullable private Integer hash; - - public VFSDataToken(Map<String, String> token, long expireAtMillis) { - this.token = token; - this.expireAtMillis = expireAtMillis; - } - - public Map<String, String> token() { - return this.token; - } - - public long expireAtMillis() { - return this.expireAtMillis; - } - - public boolean equals(Object o) { - if (o != null && this.getClass() == o.getClass()) { - VFSDataToken token1 = (VFSDataToken) o; - return this.expireAtMillis == token1.expireAtMillis - && Objects.equals(this.token, token1.token); - } else { - return false; - } - } - - public int hashCode() { - if (this.hash == null) { - this.hash = Objects.hash(new Object[] {this.token, this.expireAtMillis}); - } - - return this.hash; - } -} diff --git a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java index ad9cd09122..6ca09dc739 100644 --- a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java +++ b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java @@ -24,7 +24,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.rest.RESTApi; -import org.apache.paimon.rest.RESTUtil; +import org.apache.paimon.rest.RESTTokenFileIO; import org.apache.paimon.rest.exceptions.AlreadyExistsException; import org.apache.paimon.rest.exceptions.BadRequestException; import org.apache.paimon.rest.exceptions.ForbiddenException; @@ -32,14 +32,7 @@ import org.apache.paimon.rest.exceptions.NoSuchResourceException; import org.apache.paimon.rest.exceptions.NotImplementedException; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.GetTableResponse; -import org.apache.paimon.rest.responses.GetTableTokenResponse; import org.apache.paimon.schema.Schema; -import org.apache.paimon.utils.IOUtils; -import org.apache.paimon.utils.ThreadUtils; - -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; -import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,41 +42,22 @@ import java.io.IOException; import java.nio.file.FileAlreadyExistsException; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.OBJECT_TABLE; -import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE; -import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS; /** Wrap over RESTCatalog to provide basic operations for virtual path. */ public class VFSOperations { + private static final Logger LOG = LoggerFactory.getLogger(VFSOperations.class); private final RESTApi api; private final CatalogContext context; - // table id -> fileIO - private static final Cache<VFSDataToken, FileIO> FILE_IO_CACHE = - Caffeine.newBuilder() - .expireAfterAccess(30, TimeUnit.MINUTES) - .maximumSize(1000) - .removalListener( - (ignored, value, cause) -> IOUtils.closeQuietly((FileIO) value)) - .scheduler( - Scheduler.forScheduledExecutorService( - Executors.newSingleThreadScheduledExecutor( - ThreadUtils.newDaemonThreadFactory( - "rest-token-file-io-scheduler")))) - .build(); - - private static final Cache<String, VFSDataToken> TOKEN_CACHE = - Caffeine.newBuilder().expireAfterAccess(30, TimeUnit.MINUTES).maximumSize(1000).build(); - - public VFSOperations(CatalogContext context) { - this.context = context; - this.api = new RESTApi(context.options()); + public VFSOperations(Options options) { + this.api = new RESTApi(options); + // Get the configured options which has been merged from REST Server + this.context = CatalogContext.create(api.options()); } public VFSIdentifier getVFSIdentifier(String virtualPath) throws IOException { @@ -116,9 +90,7 @@ public class VFSOperations { } // Get real path StringBuilder realPath = new StringBuilder(table.getPath()); - boolean isTableRoot = true; if (parts.length > 2) { - isTableRoot = false; if (!table.getPath().endsWith("/")) { realPath.append("/"); } @@ -129,9 +101,8 @@ public class VFSOperations { } } } - // Get REST token - FileIO fileIO = getFileIO(new Identifier(databaseName, tableName), table); + FileIO fileIO = new RESTTokenFileIO(context, api, identifier, new Path(table.getPath())); if (parts.length == 2) { return new VFSTableRootIdentifier( table, realPath.toString(), fileIO, databaseName, tableName); @@ -254,66 +225,6 @@ public class VFSOperations { } } - private FileIO getFileIO(Identifier identifier, GetTableResponse table) throws IOException { - VFSDataToken token = TOKEN_CACHE.getIfPresent(table.getId()); - if (shouldRefresh(token)) { - synchronized (TOKEN_CACHE) { - token = TOKEN_CACHE.getIfPresent(table.getId()); - if (shouldRefresh(token)) { - token = refreshToken(identifier); - TOKEN_CACHE.put(table.getId(), token); - } - } - } - - FileIO fileIO = FILE_IO_CACHE.getIfPresent(token); - if (fileIO != null) { - return fileIO; - } - - synchronized (FILE_IO_CACHE) { - fileIO = FILE_IO_CACHE.getIfPresent(token); - if (fileIO != null) { - return fileIO; - } - - Options options = context.options(); - // the original options are not overwritten - options = new Options(RESTUtil.merge(token.token(), options.toMap())); - options.set(FILE_IO_ALLOW_CACHE, false); - CatalogContext fileIOContext = CatalogContext.create(options); - fileIO = FileIO.get(new Path(table.getPath()), fileIOContext); - FILE_IO_CACHE.put(token, fileIO); - return fileIO; - } - } - - private boolean shouldRefresh(VFSDataToken token) { - return token == null - || token.expireAtMillis() - System.currentTimeMillis() - < TOKEN_EXPIRATION_SAFE_TIME_MILLIS; - } - - private VFSDataToken refreshToken(Identifier identifier) throws IOException { - LOG.info("begin refresh data token for identifier [{}]", identifier); - GetTableTokenResponse response; - try { - response = api.loadTableToken(identifier); - } catch (NoSuchResourceException e) { - throw new FileNotFoundException("Table " + identifier + " not found"); - } catch (ForbiddenException e) { - throw new IOException("No permission to access table " + identifier); - } - - LOG.info( - "end refresh data token for identifier [{}] expiresAtMillis [{}]", - identifier, - response.getExpiresAtMillis()); - - VFSDataToken token = new VFSDataToken(response.getToken(), response.getExpiresAtMillis()); - return token; - } - private GetTableResponse loadTableMetadata(Identifier identifier) throws IOException { // if the table is system table, we need to load table metadata from the system table's data // table diff --git a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java index 3eb1849b23..1c0e8ceb39 100644 --- a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java +++ b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java @@ -18,7 +18,6 @@ package org.apache.paimon.vfs.hadoop; -import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -73,9 +72,7 @@ public class PaimonVirtualFileSystem extends FileSystem { Options options = PaimonVirtualFileSystemConfiguration.convertToCatalogOptions(conf); // pvfs://catalog_name/database_name/table_name/file, so uri authority is catalog name options.set(CatalogOptions.WAREHOUSE, uri.getAuthority()); - - CatalogContext catalogContext = CatalogContext.create(options); - vfsOperations = new VFSOperations(catalogContext); + vfsOperations = new VFSOperations(options); } private String getVirtualPath(Path path) {