This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-fs-spi
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 8733d0e8013b9b5e7bffccd716332eaf6895a11b
Author: morningman <[email protected]>
AuthorDate: Thu Apr 2 00:00:19 2026 +0800

    [refactor](fs-spi) P4.8-G3: delete DFSFileSystem and its phantom reference 
infrastructure
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Problem Summary: Delete DFSFileSystem (the legacy HDFS wrapper class) and 
its
    supporting classes now that all callers have been migrated:
    - DFSFileSystem.java: deleted after migrating ExternalCatalog and 
HMSExternalCatalog
      away from its static members (PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH constant 
and
      getHdfsConf() method). These are now inlined directly in the callers using
      HdfsConfiguration and the literal string 
'ipc.client.fallback-to-simple-auth-allowed'.
    - DFSFileSystemPhantomReference.java: helper class for phantom reference 
tracking,
      only used within the dfs package
    - RemoteFSPhantomManager.java: background cleanup thread for Hadoop 
FileSystem
      objects, only called from DFSFileSystem.nativeFileSystem()
    - IcebergHadoopCatalogTest.java: @Ignore test with no assertions, purely 
manual
      exploration code using DFSFileSystem.nativeFileSystem()
    
    Also removes DFSFileSystem from the GsonUtils reflection array.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: No need to test (deleted classes have no production callers; 
ExternalCatalog
        and HMSExternalCatalog behavior is unchanged — same Hadoop config 
semantics)
    - Behavior changed: No
    - Does this need documentation: No
    
    Co-authored-by: Copilot <[email protected]>
---
 .../apache/doris/datasource/ExternalCatalog.java   |   9 +-
 .../doris/datasource/hive/HMSExternalCatalog.java  |   3 +-
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  | 582 ---------------------
 .../remote/dfs/DFSFileSystemPhantomReference.java  |  44 --
 .../fs/remote/dfs/RemoteFSPhantomManager.java      | 127 -----
 .../org/apache/doris/persist/gson/GsonUtils.java   |   1 -
 .../external/iceberg/IcebergHadoopCatalogTest.java |  91 ----
 7 files changed, 7 insertions(+), 850 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 6240c837567..43717911ca1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -55,7 +55,6 @@ import 
org.apache.doris.datasource.paimon.PaimonExternalDatabase;
 import org.apache.doris.datasource.test.TestExternalCatalog;
 import org.apache.doris.datasource.test.TestExternalDatabase;
 import 
org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.info.TableNameInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo;
 import org.apache.doris.persist.CreateDbInfo;
@@ -79,6 +78,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.jetbrains.annotations.NotNull;
@@ -223,7 +223,10 @@ public abstract class ExternalCatalog
     }
 
     private Configuration buildConf() {
-        Configuration conf = 
DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth());
+        Configuration conf = new HdfsConfiguration();
+        if (ifNotSetFallbackToSimpleAuth()) {
+            conf.set("ipc.client.fallback-to-simple-auth-allowed", "true");
+        }
         Map<String, String> catalogProperties = 
