comnetwork commented on code in PR #4463:
URL: https://github.com/apache/hbase/pull/4463#discussion_r889490541
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java:
##########
@@ -532,82 +513,109 @@ protected boolean isPeerEnabled() {
@Override
protected void doStop() {
- disconnect(); // don't call super.doStop()
// Allow currently running replication tasks to finish
- exec.shutdown();
- try {
- exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- }
- // Abort if the tasks did not terminate in time
- if (!exec.isTerminated()) {
- String errMsg = "HBaseInterClusterReplicationEndpoint termination
failed. The "
- + "ThreadPoolExecutor failed to finish all tasks within " +
maxTerminationWait + "ms. "
- + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
- abortable.abort(errMsg, new IOException(errMsg));
- }
+ this.stopping = true;
+ disconnect(); // don't call super.doStop()
notifyStopped();
}
- protected int replicateEntries(List<Entry> entries, int batchIndex, int
timeout)
- throws IOException {
+ protected CompletableFuture<Integer> replicateEntries(List<Entry> entries,
int batchIndex,
+ int timeout) {
+ int entriesHashCode = System.identityHashCode(entries);
+ if (LOG.isTraceEnabled()) {
+ long size =
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+ LOG.trace("{} Replicating batch {} of {} entries with total size {}
bytes to {}", logPeerId(),
+ entriesHashCode, entries.size(), size, replicationClusterId);
+ }
SinkPeer sinkPeer = null;
+ final CompletableFuture<Integer> resultCompletableFuture = new
CompletableFuture<Integer>();
try {
- int entriesHashCode = System.identityHashCode(entries);
- if (LOG.isTraceEnabled()) {
- long size =
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
- LOG.trace("{} Replicating batch {} of {} entries with total size {}
bytes to {}",
- logPeerId(), entriesHashCode, entries.size(), size,
replicationClusterId);
- }
sinkPeer = getReplicationSink();
AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
- try {
- ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
- entries.toArray(new Entry[entries.size()]), replicationClusterId,
baseNamespaceDir,
- hfileArchiveDir, timeout);
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Completed replicating batch {}", logPeerId(),
entriesHashCode);
- }
- } catch (IOException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Failed replicating batch {}", logPeerId(),
entriesHashCode, e);
- }
- throw e;
- }
- reportSinkSuccess(sinkPeer);
- } catch (IOException ioe) {
+ final SinkPeer sinkPeerToUse = sinkPeer;
+
FutureUtils.addListener(ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+ entries.toArray(new Entry[entries.size()]), replicationClusterId,
baseNamespaceDir,
+ hfileArchiveDir, timeout), (response, exception) -> {
+ if (exception != null) {
+ onReplicateWALEntryException(entriesHashCode, exception,
sinkPeerToUse);
+ resultCompletableFuture.completeExceptionally(exception);
+ return;
+ }
+ reportSinkSuccess(sinkPeerToUse);
+ resultCompletableFuture.complete(batchIndex);
+ });
+ } catch (Throwable e) {
+ this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
+ resultCompletableFuture.completeExceptionally(e);
+ }
+ return resultCompletableFuture;
+ }
+
+ private void onReplicateWALEntryException(int entriesHashCode, Throwable
exception,
+ final SinkPeer sinkPeer) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Failed replicating batch {}", logPeerId(),
entriesHashCode, exception);
+ }
+ if (exception instanceof IOException) {
if (sinkPeer != null) {
reportBadSink(sinkPeer);
}
- throw ioe;
}
- return batchIndex;
}
- private int serialReplicateRegionEntries(List<Entry> entries, int
batchIndex, int timeout)
- throws IOException {
- int batchSize = 0, index = 0;
+ private CompletableFuture<Integer> serialReplicateRegionEntries(
+ PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int
timeout) {
+ if (!walEntryPeekingIterator.hasNext()) {
+ return CompletableFuture.completedFuture(batchIndex);
+ }
+ int batchSize = 0;
List<Entry> batch = new ArrayList<>();
- for (Entry entry : entries) {
+ while (walEntryPeekingIterator.hasNext()) {
+ Entry entry = walEntryPeekingIterator.peek();
int entrySize = getEstimatedEntrySize(entry);
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
- replicateEntries(batch, index++, timeout);
- batch.clear();
- batchSize = 0;
+ // replicateEntries(batch, index++, timeout);
+ // batch.clear();
+ // batchSize = 0;
+ break;
}
+ walEntryPeekingIterator.next();
batch.add(entry);
batchSize += entrySize;
}
- if (batchSize > 0) {
- replicateEntries(batch, index, timeout);
+
+ if (batchSize <= 0) {
+ return CompletableFuture.completedFuture(batchIndex);
}
- return batchIndex;
+ final CompletableFuture<Integer> resultCompletableFuture = new
CompletableFuture<Integer>();
+ FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout),
(response, exception) -> {
+ if (exception != null) {
+ resultCompletableFuture.completeExceptionally(exception);
+ return;
+ }
+ if (!walEntryPeekingIterator.hasNext()) {
+ resultCompletableFuture.complete(batchIndex);
+ return;
+ }
+ FutureUtils.addListener(
Review Comment:
Yes, when `HBaseInterClusterReplicationEndpoint.isSerialis` is true,it works
as you said
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]