This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new faf133d24a HDDS-11220. Initialize block length using the chunk list
from DataNode before seek (#7221)
faf133d24a is described below
commit faf133d24a4730bdccc967e266015a8a53248b17
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Mon Oct 28 11:11:59 2024 -0700
HDDS-11220. Initialize block length using the chunk list from DataNode
before seek (#7221)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 1 +
.../hdds/scm/storage/MultipartInputStream.java | 27 ++++++++++++++-
.../java/org/apache/hadoop/fs/ozone/TestHSync.java | 40 ++++++++++++++++++++++
3 files changed, 67 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 34fb728be9..d6353be9d2 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -166,6 +166,7 @@ public class BlockInputStream extends
BlockExtendedInputStream {
if (blockInfo != null && blockInfo.isUnderConstruction()) {
// use the block length from DN if block is under construction.
length = blockData.getSize();
+ LOG.debug("Updated block length to {} for block {}", length,
blockID);
}
break;
// If we get a StorageContainerException or an IOException due to
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
index 4bc144f3bd..5f00e83e81 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java
@@ -34,7 +34,7 @@ import java.util.List;
public class MultipartInputStream extends ExtendedInputStream {
private final String key;
- private final long length;
+ private long length;
// List of PartInputStream, one for each part of the key
private final List<? extends PartInputStream> partStreams;
@@ -56,6 +56,8 @@ public class MultipartInputStream extends ExtendedInputStream
{
// can be reset if a new position is seeked.
private int prevPartIndex;
+ private boolean initialized = false;
+
public MultipartInputStream(String keyName,
List<? extends PartInputStream> inputStreams) {
@@ -130,6 +132,9 @@ public class MultipartInputStream extends
ExtendedInputStream {
@Override
public synchronized void seek(long pos) throws IOException {
checkOpen();
+ if (!initialized) {
+ initialize();
+ }
if (pos == 0 && length == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
@@ -173,6 +178,26 @@ public class MultipartInputStream extends
ExtendedInputStream {
prevPartIndex = partIndex;
}
+ public synchronized void initialize() throws IOException {
+ // Pre-check that the stream has not been intialized already
+ if (initialized) {
+ return;
+ }
+
+ for (PartInputStream partInputStream : partStreams) {
+ if (partInputStream instanceof BlockInputStream) {
+ ((BlockInputStream) partInputStream).initialize();
+ }
+ }
+
+ long streamLength = 0L;
+ for (PartInputStream partInputStream : partStreams) {
+ streamLength += partInputStream.getLength();
+ }
+ this.length = streamLength;
+ initialized = true;
+ }
+
@Override
public synchronized long getPos() throws IOException {
return length == 0 ? 0 :
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index c39e24571a..f3b4b66989 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -172,6 +172,7 @@ public class TestHSync {
private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
private static final int SERVICE_INTERVAL = 100;
private static final int EXPIRE_THRESHOLD_MS = 140;
+ private static final int WAL_HEADER_LEN = 83;
private static OpenKeyCleanupService openKeyCleanupService;
@@ -417,6 +418,45 @@ public class TestHSync {
return chunkPath;
}
+ @Test
+ public void testHSyncSeek() throws Exception {
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s.%s/",
+ OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
+ CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+ final String dir = OZONE_ROOT + bucket.getVolumeName()
+ + OZONE_URI_DELIMITER + bucket.getName();
+ final Path key1 = new Path(dir, "key-hsync-seek");
+
+ final byte[] data = new byte[1024];
+ final byte[] buffer = new byte[1024];
+ ThreadLocalRandom.current().nextBytes(data);
+
+ try (FileSystem fs = FileSystem.get(CONF)) {
+ // Create key1
+ try (FSDataOutputStream os = fs.create(key1, true)) {
+ os.write(data, 0, WAL_HEADER_LEN);
+ // the first hsync will update the correct length in the key info at OM
+ os.hsync();
+ os.write(data, 0, data.length);
+ os.hsync(); // the second hsync will not update the length at OM
+ try (FSDataInputStream in = fs.open(key1)) {
+ // the actual key length is WAL_HEADER_LEN + 1024, but the length in
OM is WAL_HEADER_LEN (83)
+ in.seek(WAL_HEADER_LEN + 1);
+ final int n = in.read(buffer, 1, buffer.length - 1);
+ // expect to read 1023 bytes
+ assertEquals(buffer.length - 1, n);
+ for (int i = 1; i < buffer.length; i++) {
+ assertEquals(data[i], buffer[i], "expected at i=" + i);
+ }
+ }
+ } finally {
+ fs.delete(key1, false);
+ }
+ }
+ }
+
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testO3fsHSync(boolean incrementalChunkList) throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]