catalogProperty.getHadoopProperties();
         for (Map.Entry<String, String> entry : catalogProperties.entrySet()) {
             conf.set(entry.getKey(), entry.getValue());
@@ -273,7 +276,7 @@ public abstract class ExternalCatalog
 
     // we need check auth fallback for kerberos or simple
     public boolean ifNotSetFallbackToSimpleAuth() {
-        return 
catalogProperty.getOrDefault(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, 
"").isEmpty();
+        return 
catalogProperty.getOrDefault("ipc.client.fallback-to-simple-auth-allowed", 
"").isEmpty();
     }
 
     // Will be called when creating catalog(not replaying).
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 1feeb59c241..03246a1f75a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -34,7 +34,6 @@ import org.apache.doris.datasource.metacache.CacheSpec;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
 import org.apache.doris.datasource.property.metastore.AbstractHiveProperties;
 import org.apache.doris.fs.SpiSwitchingFileSystem;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 import org.apache.doris.transaction.TransactionManagerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -227,7 +226,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
         super.setDefaultPropsIfMissing(isReplay);
         if (ifNotSetFallbackToSimpleAuth()) {
             // always allow fallback to simple auth, so to support both 
kerberos and simple auth
-            
catalogProperty.addProperty(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, 
"true");
+            
catalogProperty.addProperty("ipc.client.fallback-to-simple-auth-allowed", 
"true");
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
deleted file mode 100644
index 8d98a7bc5fb..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ /dev/null
@@ -1,582 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.remote.dfs;
-
-import org.apache.doris.backup.Status;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.security.authentication.HadoopAuthenticator;
-import org.apache.doris.common.util.S3Util;
-import org.apache.doris.common.util.URI;
-import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties;
-import org.apache.doris.datasource.property.storage.StorageProperties;
-import org.apache.doris.foundation.fs.FsStorageType;
-import org.apache.doris.fs.operations.HDFSFileOperations;
-import org.apache.doris.fs.operations.HDFSOpParams;
-import org.apache.doris.fs.operations.OpParams;
-import org.apache.doris.fs.remote.RemoteFile;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.FileVisitOption;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-
-public class DFSFileSystem extends RemoteFileSystem {
-
-    public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = 
"ipc.client.fallback-to-simple-auth-allowed";
-    private static final Logger LOG = 
LogManager.getLogger(DFSFileSystem.class);
-    private HDFSFileOperations operations = null;
-    private final HdfsCompatibleProperties hdfsProperties;
-    protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
-
-    public DFSFileSystem(HdfsCompatibleProperties hdfsProperties) {
-        super(FsStorageType.HDFS.name(), FsStorageType.HDFS);
-        this.properties.putAll(hdfsProperties.getOrigProps());
-        this.hdfsProperties = hdfsProperties;
-    }
-
-    @Override
-    public StorageProperties getStorageProperties() {
-        return hdfsProperties;
-    }
-
-    @Override
-    public Status listFiles(String remotePath, boolean recursive, 
List<RemoteFile> result) {
-        try {
-            Path locatedPath = new Path(remotePath);
-            org.apache.hadoop.fs.FileSystem fileSystem = 
nativeFileSystem(locatedPath);
-            RemoteIterator<LocatedFileStatus> locatedFiles = 
getLocatedFiles(recursive, fileSystem, locatedPath);
-            while (locatedFiles.hasNext()) {
-                LocatedFileStatus fileStatus = locatedFiles.next();
-                RemoteFile location = new RemoteFile(
-                        fileStatus.getPath(), fileStatus.isDirectory(), 
fileStatus.getLen(),
-                        fileStatus.getBlockSize(), 
fileStatus.getModificationTime(), fileStatus.getBlockLocations());
-                result.add(location);
-            }
-        } catch (FileNotFoundException e) {
-            return new Status(Status.ErrCode.NOT_FOUND, e.getMessage());
-        } catch (Exception e) {
-            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
-        }
-        return Status.OK;
-    }
-
-
-    @Override
-    public Status listDirectories(String remotePath, Set<String> result) {
-        try {
-            Path locatedPath = new Path(remotePath);
-            FileSystem fileSystem = nativeFileSystem(locatedPath);
-            FileStatus[] fileStatuses = getFileStatuses(locatedPath, 
fileSystem);
-            result.addAll(
-                    Arrays.stream(fileStatuses)
-                            .filter(FileStatus::isDirectory)
-                            .map(file -> file.getPath().toString() + "/")
-                            .collect(ImmutableSet.toImmutableSet()));
-        } catch (Exception e) {
-            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
-        }
-        return Status.OK;
-    }
-
-    public DFSFileSystem(HdfsCompatibleProperties hdfsProperties, 
FsStorageType storageType) {
-        super(storageType.name(), storageType);
-        this.properties.putAll(hdfsProperties.getOrigProps());
-        this.hdfsProperties = hdfsProperties;
-    }
-
-    @VisibleForTesting
-    public FileSystem nativeFileSystem(Path remotePath) throws IOException {
-        if (closed.get()) {
-            throw new IOException("FileSystem is closed.");
-        }
-        if (dfsFileSystem == null) {
-            synchronized (this) {
-                if (closed.get()) {
-                    throw new IOException("FileSystem is closed.");
-                }
-                if (dfsFileSystem == null) {
-                    try {
-                        dfsFileSystem = 
hdfsProperties.getHadoopAuthenticator().doAs(() -> {
-                            try {
-                                Configuration originalConf = 
hdfsProperties.getHadoopStorageConfig();
-                                // Create a copy of the original configuration 
to avoid modifying global settings
-                                Configuration confCopy = new 
Configuration(originalConf);
-                                // Disable FileSystem caching to ensure a new 
instance is created every time
-                                // Reason: We manage the lifecycle of 
FileSystem instances manually.
-                                // Even if the caller doesn't explicitly close 
the instance, we will do so when needed.
-                                // However, since Hadoop caches FileSystem 
instances by default,
-                                // other parts of the system may still be 
using the same instance.
-                                // If we close the shared instance here, it 
could break those other users.
-                                // Therefore, we disable the cache to ensure 
isolated, non-shared instances.
-                                
confCopy.setBoolean("fs.hdfs.impl.disable.cache", true);
-                                return FileSystem.get(remotePath.toUri(), 
confCopy);
-                            } catch (IOException e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-                        operations = new HDFSFileOperations(dfsFileSystem);
-                        RemoteFSPhantomManager.registerPhantomReference(this);
-                    } catch (Exception e) {
-                        throw new IOException("Failed to get dfs FileSystem 
for " + e.getMessage(), e);
-                    }
-                }
-            }
-        }
-        return dfsFileSystem;
-    }
-
-    protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean 
recursive,
-            FileSystem fileSystem, Path locatedPath) throws IOException {
-        return hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.listFiles(locatedPath, recursive));
-    }
-
-    protected FileStatus[] getFileStatuses(Path remotePath, FileSystem 
fileSystem) throws IOException {
-        return hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.listStatus(remotePath));
-    }
-
-    public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) {
-        Configuration hdfsConf = new HdfsConfiguration();
-        if (fallbackToSimpleAuth) {
-            // need support fallback to simple if the cluster is a mixture of  
kerberos and simple auth.
-            hdfsConf.set(PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true");
-        }
-        return hdfsConf;
-    }
-
-    @Override
-    public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("download from {} to {}, file size: {}.", 
remoteFilePath, localFilePath, fileSize);
-        }
-        final long start = System.currentTimeMillis();
-        HDFSOpParams hdfsOpParams = OpParams.of(remoteFilePath);
-        Status st = operations.openReader(hdfsOpParams);
-        if (st != Status.OK) {
-            return st;
-        }
-        FSDataInputStream fsDataInputStream = hdfsOpParams.fsDataInputStream();
-        LOG.info("finished to open reader. download {} to {}.", 
remoteFilePath, localFilePath);
-
-        // delete local file if exist
-        File localFile = new File(localFilePath);
-        if (localFile.exists()) {
-            try {
-                Files.walk(Paths.get(localFilePath), 
FileVisitOption.FOLLOW_LINKS).sorted(Comparator.reverseOrder())
-                        .map(java.nio.file.Path::toFile).forEach(File::delete);
-            } catch (IOException e) {
-                return new Status(Status.ErrCode.COMMON_ERROR,
-                        "failed to delete exist local file: " + localFilePath 
+ ", msg: " + e.getMessage());
-            }
-        }
-        // create local file
-        try {
-            if (!localFile.createNewFile()) {
-                return new Status(Status.ErrCode.COMMON_ERROR, "failed to 
create local file: " + localFilePath);
-            }
-        } catch (IOException e) {
-            return new Status(Status.ErrCode.COMMON_ERROR,
-                    "failed to create local file: " + localFilePath + ", msg: 
" + e.getMessage());
-        }
-
-        String lastErrMsg;
-        Status status = Status.OK;
-        try (BufferedOutputStream out = new 
BufferedOutputStream(Files.newOutputStream(localFile.toPath()))) {
-            final long bufSize = 1024 * 1024; // 1MB
-            long leftSize = fileSize;
-            long readOffset = 0;
-            while (leftSize > 0) {
-                long readLen = Math.min(leftSize, bufSize);
-                try {
-                    ByteBuffer data = readStreamBuffer(fsDataInputStream, 
readOffset, readLen);
-                    if (readLen != data.array().length) {
-                        LOG.warn(
-                                "the actual read length does not equal to "
-                                        + "the expected read length: {} vs. 
{}, file: {}",
-                                data.array().length, readLen, remoteFilePath);
-                    }
-                    // write local file
-                    out.write(data.array());
-                    readOffset += data.array().length;
-                    leftSize -= data.array().length;
-                } catch (Exception e) {
-                    lastErrMsg = String.format(
-                            "failed to read. " + "current read offset: %d, 
read length: %d,"
-                                    + " file size: %d, file: %s. msg: %s",
-                            readOffset, readLen, fileSize, remoteFilePath, 
e.getMessage());
-                    LOG.warn(lastErrMsg);
-                    status = new Status(Status.ErrCode.COMMON_ERROR, 
lastErrMsg);
-                    break;
-                }
-            }
-        } catch (IOException e) {
-            return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + 
e.getMessage());
-        } finally {
-            Status closeStatus = 
operations.closeReader(OpParams.of(fsDataInputStream));
-            if (!closeStatus.ok()) {
-                LOG.warn(closeStatus.getErrMsg());
-                if (status.ok()) {
-                    // we return close write error only if no other error has 
been encountered.
-                    status = closeStatus;
-                }
-            }
-        }
-
-        LOG.info("finished to download from {} to {} with size: {}. cost {} 
ms", remoteFilePath, localFilePath,
-                fileSize, (System.currentTimeMillis() - start));
-        return status;
-    }
-
-    /**
-     * read data from fsDataInputStream.
-     *
-     * @param fsDataInputStream input stream for read.
-     * @param readOffset read offset.
-     * @param length read length.
-     * @return ByteBuffer
-     * @throws IOException when read data error.
-     */
-    private static ByteBuffer readStreamBuffer(FSDataInputStream 
fsDataInputStream, long readOffset, long length)
-            throws IOException {
-        synchronized (fsDataInputStream) {
-            long currentStreamOffset;
-            try {
-                currentStreamOffset = fsDataInputStream.getPos();
-            } catch (IOException e) {
-                LOG.warn("errors while get file pos from output stream", e);
-                throw new IOException("errors while get file pos from output 
stream", e);
-            }
-            if (currentStreamOffset != readOffset) {
-                // it's ok, when reading some format like parquet, it is not a 
sequential read
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("invalid offset, current read offset is " + 
currentStreamOffset
-                            + " is not equal to request offset " + readOffset 
+ " seek to it");
-                }
-                try {
-                    fsDataInputStream.seek(readOffset);
-                } catch (IOException e) {
-                    throw new IOException(String.format(
-                            "current read offset %d is not equal to %d, and 
could not seek to it, msg: %s",
-                            currentStreamOffset, readOffset, e.getMessage()));
-                }
-            }
-            // Avoid using the ByteBuffer based read for Hadoop because some
-            // FSDataInputStream
-            // implementations are not ByteBufferReadable,
-            // See https://issues.apache.org/jira/browse/HADOOP-14603
-            byte[] buf;
-            if (length > HDFSFileOperations.READ_BUFFER_SIZE) {
-                buf = new byte[HDFSFileOperations.READ_BUFFER_SIZE];
-            } else {
-                buf = new byte[(int) length];
-            }
-            try {
-                int readLength = readBytesFully(fsDataInputStream, buf);
-                if (readLength < 0) {
-                    throw new IOException("end of file reached");
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
-                            "read buffer from input stream, buffer size:" + 
buf.length + ", read length:" + readLength);
-                }
-                return ByteBuffer.wrap(buf, 0, readLength);
-            } catch (IOException e) {
-                LOG.warn("errors while read data from stream", e);
-                throw new IOException("errors while read data from stream " + 
e.getMessage());
-            }
-        }
-    }
-
-    private static int readBytesFully(FSDataInputStream is, byte[] dest) 
throws IOException {
-        int readLength = 0;
-        while (readLength < dest.length) {
-            int availableReadLength = dest.length - readLength;
-            int n = is.read(dest, readLength, availableReadLength);
-            if (n <= 0) {
-                break;
-            }
-            readLength += n;
-        }
-        return readLength;
-    }
-
-    @Override
-    public Status exists(String remotePath) {
-        try {
-            URI pathUri = URI.create(remotePath);
-            Path inputFilePath = new Path(pathUri.getLocation());
-            FileSystem fileSystem = nativeFileSystem(inputFilePath);
-            boolean isPathExist = 
hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.exists(inputFilePath));
-            if (!isPathExist) {
-                return new Status(Status.ErrCode.NOT_FOUND, "remote path does 
not exist: " + remotePath);
-            }
-            return Status.OK;
-        } catch (Exception e) {
-            LOG.warn("errors while check path exist " + remotePath, e);
-            return new Status(Status.ErrCode.COMMON_ERROR,
-                    "failed to check remote path exist: " + remotePath + ". 
msg: " + e.getMessage());
-        }
-    }
-
-    @Override
-    public Status directUpload(String content, String remoteFile) {
-        HDFSOpParams hdfsOpParams = OpParams.of(remoteFile);
-        Status wst = operations.openWriter(hdfsOpParams);
-        if (wst != Status.OK) {
-            return wst;
-        }
-        FSDataOutputStream fsDataOutputStream = 
hdfsOpParams.fsDataOutputStream();
-        LOG.info("finished to open writer. directly upload to remote path 
{}.", remoteFile);
-
-        Status status = Status.OK;
-        try {
-            fsDataOutputStream.writeBytes(content);
-        } catch (IOException e) {
-            LOG.warn("errors while write data to output stream", e);
-            status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: 
" + e.getMessage());
-        } finally {
-            Status closeStatus = 
operations.closeWriter(OpParams.of(fsDataOutputStream));
-            if (!closeStatus.ok()) {
-                LOG.warn(closeStatus.getErrMsg());
-                if (status.ok()) {
-                    status = closeStatus;
-                }
-            }
-        }
-        return status;
-    }
-
-    @Override
-    public Status upload(String localPath, String remotePath) {
-        long start = System.currentTimeMillis();
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("local path {}, remote path {}", localPath, remotePath);
-        }
-        HDFSOpParams hdfsOpParams = OpParams.of(remotePath);
-        Status wst = operations.openWriter(hdfsOpParams);
-        if (wst != Status.OK) {
-            return wst;
-        }
-        FSDataOutputStream fsDataOutputStream = 
hdfsOpParams.fsDataOutputStream();
-        LOG.info("finished to open writer. directly upload to remote path 
{}.", remotePath);
-        // read local file and write remote
-        File localFile = new File(localPath);
-        long fileLength = localFile.length();
-        byte[] readBuf = new byte[1024];
-        Status status = new Status(Status.ErrCode.OK, "");
-        try (BufferedInputStream in = new BufferedInputStream(new 
FileInputStream(localFile))) {
-            // save the last err msg
-            String lastErrMsg = null;
-            // save the current write offset of remote file
-            long writeOffset = 0;
-            // read local file, 1MB at a time
-            int bytesRead;
-            while ((bytesRead = in.read(readBuf)) != -1) {
-                try {
-                    fsDataOutputStream.write(readBuf, 0, bytesRead);
-                } catch (IOException e) {
-                    LOG.warn("errors while write data to output stream", e);
-                    lastErrMsg = String.format(
-                            "failed to write hdfs. current write offset: %d, 
write length: %d, "
-                                    + "file length: %d, file: %s, msg: errors 
while write data to output stream",
-                            writeOffset, bytesRead, fileLength, remotePath);
-                    status = new Status(Status.ErrCode.COMMON_ERROR, 
lastErrMsg);
-                    break;
-                }
-
-                // write succeed, update current write offset
-                writeOffset += bytesRead;
-            } // end of read local file loop
-        } catch (FileNotFoundException e1) {
-            return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not 
found exception: " + e1.getMessage());
-        } catch (IOException e1) {
-            return new Status(Status.ErrCode.COMMON_ERROR, "encounter io 
exception: " + e1.getMessage());
-        } finally {
-            Status closeStatus = 
operations.closeWriter(OpParams.of(fsDataOutputStream));
-            if (!closeStatus.ok()) {
-                LOG.warn(closeStatus.getErrMsg());
-                if (status.ok()) {
-                    // we return close write error only if no other error has 
been encountered.
-                    status = closeStatus;
-                }
-            }
-        }
-
-        if (status.ok()) {
-            LOG.info("finished to upload {} to remote path {}. cost: {} ms", 
localPath, remotePath,
-                    (System.currentTimeMillis() - start));
-        }
-        return status;
-    }
-
-    @Override
-    public Status rename(String srcPath, String destPath) {
-        long start = System.currentTimeMillis();
-        try {
-            URI srcPathUri = URI.create(srcPath);
-            URI destPathUri = URI.create(destPath);
-            if 
(!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) {
-                return new Status(Status.ErrCode.COMMON_ERROR, "only allow 
rename in same file system");
-            }
-            FileSystem fileSystem = nativeFileSystem(new Path(destPath));
-            Path srcfilePath = new Path(srcPathUri.getPath());
-            Path destfilePath = new Path(destPathUri.getPath());
-            boolean isRenameSuccess = 
hdfsProperties.getHadoopAuthenticator().doAs(()
-                    -> fileSystem.rename(srcfilePath, destfilePath));
-            if (!isRenameSuccess) {
-                return new Status(Status.ErrCode.COMMON_ERROR, "failed to 
rename " + srcPath + " to " + destPath);
-            }
-        } catch (UserException e) {
-            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
-        } catch (IOException e) {
-            LOG.warn("errors while rename path from " + srcPath + " to " + 
destPath);
-            return new Status(Status.ErrCode.COMMON_ERROR,
-                    "failed to rename remote " + srcPath + " to " + destPath + 
", msg: " + e.getMessage());
-        }
-        LOG.info("finished to rename {} to  {}. cost: {} ms", srcPath, 
destPath, (System.currentTimeMillis() - start));
-        return Status.OK;
-    }
-
-    @Override
-    public Status delete(String remotePath) {
-        try {
-            URI pathUri = URI.create(remotePath);
-            Path inputFilePath = new Path(pathUri.getLocation());
-            FileSystem fileSystem = nativeFileSystem(inputFilePath);
-            hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.delete(inputFilePath, true));
-        } catch (UserException e) {
-            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
-        } catch (IOException e) {
-            LOG.warn("errors while delete path " + remotePath);
-            return new Status(Status.ErrCode.COMMON_ERROR,
-                    "failed to delete remote path: " + remotePath + ", msg: " 
+ e.getMessage());
-        }
-        LOG.info("finished to delete remote path {}.", remotePath);
-        return Status.OK;
-    }
-
-    /**
-     * get files in remotePath of HDFS.
-     *
-     * @param remotePath hdfs://namenode:port/path.
-     * @param result files in remotePath.
-     * @param fileNameOnly means get file only in remotePath if true.
-     * @return Status.OK if success.
-     */
-    @Override
-    public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
-        try {
-            URI pathUri = URI.create(remotePath);
-            Path pathPattern = new 
Path(S3Util.extendGlobs(pathUri.getLocation()));
-            FileSystem fileSystem = nativeFileSystem(pathPattern);
-            FileStatus[] files = 
hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.globStatus(pathPattern));
-            if (files == null) {
-                LOG.info("no files in path " + remotePath);
-                return Status.OK;
-            }
-            for (FileStatus fileStatus : files) {
-                RemoteFile remoteFile = new RemoteFile(
-                        fileNameOnly ? fileStatus.getPath().getName() : 
fileStatus.getPath().toString(),
-                        !fileStatus.isDirectory(), fileStatus.isDirectory() ? 
-1 : fileStatus.getLen(),
-                        fileStatus.getBlockSize(), 
fileStatus.getModificationTime());
-                result.add(remoteFile);
-            }
-        } catch (FileNotFoundException e) {
-            LOG.info("file not found: " + e.getMessage());
-            return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + 
e.getMessage());
-        } catch (Exception e) {
-            LOG.warn("errors while get file status ", e);
-            return new Status(Status.ErrCode.COMMON_ERROR, "errors while get 
file status " + e.getMessage());
-        }
-        LOG.info("finish list path {}", remotePath);
-        return Status.OK;
-    }
-
-    @Override
-    public Status makeDir(String remotePath) {
-        try {
-            Path locatedPath = new Path(remotePath);
-            FileSystem fileSystem = nativeFileSystem(locatedPath);
-            if (!hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.mkdirs(locatedPath))) {
-                LOG.warn("failed to make dir for " + remotePath);
-                return new Status(Status.ErrCode.COMMON_ERROR, "failed to make 
dir for " + remotePath);
-            }
-        } catch (Exception e) {
-            LOG.warn("failed to make dir for {}, exception:", remotePath, e);
-            return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
-        }
-        return Status.OK;
-    }
-
-    @VisibleForTesting
-    public HadoopAuthenticator getAuthenticator() {
-        return hdfsProperties.getHadoopAuthenticator();
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (closed.compareAndSet(false, true)) {
-            try {
-                if (dfsFileSystem != null) {
-                    dfsFileSystem.close();
-                }
-            } catch (IOException e) {
-                LOG.warn("Failed to close DFSFileSystem: {}", e.getMessage(), 
e);
-            }
-        }
-    }
-
-    public FileStatus getFileStatus(Path path) throws IOException {
-        FileSystem fileSystem = nativeFileSystem(path);
-        return hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.getFileStatus(path));
-    }
-
-    public FSDataInputStream openFile(Path path) throws IOException {
-        FileSystem fileSystem = nativeFileSystem(path);
-        return hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.open(path));
-    }
-
-    public FSDataOutputStream createFile(Path path, boolean overwrite) throws 
IOException {
-        FileSystem fileSystem = nativeFileSystem(path);
-        return hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.create(path, overwrite));
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystemPhantomReference.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystemPhantomReference.java
deleted file mode 100644
index fe7c08080e5..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystemPhantomReference.java
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.remote.dfs;
-
-import org.apache.hadoop.fs.FileSystem;
-
-import java.lang.ref.PhantomReference;
-import java.lang.ref.ReferenceQueue;
-
-public class DFSFileSystemPhantomReference extends 
PhantomReference<DFSFileSystem> {
-
-    private FileSystem fs;
-
-    /**
-     * Creates a new phantom reference that refers to the given object and
-     * is registered with the given queue.
-     *
-     * <p> It is possible to create a phantom reference with a {@code null}
-     * queue.  Such a reference will never be enqueued.
-     *
-     * @param referent the object the new phantom reference will refer to
-     * @param q        the queue with which the reference is to be registered,
-     *                 or {@code null} if registration is not required
-     */
-    public DFSFileSystemPhantomReference(DFSFileSystem referent, 
ReferenceQueue<? super DFSFileSystem> q) {
-        super(referent, q);
-        this.fs = referent.dfsFileSystem;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/RemoteFSPhantomManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/RemoteFSPhantomManager.java
deleted file mode 100644
index 6299936831c..00000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/RemoteFSPhantomManager.java
+++ /dev/null
@@ -1,127 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.remote.dfs;
-
-import org.apache.doris.common.CustomThreadFactory;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-
-import com.google.common.collect.Sets;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.IOException;
-import java.lang.ref.PhantomReference;
-import java.lang.ref.Reference;
-import java.lang.ref.ReferenceQueue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * The RemoteFSPhantomManager class is responsible for managing the phantom 
references
- * of RemoteFileSystem objects. It ensures that the associated FileSystem 
resources are
- * automatically cleaned up when the RemoteFileSystem objects are garbage 
collected.
- * <p>
- * By utilizing a ReferenceQueue and PhantomReference, this class can monitor 
the lifecycle
- * of RemoteFileSystem objects. When a RemoteFileSystem object is no longer in 
use and is
- * garbage collected, its corresponding FileSystem resource is properly closed 
to prevent
- * resource leaks.
- * <p>
- * The class provides a thread-safe mechanism to ensure that the cleanup 
thread is started only once.
- * <p>
- * Main functionalities include:
- * - Registering phantom references of RemoteFileSystem objects.
- * - Starting a periodic cleanup thread that automatically closes unused 
FileSystem resources.
- */
-public class RemoteFSPhantomManager {
-
-    private static final Logger LOG = 
LogManager.getLogger(RemoteFSPhantomManager.class);
-
-    // Scheduled executor for periodic resource cleanup
-    private static ScheduledExecutorService cleanupExecutor;
-
-    // Reference queue for monitoring RemoteFileSystem objects' phantom 
references
-    private static final ReferenceQueue<DFSFileSystem> referenceQueue = new 
ReferenceQueue<>();
-
-    // Map storing the phantom references and their corresponding FileSystem 
objects
-    private static final ConcurrentHashMap<PhantomReference<DFSFileSystem>, 
FileSystem> referenceMap
-            = new ConcurrentHashMap<>();
-
-    private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet();
-
-    // Flag indicating whether the cleanup thread has been started
-    private static final AtomicBoolean isStarted = new AtomicBoolean(false);
-
-    /**
-     * Registers a phantom reference for a RemoteFileSystem object in the 
manager.
-     * If the cleanup thread has not been started, it will be started.
-     *
-     * @param remoteFileSystem the RemoteFileSystem object to be registered
-     */
-    public static void registerPhantomReference(DFSFileSystem 
remoteFileSystem) {
-        if (!isStarted.get()) {
-            start();
-            isStarted.set(true);
-        }
-        if (fsSet.contains(remoteFileSystem.dfsFileSystem)) {
-            throw new RuntimeException("FileSystem already exists: " + 
remoteFileSystem.dfsFileSystem.getUri());
-        }
-        DFSFileSystemPhantomReference phantomReference = new 
DFSFileSystemPhantomReference(remoteFileSystem,
-                referenceQueue);
-        referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
-        fsSet.add(remoteFileSystem.dfsFileSystem);
-    }
-
-    /**
-     * Starts the cleanup thread, which periodically checks and cleans up 
unused FileSystem resources.
-     * The method uses double-checked locking to ensure thread-safe startup of 
the cleanup thread.
-     */
-    public static void start() {
-        if (isStarted.compareAndSet(false, true)) {
-            synchronized (RemoteFSPhantomManager.class) {
-                LOG.info("Starting cleanup thread for RemoteFileSystem 
objects");
-                if (cleanupExecutor == null) {
-                    CustomThreadFactory threadFactory = new 
CustomThreadFactory("remote-fs-phantom-cleanup");
-                    cleanupExecutor = Executors.newScheduledThreadPool(1, 
threadFactory);
-                    cleanupExecutor.scheduleAtFixedRate(() -> {
-                        Reference<? extends RemoteFileSystem> ref;
-                        while ((ref = referenceQueue.poll()) != null) {
-                            DFSFileSystemPhantomReference phantomRef = 
(DFSFileSystemPhantomReference) ref;
-
-                            FileSystem fs = referenceMap.remove(phantomRef);
-                            if (fs != null) {
-                                try {
-                                    fs.close();
-                                    fsSet.remove(fs);
-                                    LOG.info("Closed file system: {}", 
fs.getUri());
-                                } catch (IOException e) {
-                                    LOG.warn("Failed to close file system", e);
-                                }
-                            }
-                        }
-                    }, 0, 1, TimeUnit.MINUTES);
-                }
-            }
-        }
-    }
-
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 8faf29f12d1..32694b8ad40 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -585,7 +585,6 @@ public class GsonUtils {
         // New metadata uses FileSystemDescriptor, which does not require 
these registrations.
         String[][] subtypes = {
             {"BrokerFileSystem",  
"org.apache.doris.fs.remote.BrokerFileSystem"},
-            {"DFSFileSystem",     
"org.apache.doris.fs.remote.dfs.DFSFileSystem"},
             {"S3FileSystem",      "org.apache.doris.fs.remote.S3FileSystem"},
             {"AzureFileSystem",   
"org.apache.doris.fs.remote.AzureFileSystem"},
         };
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java
deleted file mode 100644
index fcf001dd079..00000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.external.iceberg;
-
-import org.apache.doris.common.UserException;
-import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties;
-import org.apache.doris.datasource.property.storage.StorageProperties;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import com.google.common.collect.Maps;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.junit.Ignore;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Map;
-
-public class IcebergHadoopCatalogTest {
-    FileSystem nativeFs;
-    private static final PathFilter TABLE_FILTER = (path) -> 
path.getName().endsWith(".metadata.json");
-
-    @Ignore
-    // This logic is same as HadoopCatalog.listNamespaces in Iceberg 1.4.3.
-    // So that we can use this to test the behavior.
-    // Set correct properties to test.
-    public void testHadoopCatalogListNamespaces() throws UserException, 
IOException {
-        Map<String, String> properties = Maps.newHashMap();
-        properties.put("cos.access_key", "xxx");
-        properties.put("cos.secret_key", "yyy");
-        properties.put("cos.endpoint", "cos.ap-beijing.myqcloud.com");
-        properties.put("cos.region", "ap-beijing");
-        String pathStr = "cosn://bucket1/namespace";
-        DFSFileSystem fs = new DFSFileSystem(
-                (HdfsCompatibleProperties) 
StorageProperties.createPrimary(properties));
-        nativeFs = fs.nativeFileSystem(new Path(pathStr));
-
-        RemoteIterator<FileStatus> it = nativeFs.listStatusIterator(new 
Path(pathStr));
-        while (it.hasNext()) {
-            Path path = (it.next()).getPath();
-            if (isNamespace(path)) {
-                System.out.println(path);
-            }
-        }
-    }
-
-    private boolean isNamespace(Path path) throws IOException {
-        return this.isDirectory(path) && !this.isTableDir(path);
-    }
-
-    private boolean isDirectory(Path path) throws IOException {
-        try {
-            FileStatus fileStatus = this.nativeFs.getFileStatus(path);
-            return fileStatus.isDirectory();
-        } catch (FileNotFoundException var3) {
-            return false;
-        } catch (IOException var4) {
-            throw var4;
-        }
-    }
-
-    private boolean isTableDir(Path path) throws IOException {
-        Path metadataPath = new Path(path, "metadata");
-
-        try {
-            return this.nativeFs.listStatus(metadataPath, TABLE_FILTER).length 
>= 1;
-        } catch (FileNotFoundException var4) {
-            return false;
-        } catch (IOException var5) {
-            throw var5;
-        }
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to