This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 138cbe3157 HDDS-11998. BlockDataStreamOutput should
decrPendingContainerOpsMetrics (#7636)
138cbe3157 is described below
commit 138cbe31570da5876a682b54a5322e99cb9e4dc0
Author: Ivan Andika <[email protected]>
AuthorDate: Sat Jan 4 19:28:20 2025 +0800
HDDS-11998. BlockDataStreamOutput should decrPendingContainerOpsMetrics
(#7636)
---
.../hdds/scm/storage/BlockDataStreamOutput.java | 5 +-
.../hadoop/hdds/scm/storage/StreamBuffer.java | 4 +-
.../client/rpc/TestBlockDataStreamOutput.java | 77 ++++++++++++++--------
3 files changed, 56 insertions(+), 30 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 48c77f2c86..8c2883a437 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -253,7 +253,7 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
}
while (len > 0) {
allocateNewBufferIfNeeded();
- int writeLen = Math.min(len, currentBuffer.length());
+ int writeLen = Math.min(len, currentBuffer.remaining());
final StreamBuffer buf = new StreamBuffer(b, off, writeLen);
currentBuffer.put(buf);
writeChunkIfNeeded();
@@ -265,7 +265,7 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
}
private void writeChunkIfNeeded() throws IOException {
- if (currentBuffer.length() == 0) {
+ if (currentBuffer.remaining() == 0) {
writeChunk(currentBuffer);
currentBuffer = null;
}
@@ -672,6 +672,7 @@ public class BlockDataStreamOutput implements
ByteBufferStreamOutput {
out.writeAsync(buf, StandardWriteOption.SYNC) :
out.writeAsync(buf))
.whenCompleteAsync((r, e) -> {
+
metrics.decrPendingContainerOpsMetrics(ContainerProtos.Type.WriteChunk);
if (e != null || !r.isSuccess()) {
if (e == null) {
e = new IOException("result is not success");
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
index b889aa35e2..5bf6dcee82 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBuffer.java
@@ -39,8 +39,8 @@ public class StreamBuffer {
return buffer.duplicate();
}
- public int length() {
- return buffer.limit() - buffer.position();
+ public int remaining() {
+ return buffer.remaining();
}
public int position() {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
index 5bcf708405..46c3c71627 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockDataStreamOutput.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
@@ -47,13 +46,14 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.PutBlock;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type.WriteChunk;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,13 +77,6 @@ public class TestBlockDataStreamOutput {
private static String keyString;
private static final DatanodeVersion DN_OLD_VERSION =
DatanodeVersion.SEPARATE_RATIS_PORTS_AVAILABLE;
- /**
- * Create a MiniDFSCluster for testing.
- * <p>
- * Ozone is made active by setting OZONE_ENABLED = true
- *
- * @throws IOException
- */
@BeforeAll
public static void init() throws Exception {
chunkSize = 100;
@@ -120,7 +113,7 @@ public class TestBlockDataStreamOutput {
client = OzoneClientFactory.getRpcClient(conf);
objectStore = client.getObjectStore();
keyString = UUID.randomUUID().toString();
- volumeName = "testblockoutputstream";
+ volumeName = "testblockdatastreamoutput";
bucketName = volumeName;
objectStore.createVolume(volumeName);
objectStore.getVolume(volumeName).createBucket(bucketName);
@@ -130,9 +123,6 @@ public class TestBlockDataStreamOutput {
return UUID.randomUUID().toString();
}
- /**
- * Shutdown MiniDFSCluster.
- */
@AfterAll
public static void shutdown() {
IOUtils.closeQuietly(client);
@@ -166,6 +156,11 @@ public class TestBlockDataStreamOutput {
}
static void testWrite(int dataLength) throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long pendingWriteChunkCount =
metrics.getPendingContainerOpCountMetrics(WriteChunk);
+ long pendingPutBlockCount =
metrics.getPendingContainerOpCountMetrics(PutBlock);
+
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
keyName, ReplicationType.RATIS, dataLength);
@@ -174,9 +169,19 @@ public class TestBlockDataStreamOutput {
// now close the stream, It will update the key length.
key.close();
validateData(keyName, data);
+
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
}
private void testWriteWithFailure(int dataLength) throws Exception {
+ XceiverClientMetrics metrics =
+ XceiverClientManager.getXceiverClientMetrics();
+ long pendingWriteChunkCount =
metrics.getPendingContainerOpCountMetrics(WriteChunk);
+ long pendingPutBlockCount =
metrics.getPendingContainerOpCountMetrics(PutBlock);
+
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
keyName, ReplicationType.RATIS, dataLength);
@@ -195,17 +200,24 @@ public class TestBlockDataStreamOutput {
key.close();
String dataString = new String(data, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
+
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
}
@Test
public void testPutBlockAtBoundary() throws Exception {
- int dataLength = 500;
+ int dataLength = maxFlushSize + 100;
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
- long putBlockCount = metrics.getContainerOpCountMetrics(
- ContainerProtos.Type.PutBlock);
- long pendingPutBlockCount = metrics.getPendingContainerOpCountMetrics(
- ContainerProtos.Type.PutBlock);
+ long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk);
+ long putBlockCount = metrics.getContainerOpCountMetrics(PutBlock);
+ long pendingWriteChunkCount =
metrics.getPendingContainerOpCountMetrics(WriteChunk);
+ long pendingPutBlockCount =
metrics.getPendingContainerOpCountMetrics(PutBlock);
+ long totalOpCount = metrics.getTotalOpCount();
+
String keyName = getKeyName();
OzoneDataStreamOutput key = createKey(
keyName, ReplicationType.RATIS, 0);
@@ -213,14 +225,25 @@ public class TestBlockDataStreamOutput {
ContainerTestHelper.getFixedLengthString(keyString, dataLength)
.getBytes(UTF_8);
key.write(ByteBuffer.wrap(data));
-
assertThat(metrics.getPendingContainerOpCountMetrics(ContainerProtos.Type.PutBlock))
+ assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
.isLessThanOrEqualTo(pendingPutBlockCount + 1);
+ assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+ .isLessThanOrEqualTo(pendingWriteChunkCount + 5);
key.close();
// Since data length is 500 , first putBlock will be at 400(flush boundary)
// and the other at 500
- assertEquals(
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.PutBlock),
- putBlockCount + 2);
+ assertEquals(putBlockCount + 2,
+ metrics.getContainerOpCountMetrics(PutBlock));
+ // Each chunk is 100 so there will be 500 / 100 = 5 chunks.
+ assertEquals(writeChunkCount + 5,
+ metrics.getContainerOpCountMetrics(WriteChunk));
+ assertEquals(totalOpCount + 7,
+ metrics.getTotalOpCount());
+ assertEquals(pendingPutBlockCount,
+ metrics.getPendingContainerOpCountMetrics(PutBlock));
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
+
validateData(keyName, data);
}
@@ -242,20 +265,22 @@ public class TestBlockDataStreamOutput {
XceiverClientMetrics metrics =
XceiverClientManager.getXceiverClientMetrics();
OzoneDataStreamOutput key = createKey(keyName, ReplicationType.RATIS, 0);
- long writeChunkCount =
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk);
+ long writeChunkCount = metrics.getContainerOpCountMetrics(WriteChunk);
+ long pendingWriteChunkCount =
metrics.getPendingContainerOpCountMetrics(WriteChunk);
byte[] data =
ContainerTestHelper.getFixedLengthString(keyString, chunkSize / 2)
.getBytes(UTF_8);
key.write(ByteBuffer.wrap(data));
// minPacketSize= 100, so first write of 50 wont trigger a writeChunk
assertEquals(writeChunkCount,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ metrics.getContainerOpCountMetrics(WriteChunk));
key.write(ByteBuffer.wrap(data));
assertEquals(writeChunkCount + 1,
- metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+ metrics.getContainerOpCountMetrics(WriteChunk));
// now close the stream, It will update the key length.
key.close();
+ assertEquals(pendingWriteChunkCount,
+ metrics.getPendingContainerOpCountMetrics(WriteChunk));
String dataString = new String(data, UTF_8);
validateData(keyName, dataString.concat(dataString).getBytes(UTF_8));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]