This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new afa2f85b31e [HUDI-8501] Improve SizeAwareDataInputStream to implement 
idempotent (#12231)
afa2f85b31e is described below

commit afa2f85b31e7d75d416c5b84fc41dd55eb6fc8aa
Author: usberkeley <[email protected]>
AuthorDate: Mon Feb 24 08:10:40 2025 +0800

    [HUDI-8501] Improve SizeAwareDataInputStream to implement idempotent 
(#12231)
---
 .../hudi/common/fs/SizeAwareDataInputStream.java   |  14 +--
 .../common/fs/TestSizeAwareDataInputStream.java    | 115 +++++++++++++++++++++
 2 files changed, 123 insertions(+), 6 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareDataInputStream.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareDataInputStream.java
index 63fe79ed27c..8a77d60a546 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareDataInputStream.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareDataInputStream.java
@@ -36,30 +36,32 @@ public class SizeAwareDataInputStream {
   }
 
   public int readInt() throws IOException {
+    int value = dis.readInt();
     numberOfBytesRead.addAndGet(Integer.BYTES);
-    return dis.readInt();
+    return value;
   }
 
   public void readFully(byte[] b, int off, int len) throws IOException {
-    numberOfBytesRead.addAndGet(len);
     dis.readFully(b, off, len);
+    numberOfBytesRead.addAndGet(len);
   }
 
   public void readFully(byte[] b) throws IOException {
-    numberOfBytesRead.addAndGet(b.length);
     dis.readFully(b);
+    numberOfBytesRead.addAndGet(b.length);
   }
 
   public int skipBytes(int n) throws IOException {
-    numberOfBytesRead.addAndGet(n);
-    return dis.skipBytes(n);
+    int numOfBytes = dis.skipBytes(n);
+    numberOfBytesRead.addAndGet(numOfBytes);
+    return numOfBytes;
   }
 
   public void close() throws IOException {
     dis.close();
   }
 
-  public Integer getNumberOfBytesRead() {
+  public int getNumberOfBytesRead() {
     return numberOfBytesRead.get();
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestSizeAwareDataInputStream.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestSizeAwareDataInputStream.java
new file mode 100644
index 00000000000..53f2f463058
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestSizeAwareDataInputStream.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Unit tests for the SizeAwareDataInputStream class.
+ */
+public class TestSizeAwareDataInputStream {
+
+  private SizeAwareDataInputStream sizeAwareDataInputStream;
+  private ByteArrayInputStream byteArrayInputStream;
+
+  /**
+   * Initializes the input stream with a predefined byte array.
+   */
+  @BeforeEach
+  void setUp() {
+    byte[] data = {0, 0, 0, 1, 2, 3, 4, 5};
+    byteArrayInputStream = new ByteArrayInputStream(data);
+    sizeAwareDataInputStream = new SizeAwareDataInputStream(new 
DataInputStream(byteArrayInputStream));
+  }
+
+  /**
+   * Tests the readInt method to ensure it reads an integer correctly and 
updates the byte count.
+   */
+  @Test
+  void testReadInt() throws IOException {
+    int value = sizeAwareDataInputStream.readInt();
+    assertEquals(1, value);
+    assertEquals(4, sizeAwareDataInputStream.getNumberOfBytesRead());
+  }
+
+  /**
+   * Tests the readFully method to ensure it reads bytes into a buffer and 
updates the byte count.
+   */
+  @Test
+  void testReadFully() throws IOException {
+    byte[] buffer = new byte[4];
+    sizeAwareDataInputStream.readFully(buffer);
+    assertArrayEquals(new byte[]{0, 0, 0, 1}, buffer);
+    assertEquals(4, sizeAwareDataInputStream.getNumberOfBytesRead());
+  }
+
+  /**
+   * Tests the readFully method with offset to ensure it reads bytes into a 
specific part of the buffer and updates the byte count.
+   */
+  @Test
+  void testReadFullyWithOffset() throws IOException {
+    byte[] buffer = new byte[6];
+    sizeAwareDataInputStream.readFully(buffer, 2, 4);
+    assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 1}, buffer);
+    assertEquals(4, sizeAwareDataInputStream.getNumberOfBytesRead());
+  }
+
+  /**
+   * Tests the skipBytes method to ensure it skips the correct number of bytes 
and updates the byte count.
+   */
+  @Test
+  void testSkipBytes() throws IOException {
+    int skipped = sizeAwareDataInputStream.skipBytes(2);
+    assertEquals(2, skipped);
+    assertEquals(2, sizeAwareDataInputStream.getNumberOfBytesRead());
+  }
+
+  /**
+   * Tests the skipBytes method when attempting to skip more bytes than 
available, ensuring it only skips the available bytes.
+   */
+  @Test
+  void testSkipBytesBeyondAvailable() throws IOException {
+    sizeAwareDataInputStream.readInt();
+    // Try to skip more than available
+    int skipped = sizeAwareDataInputStream.skipBytes(10);
+    assertEquals(4, skipped);
+    assertEquals(8, sizeAwareDataInputStream.getNumberOfBytesRead());
+  }
+
+  /**
+   * Tests exception handling to ensure that the byte count remains consistent 
after an EOFException.
+   */
+  @Test
+  void testExceptionHandling() {
+    byte[] buffer = new byte[10];
+    assertThrows(EOFException.class, () -> 
sizeAwareDataInputStream.readFully(buffer));
+    assertEquals(0, sizeAwareDataInputStream.getNumberOfBytesRead(), "Number 
of bytes read should remain consistent after exception.");
+  }
+}

Reply via email to