This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7347cbc6352 [Fix](hdfs-fs)The cache expiration should explicitly
release the held fs #38610 (#40504)
7347cbc6352 is described below
commit 7347cbc63523300a2573b1fbd54b81382ec1e2c4
Author: Calvin Kirs <[email protected]>
AuthorDate: Sat Sep 7 23:45:24 2024 +0800
[Fix](hdfs-fs)The cache expiration should explicitly release the held fs
#38610 (#40504)
---
.../doris/fs/remote/RemoteFSPhantomManager.java | 117 +++++++++++++++++++++
.../apache/doris/fs/remote/RemoteFileSystem.java | 21 +++-
.../remote/RemoteFileSystemPhantomReference.java | 44 ++++++++
.../org/apache/doris/fs/remote/S3FileSystem.java | 26 +++--
.../apache/doris/fs/remote/dfs/DFSFileSystem.java | 50 +++++----
5 files changed, 231 insertions(+), 27 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
new file mode 100644
index 00000000000..282361c4cb6
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFSPhantomManager.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs.remote;
+
+import org.apache.doris.common.CustomThreadFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The RemoteFSPhantomManager class is responsible for managing the phantom
references
+ * of RemoteFileSystem objects. It ensures that the associated FileSystem
resources are
+ * automatically cleaned up when the RemoteFileSystem objects are garbage
collected.
+ * <p>
+ * By utilizing a ReferenceQueue and PhantomReference, this class can monitor
the lifecycle
+ * of RemoteFileSystem objects. When a RemoteFileSystem object is no longer in
use and is
+ * garbage collected, its corresponding FileSystem resource is properly closed
to prevent
+ * resource leaks.
+ * <p>
+ * The class provides a thread-safe mechanism to ensure that the cleanup
thread is started only once.
+ * <p>
+ * Main functionalities include:
+ * - Registering phantom references of RemoteFileSystem objects.
+ * - Starting a periodic cleanup thread that automatically closes unused
FileSystem resources.
+ */
+public class RemoteFSPhantomManager {
+
+ private static final Logger LOG =
LogManager.getLogger(RemoteFSPhantomManager.class);
+
+ // Scheduled executor for periodic resource cleanup
+ private static ScheduledExecutorService cleanupExecutor;
+
+ // Reference queue for monitoring RemoteFileSystem objects' phantom
references
+ private static final ReferenceQueue<RemoteFileSystem> referenceQueue = new
ReferenceQueue<>();
+
+ // Map storing the phantom references and their corresponding FileSystem
objects
+ private static final ConcurrentHashMap<PhantomReference<RemoteFileSystem>,
FileSystem> referenceMap
+ = new ConcurrentHashMap<>();
+
+ // Flag indicating whether the cleanup thread has been started
+ private static final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ /**
+ * Registers a phantom reference for a RemoteFileSystem object in the
manager.
+ * If the cleanup thread has not been started, it will be started.
+ *
+ * @param remoteFileSystem the RemoteFileSystem object to be registered
+ */
+ public static void registerPhantomReference(RemoteFileSystem
remoteFileSystem) {
+ if (!isStarted.get()) {
+ start();
+ isStarted.set(true);
+ }
+ RemoteFileSystemPhantomReference phantomReference = new
RemoteFileSystemPhantomReference(remoteFileSystem,
+ referenceQueue);
+ referenceMap.put(phantomReference, remoteFileSystem.dfsFileSystem);
+ }
+
+ /**
+ * Starts the cleanup thread, which periodically checks and cleans up
unused FileSystem resources.
+ * The method uses double-checked locking to ensure thread-safe startup of
the cleanup thread.
+ */
+ public static void start() {
+ if (isStarted.compareAndSet(false, true)) {
+ synchronized (RemoteFSPhantomManager.class) {
+ LOG.info("Starting cleanup thread for RemoteFileSystem
objects");
+ if (cleanupExecutor == null) {
+ CustomThreadFactory threadFactory = new
CustomThreadFactory("remote-fs-phantom-cleanup");
+ cleanupExecutor = Executors.newScheduledThreadPool(1,
threadFactory);
+ cleanupExecutor.scheduleAtFixedRate(() -> {
+ Reference<? extends RemoteFileSystem> ref;
+ while ((ref = referenceQueue.poll()) != null) {
+ RemoteFileSystemPhantomReference phantomRef =
(RemoteFileSystemPhantomReference) ref;
+
+ FileSystem fs = referenceMap.remove(phantomRef);
+ if (fs != null) {
+ try {
+ fs.close();
+ LOG.info("Closed file system: {}",
fs.getUri());
+ } catch (IOException e) {
+ LOG.warn("Failed to close file system", e);
+ }
+ }
+ }
+ }, 0, 1, TimeUnit.MINUTES);
+ }
+ }
+ }
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
index ffe63f20ac7..2149cdb4e1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java
@@ -26,13 +26,18 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
-public abstract class RemoteFileSystem extends PersistentFileSystem {
+public abstract class RemoteFileSystem extends PersistentFileSystem implements
Closeable {
// this field will be visited by multi-threads, better use volatile
qualifier
protected volatile org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
+ private final ReentrantLock fsLock = new ReentrantLock();
+ protected static final AtomicBoolean closed = new AtomicBoolean(false);
public RemoteFileSystem(String name, StorageBackend.StorageType type) {
super(name, type);
@@ -65,4 +70,18 @@ public abstract class RemoteFileSystem extends
PersistentFileSystem {
}
return new RemoteFiles(locations);
}
+
+ @Override
+ public void close() throws IOException {
+ fsLock.lock();
+ try {
+ if (!closed.getAndSet(true)) {
+ if (dfsFileSystem != null) {
+ dfsFileSystem.close();
+ }
+ }
+ } finally {
+ fsLock.unlock();
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java
new file mode 100644
index 00000000000..89506c7b212
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystemPhantomReference.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs.remote;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+
+public class RemoteFileSystemPhantomReference extends
PhantomReference<RemoteFileSystem> {
+
+ private FileSystem fs;
+
+ /**
+ * Creates a new phantom reference that refers to the given object and
+ * is registered with the given queue.
+ *
+ * <p> It is possible to create a phantom reference with a {@code null}
+ * queue. Such a reference will never be enqueued.
+ *
+ * @param referent the object the new phantom reference will refer to
+ * @param q the queue with which the reference is to be registered,
+ * or {@code null} if registration is not required
+ */
+ public RemoteFileSystemPhantomReference(RemoteFileSystem referent,
ReferenceQueue<? super RemoteFileSystem> q) {
+ super(referent, q);
+ this.fs = referent.dfsFileSystem;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index f91c50d7099..7d4b9d797ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -57,14 +57,26 @@ public class S3FileSystem extends ObjFileSystem {
@Override
protected FileSystem nativeFileSystem(String remotePath) throws
UserException {
+ //todo Extracting a common method to achieve logic reuse
+ if (closed.get()) {
+ throw new UserException("FileSystem is closed.");
+ }
if (dfsFileSystem == null) {
- Configuration conf = new Configuration();
- System.setProperty("com.amazonaws.services.s3.enableV4", "true");
-
PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set);
- try {
- dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(),
conf);
- } catch (Exception e) {
- throw new UserException("Failed to get S3 FileSystem for " +
e.getMessage(), e);
+ synchronized (this) {
+ if (closed.get()) {
+ throw new UserException("FileSystem is closed.");
+ }
+ if (dfsFileSystem == null) {
+ Configuration conf = new Configuration();
+ System.setProperty("com.amazonaws.services.s3.enableV4",
"true");
+
PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set);
+ try {
+ dfsFileSystem = FileSystem.get(new
Path(remotePath).toUri(), conf);
+ } catch (Exception e) {
+ throw new UserException("Failed to get S3 FileSystem
for " + e.getMessage(), e);
+ }
+ RemoteFSPhantomManager.registerPhantomReference(this);
+ }
}
}
return dfsFileSystem;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 25ecafda468..ee7fddf7ac6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.util.URI;
import org.apache.doris.fs.operations.HDFSFileOperations;
import org.apache.doris.fs.operations.HDFSOpParams;
import org.apache.doris.fs.operations.OpParams;
+import org.apache.doris.fs.remote.RemoteFSPhantomManager;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
@@ -73,30 +74,41 @@ public class DFSFileSystem extends RemoteFileSystem {
@Override
protected FileSystem nativeFileSystem(String remotePath) throws
UserException {
- if (dfsFileSystem != null) {
- return dfsFileSystem;
+ if (closed.get()) {
+ throw new UserException("FileSystem is closed.");
}
- Configuration conf = new HdfsConfiguration();
- for (Map.Entry<String, String> propEntry : properties.entrySet()) {
- conf.set(propEntry.getKey(), propEntry.getValue());
- }
+ if (dfsFileSystem == null) {
+ synchronized (this) {
+ if (closed.get()) {
+ throw new UserException("FileSystem is closed.");
+ }
+ if (dfsFileSystem == null) {
- UserGroupInformation ugi = login(conf);
- try {
- dfsFileSystem = ugi.doAs((PrivilegedAction<FileSystem>) () -> {
- try {
- return FileSystem.get(new Path(remotePath).toUri(), conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ Configuration conf = new HdfsConfiguration();
+ for (Map.Entry<String, String> propEntry :
properties.entrySet()) {
+ conf.set(propEntry.getKey(), propEntry.getValue());
+ }
+
+ UserGroupInformation ugi = login(conf);
+ try {
+ dfsFileSystem =
ugi.doAs((PrivilegedAction<FileSystem>) () -> {
+ try {
+ return FileSystem.get(new
Path(remotePath).toUri(), conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (SecurityException e) {
+ throw new UserException(e);
+ }
+
+ Preconditions.checkNotNull(dfsFileSystem);
+ operations = new HDFSFileOperations(dfsFileSystem);
+ RemoteFSPhantomManager.registerPhantomReference(this);
}
- });
- } catch (SecurityException e) {
- throw new UserException(e);
+ }
}
-
- Preconditions.checkNotNull(dfsFileSystem);
- operations = new HDFSFileOperations(dfsFileSystem);
return dfsFileSystem;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]