This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new afa2f85b31e [HUDI-8501] Improve SizeAwareDataInputStream to implement
idempotent (#12231)
afa2f85b31e is described below
commit afa2f85b31e7d75d416c5b84fc41dd55eb6fc8aa
Author: usberkeley <[email protected]>
AuthorDate: Mon Feb 24 08:10:40 2025 +0800
[HUDI-8501] Improve SizeAwareDataInputStream to implement idempotent
(#12231)
---
.../hudi/common/fs/SizeAwareDataInputStream.java | 14 +--
.../common/fs/TestSizeAwareDataInputStream.java | 115 +++++++++++++++++++++
2 files changed, 123 insertions(+), 6 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareDataInputStream.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareDataInputStream.java
index 63fe79ed27c..8a77d60a546 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareDataInputStream.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareDataInputStream.java
@@ -36,30 +36,32 @@ public class SizeAwareDataInputStream {
}
public int readInt() throws IOException {
+ int value = dis.readInt();
numberOfBytesRead.addAndGet(Integer.BYTES);
- return dis.readInt();
+ return value;
}
public void readFully(byte[] b, int off, int len) throws IOException {
- numberOfBytesRead.addAndGet(len);
dis.readFully(b, off, len);
+ numberOfBytesRead.addAndGet(len);
}
public void readFully(byte[] b) throws IOException {
- numberOfBytesRead.addAndGet(b.length);
dis.readFully(b);
+ numberOfBytesRead.addAndGet(b.length);
}
public int skipBytes(int n) throws IOException {
- numberOfBytesRead.addAndGet(n);
- return dis.skipBytes(n);
+ int numOfBytes = dis.skipBytes(n);
+ numberOfBytesRead.addAndGet(numOfBytes);
+ return numOfBytes;
}
public void close() throws IOException {
dis.close();
}
- public Integer getNumberOfBytesRead() {
+ public int getNumberOfBytesRead() {
return numberOfBytesRead.get();
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestSizeAwareDataInputStream.java
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestSizeAwareDataInputStream.java
new file mode 100644
index 00000000000..53f2f463058
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestSizeAwareDataInputStream.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Unit tests for the SizeAwareDataInputStream class.
+ */
+public class TestSizeAwareDataInputStream {
+
+ private SizeAwareDataInputStream sizeAwareDataInputStream;
+ private ByteArrayInputStream byteArrayInputStream;
+
+ /**
+ * Initializes the input stream with a predefined byte array.
+ */
+ @BeforeEach
+ void setUp() {
+ byte[] data = {0, 0, 0, 1, 2, 3, 4, 5};
+ byteArrayInputStream = new ByteArrayInputStream(data);
+ sizeAwareDataInputStream = new SizeAwareDataInputStream(new
DataInputStream(byteArrayInputStream));
+ }
+
+ /**
+ * Tests the readInt method to ensure it reads an integer correctly and
updates the byte count.
+ */
+ @Test
+ void testReadInt() throws IOException {
+ int value = sizeAwareDataInputStream.readInt();
+ assertEquals(1, value);
+ assertEquals(4, sizeAwareDataInputStream.getNumberOfBytesRead());
+ }
+
+ /**
+ * Tests the readFully method to ensure it reads bytes into a buffer and
updates the byte count.
+ */
+ @Test
+ void testReadFully() throws IOException {
+ byte[] buffer = new byte[4];
+ sizeAwareDataInputStream.readFully(buffer);
+ assertArrayEquals(new byte[]{0, 0, 0, 1}, buffer);
+ assertEquals(4, sizeAwareDataInputStream.getNumberOfBytesRead());
+ }
+
+ /**
+ * Tests the readFully method with offset to ensure it reads bytes into a
specific part of the buffer and updates the byte count.
+ */
+ @Test
+ void testReadFullyWithOffset() throws IOException {
+ byte[] buffer = new byte[6];
+ sizeAwareDataInputStream.readFully(buffer, 2, 4);
+ assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 1}, buffer);
+ assertEquals(4, sizeAwareDataInputStream.getNumberOfBytesRead());
+ }
+
+ /**
+ * Tests the skipBytes method to ensure it skips the correct number of bytes
and updates the byte count.
+ */
+ @Test
+ void testSkipBytes() throws IOException {
+ int skipped = sizeAwareDataInputStream.skipBytes(2);
+ assertEquals(2, skipped);
+ assertEquals(2, sizeAwareDataInputStream.getNumberOfBytesRead());
+ }
+
+ /**
+ * Tests the skipBytes method when attempting to skip more bytes than
available, ensuring it only skips the available bytes.
+ */
+ @Test
+ void testSkipBytesBeyondAvailable() throws IOException {
+ sizeAwareDataInputStream.readInt();
+ // Try to skip more than available
+ int skipped = sizeAwareDataInputStream.skipBytes(10);
+ assertEquals(4, skipped);
+ assertEquals(8, sizeAwareDataInputStream.getNumberOfBytesRead());
+ }
+
+ /**
+ * Tests exception handling to ensure that the byte count remains consistent
after an EOFException.
+ */
+ @Test
+ void testExceptionHandling() {
+ byte[] buffer = new byte[10];
+ assertThrows(EOFException.class, () ->
sizeAwareDataInputStream.readFully(buffer));
+ assertEquals(0, sizeAwareDataInputStream.getNumberOfBytesRead(), "Number
of bytes read should remain consistent after exception.");
+ }
+}