This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new f563d676dc HDDS-11391. Frequent Ozone DN Crashes During OM + DN 
Decommission with Freon (#7154)
f563d676dc is described below

commit f563d676dc6b6cb9e0ed5d288b94e7660a2584c1
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Wed Sep 4 18:13:02 2024 -0700

    HDDS-11391. Frequent Ozone DN Crashes During OM + DN Decommission with 
Freon (#7154)
---
 .../ozone/container/keyvalue/KeyValueHandler.java  | 14 ++++--
 .../keyvalue/impl/AbstractTestChunkManager.java    | 56 ++++++++++++++++++++++
 .../keyvalue/impl/CommonChunkManagerTestCases.java | 25 ++++++++++
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 28 +++++++++++
 4 files changed, 120 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 402e1be4cd..1bcb64200b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -122,6 +122,7 @@ import static 
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkV
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerDataProto.State.RECOVERING;
 import static 
org.apache.hadoop.ozone.ClientVersion.EC_REPLICA_INDEX_REQUIRED_IN_BLOCK_REQUEST;
+import static org.apache.hadoop.ozone.OzoneConsts.INCREMENTAL_CHUNK_LIST;
 import static 
org.apache.hadoop.ozone.container.common.interfaces.Container.ScanResult;
 
 import org.apache.hadoop.util.Time;
@@ -547,9 +548,13 @@ public class KeyValueHandler extends Handler {
 
       boolean endOfBlock = false;
       if (!request.getPutBlock().hasEof() || request.getPutBlock().getEof()) {
-        // in EC, we will be doing empty put block.
-        // So, let's flush only when there are any chunks
-        if (!request.getPutBlock().getBlockData().getChunksList().isEmpty()) {
+        // There are two cases where client sends empty put block with eof.
+        // (1) An EC empty file. In this case, the block/chunk file does not 
exist,
+        //     so no need to flush/close the file.
+        // (2) Ratis output stream in incremental chunk list mode may send 
empty put block
+        //     to close the block, in which case we need to flush/close the 
file.
+        if (!request.getPutBlock().getBlockData().getChunksList().isEmpty() ||
+            blockData.getMetadata().containsKey(INCREMENTAL_CHUNK_LIST)) {
           chunkManager.finishWriteChunks(kvContainer, blockData);
         }
         endOfBlock = true;
@@ -903,6 +908,9 @@ public class KeyValueHandler extends Handler {
         // of order.
         blockData.setBlockCommitSequenceId(dispatcherContext.getLogIndex());
         boolean eob = writeChunk.getBlock().getEof();
+        if (eob) {
+          chunkManager.finishWriteChunks(kvContainer, blockData);
+        }
         blockManager.putBlock(kvContainer, blockData, eob);
         blockDataProto = blockData.getProtoBufMessage();
         final long numBytes = blockDataProto.getSerializedSize();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java
index 0c373cb0db..d9b95f199d 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/AbstractTestChunkManager.java
@@ -34,8 +34,13 @@ import 
org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.Buffer;
 import java.nio.ByteBuffer;
 import java.util.UUID;
@@ -53,6 +58,8 @@ import static org.mockito.Mockito.when;
  * Helpers for ChunkManager implementation tests.
  */
 public abstract class AbstractTestChunkManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractTestChunkManager.class);
 
   private HddsVolume hddsVolume;
   private KeyValueContainerData keyValueContainerData;
@@ -128,6 +135,55 @@ public abstract class AbstractTestChunkManager {
     assertEquals(expected, files.length);
   }
 
+  /**
+   * Helper method to check if a file is in use.
+   */
+  public static boolean isFileNotInUse(String filePath) {
+    try {
+      Process process = new ProcessBuilder("fuser", filePath).start();
+      try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream(), UTF_8))) {
+        String output = reader.readLine();  // If fuser returns no output, the 
file is not in use
+        if (output == null) {
+          return true;
+        }
+        LOG.debug("File is in use: {}", filePath);
+        return false;
+      } finally {
+        process.destroy();
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to check if file is in use: {}", filePath, e);
+      return false;  // On failure, assume the file is in use
+    }
+  }
+
+  protected boolean checkChunkFilesClosed() {
+    return checkChunkFilesClosed(keyValueContainerData.getChunksPath());
+  }
+
+  /**
+   * check that all files under chunk path are closed.
+  */
+  public static boolean checkChunkFilesClosed(String path) {
+    //As in Setup, we try to create container, these paths should exist.
+    assertNotNull(path);
+
+    File dir = new File(path);
+    assertTrue(dir.exists());
+
+    File[] files = dir.listFiles();
+    assertNotNull(files);
+    for (File file : files) {
+      assertTrue(file.exists());
+      assertTrue(file.isFile());
+      // check that the file is closed.
+      if (!isFileNotInUse(file.getAbsolutePath())) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   protected void checkWriteIOStats(long length, long opCount) {
     VolumeIOStats volumeIOStats = hddsVolume.getVolumeIOStats();
     assertEquals(length, volumeIOStats.getWriteBytes());
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java
index 47d2487474..d4a12f577e 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/impl/CommonChunkManagerTestCases.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -39,7 +40,9 @@ import static 
org.apache.hadoop.ozone.container.common.ContainerTestUtils.WRITE_
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.when;
 
 /**
  * Common test cases for ChunkManager implementation tests.
@@ -222,4 +225,26 @@ public abstract class CommonChunkManagerTestCases extends 
AbstractTestChunkManag
     checkReadIOStats(len * count, count);
   }
 
+  @Test
+  public void testFinishWrite() throws Exception {
+    // GIVEN
+    ChunkManager chunkManager = createTestSubject();
+    checkChunkFileCount(0);
+    checkWriteIOStats(0, 0);
+
+    chunkManager.writeChunk(getKeyValueContainer(), getBlockID(),
+        getChunkInfo(), getData(),
+        WRITE_STAGE);
+
+    BlockData blockData = Mockito.mock(BlockData.class);
+    when(blockData.getBlockID()).thenReturn(getBlockID());
+
+    chunkManager.finishWriteChunks(getKeyValueContainer(), blockData);
+    assertTrue(checkChunkFilesClosed());
+
+    // THEN
+    checkChunkFileCount(1);
+    checkWriteIOStats(getChunkInfo().getLen(), 1);
+  }
+
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 49b515d53c..72978f8181 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StreamCapabilities;
 
 import org.apache.hadoop.ozone.ClientConfigForTesting;
+import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -83,7 +84,9 @@ import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
+import 
org.apache.hadoop.ozone.container.keyvalue.impl.AbstractTestChunkManager;
 import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
 import org.apache.hadoop.ozone.container.metadata.AbstractDatanodeStore;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -93,6 +96,7 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -320,6 +324,8 @@ public class TestHSync {
   }
 
   @Test
+  // Making this the second test to be run to avoid lingering block files from 
previous tests
+  @Order(2)
   public void testEmptyHsync() throws Exception {
     // Check that deletedTable should not have keys with the same block as in
     // keyTable's when a key is hsync()'ed then close()'d.
@@ -358,10 +364,16 @@ public class TestHSync {
     String data = "random data";
     final Path file = new Path(dir, "file-hsync-then-close");
     try (FileSystem fs = FileSystem.get(CONF)) {
+      String chunkPath;
       try (FSDataOutputStream outputStream = fs.create(file, true)) {
         outputStream.write(data.getBytes(UTF_8), 0, data.length());
         outputStream.hsync();
+        // locate the container chunk path on the first DataNode.
+        chunkPath = getChunkPathOnDataNode(outputStream);
+        assertFalse(AbstractTestChunkManager.checkChunkFilesClosed(chunkPath));
       }
+      // After close, the chunk file should be closed.
+      assertTrue(AbstractTestChunkManager.checkChunkFilesClosed(chunkPath));
     }
 
     OzoneManager ozoneManager = cluster.getOzoneManager();
@@ -387,6 +399,22 @@ public class TestHSync {
     }
   }
 
+  private static String getChunkPathOnDataNode(FSDataOutputStream outputStream)
+      throws IOException {
+    String chunkPath;
+    KeyOutputStream groupOutputStream =
+        ((OzoneFSOutputStream) 
outputStream.getWrappedStream()).getWrappedOutputStream().getKeyOutputStream();
+    List<OmKeyLocationInfo> locationInfoList =
+        groupOutputStream.getLocationInfoList();
+    OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+    HddsDatanodeService dn = TestHelper.getDatanodeService(omKeyLocationInfo, 
cluster);
+    chunkPath = dn.getDatanodeStateMachine()
+        .getContainer().getContainerSet()
+        .getContainer(omKeyLocationInfo.getContainerID()).
+        getContainerData().getChunksPath();
+    return chunkPath;
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   public void testO3fsHSync(boolean incrementalChunkList) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to