This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9605b4531f [rest] Refactory RESTTokenFileIO to let it reused in vfs 
(#5939)
9605b4531f is described below

commit 9605b4531fff1537bff00259430c61b75c94df3d
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Wed Jul 23 13:01:16 2025 +0800

    [rest] Refactory RESTTokenFileIO to let it reused in vfs (#5939)
---
 .../java/org/apache/paimon/rest/RESTToken.java     |   0
 .../org/apache/paimon/rest/RESTTokenFileIO.java    |  43 +++------
 .../java/org/apache/paimon/rest/RESTCatalog.java   |   2 +-
 .../java/org/apache/paimon/vfs/VFSDataToken.java   |  64 -------------
 .../java/org/apache/paimon/vfs/VFSOperations.java  | 103 ++-------------------
 .../paimon/vfs/hadoop/PaimonVirtualFileSystem.java |   5 +-
 6 files changed, 24 insertions(+), 193 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTToken.java
similarity index 100%
rename from paimon-core/src/main/java/org/apache/paimon/rest/RESTToken.java
rename to paimon-api/src/main/java/org/apache/paimon/rest/RESTToken.java
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
similarity index 84%
rename from 
paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
rename to 
paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
index cbfb28d6e3..c34e62123d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/rest/RESTTokenFileIO.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.rest;
 
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
@@ -51,7 +50,7 @@ import static 
org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
 /** A {@link FileIO} to support getting token from REST Server. */
 public class RESTTokenFileIO implements FileIO {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     public static final ConfigOption<Boolean> DATA_TOKEN_ENABLED =
             ConfigOptions.key("data-token.enabled")
@@ -74,25 +73,22 @@ public class RESTTokenFileIO implements FileIO {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RESTTokenFileIO.class);
 
-    private final RESTCatalogLoader catalogLoader;
+    private final CatalogContext catalogContext;
     private final Identifier identifier;
     private final Path path;
 
-    // catalog instance before serialization, it will become null after 
serialization, then we
-    // should create catalog from catalog loader
-    private final transient RESTCatalog catalogInstance;
+    // Api instance before serialization, it will become null after 
serialization, then we should
+    // create RESTApi from catalogContext
+    private transient volatile RESTApi apiInstance;
 
     // the latest token from REST Server, serializable in order to avoid 
loading token from the REST
     // Server again after serialization
     private volatile RESTToken token;
 
     public RESTTokenFileIO(
-            RESTCatalogLoader catalogLoader,
-            RESTCatalog catalogInstance,
-            Identifier identifier,
-            Path path) {
-        this.catalogLoader = catalogLoader;
-        this.catalogInstance = catalogInstance;
+            CatalogContext catalogContext, RESTApi apiInstance, Identifier 
identifier, Path path) {
+        this.catalogContext = catalogContext;
+        this.apiInstance = apiInstance;
         this.identifier = identifier;
         this.path = path;
     }
@@ -165,12 +161,13 @@ public class RESTTokenFileIO implements FileIO {
                 return fileIO;
             }
 
-            CatalogContext context = catalogLoader.context();
-            Options options = context.options();
+            Options options = catalogContext.options();
             // the original options are not overwritten
             options = new Options(RESTUtil.merge(token.token(), 
options.toMap()));
             options.set(FILE_IO_ALLOW_CACHE, false);
-            context = CatalogContext.create(options, context.preferIO(), 
context.fallbackIO());
+            CatalogContext context =
+                    CatalogContext.create(
+                            options, catalogContext.preferIO(), 
catalogContext.fallbackIO());
             try {
                 fileIO = FileIO.get(path, context);
             } catch (IOException e) {
@@ -199,20 +196,10 @@ public class RESTTokenFileIO implements FileIO {
 
     private void refreshToken() {
         LOG.info("begin refresh data token for identifier [{}]", identifier);
-        GetTableTokenResponse response;
-        if (catalogInstance != null) {
-            try {
-                response = catalogInstance.loadTableToken(identifier);
-            } catch (Catalog.TableNotExistException e) {
-                throw new RuntimeException(e);
-            }
-        } else {
-            try (RESTCatalog catalog = catalogLoader.load()) {
-                response = catalog.loadTableToken(identifier);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
+        if (apiInstance == null) {
+            apiInstance = new RESTApi(catalogContext.options(), false);
         }
+        GetTableTokenResponse response = 
apiInstance.loadTableToken(identifier);
         LOG.info(
                 "end refresh data token for identifier [{}] expiresAtMillis 
[{}]",
                 identifier,
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 6f574a1ea5..fcf2d0525f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -971,7 +971,7 @@ public class RESTCatalog implements Catalog {
 
     private FileIO fileIOForData(Path path, Identifier identifier) {
         return dataTokenEnabled
-                ? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
+                ? new RESTTokenFileIO(context, api, identifier, path)
                 : fileIOFromOptions(path);
     }
 
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
deleted file mode 100644
index 730ee591af..0000000000
--- 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSDataToken.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.vfs;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Objects;
-
-/** Data token. */
-public class VFSDataToken implements Serializable {
-    private static final long serialVersionUID = 1L;
-    private final Map<String, String> token;
-    private final long expireAtMillis;
-    @Nullable private Integer hash;
-
-    public VFSDataToken(Map<String, String> token, long expireAtMillis) {
-        this.token = token;
-        this.expireAtMillis = expireAtMillis;
-    }
-
-    public Map<String, String> token() {
-        return this.token;
-    }
-
-    public long expireAtMillis() {
-        return this.expireAtMillis;
-    }
-
-    public boolean equals(Object o) {
-        if (o != null && this.getClass() == o.getClass()) {
-            VFSDataToken token1 = (VFSDataToken) o;
-            return this.expireAtMillis == token1.expireAtMillis
-                    && Objects.equals(this.token, token1.token);
-        } else {
-            return false;
-        }
-    }
-
-    public int hashCode() {
-        if (this.hash == null) {
-            this.hash = Objects.hash(new Object[] {this.token, 
this.expireAtMillis});
-        }
-
-        return this.hash;
-    }
-}
diff --git 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
index ad9cd09122..6ca09dc739 100644
--- 
a/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
+++ 
b/paimon-vfs/paimon-vfs-common/src/main/java/org/apache/paimon/vfs/VFSOperations.java
@@ -24,7 +24,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.rest.RESTApi;
-import org.apache.paimon.rest.RESTUtil;
+import org.apache.paimon.rest.RESTTokenFileIO;
 import org.apache.paimon.rest.exceptions.AlreadyExistsException;
 import org.apache.paimon.rest.exceptions.BadRequestException;
 import org.apache.paimon.rest.exceptions.ForbiddenException;
@@ -32,14 +32,7 @@ import 
org.apache.paimon.rest.exceptions.NoSuchResourceException;
 import org.apache.paimon.rest.exceptions.NotImplementedException;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
-import org.apache.paimon.rest.responses.GetTableTokenResponse;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.ThreadUtils;
-
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
-import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Scheduler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,41 +42,22 @@ import java.io.IOException;
 import java.nio.file.FileAlreadyExistsException;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.OBJECT_TABLE;
-import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
-import static org.apache.paimon.rest.RESTApi.TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
 
 /** Wrap over RESTCatalog to provide basic operations for virtual path. */
 public class VFSOperations {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(VFSOperations.class);
 
     private final RESTApi api;
     private final CatalogContext context;
 
-    // table id -> fileIO
-    private static final Cache<VFSDataToken, FileIO> FILE_IO_CACHE =
-            Caffeine.newBuilder()
-                    .expireAfterAccess(30, TimeUnit.MINUTES)
-                    .maximumSize(1000)
-                    .removalListener(
-                            (ignored, value, cause) -> 
IOUtils.closeQuietly((FileIO) value))
-                    .scheduler(
-                            Scheduler.forScheduledExecutorService(
-                                    Executors.newSingleThreadScheduledExecutor(
-                                            ThreadUtils.newDaemonThreadFactory(
-                                                    
"rest-token-file-io-scheduler"))))
-                    .build();
-
-    private static final Cache<String, VFSDataToken> TOKEN_CACHE =
-            Caffeine.newBuilder().expireAfterAccess(30, 
TimeUnit.MINUTES).maximumSize(1000).build();
-
-    public VFSOperations(CatalogContext context) {
-        this.context = context;
-        this.api = new RESTApi(context.options());
+    public VFSOperations(Options options) {
+        this.api = new RESTApi(options);
+        // Get the configured options which has been merged from REST Server
+        this.context = CatalogContext.create(api.options());
     }
 
     public VFSIdentifier getVFSIdentifier(String virtualPath) throws 
IOException {
@@ -116,9 +90,7 @@ public class VFSOperations {
         }
         // Get real path
         StringBuilder realPath = new StringBuilder(table.getPath());
-        boolean isTableRoot = true;
         if (parts.length > 2) {
-            isTableRoot = false;
             if (!table.getPath().endsWith("/")) {
                 realPath.append("/");
             }
@@ -129,9 +101,8 @@ public class VFSOperations {
                 }
             }
         }
-        // Get REST token
-        FileIO fileIO = getFileIO(new Identifier(databaseName, tableName), 
table);
 
+        FileIO fileIO = new RESTTokenFileIO(context, api, identifier, new 
Path(table.getPath()));
         if (parts.length == 2) {
             return new VFSTableRootIdentifier(
                     table, realPath.toString(), fileIO, databaseName, 
tableName);
@@ -254,66 +225,6 @@ public class VFSOperations {
         }
     }
 
-    private FileIO getFileIO(Identifier identifier, GetTableResponse table) 
throws IOException {
-        VFSDataToken token = TOKEN_CACHE.getIfPresent(table.getId());
-        if (shouldRefresh(token)) {
-            synchronized (TOKEN_CACHE) {
-                token = TOKEN_CACHE.getIfPresent(table.getId());
-                if (shouldRefresh(token)) {
-                    token = refreshToken(identifier);
-                    TOKEN_CACHE.put(table.getId(), token);
-                }
-            }
-        }
-
-        FileIO fileIO = FILE_IO_CACHE.getIfPresent(token);
-        if (fileIO != null) {
-            return fileIO;
-        }
-
-        synchronized (FILE_IO_CACHE) {
-            fileIO = FILE_IO_CACHE.getIfPresent(token);
-            if (fileIO != null) {
-                return fileIO;
-            }
-
-            Options options = context.options();
-            // the original options are not overwritten
-            options = new Options(RESTUtil.merge(token.token(), 
options.toMap()));
-            options.set(FILE_IO_ALLOW_CACHE, false);
-            CatalogContext fileIOContext = CatalogContext.create(options);
-            fileIO = FileIO.get(new Path(table.getPath()), fileIOContext);
-            FILE_IO_CACHE.put(token, fileIO);
-            return fileIO;
-        }
-    }
-
-    private boolean shouldRefresh(VFSDataToken token) {
-        return token == null
-                || token.expireAtMillis() - System.currentTimeMillis()
-                        < TOKEN_EXPIRATION_SAFE_TIME_MILLIS;
-    }
-
-    private VFSDataToken refreshToken(Identifier identifier) throws 
IOException {
-        LOG.info("begin refresh data token for identifier [{}]", identifier);
-        GetTableTokenResponse response;
-        try {
-            response = api.loadTableToken(identifier);
-        } catch (NoSuchResourceException e) {
-            throw new FileNotFoundException("Table " + identifier + " not 
found");
-        } catch (ForbiddenException e) {
-            throw new IOException("No permission to access table " + 
identifier);
-        }
-
-        LOG.info(
-                "end refresh data token for identifier [{}] expiresAtMillis 
[{}]",
-                identifier,
-                response.getExpiresAtMillis());
-
-        VFSDataToken token = new VFSDataToken(response.getToken(), 
response.getExpiresAtMillis());
-        return token;
-    }
-
     private GetTableResponse loadTableMetadata(Identifier identifier) throws 
IOException {
         // if the table is system table, we need to load table metadata from 
the system table's data
         // table
diff --git 
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
index 3eb1849b23..1c0e8ceb39 100644
--- 
a/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
+++ 
b/paimon-vfs/paimon-vfs-hadoop/src/main/java/org/apache/paimon/vfs/hadoop/PaimonVirtualFileSystem.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.vfs.hadoop;
 
-import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -73,9 +72,7 @@ public class PaimonVirtualFileSystem extends FileSystem {
         Options options = 
PaimonVirtualFileSystemConfiguration.convertToCatalogOptions(conf);
         // pvfs://catalog_name/database_name/table_name/file, so uri authority 
is catalog name
         options.set(CatalogOptions.WAREHOUSE, uri.getAuthority());
-
-        CatalogContext catalogContext = CatalogContext.create(options);
-        vfsOperations = new VFSOperations(catalogContext);
+        vfsOperations = new VFSOperations(options);
     }
 
     private String getVirtualPath(Path path) {

Reply via email to