This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 509c970054 HDDS-10442. [hsync] Add a Freon tool to measure client to
DataNode round-trip latency (#6297)
509c970054 is described below
commit 509c970054886c15cd7dd425104be9d0e4e59723
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Wed Mar 27 11:28:54 2024 -0700
HDDS-10442. [hsync] Add a Freon tool to measure client to DataNode
round-trip latency (#6297)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 1 +
.../ContainerCommandResponseBuilders.java | 28 ++++
.../hdds/scm/storage/ContainerProtocolCalls.java | 38 +++++
.../org/apache/hadoop/ozone/audit/DNAction.java | 3 +-
.../container/common/impl/HddsDispatcher.java | 1 +
.../ozone/container/keyvalue/KeyValueHandler.java | 8 +
.../src/main/proto/DatanodeClientProtocol.proto | 13 ++
.../hdds/scm/cli/ContainerOperationClient.java | 2 +-
.../src/main/smoketest/freon/echoRPCLoad.robot | 21 +++
.../hadoop/hdds/scm/TestContainerSmallFile.java | 19 +++
.../hadoop/ozone/freon/TestDNRPCLoadGenerator.java | 111 +++++++++++++
.../hadoop/ozone/freon/DNRPCLoadGenerator.java | 178 +++++++++++++++++++++
.../java/org/apache/hadoop/ozone/freon/Freon.java | 3 +-
.../hadoop/ozone/freon/OmRPCLoadGenerator.java | 6 +-
14 files changed, 426 insertions(+), 6 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index ee1c9669a1..e1188f1cd1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -424,6 +424,7 @@ public final class HddsUtils {
case ListContainer:
case ListChunk:
case GetCommittedBlockLength:
+ case Echo:
return true;
case CloseContainer:
case WriteChunk:
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 9acb0e5c33..86336e9bc7 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.Function;
+
+import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
@@ -42,6 +44,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import static
org.apache.hadoop.hdds.scm.utils.ClientCommandsUtils.getReadChunkVersion;
@@ -319,6 +322,31 @@ public final class ContainerCommandResponseBuilders {
.build();
}
+ public static ContainerCommandResponseProto getEchoResponse(
+ ContainerCommandRequestProto msg) {
+
+ ContainerProtos.EchoRequestProto echoRequest = msg.getEcho();
+ int responsePayload = echoRequest.getPayloadSizeResp();
+
+ int sleepTimeMs = echoRequest.getSleepTimeMs();
+ try {
+ if (sleepTimeMs > 0) {
+ Thread.sleep(sleepTimeMs);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ ContainerProtos.EchoResponseProto.Builder echo =
+ ContainerProtos.EchoResponseProto
+ .newBuilder()
+
.setPayload(UnsafeByteOperations.unsafeWrap(RandomUtils.nextBytes(responsePayload)));
+
+ return getSuccessResponseBuilder(msg)
+ .setEcho(echo)
+ .build();
+ }
+
private ContainerCommandResponseBuilders() {
throw new UnsupportedOperationException("no instances");
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 58bb326eb0..1453ae56b4 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -57,6 +57,8 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContai
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoRequestProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.EchoResponseProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -65,6 +67,7 @@ import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenExcep
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.security.token.Token;
@@ -661,6 +664,41 @@ public final class ContainerProtocolCalls {
return response.getGetSmallFile();
}
+ /**
+ * Send an echo to DataNode.
+ *
+ * @return EchoResponseProto
+ */
+ public static EchoResponseProto echo(XceiverClientSpi client, String
encodedContainerID,
+ long containerID, ByteString payloadReqBytes, int payloadRespSizeKB, int
sleepTimeMs) throws IOException {
+ ContainerProtos.EchoRequestProto getEcho =
+ EchoRequestProto
+ .newBuilder()
+ .setPayload(payloadReqBytes)
+ .setPayloadSizeResp(payloadRespSizeKB)
+ .setSleepTimeMs(sleepTimeMs)
+ .build();
+ String id = client.getPipeline().getClosestNode().getUuidString();
+
+ ContainerCommandRequestProto.Builder builder = ContainerCommandRequestProto
+ .newBuilder()
+ .setCmdType(Type.Echo)
+ .setContainerID(containerID)
+ .setDatanodeUuid(id)
+ .setEcho(getEcho);
+ if (!encodedContainerID.isEmpty()) {
+ builder.setEncodedToken(encodedContainerID);
+ }
+ String traceId = TracingUtil.exportCurrentSpan();
+ if (traceId != null) {
+ builder.setTraceID(traceId);
+ }
+ ContainerCommandRequestProto request = builder.build();
+ ContainerCommandResponseProto response =
+ client.sendCommand(request, getValidatorList());
+ return response.getEcho();
+ }
+
/**
* Validates a response from a container protocol call. Any non-successful
* return code is mapped to a corresponding exception and thrown.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index d271e7d5d4..f7a38e3dec 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -40,7 +40,8 @@ public enum DNAction implements AuditAction {
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
STREAM_INIT,
- FINALIZE_BLOCK;
+ FINALIZE_BLOCK,
+ ECHO;
@Override
public String getAction() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index f20615d23f..8e68eeac53 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -807,6 +807,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
case StreamInit : return DNAction.STREAM_INIT;
case FinalizeBlock : return DNAction.FINALIZE_BLOCK;
+ case Echo : return DNAction.ECHO;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 3a945c2212..3d9214c917 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -103,6 +103,7 @@ import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
+import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getEchoResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
@@ -279,6 +280,8 @@ public class KeyValueHandler extends Handler {
return handler.handleGetCommittedBlockLength(request, kvContainer);
case FinalizeBlock:
return handler.handleFinalizeBlock(request, kvContainer);
+ case Echo:
+ return handler.handleEcho(request, kvContainer);
default:
return null;
}
@@ -611,6 +614,11 @@ public class KeyValueHandler extends Handler {
return getFinalizeBlockResponse(request, responseData);
}
+ ContainerCommandResponseProto handleEcho(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+ return getEchoResponse(request);
+ }
+
/**
* Handle Get Block operation. Calls BlockManager to process the request.
*/
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 0206a8ea71..ccde261de0 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -107,6 +107,7 @@ enum Type {
StreamWrite = 20;
FinalizeBlock = 21;
+ Echo = 22;
}
@@ -215,6 +216,7 @@ message ContainerCommandRequestProto {
optional uint32 version = 24;
optional FinalizeBlockRequestProto finalizeBlock = 25;
+ optional EchoRequestProto echo = 26;
}
message ContainerCommandResponseProto {
@@ -247,6 +249,7 @@ message ContainerCommandResponseProto {
optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
optional FinalizeBlockResponseProto finalizeBlock = 22;
+ optional EchoResponseProto echo = 23;
}
message ContainerDataProto {
@@ -390,6 +393,16 @@ message ListBlockResponseProto {
repeated BlockData blockData = 1;
}
+message EchoRequestProto {
+ optional bytes payload = 1;
+ optional int32 payloadSizeResp = 2;
+ optional int32 sleepTimeMs = 3;
+}
+
+message EchoResponseProto {
+ optional bytes payload = 1;
+}
+
// Chunk Operations
message ChunkInfo {
diff --git
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
index 499d58b1ff..7898ed76b1 100644
---
a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
+++
b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/ContainerOperationClient.java
@@ -184,7 +184,7 @@ public class ContainerOperationClient implements ScmClient {
}
}
- private String getEncodedContainerToken(long containerId) throws IOException
{
+ public String getEncodedContainerToken(long containerId) throws IOException {
if (!containerTokenEnabled) {
return "";
}
diff --git a/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot
b/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot
index 32456af488..c6ea4e6346 100644
--- a/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/freon/echoRPCLoad.robot
@@ -23,6 +23,27 @@ ${PREFIX} ${EMPTY}
${n} 1
*** Test Cases ***
+Get Container ID
+ ${result} = Execute ozone admin container create
+ ${containerID} = Execute ozone admin container list --count 1
--state=OPEN | grep -o '"containerID" *: *[^,}]*' | awk -F'[:,]' '{print $2}' |
tr -d '" '
+ Set Suite Variable ${containerID}
+
+[Read] Ozone DataNode Echo RPC Load Generator with request payload and
response payload
+ ${result} = Execute ozone freon dne -t=1 -n=${n}
--payload-req=1 --payload-resp=1 --container-id=${containerID}
+ Should contain ${result} Successful executions: ${n}
+
+[Read] Ozone DataNode Echo RPC Load Generator with request payload and empty
response payload
+ ${result} = Execute ozone freon dne -t=1 -n=${n}
--payload-req=1 --container-id=${containerID}
+ Should contain ${result} Successful executions: ${n}
+
+[Read] Ozone DataNode Echo RPC Load Generator with empty request payload and
response payload
+ ${result} = Execute ozone freon dne -t=1 -n=${n}
--payload-resp=1 --container-id=${containerID}
+ Should contain ${result} Successful executions: ${n}
+
+[Read] Ozone DataNode Echo RPC Load Generator with empty request payload and
empty response payload no sleep time one xceiver client
+ ${result} = Execute ozone freon dne -t=1 -n=${n}
--sleep-time-ms=0 --clients=1 --container-id=${containerID}
+ Should contain ${result} Successful executions: ${n}
+
[Read] Ozone Echo RPC Load Generator with request payload and response payload
${result} = Execute ozone freon ome -t=1 -n=${n}
--payload-req=1 --payload-resp=1
Should contain ${result} Successful executions: ${n}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java
index 30c4e4cd5b..5dab271d9e 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestContainerSmallFile.java
@@ -31,6 +31,8 @@ import
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -188,6 +190,23 @@ public class TestContainerSmallFile {
assertEquals("data123", readData);
xceiverClientManager.releaseClient(client, false);
}
+
+ @Test
+ public void testEcho() throws Exception {
+ ContainerWithPipeline container =
+ storageContainerLocationClient.allocateContainer(
+ SCMTestUtils.getReplicationType(ozoneConfig),
+ HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
+ XceiverClientSpi client = xceiverClientManager
+ .acquireClient(container.getPipeline());
+ ContainerProtocolCalls.createContainer(client,
+ container.getContainerInfo().getContainerID(), null);
+ ByteString byteString = UnsafeByteOperations.unsafeWrap(new byte[0]);
+ ContainerProtos.EchoResponseProto response =
+ ContainerProtocolCalls.echo(client, "",
container.getContainerInfo().getContainerID(), byteString, 1, 0);
+ assertEquals(1, response.getPayload().size());
+ xceiverClientManager.releaseClient(client, false);
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java
new file mode 100644
index 0000000000..d049a7e320
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDNRPCLoadGenerator.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.freon;
+
+import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import picocli.CommandLine;
+
+import java.time.Duration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests Freon, with MiniOzoneCluster and validate data.
+ */
+public class TestDNRPCLoadGenerator {
+
+ private static MiniOzoneCluster cluster = null;
+ private static ContainerWithPipeline container;
+
+ private static void startCluster(OzoneConfiguration conf) throws Exception {
+ DatanodeRatisServerConfig ratisServerConfig =
+ conf.getObject(DatanodeRatisServerConfig.class);
+ ratisServerConfig.setRequestTimeOut(Duration.ofSeconds(3));
+ ratisServerConfig.setWatchTimeOut(Duration.ofSeconds(10));
+ conf.setFromObject(ratisServerConfig);
+
+ RatisClientConfig.RaftConfig raftClientConfig =
+ conf.getObject(RatisClientConfig.RaftConfig.class);
+ raftClientConfig.setRpcRequestTimeout(Duration.ofSeconds(3));
+ raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(10));
+ conf.setFromObject(raftClientConfig);
+
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(5).build();
+ cluster.waitForClusterToBeReady();
+ cluster.waitForPipelineTobeReady(HddsProtos.ReplicationFactor.THREE,
+ 180000);
+
+ StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient = cluster
+ .getStorageContainerLocationClient();
+ container =
+ storageContainerLocationClient.allocateContainer(
+ SCMTestUtils.getReplicationType(conf),
+ HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
+ XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
+ XceiverClientSpi client = xceiverClientManager
+ .acquireClient(container.getPipeline());
+ ContainerProtocolCalls.createContainer(client,
+ container.getContainerInfo().getContainerID(), null);
+ }
+
+ static void shutdownCluster() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @BeforeAll
+ public static void init() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ startCluster(conf);
+ }
+
+ @AfterAll
+ public static void shutdown() {
+ shutdownCluster();
+ }
+
+ @Test
+ public void test() {
+ DNRPCLoadGenerator randomKeyGenerator =
+ new DNRPCLoadGenerator(cluster.getConf());
+ CommandLine cmd = new CommandLine(randomKeyGenerator);
+ int exitCode = cmd.execute(
+ "--container-id",
Long.toString(container.getContainerInfo().getContainerID()),
+ "--clients", "5",
+ "-t", "10");
+ assertEquals(0, exitCode);
+ }
+}
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java
new file mode 100644
index 0000000000..1d1b898a7d
--- /dev/null
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DNRPCLoadGenerator.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
+import
org.apache.hadoop.hdds.security.x509.certificate.client.CACertificateProvider;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.ratis.thirdparty.com.google.protobuf.UnsafeByteOperations;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static org.apache.hadoop.ozone.common.PayloadUtils.generatePayloadBytes;
+
+/**
+ * Utility to generate RPC request to DN.
+ */
+@Command(name = "dn-echo",
+ aliases = "dne",
+ description =
+ "Generate echo RPC request to DataNode",
+ versionProvider = HddsVersionProvider.class,
+ mixinStandardHelpOptions = true,
+ showDefaultValues = true)
+public class DNRPCLoadGenerator extends BaseFreonGenerator
+ implements Callable<Void> {
+
+ private static final int RPC_PAYLOAD_MULTIPLICATION_FACTOR = 1024;
+ private static final int MAX_SIZE_KB = 2097151;
+ private Timer timer;
+ private OzoneConfiguration configuration;
+ private ByteString payloadReqBytes;
+ private int payloadRespSize;
+ private List<XceiverClientSpi> clients;
+ private String encodedContainerToken;
+ @Option(names = {"--payload-req"},
+ description =
+ "Specifies the size of payload in KB in RPC request. ",
+ defaultValue = "0")
+ private int payloadReqSizeKB = 0;
+
+ @Option(names = {"--payload-resp"},
+ description =
+ "Specifies the size of payload in KB in RPC response. ",
+ defaultValue = "0")
+ private int payloadRespSizeKB = 0;
+
+ @Option(names = {"--container-id"},
+ description = "Send echo to DataNodes associated with this container")
+ private long containerID;
+
+ @Option(names = {"--sleep-time-ms"},
+ description = "Let DataNode to pause for a duration (in milliseconds)
for each request",
+ defaultValue = "0")
+ private int sleepTimeMs = 0;
+
+ @Option(names = {"--clients"},
+ description = "number of xceiver clients",
+ defaultValue = "1")
+ private int numClients = 1;
+
+ @CommandLine.ParentCommand
+ private Freon freon;
+
+ // empy constructor for picocli
+ DNRPCLoadGenerator() {
+ }
+
+ @VisibleForTesting
+ DNRPCLoadGenerator(OzoneConfiguration ozoneConfiguration) {
+ this.configuration = ozoneConfiguration;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ Preconditions.checkArgument(payloadReqSizeKB >= 0,
+ "OM echo request payload size should be positive value or zero.");
+ Preconditions.checkArgument(payloadRespSizeKB >= 0,
+ "OM echo response payload size should be positive value or zero.");
+
+ if (configuration == null) {
+ configuration = freon.createOzoneConfiguration();
+ }
+ ContainerOperationClient scmClient = new
ContainerOperationClient(configuration);
+ ContainerInfo containerInfo = scmClient.getContainer(containerID);
+
+ List<Pipeline> pipelineList = scmClient.listPipelines();
+ Pipeline pipeline = pipelineList.stream()
+ .filter(p -> p.getId().equals(containerInfo.getPipelineID()))
+ .findFirst()
+ .orElse(null);
+ encodedContainerToken = scmClient.getEncodedContainerToken(containerID);
+ XceiverClientFactory xceiverClientManager;
+ if (OzoneSecurityUtil.isSecurityEnabled(configuration)) {
+ CACertificateProvider caCerts = () -> HAUtils.buildCAX509List(null,
configuration);
+ xceiverClientManager = new XceiverClientManager(configuration,
+ configuration.getObject(XceiverClientManager.ScmClientConfig.class),
+ new ClientTrustManager(caCerts, null));
+ } else {
+ xceiverClientManager = new XceiverClientManager(configuration);
+ }
+ clients = new ArrayList<>(numClients);
+ for (int i = 0; i < numClients; i++) {
+ clients.add(xceiverClientManager.acquireClient(pipeline));
+ }
+
+ init();
+ payloadReqBytes =
UnsafeByteOperations.unsafeWrap(generatePayloadBytes(payloadReqSizeKB));
+ payloadRespSize = calculateMaxPayloadSize(payloadRespSizeKB);
+ timer = getMetrics().timer("rpc-payload");
+ try {
+ runTests(this::sendRPCReq);
+ } finally {
+ for (XceiverClientSpi client : clients) {
+ xceiverClientManager.releaseClient(client, false);
+ }
+ xceiverClientManager.close();
+ scmClient.close();
+ }
+ return null;
+ }
+
+ private int calculateMaxPayloadSize(int payloadSizeKB) {
+ if (payloadSizeKB > 0) {
+ return Math.min(
+ Math.toIntExact((long)payloadSizeKB *
+ RPC_PAYLOAD_MULTIPLICATION_FACTOR),
+ MAX_SIZE_KB);
+ }
+ return 0;
+ }
+
+ private void sendRPCReq(long l) throws Exception {
+ timer.time(() -> {
+ int clientIndex = (numClients == 1) ? 0 : (int)l % numClients;
+ ContainerProtos.EchoResponseProto response =
+ ContainerProtocolCalls.echo(clients.get(clientIndex),
encodedContainerToken,
+ containerID, payloadReqBytes, payloadRespSize, sleepTimeMs);
+ return null;
+ });
+ }
+}
+
+
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
index bd5510695f..349887a776 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java
@@ -73,7 +73,8 @@ import static
org.apache.hadoop.hdds.server.http.HttpServer2.setHttpBaseDir;
OzoneClientKeyReadWriteListOps.class,
RangeKeysGenerator.class,
DatanodeSimulator.class,
- OmMetadataGenerator.class
+ OmMetadataGenerator.class,
+ DNRPCLoadGenerator.class
},
versionProvider = HddsVersionProvider.class,
mixinStandardHelpOptions = true)
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java
index 958df4c11a..90807a0e6f 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/OmRPCLoadGenerator.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.ozone.freon;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
@@ -27,6 +26,8 @@ import java.util.concurrent.Callable;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
+import static org.apache.hadoop.ozone.common.PayloadUtils.generatePayloadBytes;
+
/**
* Utility to generate RPC request to OM with or without payload.
*/
@@ -88,8 +89,7 @@ public class OmRPCLoadGenerator extends BaseFreonGenerator
}
init();
- payloadReqBytes = RandomUtils.nextBytes(
- calculateMaxPayloadSize(payloadReqSizeKB));
+ payloadReqBytes = generatePayloadBytes(payloadReqSizeKB);
payloadRespSize = calculateMaxPayloadSize(payloadRespSizeKB);
timer = getMetrics().timer("rpc-payload");
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]