Apache9 commented on code in PR #4463:
URL: https://github.com/apache/hbase/pull/4463#discussion_r888016362


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java:
##########
@@ -532,82 +513,109 @@ protected boolean isPeerEnabled() {
 
   @Override
   protected void doStop() {
-    disconnect(); // don't call super.doStop()
     // Allow currently running replication tasks to finish
-    exec.shutdown();
-    try {
-      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-    }
-    // Abort if the tasks did not terminate in time
-    if (!exec.isTerminated()) {
-      String errMsg = "HBaseInterClusterReplicationEndpoint termination 
failed. The "
-        + "ThreadPoolExecutor failed to finish all tasks within " + 
maxTerminationWait + "ms. "
-        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
-      abortable.abort(errMsg, new IOException(errMsg));
-    }
+    this.stopping = true;
+    disconnect(); // don't call super.doStop()
     notifyStopped();
   }
 
-  protected int replicateEntries(List<Entry> entries, int batchIndex, int 
timeout)
-    throws IOException {
+  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, 
int batchIndex,
+    int timeout) {
+    int entriesHashCode = System.identityHashCode(entries);
+    if (LOG.isTraceEnabled()) {
+      long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+      LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}", logPeerId(),
+        entriesHashCode, entries.size(), size, replicationClusterId);
+    }
     SinkPeer sinkPeer = null;
+    final CompletableFuture<Integer> resultCompletableFuture = new 
CompletableFuture<Integer>();
     try {
-      int entriesHashCode = System.identityHashCode(entries);
-      if (LOG.isTraceEnabled()) {
-        long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}",
-          logPeerId(), entriesHashCode, entries.size(), size, 
replicationClusterId);
-      }
       sinkPeer = getReplicationSink();
       AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-      try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
-          hfileArchiveDir, timeout);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Completed replicating batch {}", logPeerId(), 
entriesHashCode);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, e);
-        }
-        throw e;
-      }
-      reportSinkSuccess(sinkPeer);
-    } catch (IOException ioe) {
+      final SinkPeer sinkPeerToUse = sinkPeer;
+      
FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+        entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
+        hfileArchiveDir, timeout), (response, exception) -> {
+          if (exception != null) {
+            onReplicateWALEntryException(entriesHashCode, exception, 
sinkPeerToUse);
+            resultCompletableFuture.completeExceptionally(exception);
+            return;
+          }
+          reportSinkSuccess(sinkPeerToUse);
+          resultCompletableFuture.complete(batchIndex);
+        });
+    } catch (Throwable e) {
+      this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
+      resultCompletableFuture.completeExceptionally(e);
+    }
+    return resultCompletableFuture;
+  }
+
+  private void onReplicateWALEntryException(int entriesHashCode, Throwable 
exception,
+    final SinkPeer sinkPeer) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, exception);
+    }
+    if (exception instanceof IOException) {
       if (sinkPeer != null) {
         reportBadSink(sinkPeer);
       }
-      throw ioe;
     }
-    return batchIndex;
   }
 
