This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 083a45ee0b Revert "HDDS-10384. RPC client Reusing thread resources.
(#6270)" (#6277)
083a45ee0b is described below
commit 083a45ee0b774c62a481d029d3de3812ccc63e2c
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Feb 27 08:29:22 2024 +0100
Revert "HDDS-10384. RPC client Reusing thread resources. (#6270)" (#6277)
This reverts commit 84c6e4d861d907d1ac39e252aa97e8a512ef247b.
---
.../hadoop/hdds/scm/storage/AbstractCommitWatcher.java | 2 +-
.../apache/hadoop/hdds/scm/storage/BlockOutputStream.java | 5 ++++-
.../ec/reconstruction/ECReconstructionCoordinator.java | 3 ++-
.../org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java | 11 +++++++----
.../java/org/apache/hadoop/ozone/client/rpc/RpcClient.java | 3 ++-
.../hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java | 12 +++---------
6 files changed, 19 insertions(+), 17 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index 957f761ccb..0c5501c792 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -73,7 +73,7 @@ abstract class AbstractCommitWatcher<BUFFER> {
return commitIndexMap;
}
- synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
+ void updateCommitInfoMap(long index, List<BUFFER> buffers) {
commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
.addAll(buffers);
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 5c0516d7bd..5ff5da6098 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -181,7 +182,8 @@ public class BlockOutputStream extends OutputStream {
(long) flushPeriod * streamBufferArgs.getStreamBufferSize() ==
streamBufferArgs
.getStreamBufferFlushSize());
- this.responseExecutor = blockOutputStreamResourceProvider.get();
+ // A single thread executor handle the responses of async requests
+ responseExecutor = Executors.newSingleThreadExecutor();
bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
@@ -655,6 +657,7 @@ public class BlockOutputStream extends OutputStream {
bufferList.clear();
}
bufferList = null;
+ responseExecutor.shutdown();
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 90756bbc88..a45c158448 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -101,7 +101,8 @@ public class ECReconstructionCoordinator implements
Closeable {
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
- private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;
+ // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer
thread pool is used.
+ private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;
private final ECContainerOperationClient containerOperationClient;
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 0cb3973e04..878558073f 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -43,6 +43,8 @@ import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -64,6 +66,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
private final int numParityBlks;
private final ByteBufferPool bufferPool;
private final RawErasureEncoder encoder;
+ private final ExecutorService flushExecutor;
private final Future<Boolean> flushFuture;
private final AtomicLong flushCheckpoint;
@@ -116,13 +119,12 @@ public final class ECKeyOutputStream extends
KeyOutputStream
this.writeOffset = 0;
this.encoder = CodecUtil.createRawEncoderWithFallback(
builder.getReplicationConfig());
+ this.flushExecutor = Executors.newSingleThreadExecutor();
S3Auth s3Auth = builder.getS3CredentialsProvider().get();
ThreadLocal<S3Auth> s3CredentialsProvider =
builder.getS3CredentialsProvider();
- this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() ->
{
- s3CredentialsProvider.set(s3Auth);
- return flushStripeFromQueue();
- });
+ flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
+ this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
this.flushCheckpoint = new AtomicLong(0);
this.atomicKeyCreation = builder.getAtomicKeyCreation();
}
@@ -493,6 +495,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
} catch (InterruptedException e) {
throw new IOException("Flushing thread was interrupted", e);
} finally {
+ flushExecutor.shutdownNow();
closeCurrentStreamEntry();
blockOutputStreamEntryPool.cleanup();
}
diff --git
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index a6830ba9f7..74b22e7ca4 100644
---
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -196,7 +196,8 @@ public class RpcClient implements ClientProtocol {
// for reconstruction.
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
- private static final int WRITE_POOL_MIN_SIZE = 1;
+ // TODO: Adjusts to the appropriate value when the writeThreadPool is used.
+ private static final int WRITE_POOL_MIN_SIZE = 0;
private final ConfigurationSource conf;
private final OzoneManagerClientProtocol ozoneManagerClient;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index 44303ed2ff..29cf1bc5e1 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -213,14 +213,6 @@ class TestOzoneAtRestEncryption {
}
}
- static void reInitClient() throws IOException {
- ozClient = OzoneClientFactory.getRpcClient(conf);
- store = ozClient.getObjectStore();
- TestOzoneRpcClient.setOzClient(ozClient);
- TestOzoneRpcClient.setStore(store);
- }
-
-
@ParameterizedTest
@EnumSource
void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception {
@@ -778,7 +770,9 @@ class TestOzoneAtRestEncryption {
KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider();
assertNotEquals(kp3, kpSpy);
- reInitClient();
+ // Restore ozClient and store
+ TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf));
+ TestOzoneRpcClient.setStore(ozClient.getObjectStore());
}
private static RepeatedOmKeyInfo getMatchedKeyInfo(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]