This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-fs-spi in repository https://gitbox.apache.org/repos/asf/doris.git
commit 8733d0e8013b9b5e7bffccd716332eaf6895a11b Author: morningman <[email protected]> AuthorDate: Thu Apr 2 00:00:19 2026 +0800 [refactor](fs-spi) P4.8-G3: delete DFSFileSystem and its phantom reference infrastructure ### What problem does this PR solve? Issue Number: N/A Problem Summary: Delete DFSFileSystem (the legacy HDFS wrapper class) and its supporting classes now that all callers have been migrated: - DFSFileSystem.java: deleted after migrating ExternalCatalog and HMSExternalCatalog away from its static members (PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH constant and getHdfsConf() method). These are now inlined directly in the callers using HdfsConfiguration and the literal string 'ipc.client.fallback-to-simple-auth-allowed'. - DFSFileSystemPhantomReference.java: helper class for phantom reference tracking, only used within the dfs package - RemoteFSPhantomManager.java: background cleanup thread for Hadoop FileSystem objects, only called from DFSFileSystem.nativeFileSystem() - IcebergHadoopCatalogTest.java: @Ignore test with no assertions, purely manual exploration code using DFSFileSystem.nativeFileSystem() Also removes DFSFileSystem from the GsonUtils reflection array. ### Release note None ### Check List (For Author) - Test: No need to test (deleted classes have no production callers; ExternalCatalog and HMSExternalCatalog behavior is unchanged — same Hadoop config semantics) - Behavior changed: No - Does this need documentation: No Co-authored-by: Copilot <[email protected]> --- .../apache/doris/datasource/ExternalCatalog.java | 9 +- .../doris/datasource/hive/HMSExternalCatalog.java | 3 +- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 582 --------------------- .../remote/dfs/DFSFileSystemPhantomReference.java | 44 -- .../fs/remote/dfs/RemoteFSPhantomManager.java | 127 ----- .../org/apache/doris/persist/gson/GsonUtils.java | 1 - .../external/iceberg/IcebergHadoopCatalogTest.java | 91 ---- 7 files changed, 7 insertions(+), 850 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 6240c837567..43717911ca1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -55,7 +55,6 @@ import org.apache.doris.datasource.paimon.PaimonExternalDatabase; import org.apache.doris.datasource.test.TestExternalCatalog; import org.apache.doris.datasource.test.TestExternalDatabase; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.info.TableNameInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; import org.apache.doris.persist.CreateDbInfo; @@ -79,6 +78,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -223,7 +223,10 @@ public abstract class ExternalCatalog } private Configuration buildConf() { - Configuration conf = DFSFileSystem.getHdfsConf(ifNotSetFallbackToSimpleAuth()); + Configuration conf = new HdfsConfiguration(); + if (ifNotSetFallbackToSimpleAuth()) { + conf.set("ipc.client.fallback-to-simple-auth-allowed", "true"); + } Map<String, String> catalogProperties = catalogProperty.getHadoopProperties(); for (Map.Entry<String, String> entry : catalogProperties.entrySet()) { conf.set(entry.getKey(), entry.getValue()); @@ -273,7 +276,7 @@ public abstract class ExternalCatalog // we need check auth fallback for kerberos or simple public boolean ifNotSetFallbackToSimpleAuth() { - return catalogProperty.getOrDefault(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "").isEmpty(); + return catalogProperty.getOrDefault("ipc.client.fallback-to-simple-auth-allowed", "").isEmpty(); } // Will be called when creating catalog(not replaying). diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java index 1feeb59c241..03246a1f75a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java @@ -34,7 +34,6 @@ import org.apache.doris.datasource.metacache.CacheSpec; import org.apache.doris.datasource.operations.ExternalMetadataOperations; import org.apache.doris.datasource.property.metastore.AbstractHiveProperties; import org.apache.doris.fs.SpiSwitchingFileSystem; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.transaction.TransactionManagerFactory; import com.google.common.annotations.VisibleForTesting; @@ -227,7 +226,7 @@ public class HMSExternalCatalog extends ExternalCatalog { super.setDefaultPropsIfMissing(isReplay); if (ifNotSetFallbackToSimpleAuth()) { // always allow fallback to simple auth, so to support both kerberos and simple auth - catalogProperty.addProperty(DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true"); + catalogProperty.addProperty("ipc.client.fallback-to-simple-auth-allowed", "true"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java deleted file mode 100644 index 8d98a7bc5fb..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ /dev/null @@ -1,582 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.remote.dfs; - -import org.apache.doris.backup.Status; -import org.apache.doris.common.UserException; -import org.apache.doris.common.security.authentication.HadoopAuthenticator; -import org.apache.doris.common.util.S3Util; -import org.apache.doris.common.util.URI; -import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties; -import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.foundation.fs.FsStorageType; -import org.apache.doris.fs.operations.HDFSFileOperations; -import org.apache.doris.fs.operations.HDFSOpParams; -import org.apache.doris.fs.operations.OpParams; -import org.apache.doris.fs.remote.RemoteFile; -import org.apache.doris.fs.remote.RemoteFileSystem; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.FileVisitOption; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Set; - -public class DFSFileSystem extends RemoteFileSystem { - - public static final String PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH = "ipc.client.fallback-to-simple-auth-allowed"; - private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class); - private HDFSFileOperations operations = null; - private final HdfsCompatibleProperties hdfsProperties; - protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null; - - public DFSFileSystem(HdfsCompatibleProperties hdfsProperties) { - super(FsStorageType.HDFS.name(), FsStorageType.HDFS); - this.properties.putAll(hdfsProperties.getOrigProps()); - this.hdfsProperties = hdfsProperties; - } - - @Override - public StorageProperties getStorageProperties() { - return hdfsProperties; - } - - @Override - public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) { - try { - Path locatedPath = new Path(remotePath); - org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(locatedPath); - RemoteIterator<LocatedFileStatus> locatedFiles = getLocatedFiles(recursive, fileSystem, locatedPath); - while (locatedFiles.hasNext()) { - LocatedFileStatus fileStatus = locatedFiles.next(); - RemoteFile location = new RemoteFile( - fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(), - fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations()); - result.add(location); - } - } catch (FileNotFoundException e) { - return new Status(Status.ErrCode.NOT_FOUND, e.getMessage()); - } catch (Exception e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); - } - return Status.OK; - } - - - @Override - public Status listDirectories(String remotePath, Set<String> result) { - try { - Path locatedPath = new Path(remotePath); - FileSystem fileSystem = nativeFileSystem(locatedPath); - FileStatus[] fileStatuses = getFileStatuses(locatedPath, fileSystem); - result.addAll( - Arrays.stream(fileStatuses) - .filter(FileStatus::isDirectory) - .map(file -> file.getPath().toString() + "/") - .collect(ImmutableSet.toImmutableSet())); - } catch (Exception e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); - } - return Status.OK; - } - - public DFSFileSystem(HdfsCompatibleProperties hdfsProperties, FsStorageType storageType) { - super(storageType.name(), storageType); - this.properties.putAll(hdfsProperties.getOrigProps()); - this.hdfsProperties = hdfsProperties; - } - - @VisibleForTesting - public FileSystem nativeFileSystem(Path remotePath) throws IOException { - if (closed.get()) { - throw new IOException("FileSystem is closed."); - } - if (dfsFileSystem == null) { - synchronized (this) { - if (closed.get()) { - throw new IOException("FileSystem is closed."); - } - if (dfsFileSystem == null) { - try { - dfsFileSystem = hdfsProperties.getHadoopAuthenticator().doAs(() -> { - try { - Configuration originalConf = hdfsProperties.getHadoopStorageConfig(); - // Create a copy of the original configuration to avoid modifying global settings - Configuration confCopy = new Configuration(originalConf); - // Disable FileSystem caching to ensure a new instance is created every time - // Reason: We manage the lifecycle of FileSystem instances manually. - // Even if the caller doesn't explicitly close the instance, we will do so when needed. - // However, since Hadoop caches FileSystem instances by default, - // other parts of the system may still be using the same instance. - // If we close the shared instance here, it could break those other users. - // Therefore, we disable the cache to ensure isolated, non-shared instances. - confCopy.setBoolean("fs.hdfs.impl.disable.cache", true); - return FileSystem.get(remotePath.toUri(), confCopy); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - operations = new HDFSFileOperations(dfsFileSystem); - RemoteFSPhantomManager.registerPhantomReference(this); - } catch (Exception e) { - throw new IOException("Failed to get dfs FileSystem for " + e.getMessage(), e); - } - } - } - } - return dfsFileSystem; - } - - protected RemoteIterator<LocatedFileStatus> getLocatedFiles(boolean recursive, - FileSystem fileSystem, Path locatedPath) throws IOException { - return hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.listFiles(locatedPath, recursive)); - } - - protected FileStatus[] getFileStatuses(Path remotePath, FileSystem fileSystem) throws IOException { - return hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.listStatus(remotePath)); - } - - public static Configuration getHdfsConf(boolean fallbackToSimpleAuth) { - Configuration hdfsConf = new HdfsConfiguration(); - if (fallbackToSimpleAuth) { - // need support fallback to simple if the cluster is a mixture of kerberos and simple auth. - hdfsConf.set(PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH, "true"); - } - return hdfsConf; - } - - @Override - public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { - if (LOG.isDebugEnabled()) { - LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize); - } - final long start = System.currentTimeMillis(); - HDFSOpParams hdfsOpParams = OpParams.of(remoteFilePath); - Status st = operations.openReader(hdfsOpParams); - if (st != Status.OK) { - return st; - } - FSDataInputStream fsDataInputStream = hdfsOpParams.fsDataInputStream(); - LOG.info("finished to open reader. download {} to {}.", remoteFilePath, localFilePath); - - // delete local file if exist - File localFile = new File(localFilePath); - if (localFile.exists()) { - try { - Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS).sorted(Comparator.reverseOrder()) - .map(java.nio.file.Path::toFile).forEach(File::delete); - } catch (IOException e) { - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to delete exist local file: " + localFilePath + ", msg: " + e.getMessage()); - } - } - // create local file - try { - if (!localFile.createNewFile()) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + localFilePath); - } - } catch (IOException e) { - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to create local file: " + localFilePath + ", msg: " + e.getMessage()); - } - - String lastErrMsg; - Status status = Status.OK; - try (BufferedOutputStream out = new BufferedOutputStream(Files.newOutputStream(localFile.toPath()))) { - final long bufSize = 1024 * 1024; // 1MB - long leftSize = fileSize; - long readOffset = 0; - while (leftSize > 0) { - long readLen = Math.min(leftSize, bufSize); - try { - ByteBuffer data = readStreamBuffer(fsDataInputStream, readOffset, readLen); - if (readLen != data.array().length) { - LOG.warn( - "the actual read length does not equal to " - + "the expected read length: {} vs. {}, file: {}", - data.array().length, readLen, remoteFilePath); - } - // write local file - out.write(data.array()); - readOffset += data.array().length; - leftSize -= data.array().length; - } catch (Exception e) { - lastErrMsg = String.format( - "failed to read. " + "current read offset: %d, read length: %d," - + " file size: %d, file: %s. msg: %s", - readOffset, readLen, fileSize, remoteFilePath, e.getMessage()); - LOG.warn(lastErrMsg); - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - break; - } - } - } catch (IOException e) { - return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage()); - } finally { - Status closeStatus = operations.closeReader(OpParams.of(fsDataInputStream)); - if (!closeStatus.ok()) { - LOG.warn(closeStatus.getErrMsg()); - if (status.ok()) { - // we return close write error only if no other error has been encountered. - status = closeStatus; - } - } - } - - LOG.info("finished to download from {} to {} with size: {}. cost {} ms", remoteFilePath, localFilePath, - fileSize, (System.currentTimeMillis() - start)); - return status; - } - - /** - * read data from fsDataInputStream. - * - * @param fsDataInputStream input stream for read. - * @param readOffset read offset. - * @param length read length. - * @return ByteBuffer - * @throws IOException when read data error. - */ - private static ByteBuffer readStreamBuffer(FSDataInputStream fsDataInputStream, long readOffset, long length) - throws IOException { - synchronized (fsDataInputStream) { - long currentStreamOffset; - try { - currentStreamOffset = fsDataInputStream.getPos(); - } catch (IOException e) { - LOG.warn("errors while get file pos from output stream", e); - throw new IOException("errors while get file pos from output stream", e); - } - if (currentStreamOffset != readOffset) { - // it's ok, when reading some format like parquet, it is not a sequential read - if (LOG.isDebugEnabled()) { - LOG.debug("invalid offset, current read offset is " + currentStreamOffset - + " is not equal to request offset " + readOffset + " seek to it"); - } - try { - fsDataInputStream.seek(readOffset); - } catch (IOException e) { - throw new IOException(String.format( - "current read offset %d is not equal to %d, and could not seek to it, msg: %s", - currentStreamOffset, readOffset, e.getMessage())); - } - } - // Avoid using the ByteBuffer based read for Hadoop because some - // FSDataInputStream - // implementations are not ByteBufferReadable, - // See https://issues.apache.org/jira/browse/HADOOP-14603 - byte[] buf; - if (length > HDFSFileOperations.READ_BUFFER_SIZE) { - buf = new byte[HDFSFileOperations.READ_BUFFER_SIZE]; - } else { - buf = new byte[(int) length]; - } - try { - int readLength = readBytesFully(fsDataInputStream, buf); - if (readLength < 0) { - throw new IOException("end of file reached"); - } - if (LOG.isDebugEnabled()) { - LOG.debug( - "read buffer from input stream, buffer size:" + buf.length + ", read length:" + readLength); - } - return ByteBuffer.wrap(buf, 0, readLength); - } catch (IOException e) { - LOG.warn("errors while read data from stream", e); - throw new IOException("errors while read data from stream " + e.getMessage()); - } - } - } - - private static int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException { - int readLength = 0; - while (readLength < dest.length) { - int availableReadLength = dest.length - readLength; - int n = is.read(dest, readLength, availableReadLength); - if (n <= 0) { - break; - } - readLength += n; - } - return readLength; - } - - @Override - public Status exists(String remotePath) { - try { - URI pathUri = URI.create(remotePath); - Path inputFilePath = new Path(pathUri.getLocation()); - FileSystem fileSystem = nativeFileSystem(inputFilePath); - boolean isPathExist = hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.exists(inputFilePath)); - if (!isPathExist) { - return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); - } - return Status.OK; - } catch (Exception e) { - LOG.warn("errors while check path exist " + remotePath, e); - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to check remote path exist: " + remotePath + ". msg: " + e.getMessage()); - } - } - - @Override - public Status directUpload(String content, String remoteFile) { - HDFSOpParams hdfsOpParams = OpParams.of(remoteFile); - Status wst = operations.openWriter(hdfsOpParams); - if (wst != Status.OK) { - return wst; - } - FSDataOutputStream fsDataOutputStream = hdfsOpParams.fsDataOutputStream(); - LOG.info("finished to open writer. directly upload to remote path {}.", remoteFile); - - Status status = Status.OK; - try { - fsDataOutputStream.writeBytes(content); - } catch (IOException e) { - LOG.warn("errors while write data to output stream", e); - status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage()); - } finally { - Status closeStatus = operations.closeWriter(OpParams.of(fsDataOutputStream)); - if (!closeStatus.ok()) { - LOG.warn(closeStatus.getErrMsg()); - if (status.ok()) { - status = closeStatus; - } - } - } - return status; - } - - @Override - public Status upload(String localPath, String remotePath) { - long start = System.currentTimeMillis(); - if (LOG.isDebugEnabled()) { - LOG.debug("local path {}, remote path {}", localPath, remotePath); - } - HDFSOpParams hdfsOpParams = OpParams.of(remotePath); - Status wst = operations.openWriter(hdfsOpParams); - if (wst != Status.OK) { - return wst; - } - FSDataOutputStream fsDataOutputStream = hdfsOpParams.fsDataOutputStream(); - LOG.info("finished to open writer. directly upload to remote path {}.", remotePath); - // read local file and write remote - File localFile = new File(localPath); - long fileLength = localFile.length(); - byte[] readBuf = new byte[1024]; - Status status = new Status(Status.ErrCode.OK, ""); - try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(localFile))) { - // save the last err msg - String lastErrMsg = null; - // save the current write offset of remote file - long writeOffset = 0; - // read local file, 1MB at a time - int bytesRead; - while ((bytesRead = in.read(readBuf)) != -1) { - try { - fsDataOutputStream.write(readBuf, 0, bytesRead); - } catch (IOException e) { - LOG.warn("errors while write data to output stream", e); - lastErrMsg = String.format( - "failed to write hdfs. current write offset: %d, write length: %d, " - + "file length: %d, file: %s, msg: errors while write data to output stream", - writeOffset, bytesRead, fileLength, remotePath); - status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); - break; - } - - // write succeed, update current write offset - writeOffset += bytesRead; - } // end of read local file loop - } catch (FileNotFoundException e1) { - return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e1.getMessage()); - } catch (IOException e1) { - return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage()); - } finally { - Status closeStatus = operations.closeWriter(OpParams.of(fsDataOutputStream)); - if (!closeStatus.ok()) { - LOG.warn(closeStatus.getErrMsg()); - if (status.ok()) { - // we return close write error only if no other error has been encountered. - status = closeStatus; - } - } - } - - if (status.ok()) { - LOG.info("finished to upload {} to remote path {}. cost: {} ms", localPath, remotePath, - (System.currentTimeMillis() - start)); - } - return status; - } - - @Override - public Status rename(String srcPath, String destPath) { - long start = System.currentTimeMillis(); - try { - URI srcPathUri = URI.create(srcPath); - URI destPathUri = URI.create(destPath); - if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) { - return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system"); - } - FileSystem fileSystem = nativeFileSystem(new Path(destPath)); - Path srcfilePath = new Path(srcPathUri.getPath()); - Path destfilePath = new Path(destPathUri.getPath()); - boolean isRenameSuccess = hdfsProperties.getHadoopAuthenticator().doAs(() - -> fileSystem.rename(srcfilePath, destfilePath)); - if (!isRenameSuccess) { - return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath); - } - } catch (UserException e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); - } catch (IOException e) { - LOG.warn("errors while rename path from " + srcPath + " to " + destPath); - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to rename remote " + srcPath + " to " + destPath + ", msg: " + e.getMessage()); - } - LOG.info("finished to rename {} to {}. cost: {} ms", srcPath, destPath, (System.currentTimeMillis() - start)); - return Status.OK; - } - - @Override - public Status delete(String remotePath) { - try { - URI pathUri = URI.create(remotePath); - Path inputFilePath = new Path(pathUri.getLocation()); - FileSystem fileSystem = nativeFileSystem(inputFilePath); - hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.delete(inputFilePath, true)); - } catch (UserException e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); - } catch (IOException e) { - LOG.warn("errors while delete path " + remotePath); - return new Status(Status.ErrCode.COMMON_ERROR, - "failed to delete remote path: " + remotePath + ", msg: " + e.getMessage()); - } - LOG.info("finished to delete remote path {}.", remotePath); - return Status.OK; - } - - /** - * get files in remotePath of HDFS. - * - * @param remotePath hdfs://namenode:port/path. - * @param result files in remotePath. - * @param fileNameOnly means get file only in remotePath if true. - * @return Status.OK if success. - */ - @Override - public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) { - try { - URI pathUri = URI.create(remotePath); - Path pathPattern = new Path(S3Util.extendGlobs(pathUri.getLocation())); - FileSystem fileSystem = nativeFileSystem(pathPattern); - FileStatus[] files = hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.globStatus(pathPattern)); - if (files == null) { - LOG.info("no files in path " + remotePath); - return Status.OK; - } - for (FileStatus fileStatus : files) { - RemoteFile remoteFile = new RemoteFile( - fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), - !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), - fileStatus.getBlockSize(), fileStatus.getModificationTime()); - result.add(remoteFile); - } - } catch (FileNotFoundException e) { - LOG.info("file not found: " + e.getMessage()); - return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage()); - } catch (Exception e) { - LOG.warn("errors while get file status ", e); - return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage()); - } - LOG.info("finish list path {}", remotePath); - return Status.OK; - } - - @Override - public Status makeDir(String remotePath) { - try { - Path locatedPath = new Path(remotePath); - FileSystem fileSystem = nativeFileSystem(locatedPath); - if (!hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.mkdirs(locatedPath))) { - LOG.warn("failed to make dir for " + remotePath); - return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath); - } - } catch (Exception e) { - LOG.warn("failed to make dir for {}, exception:", remotePath, e); - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); - } - return Status.OK; - } - - @VisibleForTesting - public HadoopAuthenticator getAuthenticator() { - return hdfsProperties.getHadoopAuthenticator(); - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - try { - if (dfsFileSystem != null) { - dfsFileSystem.close(); - } - } catch (IOException e) { - LOG.warn("Failed to close DFSFileSystem: {}", e.getMessage(), e); - } - } - } - - public FileStatus getFileStatus(Path path) throws IOException { - FileSystem fileSystem = nativeFileSystem(path); - return hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.getFileStatus(path)); - } - - public FSDataInputStream openFile(Path path) throws IOException { - FileSystem fileSystem = nativeFileSystem(path); - return hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.open(path)); - } - - public FSDataOutputStream createFile(Path path, boolean overwrite) throws IOException { - FileSystem fileSystem = nativeFileSystem(path); - return hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.create(path, overwrite)); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystemPhantomReference.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystemPhantomReference.java deleted file mode 100644 index fe7c08080e5..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystemPhantomReference.java +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.remote.dfs; - -import org.apache.hadoop.fs.FileSystem; - -import java.lang.ref.PhantomReference; -import java.lang.ref.ReferenceQueue; - -public class DFSFileSystemPhantomReference extends PhantomReference<DFSFileSystem> { - - private FileSystem fs; - - /** - * Creates a new phantom reference that refers to the given object and - * is registered with the given queue. - * - * <p> It is possible to create a phantom reference with a {@code null} - * queue. Such a reference will never be enqueued. - * - * @param referent the object the new phantom reference will refer to - * @param q the queue with which the reference is to be registered, - * or {@code null} if registration is not required - */ - public DFSFileSystemPhantomReference(DFSFileSystem referent, ReferenceQueue<? super DFSFileSystem> q) { - super(referent, q); - this.fs = referent.dfsFileSystem; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/RemoteFSPhantomManager.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/RemoteFSPhantomManager.java deleted file mode 100644 index 6299936831c..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/RemoteFSPhantomManager.java +++ /dev/null @@ -1,127 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.remote.dfs; - -import org.apache.doris.common.CustomThreadFactory; -import org.apache.doris.fs.remote.RemoteFileSystem; - -import com.google.common.collect.Sets; -import org.apache.hadoop.fs.FileSystem; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.IOException; -import java.lang.ref.PhantomReference; -import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * The RemoteFSPhantomManager class is responsible for managing the phantom references - * of RemoteFileSystem objects. It ensures that the associated FileSystem resources are - * automatically cleaned up when the RemoteFileSystem objects are garbage collected. - * <p> - * By utilizing a ReferenceQueue and PhantomReference, this class can monitor the lifecycle - * of RemoteFileSystem objects. When a RemoteFileSystem object is no longer in use and is - * garbage collected, its corresponding FileSystem resource is properly closed to prevent - * resource leaks. - * <p> - * The class provides a thread-safe mechanism to ensure that the cleanup thread is started only once. - * <p> - * Main functionalities include: - * - Registering phantom references of RemoteFileSystem objects. - * - Starting a periodic cleanup thread that automatically closes unused FileSystem resources. - */ -public class RemoteFSPhantomManager { - - private static final Logger LOG = LogManager.getLogger(RemoteFSPhantomManager.class); - - // Scheduled executor for periodic resource cleanup - private static ScheduledExecutorService cleanupExecutor; - - // Reference queue for monitoring RemoteFileSystem objects' phantom references - private static final ReferenceQueue<DFSFileSystem> referenceQueue = new ReferenceQueue<>(); - - // Map storing the phantom references and their corresponding FileSystem objects - private static final ConcurrentHashMap<PhantomReference<DFSFileSystem>, FileSystem> referenceMap - = new ConcurrentHashMap<>(); - - private static final Set<FileSystem> fsSet = Sets.newConcurrentHashSet(); - - // Flag indicating whether the cleanup thread has been started - private static final AtomicBoolean isStarted = new AtomicBoolean(false); - - /** - * Registers a phantom reference for a RemoteFileSystem object in the manager. - * If the cleanup thread has not been started, it will be started. - * - * @param remoteFileSystem the RemoteFileSystem object to be registered - */ - public static void registerPhantomReference(DFSFileSystem remoteFileSystem) { - if (!isStarted.get()) { - start(); - isStarted.set(true); - } - if (fsSet.contains(remoteFileSystem.dfsFileSystem)) { - throw new RuntimeException("FileSystem already exists: " + remoteFileSystem.dfsFileSystem.getUri()); - } - DFSFileSystemPhantomReference phantomReference = new DFSFileSystemPhantomReference(remoteFileSystem, - referenceQueue); - referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem); - fsSet.add(remoteFileSystem.dfsFileSystem); - } - - /** - * Starts the cleanup thread, which periodically checks and cleans up unused FileSystem resources. - * The method uses double-checked locking to ensure thread-safe startup of the cleanup thread. - */ - public static void start() { - if (isStarted.compareAndSet(false, true)) { - synchronized (RemoteFSPhantomManager.class) { - LOG.info("Starting cleanup thread for RemoteFileSystem objects"); - if (cleanupExecutor == null) { - CustomThreadFactory threadFactory = new CustomThreadFactory("remote-fs-phantom-cleanup"); - cleanupExecutor = Executors.newScheduledThreadPool(1, threadFactory); - cleanupExecutor.scheduleAtFixedRate(() -> { - Reference<? extends RemoteFileSystem> ref; - while ((ref = referenceQueue.poll()) != null) { - DFSFileSystemPhantomReference phantomRef = (DFSFileSystemPhantomReference) ref; - - FileSystem fs = referenceMap.remove(phantomRef); - if (fs != null) { - try { - fs.close(); - fsSet.remove(fs); - LOG.info("Closed file system: {}", fs.getUri()); - } catch (IOException e) { - LOG.warn("Failed to close file system", e); - } - } - } - }, 0, 1, TimeUnit.MINUTES); - } - } - } - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 8faf29f12d1..32694b8ad40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -585,7 +585,6 @@ public class GsonUtils { // New metadata uses FileSystemDescriptor, which does not require these registrations. String[][] subtypes = { {"BrokerFileSystem", "org.apache.doris.fs.remote.BrokerFileSystem"}, - {"DFSFileSystem", "org.apache.doris.fs.remote.dfs.DFSFileSystem"}, {"S3FileSystem", "org.apache.doris.fs.remote.S3FileSystem"}, {"AzureFileSystem", "org.apache.doris.fs.remote.AzureFileSystem"}, }; diff --git a/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java b/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java deleted file mode 100644 index fcf001dd079..00000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/external/iceberg/IcebergHadoopCatalogTest.java +++ /dev/null @@ -1,91 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.external.iceberg; - -import org.apache.doris.common.UserException; -import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties; -import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; - -import com.google.common.collect.Maps; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.fs.RemoteIterator; -import org.junit.Ignore; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Map; - -public class IcebergHadoopCatalogTest { - FileSystem nativeFs; - private static final PathFilter TABLE_FILTER = (path) -> path.getName().endsWith(".metadata.json"); - - @Ignore - // This logic is same as HadoopCatalog.listNamespaces in Iceberg 1.4.3. - // So that we can use this to test the behavior. - // Set correct properties to test. - public void testHadoopCatalogListNamespaces() throws UserException, IOException { - Map<String, String> properties = Maps.newHashMap(); - properties.put("cos.access_key", "xxx"); - properties.put("cos.secret_key", "yyy"); - properties.put("cos.endpoint", "cos.ap-beijing.myqcloud.com"); - properties.put("cos.region", "ap-beijing"); - String pathStr = "cosn://bucket1/namespace"; - DFSFileSystem fs = new DFSFileSystem( - (HdfsCompatibleProperties) StorageProperties.createPrimary(properties)); - nativeFs = fs.nativeFileSystem(new Path(pathStr)); - - RemoteIterator<FileStatus> it = nativeFs.listStatusIterator(new Path(pathStr)); - while (it.hasNext()) { - Path path = (it.next()).getPath(); - if (isNamespace(path)) { - System.out.println(path); - } - } - } - - private boolean isNamespace(Path path) throws IOException { - return this.isDirectory(path) && !this.isTableDir(path); - } - - private boolean isDirectory(Path path) throws IOException { - try { - FileStatus fileStatus = this.nativeFs.getFileStatus(path); - return fileStatus.isDirectory(); - } catch (FileNotFoundException var3) { - return false; - } catch (IOException var4) { - throw var4; - } - } - - private boolean isTableDir(Path path) throws IOException { - Path metadataPath = new Path(path, "metadata"); - - try { - return this.nativeFs.listStatus(metadataPath, TABLE_FILTER).length >= 1; - } catch (FileNotFoundException var4) { - return false; - } catch (IOException var5) { - throw var5; - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