-  private int serialReplicateRegionEntries(List<Entry> entries, int 
batchIndex, int timeout)
-    throws IOException {
-    int batchSize = 0, index = 0;
+  private CompletableFuture<Integer> serialReplicateRegionEntries(
+    PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int 
timeout) {
+    if (!walEntryPeekingIterator.hasNext()) {
+      return CompletableFuture.completedFuture(batchIndex);
+    }
+    int batchSize = 0;
     List<Entry> batch = new ArrayList<>();
-    for (Entry entry : entries) {
+    while (walEntryPeekingIterator.hasNext()) {
+      Entry entry = walEntryPeekingIterator.peek();
       int entrySize = getEstimatedEntrySize(entry);
       if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
-        replicateEntries(batch, index++, timeout);
-        batch.clear();
-        batchSize = 0;
+        // replicateEntries(batch, index++, timeout);

Review Comment:
   Just remove the code if not used?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java:
##########
@@ -532,82 +513,109 @@ protected boolean isPeerEnabled() {
 
   @Override
   protected void doStop() {
-    disconnect(); // don't call super.doStop()
     // Allow currently running replication tasks to finish
-    exec.shutdown();
-    try {
-      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-    }
-    // Abort if the tasks did not terminate in time
-    if (!exec.isTerminated()) {
-      String errMsg = "HBaseInterClusterReplicationEndpoint termination 
failed. The "
-        + "ThreadPoolExecutor failed to finish all tasks within " + 
maxTerminationWait + "ms. "
-        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
-      abortable.abort(errMsg, new IOException(errMsg));
-    }
+    this.stopping = true;
+    disconnect(); // don't call super.doStop()
     notifyStopped();
   }
 
-  protected int replicateEntries(List<Entry> entries, int batchIndex, int 
timeout)
-    throws IOException {
+  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, 
int batchIndex,
+    int timeout) {
+    int entriesHashCode = System.identityHashCode(entries);
+    if (LOG.isTraceEnabled()) {
+      long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+      LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}", logPeerId(),
+        entriesHashCode, entries.size(), size, replicationClusterId);
+    }
     SinkPeer sinkPeer = null;
+    final CompletableFuture<Integer> resultCompletableFuture = new 
CompletableFuture<Integer>();
     try {
-      int entriesHashCode = System.identityHashCode(entries);
-      if (LOG.isTraceEnabled()) {
-        long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}",
-          logPeerId(), entriesHashCode, entries.size(), size, 
replicationClusterId);
-      }
       sinkPeer = getReplicationSink();
       AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-      try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
-          hfileArchiveDir, timeout);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Completed replicating batch {}", logPeerId(), 
entriesHashCode);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, e);
-        }
-        throw e;
-      }
-      reportSinkSuccess(sinkPeer);
-    } catch (IOException ioe) {
+      final SinkPeer sinkPeerToUse = sinkPeer;
+      
FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+        entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
+        hfileArchiveDir, timeout), (response, exception) -> {
+          if (exception != null) {
+            onReplicateWALEntryException(entriesHashCode, exception, 
sinkPeerToUse);
+            resultCompletableFuture.completeExceptionally(exception);
+            return;
+          }
+          reportSinkSuccess(sinkPeerToUse);
+          resultCompletableFuture.complete(batchIndex);
+        });
+    } catch (Throwable e) {

Review Comment:
   Better not catch Throwable? And where do we throw exception in the above 
code? getReplicationSink? Then maybe we could give them a special try catch 
block, instead of wrap the FutureUtils.addListenrer call too?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java:
##########
@@ -394,29 +375,30 @@ List<List<Entry>> filterNotExistColumnFamilyEdits(final 
List<List<Entry>> oldEnt
     return entryList;
   }
 
-  private long parallelReplicate(CompletionService<Integer> pool, 
ReplicateContext replicateContext,
-    List<List<Entry>> batches) throws IOException {
-    int futures = 0;
+  private long parallelReplicate(ReplicateContext replicateContext, 
List<List<Entry>> batches)
+    throws IOException {
+    List<CompletableFuture<Integer>> futures =
+      new ArrayList<CompletableFuture<Integer>>(batches.size());
     for (int i = 0; i < batches.size(); i++) {
       List<Entry> entries = batches.get(i);
-      if (!entries.isEmpty()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), 
entries.size(),
-            replicateContext.getSize());
-        }
-        // RuntimeExceptions encountered here bubble up and are handled in 
ReplicationSource
-        pool.submit(createReplicator(entries, i, 
replicateContext.getTimeout()));
-        futures++;
+      if (entries.isEmpty()) {
+        continue;
       }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), 
entries.size(),
+          replicateContext.getSize());
+      }
+      // RuntimeExceptions encountered here bubble up and are handled in 
ReplicationSource
+      futures.add(createReplicator(entries, i, replicateContext.getTimeout()));
     }
 
     IOException iox = null;
     long lastWriteTime = 0;
