This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 0db32a02233d84481b84513f1a6f7416c55753cf Author: Sadanand Shenoy <[email protected]> AuthorDate: Thu Jan 13 11:24:12 2022 +0530 HDDS-6139. [Ozone-Streaming] Fix incorrect computation of totalAckDataLength. (#2978) --- .../hdds/scm/storage/BlockDataStreamOutput.java | 9 ++++++++- .../ozone/client/io/BlockDataStreamOutputEntry.java | 2 +- .../ozone/client/rpc/TestBlockDataStreamOutput.java | 19 +++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 84968b68d5..6ef59dd6d8 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -133,6 +133,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private long syncPosition = 0; private StreamBuffer currentBuffer; private XceiverClientMetrics metrics; + // buffers for which putBlock is yet to be executed + private List<StreamBuffer> buffersForPutBlock; /** * Creates a new BlockDataStreamOutput. * @@ -287,6 +289,10 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private void writeChunk(StreamBuffer sb) throws IOException { bufferList.add(sb); + if (buffersForPutBlock == null) { + buffersForPutBlock = new ArrayList<>(); + } + buffersForPutBlock.add(sb); ByteBuffer dup = sb.duplicate(); dup.position(0); dup.limit(sb.position()); @@ -392,7 +398,8 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { final List<StreamBuffer> byteBufferList; if (!force) { Preconditions.checkNotNull(bufferList); - byteBufferList = bufferList; + byteBufferList = buffersForPutBlock; + buffersForPutBlock = null; Preconditions.checkNotNull(byteBufferList); } else { byteBufferList = null; diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java index 2cd5630549..4e5a35a539 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntry.java @@ -156,7 +156,7 @@ public final class BlockDataStreamOutputEntry } } - long getTotalAckDataLength() { + public long getTotalAckDataLength() { if (byteBufferStreamOutput != null) { BlockDataStreamOutput out = (BlockDataStreamOutput) this.byteBufferStreamOutput; diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java index c9242df8b1..c6a3c32d2b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java @@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.client.ObjectStore; import org.apache.hadoop.ozone.client.OzoneClient; import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.client.io.BlockDataStreamOutputEntry; import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput; import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput; import org.apache.hadoop.ozone.container.ContainerTestHelper; @@ -256,4 +257,22 @@ public class TestBlockDataStreamOutput { validateData(keyName, dataString.concat(dataString).getBytes(UTF_8)); } + @Test + public void testTotalAckDataLength() throws Exception { + int dataLength = 400; + String keyName = getKeyName(); + OzoneDataStreamOutput key = createKey( + keyName, ReplicationType.RATIS, 0); + byte[] data = + ContainerTestHelper.getFixedLengthString(keyString, dataLength) + .getBytes(UTF_8); + KeyDataStreamOutput keyDataStreamOutput = + (KeyDataStreamOutput) key.getByteBufStreamOutput(); + BlockDataStreamOutputEntry stream = + keyDataStreamOutput.getStreamEntries().get(0); + key.write(ByteBuffer.wrap(data)); + key.close(); + Assert.assertEquals(dataLength, stream.getTotalAckDataLength()); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
