This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-10685
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-10685 by this push:
     new cd3e1c7ca0 HDDS-12883. Close file descriptor of block file in datanode 
(#8319)
cd3e1c7ca0 is described below

commit cd3e1c7ca04ad2f5b6b446a91a81541cf1a0f801
Author: Sammi Chen <[email protected]>
AuthorDate: Thu Apr 24 15:28:22 2025 +0800

    HDDS-12883. Close file descriptor of block file in datanode (#8319)
---
 .../ozone/container/common/interfaces/Handler.java |  4 +--
 .../common/transport/server/Receiver.java          | 30 +++++++++++++---------
 .../ozone/container/keyvalue/KeyValueHandler.java  | 20 +++++++--------
 .../keyvalue/impl/ChunkManagerDispatcher.java      |  4 +--
 .../keyvalue/impl/FilePerBlockStrategy.java        | 15 ++++++-----
 .../keyvalue/interfaces/ChunkManager.java          |  8 +++---
 6 files changed, 45 insertions(+), 36 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index d79e962a47..88f5055f54 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -20,10 +20,10 @@
 import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.time.Clock;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -232,6 +232,6 @@ public void setClusterID(String clusterID) {
     this.clusterId = clusterID;
   }
 
-  public abstract FileDescriptor 
getBlockFileDescriptor(ContainerCommandRequestProto request)
+  public abstract RandomAccessFile getBlockFile(ContainerCommandRequestProto 
request)
       throws IOException;
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/Receiver.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/Receiver.java
index 594ac75e7d..27b1dc13dd 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/Receiver.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/Receiver.java
@@ -35,6 +35,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedChannelException;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -208,10 +209,10 @@ public void run() {
         if (responseProto.getResult() == ContainerProtos.Result.SUCCESS && 
type == ContainerProtos.Type.GetBlock) {
           // get FileDescriptor
           Handler handler = 
dispatcher.getHandler(ContainerProtos.ContainerType.KeyValueContainer);
-          FileDescriptor fd = handler.getBlockFileDescriptor(request);
-          Preconditions.checkNotNull(fd,
-              "Failed to get block InputStream for block " + 
request.getGetBlock().getBlockID());
-          entry.setFD(fd);
+          RandomAccessFile file = handler.getBlockFile(request);
+          Preconditions.checkNotNull(file,
+              "Failed to get block file for block " + 
request.getGetBlock().getBlockID());
+          entry.setFile(file);
         }
         entry.setResponse(responseProto);
         sendResponse(entry);
@@ -232,7 +233,7 @@ void sendResponse(TaskEntry entry) {
     lock.lock();
     try {
       entry.setSendStartTimeNs();
-      FileDescriptor fd = entry.getFD();
+      RandomAccessFile file = entry.getFile();
       DataOutputStream output = new DataOutputStream(new 
BufferedOutputStream(socketOut, bufferSize));
       output.writeShort(DATA_TRANSFER_VERSION);
       output.writeShort(type.getNumber());
@@ -242,10 +243,10 @@ void sendResponse(TaskEntry entry) {
             getBlockMapKey(entry.getRequest()), peer.getDomainSocket());
       }
       output.flush();
-      if (fd != null) {
+      if (file != null) {
         // send FileDescriptor
         FileDescriptor[] fds = new FileDescriptor[1];
-        fds[0] = fd;
+        fds[0] = file.getFD();
         DomainSocket sock = peer.getDomainSocket();
         // this API requires send at least one byte buf.
         sock.sendFileDescriptors(fds, buf, 0, buf.length);
@@ -258,6 +259,11 @@ void sendResponse(TaskEntry entry) {
     } finally {
       lock.unlock();
       entry.setSendFinishTimeNs();
+      try {
+        entry.getFile().close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close block file for {}", 
getBlockMapKey(entry.getRequest()), e);
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Request {} {}:{}, receive {} ns, in queue {} ns, " +
                 " handle {} ns, send out {} ns, total {} ns", type, 
responseProto.getClientId().toStringUtf8(),
@@ -277,7 +283,7 @@ private boolean isSupportedCmdType(ContainerProtos.Type 
type) {
   static class TaskEntry {
     private ContainerCommandRequestProto request;
     private ContainerCommandResponseProto response;
-    private FileDescriptor fileDescriptor;
+    private RandomAccessFile file;
     private long receiveStartTimeNs;
     private long inQueueStartTimeNs;
     private long outQueueStartTimeNs;
@@ -293,8 +299,8 @@ public ContainerCommandResponseProto getResponse() {
       return response;
     }
 
-    public FileDescriptor getFD() {
-      return fileDescriptor;
+    public RandomAccessFile getFile() {
+      return file;
     }
 
     public ContainerCommandRequestProto getRequest() {
@@ -329,8 +335,8 @@ public void setResponse(ContainerCommandResponseProto 
responseProto) {
       this.response = responseProto;
     }
 
-    public void setFD(FileDescriptor fd) {
-      this.fileDescriptor = fd;
+    public void setFile(RandomAccessFile f) {
+      this.file = f;
     }
 
     public void setSendStartTimeNs() {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 07a226ca76..1c17d34400 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -55,11 +55,11 @@
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Striped;
 import java.io.File;
-import java.io.FileDescriptor;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
@@ -153,8 +153,8 @@ public class KeyValueHandler extends Handler {
   // A striped lock that is held during container creation.
   private final Striped<Lock> containerCreationLocks;
   private static FaultInjector injector;
-  // map temporarily carries the FileDescriptor for short-circuit read requests
-  private final Map<String, FileDescriptor> streamMap = new 
ConcurrentHashMap<>();
+  // map temporarily carries the RandomAccessFile for short-circuit read 
requests
+  private final Map<String, RandomAccessFile> blockFileMap = new 
ConcurrentHashMap<>();
   private OzoneContainer ozoneContainer;
   private final Clock clock;
 
@@ -729,10 +729,10 @@ ContainerCommandResponseProto handleGetBlock(
             && ozoneContainer.getReadDomainSocketChannel() != null
             && ozoneContainer.getReadDomainSocketChannel().isStarted();
         if (domainSocketServerEnabled) {
-          FileDescriptor fd = chunkManager.getShortCircuitFd(kvContainer, 
blockID);
-          Preconditions.checkState(fd != null);
+          RandomAccessFile file = chunkManager.getShortCircuitFd(kvContainer, 
blockID);
+          Preconditions.checkState(file != null);
           String mapKey = getBlockMapKey(request);
-          streamMap.put(mapKey, fd);
+          blockFileMap.put(mapKey, file);
           shortCircuitGranted = true;
         }
       }
@@ -754,17 +754,17 @@ ContainerCommandResponseProto handleGetBlock(
   }
 
   @Override
-  public FileDescriptor getBlockFileDescriptor(ContainerCommandRequestProto 
request) throws IOException {
+  public RandomAccessFile getBlockFile(ContainerCommandRequestProto request) 
throws IOException {
     if (request.getCmdType() != Type.GetBlock) {
       throw new StorageContainerException("Request type mismatch, expected " + 
 Type.GetBlock +
           ", received " + request.getCmdType(), 
ContainerProtos.Result.MALFORMED_REQUEST);
     }
     String mapKey = getBlockMapKey(request);
-    FileDescriptor fd = streamMap.remove(mapKey);
+    RandomAccessFile file = blockFileMap.remove(mapKey);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("streamMap remove stream for {}", mapKey);
+      LOG.debug("File removed from blockFileMap for {}", mapKey);
     }
-    return fd;
+    return file;
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 00c9151f50..7de714158b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -23,8 +23,8 @@
 
 import com.google.common.base.Preconditions;
 import jakarta.annotation.Nonnull;
-import java.io.FileDescriptor;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.EnumMap;
 import java.util.Map;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -142,7 +142,7 @@ public void deleteChunks(Container container, BlockData 
blockData)
   }
 
   @Override
-  public FileDescriptor getShortCircuitFd(Container container, BlockID blockID)
+  public RandomAccessFile getShortCircuitFd(Container container, BlockID 
blockID)
       throws StorageContainerException {
     return selectHandler(container).getShortCircuitFd(container, blockID);
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 8cd94ab9dc..5d1ae28f85 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -31,7 +31,6 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import java.io.File;
-import java.io.FileDescriptor;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -44,7 +43,6 @@
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.common.ChunkBufferToByteString;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
@@ -58,6 +56,7 @@
 import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.util.Shell;
 import org.apache.ratis.statemachine.StateMachine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -213,13 +212,17 @@ public ChunkBufferToByteString readChunk(Container 
container, BlockID blockID,
   }
 
   @Override
-  public FileDescriptor getShortCircuitFd(Container container, BlockID 
blockID) throws StorageContainerException {
+  public RandomAccessFile getShortCircuitFd(Container container, BlockID 
blockID) throws StorageContainerException {
     checkLayoutVersion(container);
     final File chunkFile = getChunkFile(container, blockID);
-    FileDescriptor fd = null;
     try {
-      fd = NativeIO.getShareDeleteFileDescriptor(chunkFile, 0);
-      return fd;
+      if (!Shell.WINDOWS) {
+        RandomAccessFile rf = new RandomAccessFile(chunkFile, "r");
+        return rf;
+      } else {
+        throw new StorageContainerException("Operation is not supported for 
platform "
+            + System.getProperty("os.name"), UNSUPPORTED_REQUEST);
+      }
     } catch (Exception e) {
       LOG.warn("getShortCircuitFds failed", e);
       throw new StorageContainerException("getShortCircuitFds " +
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index ef41f6e8ed..64fa8f682c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -19,8 +19,8 @@
 
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
 
-import java.io.FileDescriptor;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -79,14 +79,14 @@ ChunkBufferToByteString readChunk(Container container, 
BlockID blockID, ChunkInf
       DispatcherContext dispatcherContext) throws StorageContainerException;
 
   /**
-   * Get the FileDescriptor of a given chunk, to share with client for short 
circuit read.
+   * Get the RandomAccessFile of a given chunk, to share with client for short 
circuit read.
    *
    * @param container - Container for the chunk
    * @param blockID - ID of the block.
-   * @return FileDescriptor  - input file descriptor of block file
+   * @return RandomAccessFile  - file for block file
    * @throws StorageContainerException
    */
-  default FileDescriptor getShortCircuitFd(Container container, BlockID 
blockID)
+  default RandomAccessFile getShortCircuitFd(Container container, BlockID 
blockID)
       throws StorageContainerException {
     throw new StorageContainerException("Operation is not supported for " + 
this.getClass().getSimpleName(),
         UNSUPPORTED_REQUEST);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to