-    for (int i = 0; i < futures; i++) {
+
+    for (CompletableFuture<Integer> f : futures) {
       try {
         // wait for all futures, remove successful parts
         // (only the remaining parts will be retried)
-        Future<Integer> f = pool.take();
         int index = f.get();

Review Comment:
   Use FutureUtils.get?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java:
##########
@@ -532,82 +513,109 @@ protected boolean isPeerEnabled() {
 
   @Override
   protected void doStop() {
-    disconnect(); // don't call super.doStop()
     // Allow currently running replication tasks to finish
-    exec.shutdown();
-    try {
-      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-    }
-    // Abort if the tasks did not terminate in time
-    if (!exec.isTerminated()) {
-      String errMsg = "HBaseInterClusterReplicationEndpoint termination 
failed. The "
-        + "ThreadPoolExecutor failed to finish all tasks within " + 
maxTerminationWait + "ms. "
-        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
-      abortable.abort(errMsg, new IOException(errMsg));
-    }
+    this.stopping = true;
+    disconnect(); // don't call super.doStop()
     notifyStopped();
   }
 
-  protected int replicateEntries(List<Entry> entries, int batchIndex, int 
timeout)
-    throws IOException {
+  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, 
int batchIndex,
+    int timeout) {
+    int entriesHashCode = System.identityHashCode(entries);
+    if (LOG.isTraceEnabled()) {
+      long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+      LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}", logPeerId(),
+        entriesHashCode, entries.size(), size, replicationClusterId);
+    }
     SinkPeer sinkPeer = null;
+    final CompletableFuture<Integer> resultCompletableFuture = new 
CompletableFuture<Integer>();
     try {
-      int entriesHashCode = System.identityHashCode(entries);
-      if (LOG.isTraceEnabled()) {
-        long size = 
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("{} Replicating batch {} of {} entries with total size {} 
bytes to {}",
-          logPeerId(), entriesHashCode, entries.size(), size, 
replicationClusterId);
-      }
       sinkPeer = getReplicationSink();
       AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-      try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
-          hfileArchiveDir, timeout);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Completed replicating batch {}", logPeerId(), 
entriesHashCode);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, e);
-        }
-        throw e;
-      }
-      reportSinkSuccess(sinkPeer);
-    } catch (IOException ioe) {
+      final SinkPeer sinkPeerToUse = sinkPeer;
+      
FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+        entries.toArray(new Entry[entries.size()]), replicationClusterId, 
baseNamespaceDir,
+        hfileArchiveDir, timeout), (response, exception) -> {
+          if (exception != null) {
+            onReplicateWALEntryException(entriesHashCode, exception, 
sinkPeerToUse);
+            resultCompletableFuture.completeExceptionally(exception);
+            return;
+          }
+          reportSinkSuccess(sinkPeerToUse);
+          resultCompletableFuture.complete(batchIndex);
+        });
+    } catch (Throwable e) {
+      this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
+      resultCompletableFuture.completeExceptionally(e);
+    }
+    return resultCompletableFuture;
+  }
+
+  private void onReplicateWALEntryException(int entriesHashCode, Throwable 
exception,
+    final SinkPeer sinkPeer) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Failed replicating batch {}", logPeerId(), 
entriesHashCode, exception);
+    }
+    if (exception instanceof IOException) {
       if (sinkPeer != null) {
         reportBadSink(sinkPeer);
       }
-      throw ioe;
     }
-    return batchIndex;
   }
 
-  private int serialReplicateRegionEntries(List<Entry> entries, int 
batchIndex, int timeout)
-    throws IOException {
-    int batchSize = 0, index = 0;
+  private CompletableFuture<Integer> serialReplicateRegionEntries(
+    PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int 
timeout) {
+    if (!walEntryPeekingIterator.hasNext()) {
+      return CompletableFuture.completedFuture(batchIndex);
+    }
+    int batchSize = 0;
     List<Entry> batch = new ArrayList<>();
-    for (Entry entry : entries) {
+    while (walEntryPeekingIterator.hasNext()) {
+      Entry entry = walEntryPeekingIterator.peek();
       int entrySize = getEstimatedEntrySize(entry);
       if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
-        replicateEntries(batch, index++, timeout);
-        batch.clear();
-        batchSize = 0;
+        // replicateEntries(batch, index++, timeout);
+        // batch.clear();
+        // batchSize = 0;
+        break;
       }
+      walEntryPeekingIterator.next();
       batch.add(entry);
       batchSize += entrySize;
     }
-    if (batchSize > 0) {
-      replicateEntries(batch, index, timeout);
+
+    if (batchSize <= 0) {
+      return CompletableFuture.completedFuture(batchIndex);
     }
-    return batchIndex;
+    final CompletableFuture<Integer> resultCompletableFuture = new 
CompletableFuture<Integer>();
+    FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), 
(response, exception) -> {
+      if (exception != null) {
+        resultCompletableFuture.completeExceptionally(exception);
+        return;
+      }
+      if (!walEntryPeekingIterator.hasNext()) {
+        resultCompletableFuture.complete(batchIndex);
+        return;
+      }
+      FutureUtils.addListener(

Review Comment:
   So the algo here is to iterator over the list, once we reached a batch, send 
it out, and in the callback, we send the next batch, until we send all entries 
out?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to