comnetwork commented on code in PR #4463:
URL: https://github.com/apache/hbase/pull/4463#discussion_r889492503


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java:
##########
@@ -532,82 +513,109 @@ protected boolean isPeerEnabled() {
 
   @Override
   protected void doStop() {
-    disconnect(); // don't call super.doStop()
     // Allow currently running replication tasks to finish
-    exec.shutdown();
-    try {
-      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-    }
-    // Abort if the tasks did not terminate in time
-    if (!exec.isTerminated()) {
-      String errMsg = "HBaseInterClusterReplicationEndpoint termination 
failed. The "
-        + "ThreadPoolExecutor failed to finish all tasks within " + 
maxTerminationWait + "ms. "
-        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
-      abortable.abort(errMsg, new IOException(errMsg));
-    }
+    this.stopping = true;
+    disconnect(); // don't call super.doStop()
     notifyStopped();
   }
 
-  protected int replicateEntries(List<Entry> entries, int batchIndex, int 
timeout)
-    throws IOException {
+  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, 
int batchIndex,
+    int timeout) {
+    int entriesHashCode = System.identityHashCode(entries);
+    if (LOG.isTraceEnabled()) {
+      long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+      LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}", logPeerId(),
+        entriesHashCode, entries.size(), size, replicationClusterId);
+    }
     SinkPeer sinkPeer = null;
+    final CompletableFuture<Integer> resultCompletableFuture = new 
CompletableFuture<Integer>();
     try {
-      int entriesHashCode = System.identityHashCode(entries);
-      if (LOG.isTraceEnabled()) {
-        long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}",
-          logPeerId(), entriesHashCode, entries.size(), size, 
replicationClusterId);
-      }
       sinkPeer = getReplicationSink();
       AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-      try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
-          hfileArchiveDir, timeout);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Completed replicating batch {}", logPeerId(), 
entriesHashCode);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, e);
-        }
-        throw e;
-      }
-      reportSinkSuccess(sinkPeer);
-    } catch (IOException ioe) {
+      final SinkPeer sinkPeerToUse = sinkPeer;
+      
FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+        entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
+        hfileArchiveDir, timeout), (response, exception) -> {
+          if (exception != null) {
+            onReplicateWALEntryException(entriesHashCode, exception, 
sinkPeerToUse);
+            resultCompletableFuture.completeExceptionally(exception);
+            return;
+          }
+          reportSinkSuccess(sinkPeerToUse);
+          resultCompletableFuture.complete(batchIndex);
+        });
+    } catch (Throwable e) {

Review Comment:
   @Apache9 , yes, `getReplicationSink` throws `IOException`, I have fixed as 
you suggestion, but mind explain a bit more about why here we not catch 
Throwable and not wrap `FutureUtils.addListener` ?  I think other operation may 
also throw `RuntimeException` so we catch `Throwable` and call 
`resultCompletableFuture.completeExceptionally` if there is exception is more 
safe? I'm a little confusing...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to