Apache9 commented on code in PR #4463:
URL: https://github.com/apache/hbase/pull/4463#discussion_r889504508


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java:
##########
@@ -532,82 +513,109 @@ protected boolean isPeerEnabled() {
 
   @Override
   protected void doStop() {
-    disconnect(); // don't call super.doStop()
     // Allow currently running replication tasks to finish
-    exec.shutdown();
-    try {
-      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-    }
-    // Abort if the tasks did not terminate in time
-    if (!exec.isTerminated()) {
-      String errMsg = "HBaseInterClusterReplicationEndpoint termination 
failed. The "
-        + "ThreadPoolExecutor failed to finish all tasks within " + 
maxTerminationWait + "ms. "
-        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
-      abortable.abort(errMsg, new IOException(errMsg));
-    }
+    this.stopping = true;
+    disconnect(); // don't call super.doStop()
     notifyStopped();
   }
 
-  protected int replicateEntries(List<Entry> entries, int batchIndex, int 
timeout)
-    throws IOException {
+  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, 
int batchIndex,
+    int timeout) {
+    int entriesHashCode = System.identityHashCode(entries);
+    if (LOG.isTraceEnabled()) {
+      long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+      LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}", logPeerId(),
+        entriesHashCode, entries.size(), size, replicationClusterId);
+    }
     SinkPeer sinkPeer = null;
+    final CompletableFuture<Integer> resultCompletableFuture = new 
CompletableFuture<Integer>();
     try {
-      int entriesHashCode = System.identityHashCode(entries);
-      if (LOG.isTraceEnabled()) {
-        long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}",
-          logPeerId(), entriesHashCode, entries.size(), size, 
replicationClusterId);
-      }
       sinkPeer = getReplicationSink();
       AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-      try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
-          hfileArchiveDir, timeout);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Completed replicating batch {}", logPeerId(), 
entriesHashCode);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, e);
-        }
-        throw e;
-      }
-      reportSinkSuccess(sinkPeer);
-    } catch (IOException ioe) {
+      final SinkPeer sinkPeerToUse = sinkPeer;
+      
FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+        entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
+        hfileArchiveDir, timeout), (response, exception) -> {
+          if (exception != null) {
+            onReplicateWALEntryException(entriesHashCode, exception, 
sinkPeerToUse);
+            resultCompletableFuture.completeExceptionally(exception);
+            return;
+          }
+          reportSinkSuccess(sinkPeerToUse);
+          resultCompletableFuture.complete(batchIndex);
+        });
+    } catch (Throwable e) {

Review Comment:
   RuntimeException is also an exception so catch Exception is enough :)
   
   Usually you should not catch Throwable in code, as you may also catch 
something like OOME.
   
   FutureUtils.addListener is designed not throw any exception, so usually you 
do not need to wrap it inside a try catch. Unless you are doing something like 
a background loop where you wrap everything in it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to