This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new faf133d24a HDDS-11220. Initialize block length using the chunk list 
from DataNode before seek (#7221)
faf133d24a is described below

commit faf133d24a4730bdccc967e266015a8a53248b17
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Mon Oct 28 11:11:59 2024 -0700

    HDDS-11220. Initialize block length using the chunk list from DataNode 
before seek (#7221)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |  1 +
 .../hdds/scm/storage/MultipartInputStream.java     | 27 ++++++++++++++-
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 40 ++++++++++++++++++++++
 3 files changed, 67 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 34fb728be9..d6353be9d2 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -166,6 +166,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
         if (blockInfo != null && blockInfo.isUnderConstruction()) {
           // use the block length from DN if block is under construction.
           length = blockData.getSize();
+          LOG.debug("Updated block length to {} for block {}", length, 
blockID);
         }
         break;
         // If we get a StorageContainerException or an IOException due to
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
index 4bc144f3bd..5f00e83e81 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
@@ -34,7 +34,7 @@ import java.util.List;
 public class MultipartInputStream extends ExtendedInputStream {
 
   private final String key;
-  private final long length;
+  private long length;
 
   // List of PartInputStream, one for each part of the key
   private final List<? extends PartInputStream> partStreams;
@@ -56,6 +56,8 @@ public class MultipartInputStream extends ExtendedInputStream 
{
   // can be reset if a new position is seeked.
   private int prevPartIndex;
 
+  private boolean initialized = false;
+
   public MultipartInputStream(String keyName,
                               List<? extends PartInputStream> inputStreams) {
 
@@ -130,6 +132,9 @@ public class MultipartInputStream extends 
ExtendedInputStream {
   @Override
   public synchronized void seek(long pos) throws IOException {
     checkOpen();
+    if (!initialized) {
+      initialize();
+    }
     if (pos == 0 && length == 0) {
       // It is possible for length and pos to be zero in which case
       // seek should return instead of throwing exception
@@ -173,6 +178,26 @@ public class MultipartInputStream extends 
ExtendedInputStream {
     prevPartIndex = partIndex;
   }
 
+  public synchronized void initialize() throws IOException {
+    // Pre-check that the stream has not been intialized already
+    if (initialized) {
+      return;
+    }
+
+    for (PartInputStream partInputStream : partStreams) {
+      if (partInputStream instanceof BlockInputStream) {
+        ((BlockInputStream) partInputStream).initialize();
+      }
+    }
+
+    long streamLength = 0L;
+    for (PartInputStream partInputStream : partStreams) {
+      streamLength += partInputStream.getLength();
+    }
+    this.length = streamLength;
+    initialized = true;
+  }
+
   @Override
   public synchronized long getPos() throws IOException {
     return length == 0 ? 0 :
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index c39e24571a..f3b4b66989 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -172,6 +172,7 @@ public class TestHSync {
   private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
   private static final int SERVICE_INTERVAL = 100;
   private static final int EXPIRE_THRESHOLD_MS = 140;
+  private static final int WAL_HEADER_LEN = 83;
 
   private static OpenKeyCleanupService openKeyCleanupService;
 
@@ -417,6 +418,45 @@ public class TestHSync {
     return chunkPath;
   }
 
+  @Test
+  public void testHSyncSeek() throws Exception {
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s.%s/",
+        OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
+    CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final String dir = OZONE_ROOT + bucket.getVolumeName()
+        + OZONE_URI_DELIMITER + bucket.getName();
+    final Path key1 = new Path(dir, "key-hsync-seek");
+
+    final byte[] data = new byte[1024];
+    final byte[] buffer = new byte[1024];
+    ThreadLocalRandom.current().nextBytes(data);
+
+    try (FileSystem fs = FileSystem.get(CONF)) {
+      // Create key1
+      try (FSDataOutputStream os = fs.create(key1, true)) {
+        os.write(data, 0, WAL_HEADER_LEN);
+        // the first hsync will update the correct length in the key info at OM
+        os.hsync();
+        os.write(data, 0, data.length);
+        os.hsync(); // the second hsync will not update the length at OM
+        try (FSDataInputStream in = fs.open(key1)) {
+          // the actual key length is WAL_HEADER_LEN + 1024, but the length in 
OM is WAL_HEADER_LEN (83)
+          in.seek(WAL_HEADER_LEN + 1);
+          final int n = in.read(buffer, 1, buffer.length - 1);
+          // expect to read 1023 bytes
+          assertEquals(buffer.length - 1, n);
+          for (int i = 1; i < buffer.length; i++) {
+            assertEquals(data[i], buffer[i], "expected at i=" + i);
+          }
+        }
+      } finally {
+        fs.delete(key1, false);
+      }
+    }
+  }
+
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   public void testO3fsHSync(boolean incrementalChunkList) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to