This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 083a45ee0b Revert "HDDS-10384. RPC client Reusing thread resources. 
(#6270)" (#6277)
083a45ee0b is described below

commit 083a45ee0b774c62a481d029d3de3812ccc63e2c
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Feb 27 08:29:22 2024 +0100

    Revert "HDDS-10384. RPC client Reusing thread resources. (#6270)" (#6277)
    
    This reverts commit 84c6e4d861d907d1ac39e252aa97e8a512ef247b.
---
 .../hadoop/hdds/scm/storage/AbstractCommitWatcher.java       |  2 +-
 .../apache/hadoop/hdds/scm/storage/BlockOutputStream.java    |  5 ++++-
 .../ec/reconstruction/ECReconstructionCoordinator.java       |  3 ++-
 .../org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java | 11 +++++++----
 .../java/org/apache/hadoop/ozone/client/rpc/RpcClient.java   |  3 ++-
 .../hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java   | 12 +++---------
 6 files changed, 19 insertions(+), 17 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index 957f761ccb..0c5501c792 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -73,7 +73,7 @@ abstract class AbstractCommitWatcher<BUFFER> {
     return commitIndexMap;
   }
 
-  synchronized void updateCommitInfoMap(long index, List<BUFFER> buffers) {
+  void updateCommitInfoMap(long index, List<BUFFER> buffers) {
     commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
         .addAll(buffers);
   }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 5c0516d7bd..5ff5da6098 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -181,7 +182,8 @@ public class BlockOutputStream extends OutputStream {
             (long) flushPeriod * streamBufferArgs.getStreamBufferSize() == 
streamBufferArgs
                 .getStreamBufferFlushSize());
 
-    this.responseExecutor = blockOutputStreamResourceProvider.get();
+    // A single thread executor handle the responses of async requests
+    responseExecutor = Executors.newSingleThreadExecutor();
     bufferList = null;
     totalDataFlushedLength = 0;
     writtenDataLength = 0;
@@ -655,6 +657,7 @@ public class BlockOutputStream extends OutputStream {
       bufferList.clear();
     }
     bufferList = null;
+    responseExecutor.shutdown();
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 90756bbc88..a45c158448 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -101,7 +101,8 @@ public class ECReconstructionCoordinator implements 
Closeable {
 
   private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
 
-  private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;
+  // TODO: Adjusts to the appropriate value when the ec-reconstruct-writer 
thread pool is used.
+  private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 0;
 
   private final ECContainerOperationClient containerOperationClient;
 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 0cb3973e04..878558073f 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -43,6 +43,8 @@ import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -64,6 +66,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
   private final int numParityBlks;
   private final ByteBufferPool bufferPool;
   private final RawErasureEncoder encoder;
+  private final ExecutorService flushExecutor;
   private final Future<Boolean> flushFuture;
   private final AtomicLong flushCheckpoint;
 
@@ -116,13 +119,12 @@ public final class ECKeyOutputStream extends 
KeyOutputStream
     this.writeOffset = 0;
     this.encoder = CodecUtil.createRawEncoderWithFallback(
         builder.getReplicationConfig());
+    this.flushExecutor = Executors.newSingleThreadExecutor();
     S3Auth s3Auth = builder.getS3CredentialsProvider().get();
     ThreadLocal<S3Auth> s3CredentialsProvider =
         builder.getS3CredentialsProvider();
-    this.flushFuture = builder.getExecutorServiceSupplier().get().submit(() -> 
{
-      s3CredentialsProvider.set(s3Auth);
-      return flushStripeFromQueue();
-    });
+    flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
+    this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
     this.flushCheckpoint = new AtomicLong(0);
     this.atomicKeyCreation = builder.getAtomicKeyCreation();
   }
@@ -493,6 +495,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
     } catch (InterruptedException e) {
       throw new IOException("Flushing thread was interrupted", e);
     } finally {
+      flushExecutor.shutdownNow();
       closeCurrentStreamEntry();
       blockOutputStreamEntryPool.cleanup();
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index a6830ba9f7..74b22e7ca4 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -196,7 +196,8 @@ public class RpcClient implements ClientProtocol {
   // for reconstruction.
   private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
 
-  private static final int WRITE_POOL_MIN_SIZE = 1;
+  // TODO: Adjusts to the appropriate value when the writeThreadPool is used.
+  private static final int WRITE_POOL_MIN_SIZE = 0;
 
   private final ConfigurationSource conf;
   private final OzoneManagerClientProtocol ozoneManagerClient;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index 44303ed2ff..29cf1bc5e1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -213,14 +213,6 @@ class TestOzoneAtRestEncryption {
     }
   }
 
-  static void reInitClient() throws IOException {
-    ozClient = OzoneClientFactory.getRpcClient(conf);
-    store = ozClient.getObjectStore();
-    TestOzoneRpcClient.setOzClient(ozClient);
-    TestOzoneRpcClient.setStore(store);
-  }
-
-
   @ParameterizedTest
   @EnumSource
   void testPutKeyWithEncryption(BucketLayout bucketLayout) throws Exception {
@@ -778,7 +770,9 @@ class TestOzoneAtRestEncryption {
 
     KeyProvider kp3 = ozClient.getObjectStore().getKeyProvider();
     assertNotEquals(kp3, kpSpy);
-    reInitClient();
+    // Restore ozClient and store
+    TestOzoneRpcClient.setOzClient(OzoneClientFactory.getRpcClient(conf));
+    TestOzoneRpcClient.setStore(ozClient.getObjectStore());
   }
 
   private static RepeatedOmKeyInfo getMatchedKeyInfo(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to