This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ccd33b1701 [core] Make HadoopFileIO determine isObjectStore based on 
the path (#6393)
ccd33b1701 is described below

commit ccd33b1701b619c9d49f77d0bab2fa5b832d0595
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Oct 14 16:28:11 2025 +0800

    [core] Make HadoopFileIO determine isObjectStore based on the path (#6393)
---
 .../org/apache/paimon/fs/hadoop/HadoopFileIO.java  | 16 +++++++++++----
 .../paimon/fs/hadoop/HadoopFileIOLoader.java       |  2 +-
 .../paimon/fs/hadoop/HadoopViewFsFileIOLoader.java |  2 +-
 .../java/org/apache/paimon/utils/FileIOUtils.java  | 23 ++++++++++++++++++++++
 .../paimon/fs/HadoopLocalFileIOBehaviorTest.java   |  4 ++--
 .../org/apache/paimon/fs/HdfsBehaviorTest.java     | 23 ++++++++++++++++++++--
 .../fs/hadoop/HadoopSecuredFileSystemTest.java     |  3 ++-
 .../java/org/apache/paimon/flink/FlinkFileIO.java  | 23 ++--------------------
 8 files changed, 64 insertions(+), 32 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index 918ef5f5c4..49ca2cdc87 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -27,6 +27,7 @@ import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.RemoteIterator;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.hadoop.SerializableConfiguration;
+import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.FunctionWithException;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ReflectionUtils;
@@ -43,6 +44,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
@@ -58,15 +60,21 @@ public class HadoopFileIO implements FileIO {
 
     protected transient volatile Map<Pair<String, String>, FileSystem> fsMap;
 
+    private final Path path;
+
+    public HadoopFileIO(Path path) {
+        this.path = path;
+    }
+
     @VisibleForTesting
-    public void setFileSystem(Path path, FileSystem fs) throws IOException {
-        org.apache.hadoop.fs.Path hadoopPath = path(path);
-        getFileSystem(hadoopPath, p -> fs);
+    public void setFileSystem(FileSystem fs) throws IOException {
+        getFileSystem(path(path), p -> fs);
     }
 
     @Override
     public boolean isObjectStore() {
-        return false;
+        String scheme = path.toUri().getScheme().toLowerCase(Locale.US);
+        return FileIOUtils.isObjectStore(scheme);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java
index 00341ac285..2caea822e0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java
@@ -33,6 +33,6 @@ public class HadoopFileIOLoader implements FileIOLoader {
 
     @Override
     public HadoopFileIO load(Path path) {
-        return new HadoopFileIO();
+        return new HadoopFileIO(path);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java
 
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java
index c60bc35d81..803af22863 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java
@@ -33,6 +33,6 @@ public class HadoopViewFsFileIOLoader implements FileIOLoader 
{
 
     @Override
     public HadoopFileIO load(Path path) {
-        return new HadoopFileIO();
+        return new HadoopFileIO(path);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java
index 31162c3866..0c3559368d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java
@@ -382,4 +382,27 @@ public class FileIOUtils {
         } catch (Exception ignored) {
         }
     }
+
+    public static boolean isObjectStore(String scheme) {
+        if (scheme.startsWith("s3")
+                || scheme.startsWith("emr")
+                || scheme.startsWith("oss")
+                || scheme.startsWith("wasb")
+                || scheme.startsWith("abfs")
+                || scheme.startsWith("gs")
+                || scheme.startsWith("cosn")) {
+            // the Amazon S3 storage or Aliyun OSS storage or Azure Blob 
Storage
+            // or Google Cloud Storage
+            return true;
+        } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
+            // file servers instead of file systems
+            // they might actually be consistent, but we have no hard 
guarantees
+            // currently to rely on that
+            return true;
+        } else {
+            // the remainder should include hdfs, kosmos, ceph, ...
+            // this also includes federated HDFS (viewfs).
+            return false;
+        }
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java
index df94dd2a80..6f6ed70b1a 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java
@@ -38,8 +38,8 @@ class HadoopLocalFileIOBehaviorTest extends 
FileIOBehaviorTestBase {
     protected FileIO getFileSystem() throws Exception {
         org.apache.hadoop.fs.FileSystem fs = new RawLocalFileSystem();
         fs.initialize(URI.create("file:///"), new Configuration());
-        HadoopFileIO fileIO = new HadoopFileIO();
-        fileIO.setFileSystem(getBasePath(), fs);
+        HadoopFileIO fileIO = new HadoopFileIO(getBasePath());
+        fileIO.setFileSystem(fs);
         return fileIO;
     }
 
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java 
b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
index 56bfe052ec..b719038caa 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
@@ -28,11 +28,14 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 /** Behavior tests for HDFS. */
 class HdfsBehaviorTest extends FileIOBehaviorTestBase {
@@ -62,8 +65,8 @@ class HdfsBehaviorTest extends FileIOBehaviorTestBase {
         org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
 
         basePath = new Path(hdfs.getUri().toString() + "/tests");
-        fs = new HadoopFileIO();
-        fs.setFileSystem(basePath, hdfs);
+        fs = new HadoopFileIO(basePath);
+        fs.setFileSystem(hdfs);
     }
 
     @AfterAll
@@ -102,4 +105,20 @@ class HdfsBehaviorTest extends FileIOBehaviorTestBase {
     public void testAtomicWriteMultipleThreads() throws InterruptedException {
         FileIOTest.testOverwriteFileUtf8(new Path(getBasePath(), 
randomName()), fs);
     }
+
+    @Test
+    public void testIsObjectStore() {
+        assertThat(fs.isObjectStore()).isEqualTo(false);
+    }
+
+    @Test
+    public void testSerializable() {
+        assertDoesNotThrow(
+                () -> {
+                    try (ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
+                            ObjectOutputStream oos = new 
ObjectOutputStream(baos)) {
+                        oos.writeObject(fs);
+                    }
+                });
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java
index 1737434c8a..10d60f2bc4 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.fs.hadoop;
 
 import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 
 import org.junit.jupiter.api.Test;
@@ -41,7 +42,7 @@ public class HadoopSecuredFileSystemTest {
         options.set("security.kerberos.login.principal", "test-user");
         options.set("security.kerberos.login.keytab", 
keytabFile.getAbsolutePath());
 
-        HadoopFileIO fileIO = new HadoopFileIO();
+        HadoopFileIO fileIO = new HadoopFileIO(new Path("file:///tmp/test"));
         fileIO.configure(CatalogContext.create(options));
         assertThat(fileIO.getFileSystem(new 
org.apache.hadoop.fs.Path("file:///tmp/test")))
                 .isInstanceOf(HadoopSecuredFileSystem.class);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
index 4819967749..24651644c9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.FileIOUtils;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -50,27 +51,7 @@ public class FlinkFileIO implements FileIO {
         try {
             FileSystem fs = path.getFileSystem();
             String scheme = fs.getUri().getScheme().toLowerCase(Locale.US);
-
-            if (scheme.startsWith("s3")
-                    || scheme.startsWith("emr")
-                    || scheme.startsWith("oss")
-                    || scheme.startsWith("wasb")
-                    || scheme.startsWith("abfs")
-                    || scheme.startsWith("gs")
-                    || scheme.startsWith("cosn")) {
-                // the Amazon S3 storage or Aliyun OSS storage or Azure Blob 
Storage
-                // or Google Cloud Storage
-                return true;
-            } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
-                // file servers instead of file systems
-                // they might actually be consistent, but we have no hard 
guarantees
-                // currently to rely on that
-                return true;
-            } else {
-                // the remainder should include hdfs, kosmos, ceph, ...
-                // this also includes federated HDFS (viewfs).
-                return false;
-            }
+            return FileIOUtils.isObjectStore(scheme);
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }

Reply via email to