adoroszlai commented on code in PR #9461:
URL: https://github.com/apache/ozone/pull/9461#discussion_r2606482945
##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java:
##########
@@ -694,6 +694,27 @@ public final class OzoneConfigKeys {
"ozone.client.elastic.byte.buffer.pool.max.size";
public static final String
OZONE_CLIENT_ELASTIC_BYTE_BUFFER_POOL_MAX_SIZE_DEFAULT = "16GB";
+ public static final String
+ OZONE_CLIENT_STREAM_READ_PRE_READ_SIZE =
+ "ozone.client.stream.read.pre-read-size";
+ public static final long
+ OZONE_CLIENT_STREAM_READ_PRE_READ_SIZE_DEFAULT =
+ 32L << 20;
+
+ public static final String
+ OZONE_CLIENT_STREAM_READ_RESPONSE_DATA_SIZE =
+ "ozone.client.stream.read.response-data-size";
+ public static final int
+ OZONE_CLIENT_STREAM_READ_RESPONSE_DATA_SIZE_DEFAULT =
+ 1 << 20;
+
+ public static final String
+ OZONE_CLIENT_STREAM_READ_TIMEOUT =
+ "ozone.client.stream.read.timeout";
+ public static final int
+ OZONE_CLIENT_STREAM_READ_TIMEOUT_DEFAULT =
+ 10_000;
+
Review Comment:
Sorry, if my original comment was confusing. Please remove these, too.
```suggestion
```
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java:
##########
@@ -280,4 +289,38 @@ private void testReadEmptyBlock() throws Exception {
assertEquals(-1, keyInputStream.read());
}
}
+
+ @Test
+ public void testCustomStreamReadConfigIsApplied() throws Exception {
+ // Arrange: create a config with non-default values
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set("ozone.client.stream.read.pre-read-size", "67108864");
+ conf.set("ozone.client.stream.read.response-data-size", "2097152");
+ conf.set("ozone.client.stream.read.timeout", "5s");
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+
+ // Sanity check
+ assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout());
+ // Create a dummy BlockID for the test
+ BlockID blockID = new BlockID(1L, 1L);
+ long length = 1024L;
+ // Use Mockito to create a mock Pipeline instance.
+ Pipeline pipeline = Mockito.mock(Pipeline.class);
+
+ Token<OzoneBlockTokenIdentifier> token = null;
+ // Mock XceiverClientFactory since StreamBlockInputStream requires it in
the constructor
+ XceiverClientFactory xceiverClientFactory =
Mockito.mock(XceiverClientFactory.class);
Review Comment:
nit: please add `import static org.mockito.Mockito.mock;`
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java:
##########
@@ -399,7 +417,16 @@ private ByteBuffer read(int length, boolean preRead)
throws IOException {
}
ByteBuffer readFromQueue() throws IOException {
- final ReadBlockResponseProto readBlock = poll(10, TimeUnit.SECONDS);
+ // Convert Duration -> int seconds for poll(...)
+ final int timeoutSeconds;
+ if (readTimeout == null || readTimeout.isZero() ||
readTimeout.isNegative()) {
+ timeoutSeconds = 0;
+ } else {
+ long sec = readTimeout.getSeconds();
+ // Prevent overflow if client config is extremely large
+ timeoutSeconds = sec > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int)
sec;
+ }
+ final ReadBlockResponseProto readBlock = poll(timeoutSeconds,
TimeUnit.SECONDS);
Review Comment:
- Conversion to seconds is unnecessary, since `poll` then converts to
nanoseconds.
- Validation should be done in `OzoneClientConfig#validate()`
- Calculate `timeoutNanos` in `StreamBlockInputStream` constructor as
`config.getStreamReadTimeout().toNanos()`. `poll` parameters can be removed.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java:
##########
@@ -280,4 +289,38 @@ private void testReadEmptyBlock() throws Exception {
assertEquals(-1, keyInputStream.read());
}
}
+
+ @Test
+ public void testCustomStreamReadConfigIsApplied() throws Exception {
+ // Arrange: create a config with non-default values
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set("ozone.client.stream.read.pre-read-size", "67108864");
+ conf.set("ozone.client.stream.read.response-data-size", "2097152");
+ conf.set("ozone.client.stream.read.timeout", "5s");
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
Review Comment:
`OzoneConfiguration` is not required, please create `OzoneClientConfig`
directly and set values using its methods.
```java
OzoneClientConfig clientConfig = new OzoneClientConfig();
clientConfig.set...(...);
```
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java:
##########
@@ -280,4 +289,38 @@ private void testReadEmptyBlock() throws Exception {
assertEquals(-1, keyInputStream.read());
}
}
+
+ @Test
+ public void testCustomStreamReadConfigIsApplied() throws Exception {
+ // Arrange: create a config with non-default values
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set("ozone.client.stream.read.pre-read-size", "67108864");
+ conf.set("ozone.client.stream.read.response-data-size", "2097152");
+ conf.set("ozone.client.stream.read.timeout", "5s");
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+
+ // Sanity check
+ assertEquals(Duration.ofSeconds(5), clientConfig.getStreamReadTimeout());
Review Comment:
To test whether `OzoneClientConfig` applies the new config keys correctly,
add test case in existing `TestOzoneClientConfig`.
##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java:
##########
@@ -280,4 +289,38 @@ private void testReadEmptyBlock() throws Exception {
assertEquals(-1, keyInputStream.read());
}
}
+
+ @Test
+ public void testCustomStreamReadConfigIsApplied() throws Exception {
Review Comment:
This should be a unit test, since it does not require a working cluster.
Please move to new test class
`hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestStreamBlockInputStream.java`.
##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamBlockInputStream.java:
##########
@@ -70,8 +71,9 @@ public class StreamBlockInputStream extends
BlockExtendedInputStream {
private final String name = "stream" + STREAM_ID.getAndIncrement();
private final BlockID blockID;
private final long blockLength;
- private final int responseDataSize = 1 << 20; // 1 MB
- private final long preReadSize = 32 << 20; // 32 MB
+ private final int responseDataSize; // Default size is 1 MB
+ private final long preReadSize; // Default size is 32 MB
+ private final Duration readTimeout; // // Default timeout is 10 second
Review Comment:
Please remove the comments, they easily get outdated. Default values can be
checked in the config.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]