This is an automated email from the ASF dual-hosted git repository.

zhangshuyan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 54f7a6b12790 HDFS-17293. First packet data + checksum size will be set 
to 516 bytes when writing to a new block. (#6368). Contributed by farmmamba.
54f7a6b12790 is described below

commit 54f7a6b127908cebedf44f4a96ee06e12e98f0d6
Author: hfutatzhanghb <hfutzhan...@163.com>
AuthorDate: Mon Jan 22 11:50:51 2024 +0800

    HDFS-17293. First packet data + checksum size will be set to 516 bytes when 
writing to a new block. (#6368). Contributed by farmmamba.
    
    Reviewed-by: He Xiaoqiao <hexiaoq...@apache.org>
    Signed-off-by:  Shuyan Zhang <zhangshu...@apache.org>
---
 .../org/apache/hadoop/hdfs/DFSOutputStream.java    |  9 +++--
 .../apache/hadoop/hdfs/TestDFSOutputStream.java    | 41 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index b6634eddc891..a1bfb7f5d594 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -536,8 +536,13 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     if (!getStreamer().getAppendChunk()) {
-      final int psize = (int) Math
-          .min(blockSize - getStreamer().getBytesCurBlock(), writePacketSize);
+      int psize = 0;
+      if (blockSize == getStreamer().getBytesCurBlock()) {
+        psize = writePacketSize;
+      } else {
+        psize = (int) Math
+            .min(blockSize - getStreamer().getBytesCurBlock(), 
writePacketSize);
+      }
       computePacketChunkSize(psize, bytesPerChecksum);
     }
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 0f1b965cc264..bdb91f91bc5e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -58,6 +59,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.test.Whitebox;
+import org.apache.hadoop.util.DataChecksum;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -508,6 +510,45 @@ public class TestDFSOutputStream {
     }
   }
 
+  @Test(timeout=60000)
+  public void testFirstPacketSizeInNewBlocks() throws IOException {
+    final long blockSize = (long) 1024 * 1024;
+    MiniDFSCluster dfsCluster = cluster;
+    DistributedFileSystem fs = dfsCluster.getFileSystem();
+    Configuration dfsConf = fs.getConf();
+
+    EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE);
+    try(FSDataOutputStream fos = fs.create(new Path("/testfile.dat"),
+        FsPermission.getDefault(),
+        flags, 512, (short)3, blockSize, null)) {
+
+      DataChecksum crc32c = DataChecksum.newDataChecksum(
+          DataChecksum.Type.CRC32C, 512);
+
+      long loop = 0;
+      Random r = new Random();
+      byte[] buf = new byte[(int) blockSize];
+      r.nextBytes(buf);
+      fos.write(buf);
+      fos.hflush();
+
+      int chunkSize = crc32c.getBytesPerChecksum() + crc32c.getChecksumSize();
+      int packetContentSize = (dfsConf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
+          DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT) -
+          PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize * chunkSize;
+
+      while (loop < 20) {
+        r.nextBytes(buf);
+        fos.write(buf);
+        fos.hflush();
+        loop++;
+        Assert.assertEquals(((DFSOutputStream) 
fos.getWrappedStream()).packetSize,
+            packetContentSize);
+      }
+    }
+    fs.delete(new Path("/testfile.dat"), true);
+  }
+
   @AfterClass
   public static void tearDown() {
     if (cluster != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to