This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f47535705 HDDS-11239. Fix KeyOutputStream's exception handling when 
calling hsync concurrently (#7047)
7f47535705 is described below

commit 7f47535705ca57b479a644538f1aeb52419df223
Author: Duong Nguyen <[email protected]>
AuthorDate: Wed Aug 21 13:32:45 2024 -0700

    HDDS-11239. Fix KeyOutputStream's exception handling when calling hsync 
concurrently (#7047)
---
 .../org/apache/hadoop/hdds/scm/ErrorInjector.java  |  31 +++++
 .../hadoop/hdds/scm/XceiverClientCreator.java      |   8 +-
 .../hadoop/hdds/scm/XceiverClientManager.java      |   1 +
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |  16 ++-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  82 ++++++-------
 .../server/ratis/ContainerStateMachine.java        |   8 +-
 .../ozone/client/io/BlockOutputStreamEntry.java    |  59 +++++++++-
 .../client/io/BlockOutputStreamEntryPool.java      |  17 +--
 .../client/io/ECBlockOutputStreamEntryPool.java    |   2 +-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  |   2 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  96 ++++++++++++---
 .../ozone/client/io/TestKeyOutputStream.java       |   4 +-
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 131 +++++++++++++++++++++
 13 files changed, 382 insertions(+), 75 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java
new file mode 100644
index 0000000000..35806967a5
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/ErrorInjector.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+
+/**
+ * Client side error injector allowing simulating receiving errors from server 
side.
+ */
+@FunctionalInterface
+public interface ErrorInjector {
+  RaftClientReply getResponse(ContainerProtos.ContainerCommandRequestProto 
request, ClientId id, Pipeline pipeline);
+}
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java
index cd46bc49a1..75ae01c100 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientCreator.java
@@ -31,6 +31,12 @@ import java.io.IOException;
  * Factory for XceiverClientSpi implementations.  Client instances are not 
cached.
  */
 public class XceiverClientCreator implements XceiverClientFactory {
+  private static ErrorInjector errorInjector;
+
+  public static void enableErrorInjection(ErrorInjector injector) {
+    errorInjector = injector;
+  }
+
   private final ConfigurationSource conf;
   private final boolean topologyAwareRead;
   private final ClientTrustManager trustManager;
@@ -60,7 +66,7 @@ public class XceiverClientCreator implements 
XceiverClientFactory {
     XceiverClientSpi client;
     switch (pipeline.getType()) {
     case RATIS:
-      client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, 
trustManager);
+      client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf, 
trustManager, errorInjector);
       break;
     case STAND_ALONE:
       client = new XceiverClientGrpc(pipeline, conf, trustManager);
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 285a47ec57..07b7044172 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -61,6 +61,7 @@ import org.slf4j.LoggerFactory;
 public class XceiverClientManager extends XceiverClientCreator {
   private static final Logger LOG =
       LoggerFactory.getLogger(XceiverClientManager.class);
+
   private final Cache<String, XceiverClientSpi> clientCache;
   private final CacheMetrics cacheMetrics;
 
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index eb0ed0a885..b0ef85cfbf 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -79,12 +79,12 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
   public static XceiverClientRatis newXceiverClientRatis(
       org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
       ConfigurationSource ozoneConf) {
-    return newXceiverClientRatis(pipeline, ozoneConf, null);
+    return newXceiverClientRatis(pipeline, ozoneConf, null, null);
   }
 
   public static XceiverClientRatis newXceiverClientRatis(
       org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
-      ConfigurationSource ozoneConf, ClientTrustManager trustManager) {
+      ConfigurationSource ozoneConf, ClientTrustManager trustManager, 
ErrorInjector errorInjector) {
     final String rpcType = ozoneConf
         .get(ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_KEY,
             ScmConfigKeys.HDDS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
@@ -93,7 +93,7 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
         SecurityConfig(ozoneConf), trustManager);
     return new XceiverClientRatis(pipeline,
         SupportedRpcType.valueOfIgnoreCase(rpcType),
-        retryPolicy, tlsConfig, ozoneConf);
+        retryPolicy, tlsConfig, ozoneConf, errorInjector);
   }
 
   private final Pipeline pipeline;
@@ -110,13 +110,14 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
       = XceiverClientManager.getXceiverClientMetrics();
   private final RaftProtos.ReplicationLevel watchType;
   private final int majority;
+  private final ErrorInjector errorInjector;
 
   /**
    * Constructs a client.
    */
   private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
       RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
-      ConfigurationSource configuration) {
+      ConfigurationSource configuration, ErrorInjector errorInjector) {
     super();
     this.pipeline = pipeline;
     this.majority = (pipeline.getReplicationConfig().getRequiredNodes() / 2) + 
1;
@@ -142,6 +143,7 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
       LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
           new Throwable("TRACE"));
     }
+    this.errorInjector = errorInjector;
   }
 
   private long updateCommitInfosMap(RaftClientReply reply, 
RaftProtos.ReplicationLevel level) {
@@ -248,6 +250,12 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
 
   private CompletableFuture<RaftClientReply> sendRequestAsync(
       ContainerCommandRequestProto request) {
+    if (errorInjector != null) {
+      RaftClientReply response = errorInjector.getResponse(request, 
getClient().getId(), pipeline);
+      if (response != null) {
+        return CompletableFuture.completedFuture(response);
+      }
+    }
     return TracingUtil.executeInNewSpan(
         "XceiverClientRatis." + request.getCmdType().name(),
         () -> {
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 43ac69818f..e88b097c49 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -102,7 +102,7 @@ public class BlockOutputStream extends OutputStream {
       = new AtomicReference<>();
 
   private final BlockData.Builder containerBlockData;
-  private XceiverClientFactory xceiverClientFactory;
+  private volatile XceiverClientFactory xceiverClientFactory;
   private XceiverClientSpi xceiverClient;
   private OzoneClientConfig config;
   private StreamBufferArgs streamBufferArgs;
@@ -216,7 +216,8 @@ public class BlockOutputStream extends OutputStream {
         this.token.encodeToUrlString();
 
     //number of buffers used before doing a flush
-    refreshCurrentBuffer();
+    currentBuffer = null;
+    currentBufferRemaining = 0;
     flushPeriod = (int) (streamBufferArgs.getStreamBufferFlushSize() / 
streamBufferArgs
         .getStreamBufferSize());
 
@@ -254,12 +255,6 @@ public class BlockOutputStream extends OutputStream {
     return true;
   }
 
-  synchronized void refreshCurrentBuffer() {
-    currentBuffer = bufferPool.getCurrentBuffer();
-    currentBufferRemaining =
-        currentBuffer != null ? currentBuffer.remaining() : 0;
-  }
-
   public BlockID getBlockID() {
     return blockID.get();
   }
@@ -418,42 +413,44 @@ public class BlockOutputStream extends OutputStream {
    * @param len length of data to write
    * @throws IOException if error occurred
    */
-
-  // In this case, the data is already cached in the currentBuffer.
   public synchronized void writeOnRetry(long len) throws IOException {
     if (len == 0) {
       return;
     }
+
+    // In this case, the data from the failing (previous) block already cached 
in the allocated buffers in
+    // the BufferPool. For each pending buffers in the BufferPool, we 
sequentially flush it and wait synchronously.
+
+    List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Retrying write length {} for blockID {}", len, blockID);
+      LOG.debug("{}: Retrying write length {} on target blockID {}, {} 
buffers", this, len, blockID,
+          allocatedBuffers.size());
     }
     Preconditions.checkArgument(len <= 
streamBufferArgs.getStreamBufferMaxSize());
     int count = 0;
-    List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
     while (len > 0) {
       ChunkBuffer buffer = allocatedBuffers.get(count);
       long writeLen = Math.min(buffer.position(), len);
-      if (!buffer.hasRemaining()) {
-        writeChunk(buffer);
-      }
       len -= writeLen;
       count++;
       writtenDataLength += writeLen;
-      // we should not call isBufferFull/shouldFlush here.
-      // The buffer might already be full as whole data is already cached in
-      // the buffer. We should just validate
-      // if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
-      // call for handling full buffer/flush buffer condition.
-      if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 
0) {
-        // reset the position to zero as now we will be reading the
-        // next buffer in the list
-        updateWriteChunkLength();
-        updatePutBlockLength();
-        CompletableFuture<PutBlockResult> putBlockResultFuture = 
executePutBlock(false, false);
-        recordWatchForCommitAsync(putBlockResultFuture);
+      updateWriteChunkLength();
+      updatePutBlockLength();
+      LOG.debug("Write chunk on retry buffer = {}", buffer);
+      CompletableFuture<PutBlockResult> putBlockFuture;
+      if (allowPutBlockPiggybacking) {
+        putBlockFuture = writeChunkAndPutBlock(buffer, false);
+      } else {
+        writeChunk(buffer);
+        putBlockFuture = executePutBlock(false, false);
       }
-      if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
-        handleFullBuffer();
+      CompletableFuture<Void> watchForCommitAsync = 
watchForCommitAsync(putBlockFuture);
+      try {
+        watchForCommitAsync.get();
+      } catch (InterruptedException e) {
+        handleInterruptedException(e, true);
+      } catch (ExecutionException e) {
+        handleExecutionException(e);
       }
     }
   }
@@ -479,14 +476,6 @@ public class BlockOutputStream extends OutputStream {
   void releaseBuffersOnException() {
   }
 
-  // It may happen that once the exception is encountered , we still might
-  // have successfully flushed up to a certain index. Make sure the buffers
-  // only contain data which have not been sufficiently replicated
-  private void adjustBuffersOnException() {
-    releaseBuffersOnException();
-    refreshCurrentBuffer();
-  }
-
   /**
    * Watch for a specific commit index.
    */
@@ -633,6 +622,9 @@ public class BlockOutputStream extends OutputStream {
   protected void handleFlush(boolean close) throws IOException {
     try {
       handleFlushInternal(close);
+      if (close) {
+        waitForAllPendingFlushes();
+      }
     } catch (ExecutionException e) {
       handleExecutionException(e);
     } catch (InterruptedException ex) {
@@ -675,6 +667,17 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
+  public void waitForAllPendingFlushes() throws IOException {
+    // When closing, must wait for all flush futures to complete.
+    try {
+      allPendingFlushFutures.get();
+    } catch (InterruptedException e) {
+      handleInterruptedException(e, true);
+    } catch (ExecutionException e) {
+      handleExecutionException(e);
+    }
+  }
+
   private synchronized CompletableFuture<Void> 
handleFlushInternalSynchronized(boolean close) throws IOException {
     CompletableFuture<PutBlockResult> putBlockResultFuture = null;
     // flush the last chunk data residing on the currentBuffer
@@ -740,6 +743,7 @@ public class BlockOutputStream extends OutputStream {
         // Preconditions.checkArgument(buffer.position() == 0);
         // bufferPool.checkBufferPoolEmpty();
       } else {
+        waitForAllPendingFlushes();
         cleanup(false);
       }
     }
@@ -783,7 +787,7 @@ public class BlockOutputStream extends OutputStream {
   void cleanup() {
   }
 
-  public void cleanup(boolean invalidateClient) {
+  public synchronized void cleanup(boolean invalidateClient) {
     if (xceiverClientFactory != null) {
       xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
     }
@@ -811,7 +815,6 @@ public class BlockOutputStream extends OutputStream {
     if (isClosed()) {
       throw new IOException("BlockOutputStream has been closed.");
     } else if (getIoException() != null) {
-      adjustBuffersOnException();
       throw getIoException();
     }
   }
@@ -1148,7 +1151,6 @@ public class BlockOutputStream extends OutputStream {
    */
   private void handleExecutionException(Exception ex) throws IOException {
     setIoException(ex);
-    adjustBuffersOnException();
     throw getIoException();
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 28b9e151ff..b3398de07a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -581,7 +581,10 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     writeChunkFuture.thenApply(r -> {
       if (r.getResult() != ContainerProtos.Result.SUCCESS
           && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
+          && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+          // After concurrent flushes are allowed on the same key, chunk file 
inconsistencies can happen and
+          // that should not crash the pipeline.
+          && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
         StorageContainerException sce =
             new StorageContainerException(r.getMessage(), r.getResult());
         LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
@@ -1061,7 +1064,8 @@ public class ContainerStateMachine extends 
BaseStateMachine {
         // unhealthy
         if (r.getResult() != ContainerProtos.Result.SUCCESS
             && r.getResult() != ContainerProtos.Result.CONTAINER_NOT_OPEN
-            && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO) {
+            && r.getResult() != ContainerProtos.Result.CLOSED_CONTAINER_IO
+            && r.getResult() != 
ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) {
           StorageContainerException sce =
               new StorageContainerException(r.getMessage(), r.getResult());
           LOG.error(
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 5e6ecceefa..2ae3e47553 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -22,6 +22,8 @@ import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
 import java.util.function.Supplier;
 
 import org.apache.hadoop.fs.Syncable;
@@ -41,6 +43,8 @@ import org.apache.hadoop.security.token.Token;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A BlockOutputStreamEntry manages the data writes into the DataNodes.
@@ -51,9 +55,9 @@ import org.apache.ratis.util.JavaUtils;
  * but there can be other implementations that are using a different way.
  */
 public class BlockOutputStreamEntry extends OutputStream {
-
+  public static final Logger LOG = 
LoggerFactory.getLogger(BlockOutputStreamEntry.class);
   private final OzoneClientConfig config;
-  private OutputStream outputStream;
+  private BlockOutputStream outputStream;
   private BlockID blockID;
   private final String key;
   private final XceiverClientFactory xceiverClientManager;
@@ -69,6 +73,18 @@ public class BlockOutputStreamEntry extends OutputStream {
   private final StreamBufferArgs streamBufferArgs;
   private final Supplier<ExecutorService> executorServiceSupplier;
 
+  /**
+   * An indicator that this BlockOutputStream is created to handoff writes 
from another faulty BlockOutputStream.
+   * Once this flag is on, this BlockOutputStream can only handle writeOnRetry.
+   */
+  private volatile boolean isHandlingRetry;
+
+  /**
+   * To record how many calls(write, flush) are being handled by this block.
+   */
+  private AtomicInteger inflightCalls = new AtomicInteger();
+
+
   BlockOutputStreamEntry(Builder b) {
     this.config = b.config;
     this.outputStream = null;
@@ -83,6 +99,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     this.clientMetrics = b.clientMetrics;
     this.streamBufferArgs = b.streamBufferArgs;
     this.executorServiceSupplier = b.executorServiceSupplier;
+    this.isHandlingRetry = b.forRetry;
   }
 
   @Override
@@ -102,6 +119,37 @@ public class BlockOutputStreamEntry extends OutputStream {
     }
   }
 
+  /** Register when a call (write or flush) is received on this block. */
+  void registerCallReceived() {
+    inflightCalls.incrementAndGet();
+  }
+
+  /**
+   * Register when a call (write or flush) is finished on this block.
+   * @return true if all the calls are done.
+   */
+  boolean registerCallFinished() {
+    return inflightCalls.decrementAndGet() == 0;
+  }
+
+  void waitForRetryHandling(Condition retryHandlingCond) throws 
InterruptedException {
+    while (isHandlingRetry) {
+      LOG.info("{} : Block to wait for retry handling.", this);
+      retryHandlingCond.await();
+      LOG.info("{} : Done waiting for retry handling.", this);
+    }
+  }
+
+  void finishRetryHandling(Condition retryHandlingCond) {
+    LOG.info("{}: Exiting retry handling mode", this);
+    isHandlingRetry = false;
+    retryHandlingCond.signalAll();
+  }
+
+  void waitForAllPendingFlushes() throws IOException {
+    outputStream.waitForAllPendingFlushes();
+  }
+
   /**
    * Creates the outputStreams that are necessary to start the write.
    * Implementors can override this to instantiate multiple streams instead.
@@ -144,6 +192,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     BlockOutputStream out = (BlockOutputStream) getOutputStream();
     out.writeOnRetry(len);
     incCurrentPosition(len);
+    LOG.info("{}: Finish retrying with len {}, currentPosition {}", this, len, 
currentPosition);
   }
 
   @Override
@@ -368,6 +417,7 @@ public class BlockOutputStreamEntry extends OutputStream {
     private ContainerClientMetrics clientMetrics;
     private StreamBufferArgs streamBufferArgs;
     private Supplier<ExecutorService> executorServiceSupplier;
+    private boolean forRetry;
 
     public Pipeline getPipeline() {
       return pipeline;
@@ -433,6 +483,11 @@ public class BlockOutputStreamEntry extends OutputStream {
       return this;
     }
 
+    public Builder setForRetry(boolean forRetry) {
+      this.forRetry = forRetry;
+      return this;
+    }
+
     public BlockOutputStreamEntry build() {
       return new BlockOutputStreamEntry(this);
     }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 99899c6874..3705a13637 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -141,7 +141,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
     // only the blocks allocated in this open session (block createVersion
     // equals to open session version)
     for (OmKeyLocationInfo subKeyInfo : version.getLocationList(openVersion)) {
-      addKeyLocationInfo(subKeyInfo);
+      addKeyLocationInfo(subKeyInfo, false);
     }
   }
 
@@ -154,7 +154,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
    *                   key to be written.
    * @return a BlockOutputStreamEntry instance that handles how data is 
written.
    */
-  BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+  BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo, 
boolean forRetry) {
     return
         new BlockOutputStreamEntry.Builder()
             .setBlockID(subKeyInfo.getBlockID())
@@ -168,12 +168,13 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
             .setClientMetrics(clientMetrics)
             .setStreamBufferArgs(streamBufferArgs)
             .setExecutorServiceSupplier(executorServiceSupplier)
+            .setForRetry(forRetry)
             .build();
   }
 
-  private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+  private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo, 
boolean forRetry) {
     Preconditions.checkNotNull(subKeyInfo.getPipeline());
-    streamEntries.add(createStreamEntry(subKeyInfo));
+    streamEntries.add(createStreamEntry(subKeyInfo, forRetry));
   }
 
   /**
@@ -295,13 +296,13 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
    *
    * @throws IOException
    */
-  private void allocateNewBlock() throws IOException {
+  private void allocateNewBlock(boolean forRetry) throws IOException {
     if (!excludeList.isEmpty()) {
       LOG.debug("Allocating block with {}", excludeList);
     }
     OmKeyLocationInfo subKeyInfo =
         omClient.allocateBlock(keyArgs, openID, excludeList);
-    addKeyLocationInfo(subKeyInfo);
+    addKeyLocationInfo(subKeyInfo, forRetry);
   }
 
   /**
@@ -379,7 +380,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
    * @return the new current open stream to write to
    * @throws IOException if the block allocation failed.
    */
-  synchronized BlockOutputStreamEntry allocateBlockIfNeeded() throws 
IOException {
+  synchronized BlockOutputStreamEntry allocateBlockIfNeeded(boolean forRetry) 
throws IOException {
     BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
     if (streamEntry != null && streamEntry.isClosed()) {
       // a stream entry gets closed either by :
@@ -391,7 +392,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
       Preconditions.checkNotNull(omClient);
       // allocate a new block, if a exception happens, log an error and
       // throw exception to the caller directly, and the write fails.
-      allocateNewBlock();
+      allocateNewBlock(forRetry);
     }
     // in theory, this condition should never violate due the check above
     // still do a sanity check.
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index 6eb9aed0d3..f891724270 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -37,7 +37,7 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
   }
 
   @Override
-  ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+  ECBlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo, 
boolean forRetry) {
     final ECBlockOutputStreamEntry.Builder b = new 
ECBlockOutputStreamEntry.Builder();
     b.setBlockID(subKeyInfo.getBlockID())
             .setKey(getKeyName())
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 0de61f8485..ea3a3592a5 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -315,7 +315,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
 
   private void writeDataCells(ECChunkBuffers stripe) throws IOException {
     final ECBlockOutputStreamEntryPool blockOutputStreamEntryPool = 
getBlockOutputStreamEntryPool();
-    blockOutputStreamEntryPool.allocateBlockIfNeeded();
+    blockOutputStreamEntryPool.allocateBlockIfNeeded(false);
     ByteBuffer[] dataCells = stripe.getDataBuffers();
     for (int i = 0; i < numDataBlks; i++) {
       if (dataCells[i].limit() > 0) {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 549607c59a..4f9e5db49a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -25,6 +25,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -58,6 +61,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
+import org.apache.ratis.util.function.CheckedRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,14 +113,28 @@ public class KeyOutputStream extends OutputStream
   private boolean atomicKeyCreation;
   private ContainerClientMetrics clientMetrics;
   private OzoneManagerVersion ozoneManagerVersion;
+  private final Lock writeLock = new ReentrantLock();
+  private final Condition retryHandlingCondition = writeLock.newCondition();
 
   private final int maxConcurrentWritePerKey;
   private final KeyOutputStreamSemaphore keyOutputStreamSemaphore;
 
+  @VisibleForTesting
   KeyOutputStreamSemaphore getRequestSemaphore() {
     return keyOutputStreamSemaphore;
   }
 
+  /** Required to spy the object in testing. */
+  @VisibleForTesting
+  @SuppressWarnings("unused")
+  KeyOutputStream() {
+    maxConcurrentWritePerKey = 0;
+    keyOutputStreamSemaphore = null;
+    blockOutputStreamEntryPool = null;
+    retryPolicyMap = null;
+    replication = null;
+  }
+
   public KeyOutputStream(ReplicationConfig replicationConfig, 
BlockOutputStreamEntryPool blockOutputStreamEntryPool) {
     this.replication = replicationConfig;
     closed = false;
@@ -187,7 +205,7 @@ public class KeyOutputStream extends OutputStream
    * @param version the set of blocks that are pre-allocated.
    * @param openVersion the version corresponding to the pre-allocation.
    */
-  public synchronized void addPreallocateBlocks(OmKeyLocationInfoGroup 
version, long openVersion) {
+  public void addPreallocateBlocks(OmKeyLocationInfoGroup version, long 
openVersion) {
     blockOutputStreamEntryPool.addPreallocateBlocks(version, openVersion);
   }
 
@@ -227,22 +245,31 @@ public class KeyOutputStream extends OutputStream
         return;
       }
 
-      synchronized (this) {
+      doInWriteLock(() -> {
         handleWrite(b, off, len, false);
         writeOffset += len;
-      }
+      });
     } finally {
       getRequestSemaphore().release();
     }
   }
 
+  private <E extends Throwable> void doInWriteLock(CheckedRunnable<E> block) 
throws E {
+    writeLock.lock();
+    try {
+      block.run();
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   @VisibleForTesting
   void handleWrite(byte[] b, int off, long len, boolean retry)
       throws IOException {
     while (len > 0) {
       try {
         BlockOutputStreamEntry current =
-            blockOutputStreamEntryPool.allocateBlockIfNeeded();
+            blockOutputStreamEntryPool.allocateBlockIfNeeded(retry);
         // length(len) will be in int range if the call is happening through
         // write API of blockOutputStream. Length can be in long range if it
         // comes via Exception path.
@@ -272,12 +299,18 @@ public class KeyOutputStream extends OutputStream
       boolean retry, long len, byte[] b, int writeLen, int off, long 
currentPos)
       throws IOException {
     try {
+      current.registerCallReceived();
       if (retry) {
         current.writeOnRetry(len);
       } else {
+        waitForRetryHandling(current);
         current.write(b, off, writeLen);
         offset += writeLen;
       }
+      current.registerCallFinished();
+    } catch (InterruptedException e) {
+      current.registerCallFinished();
+      throw new InterruptedIOException();
     } catch (IOException ioe) {
       // for the current iteration, totalDataWritten - currentPos gives the
       // amount of data already written to the buffer
@@ -295,11 +328,24 @@ public class KeyOutputStream extends OutputStream
         offset += writeLen;
       }
       LOG.debug("writeLen {}, total len {}", writeLen, len);
-      handleException(current, ioe);
+      handleException(current, ioe, retry);
     }
     return writeLen;
   }
 
+  private void handleException(BlockOutputStreamEntry entry, IOException 
exception, boolean fromRetry)
+      throws IOException {
+    doInWriteLock(() -> {
+      handleExceptionInternal(entry, exception);
+      BlockOutputStreamEntry current = 
blockOutputStreamEntryPool.getCurrentStreamEntry();
+      if (!fromRetry && entry.registerCallFinished()) {
+        // When the faulty block finishes handling all its pending call, the 
current block can exit retry
+        // handling mode and unblock normal calls.
+        current.finishRetryHandling(retryHandlingCondition);
+      }
+    });
+  }
+
   /**
    * It performs following actions :
    * a. Updates the committed length at datanode for the current stream in
@@ -310,8 +356,15 @@ public class KeyOutputStream extends OutputStream
    * @param exception   actual exception that occurred
    * @throws IOException Throws IOException if Write fails
    */
-  private synchronized void handleException(BlockOutputStreamEntry streamEntry,
-      IOException exception) throws IOException {
+  private void handleExceptionInternal(BlockOutputStreamEntry streamEntry, 
IOException exception) throws IOException {
+    try {
+      // Wait for all pending flushes in the faulty stream. It's possible that 
a prior write is pending completion
+      // successfully. Errors are ignored here and will be handled by the 
individual flush call. We just want to ensure
+      // all the pending are complete before handling exception.
+      streamEntry.waitForAllPendingFlushes();
+    } catch (IOException ignored) {
+    }
+
     Throwable t = HddsClientUtils.checkForException(exception);
     Preconditions.checkNotNull(t);
     boolean retryFailure = checkForRetryFailure(t);
@@ -338,8 +391,6 @@ public class KeyOutputStream extends OutputStream
     }
     Preconditions.checkArgument(
         bufferedDataLen <= streamBufferArgs.getStreamBufferMaxSize());
-    Preconditions.checkArgument(
-        offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen);
     long containerId = streamEntry.getBlockID().getContainerID();
     Collection<DatanodeDetails> failedServers = streamEntry.getFailedServers();
     Preconditions.checkNotNull(failedServers);
@@ -498,12 +549,12 @@ public class KeyOutputStream extends OutputStream
       final long hsyncPos = writeOffset;
       handleFlushOrClose(StreamAction.HSYNC);
 
-      synchronized (this) {
+      doInWriteLock(() -> {
         Preconditions.checkState(offset >= hsyncPos,
             "offset = %s < hsyncPos = %s", offset, hsyncPos);
         MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
             () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
-      }
+      });
     } finally {
       getRequestSemaphore().release();
     }
@@ -532,14 +583,23 @@ public class KeyOutputStream extends OutputStream
           BlockOutputStreamEntry entry =
               blockOutputStreamEntryPool.getCurrentStreamEntry();
           if (entry != null) {
+            // If the current block is to handle retries, wait until all the 
retries are done.
+            waitForRetryHandling(entry);
+            entry.registerCallReceived();
             try {
               handleStreamAction(entry, op);
+              entry.registerCallFinished();
             } catch (IOException ioe) {
-              handleException(entry, ioe);
+              handleException(entry, ioe, false);
               continue;
+            } catch (Exception e) {
+              entry.registerCallFinished();
+              throw e;
             }
           }
           return;
+        } catch (InterruptedException e) {
+          throw new InterruptedIOException();
         } catch (Exception e) {
           markStreamClosed();
           throw e;
@@ -548,6 +608,10 @@ public class KeyOutputStream extends OutputStream
     }
   }
 
+  private void waitForRetryHandling(BlockOutputStreamEntry currentEntry) 
throws InterruptedException {
+    doInWriteLock(() -> 
currentEntry.waitForRetryHandling(retryHandlingCondition));
+  }
+
   private void handleStreamAction(BlockOutputStreamEntry entry,
                                   StreamAction op) throws IOException {
     Collection<DatanodeDetails> failedServers = entry.getFailedServers();
@@ -583,7 +647,11 @@ public class KeyOutputStream extends OutputStream
    * @throws IOException
    */
   @Override
-  public synchronized void close() throws IOException {
+  public void close() throws IOException {
+    doInWriteLock(this::closeInternal);
+  }
+
+  private void closeInternal() throws IOException {
     if (closed) {
       return;
     }
@@ -781,7 +849,7 @@ public class KeyOutputStream extends OutputStream
    * the last state of the volatile {@link #closed} field.
    * @throws IOException if the connection is closed.
    */
-  private synchronized void checkNotClosed() throws IOException {
+  private void checkNotClosed() throws IOException {
     if (closed) {
       throw new IOException(
           ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
index 6b6abceff3..d90a335321 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/io/TestKeyOutputStream.java
@@ -34,7 +34,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -54,7 +54,7 @@ public class TestKeyOutputStream {
   void testConcurrentWriteLimitOne() throws Exception {
     // Verify the semaphore is working to limit the number of concurrent 
writes allowed.
     KeyOutputStreamSemaphore sema1 = new KeyOutputStreamSemaphore(1);
-    KeyOutputStream keyOutputStream = mock(KeyOutputStream.class);
+    KeyOutputStream keyOutputStream = spy(KeyOutputStream.class);
     when(keyOutputStream.getRequestSemaphore()).thenReturn(sema1);
 
     final AtomicInteger countWrite = new AtomicInteger(0);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index f27ebaa972..49b515d53c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -45,6 +45,9 @@ import org.apache.hadoop.crypto.Encryptor;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.scm.ErrorInjector;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.utils.IOUtils;
@@ -95,6 +98,11 @@ import 
org.apache.hadoop.ozone.om.service.OpenKeyCleanupService;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.MethodOrderer.OrderAnnotation;
@@ -518,6 +526,7 @@ public class TestHSync {
 
   @Test
   public void testUncommittedBlocks() throws Exception {
+    waitForEmptyDeletedTable();
     // Set the fs.defaultFS
     final String rootPath = String.format("%s://%s/",
         OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
@@ -698,6 +707,99 @@ public class TestHSync {
     }, 500, 3000);
   }
 
+
+  public static Stream<Arguments> concurrentExceptionHandling() {
+    return Stream.of(
+        Arguments.of(4, 1),
+        Arguments.of(4, 4),
+        Arguments.of(8, 4)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("concurrentExceptionHandling")
+  public void testConcurrentExceptionHandling(int syncerThreads, int errors) 
throws Exception {
+    final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, 
CONF.get(OZONE_OM_ADDRESS_KEY));
+    CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    ErrorInjectorImpl errorInjector = new ErrorInjectorImpl();
+    XceiverClientManager.enableErrorInjection(errorInjector);
+
+    final String dir = OZONE_ROOT + bucket.getVolumeName() + 
OZONE_URI_DELIMITER + bucket.getName();
+
+    try (FileSystem fs = FileSystem.get(CONF)) {
+      final Path file = new Path(dir, "exceptionhandling");
+      byte[] data = new byte[8];
+      ThreadLocalRandom.current().nextBytes(data);
+      int writes;
+      try (FSDataOutputStream out = fs.create(file, true)) {
+        writes = runConcurrentWriteHSyncWithException(file, out, data, 
syncerThreads, errors, errorInjector);
+      }
+      validateWrittenFile(file, fs, data, writes);
+      fs.delete(file, false);
+    }
+  }
+
+  private int runConcurrentWriteHSyncWithException(Path file,
+      final FSDataOutputStream out, byte[] data, int syncThreadsCount, int 
errors,
+      ErrorInjectorImpl errorInjector) throws Exception {
+
+    AtomicReference<Exception> writerException = new AtomicReference<>();
+    AtomicReference<Exception> syncerException = new AtomicReference<>();
+
+    LOG.info("runConcurrentWriteHSyncWithException {} with size {}", file, 
data.length);
+    AtomicInteger writes = new AtomicInteger();
+    final long start = Time.monotonicNow();
+
+    Runnable syncer = () -> {
+      while ((Time.monotonicNow() - start < 10000)) {
+        try {
+          out.write(data);
+          writes.incrementAndGet();
+          out.hsync();
+        } catch (Exception e) {
+          LOG.error("Error calling hsync", e);
+          syncerException.compareAndSet(null, e);
+          throw new RuntimeException(e);
+        }
+      }
+    };
+
+    Thread[] syncThreads = new Thread[syncThreadsCount];
+    for (int i = 0; i < syncThreadsCount; i++) {
+      syncThreads[i] = new Thread(syncer);
+      syncThreads[i].setName("Syncer-" + i);
+      syncThreads[i].start();
+    }
+
+    // Inject error at 3rd second.
+    Runnable startErrorInjector = () -> {
+      while ((Time.monotonicNow() - start <= 3000)) {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+      errorInjector.start(errors);
+      LOG.info("Enabled error injection in XceiverClientRatis");
+    };
+
+    new Thread(startErrorInjector).start();
+
+    for (Thread sync : syncThreads) {
+      sync.join();
+    }
+
+    if (syncerException.get() != null) {
+      throw syncerException.get();
+    }
+    if (writerException.get() != null) {
+      throw writerException.get();
+    }
+    return writes.get();
+  }
+
   private int runConcurrentWriteHSync(Path file,
       final FSDataOutputStream out, byte[] data, int syncThreadsCount) throws 
Exception {
 
@@ -1320,4 +1422,33 @@ public class TestHSync {
     }
     return keys;
   }
+
+  private static class ErrorInjectorImpl implements ErrorInjector {
+    private final AtomicInteger remaining = new AtomicInteger();
+    void start(int count) {
+      remaining.set(count);
+    }
+    @Override
+    public RaftClientReply 
getResponse(ContainerProtos.ContainerCommandRequestProto request, ClientId 
clientId,
+        Pipeline pipeline) {
+      int errorNum = remaining.decrementAndGet();
+      if (errorNum >= 0) {
+        ContainerProtos.ContainerCommandResponseProto proto = 
ContainerProtos.ContainerCommandResponseProto.newBuilder()
+            .setResult(ContainerProtos.Result.CLOSED_CONTAINER_IO)
+            .setMessage("Simulated error #" + errorNum)
+            .setCmdType(request.getCmdType())
+            .build();
+        RaftClientReply reply = RaftClientReply.newBuilder()
+            .setSuccess(true)
+            .setMessage(Message.valueOf(proto.toByteString()))
+            .setClientId(clientId)
+            
.setServerId(RaftPeerId.getRaftPeerId(pipeline.getLeaderId().toString()))
+            .setGroupId(RaftGroupId.randomId())
+            .build();
+        return reply;
+      }
+
+      return null;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to