comnetwork commented on code in PR #4463:
URL: https://github.com/apache/hbase/pull/4463#discussion_r889490541


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java:
##########
@@ -532,82 +513,109 @@ protected boolean isPeerEnabled() {
 
   @Override
   protected void doStop() {
-    disconnect(); // don't call super.doStop()
     // Allow currently running replication tasks to finish
-    exec.shutdown();
-    try {
-      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-    }
-    // Abort if the tasks did not terminate in time
-    if (!exec.isTerminated()) {
-      String errMsg = "HBaseInterClusterReplicationEndpoint termination 
failed. The "
-        + "ThreadPoolExecutor failed to finish all tasks within " + 
maxTerminationWait + "ms. "
-        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
-      abortable.abort(errMsg, new IOException(errMsg));
-    }
+    this.stopping = true;
+    disconnect(); // don't call super.doStop()
     notifyStopped();
   }
 
-  protected int replicateEntries(List<Entry> entries, int batchIndex, int 
timeout)
-    throws IOException {
+  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, 
int batchIndex,
+    int timeout) {
+    int entriesHashCode = System.identityHashCode(entries);
+    if (LOG.isTraceEnabled()) {
+      long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+      LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}", logPeerId(),
+        entriesHashCode, entries.size(), size, replicationClusterId);
+    }
     SinkPeer sinkPeer = null;
+    final CompletableFuture<Integer> resultCompletableFuture = new 
CompletableFuture<Integer>();
     try {
-      int entriesHashCode = System.identityHashCode(entries);
-      if (LOG.isTraceEnabled()) {
-        long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}",
-          logPeerId(), entriesHashCode, entries.size(), size, 
replicationClusterId);
-      }
       sinkPeer = getReplicationSink();
       AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-      try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
-          hfileArchiveDir, timeout);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Completed replicating batch {}", logPeerId(), 
entriesHashCode);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, e);
-        }
-        throw e;
-      }
-      reportSinkSuccess(sinkPeer);
-    } catch (IOException ioe) {
+      final SinkPeer sinkPeerToUse = sinkPeer;
+      
FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+        entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
+        hfileArchiveDir, timeout), (response, exception) -> {
+          if (exception != null) {
+            onReplicateWALEntryException(entriesHashCode, exception, 
sinkPeerToUse);
+            resultCompletableFuture.completeExceptionally(exception);
+            return;
+          }
+          reportSinkSuccess(sinkPeerToUse);
+          resultCompletableFuture.complete(batchIndex);
+        });
+    } catch (Throwable e) {
+      this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
+      resultCompletableFuture.completeExceptionally(e);
+    }
+    return resultCompletableFuture;
+  }
+
+  private void onReplicateWALEntryException(int entriesHashCode, Throwable 
exception,
+    final SinkPeer sinkPeer) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, exception);
+    }
+    if (exception instanceof IOException) {
       if (sinkPeer != null) {
         reportBadSink(sinkPeer);
       }
-      throw ioe;
     }
-    return batchIndex;
   }
 
-  private int serialReplicateRegionEntries(List<Entry> entries, int 
batchIndex, int timeout)
-    throws IOException {
-    int batchSize = 0, index = 0;
+  private CompletableFuture<Integer> serialReplicateRegionEntries(
+    PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int 
timeout) {
+    if (!walEntryPeekingIterator.hasNext()) {
+      return CompletableFuture.completedFuture(batchIndex);
+    }
+    int batchSize = 0;
     List<Entry> batch = new ArrayList<>();
-    for (Entry entry : entries) {
+    while (walEntryPeekingIterator.hasNext()) {
+      Entry entry = walEntryPeekingIterator.peek();
       int entrySize = getEstimatedEntrySize(entry);
       if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
-        replicateEntries(batch, index++, timeout);
-        batch.clear();
-        batchSize = 0;
+        // replicateEntries(batch, index++, timeout);
+        // batch.clear();
+        // batchSize = 0;
+        break;
       }
+      walEntryPeekingIterator.next();
       batch.add(entry);
       batchSize += entrySize;
     }
-    if (batchSize > 0) {
-      replicateEntries(batch, index, timeout);
+
+    if (batchSize <= 0) {
+      return CompletableFuture.completedFuture(batchIndex);
     }
-    return batchIndex;
+    final CompletableFuture<Integer> resultCompletableFuture = new 
CompletableFuture<Integer>();
+    FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), 
(response, exception) -> {
+      if (exception != null) {
+        resultCompletableFuture.completeExceptionally(exception);
+        return;
+      }
+      if (!walEntryPeekingIterator.hasNext()) {
+        resultCompletableFuture.complete(batchIndex);
+        return;
+      }
+      FutureUtils.addListener(

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to