This is an automated email from the ASF dual-hosted git repository. chenglei pushed a commit to branch revert-4463-removetp in repository https://gitbox.apache.org/repos/asf/hbase.git
commit eb3c15ee106e7410f35bd1e9f57f5f86927cc85d Author: chenglei <[email protected]> AuthorDate: Wed Jun 22 19:36:25 2022 +0800 Revert "HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplication… (#4463)" This reverts commit 34ba2c51cf6c201ad65d3ddaf4377f81ed354965. --- .../hbase/protobuf/ReplicationProtobufUtil.java | 11 +- .../HBaseInterClusterReplicationEndpoint.java | 211 ++++++++++----------- .../hbase/replication/SyncReplicationTestBase.java | 9 +- .../hbase/replication/TestReplicationEndpoint.java | 13 +- .../replication/regionserver/TestReplicator.java | 35 ++-- .../TestSerialReplicationEndpoint.java | 11 +- 6 files changed, 136 insertions(+), 154 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java index cfdf0e12c85..c2e96ead6f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; @@ -29,6 +28,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; import org.apache.hadoop.hbase.io.SizedCellScanner; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; @@ -37,7 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @@ -53,12 +52,12 @@ public class ReplicationProtobufUtil { * @param sourceBaseNamespaceDir Path to source cluster base namespace directory * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory */ - public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry( - AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId, - Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) { + public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, + int timeout) throws IOException { Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); - return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout); + FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 39e68bf9eb4..cec360a4c97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -29,11 +29,18 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; @@ -50,7 +57,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.ipc.RemoteException; @@ -58,8 +65,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; -import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating @@ -76,6 +82,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private static final Logger LOG = LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class); + private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; + /** Drop edits for tables that been deleted from the replication source and target */ public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY = "hbase.replication.drop.on.deleted.table"; @@ -89,22 +97,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; + // Amount of time for shutdown to wait for all tasks to complete + private long maxTerminationWait; // Size limit for replication RPCs, in bytes private int replicationRpcLimit; // Metrics for this source private MetricsSource metrics; private boolean peersSelected = false; private String replicationClusterId = ""; + private ThreadPoolExecutor exec; private int maxThreads; private Path baseNamespaceDir; private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; + private Abortable abortable; private boolean dropOnDeletedTables; private boolean dropOnDeletedColumnFamilies; private boolean isSerial = false; // Initialising as 0 to guarantee at least one logging message private long lastSinkFetchTime = 0; - private volatile boolean stopping = false; @Override public void init(Context context) throws IOException { @@ -113,11 +124,20 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); + // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator + // tasks to terminate when doStop() is called. + long maxTerminationWaitMultiplier = this.conf.getLong( + "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); + this.maxTerminationWait = maxTerminationWaitMultiplier + * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); // per sink thread pool this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT); + this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build()); + this.abortable = ctx.getAbortable(); // Set the size limit for replication RPCs to 95% of the max request size. // We could do with less slop if we have an accurate estimate of encoded size. Being // conservative for now. @@ -374,31 +394,30 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return entryList; } - private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches) - throws IOException { - List<CompletableFuture<Integer>> futures = - new ArrayList<CompletableFuture<Integer>>(batches.size()); + private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext, + List<List<Entry>> batches) throws IOException { + int futures = 0; for (int i = 0; i < batches.size(); i++) { List<Entry> entries = batches.get(i); - if (entries.isEmpty()) { - continue; - } - if (LOG.isTraceEnabled()) { - LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), - replicateContext.getSize()); + if (!entries.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(), + replicateContext.getSize()); + } + // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource + pool.submit(createReplicator(entries, i, replicateContext.getTimeout())); + futures++; } - // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource - futures.add(asyncReplicate(entries, i, replicateContext.getTimeout())); } IOException iox = null; long lastWriteTime = 0; - - for (CompletableFuture<Integer> f : futures) { + for (int i = 0; i < futures; i++) { try { // wait for all futures, remove successful parts // (only the remaining parts will be retried) - int index = FutureUtils.get(f); + Future<Integer> f = pool.take(); + int index = f.get(); List<Entry> batch = batches.get(index); batches.set(index, Collections.emptyList()); // remove successful batch // Find the most recent write time in the batch @@ -406,10 +425,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi if (writeTime > lastWriteTime) { lastWriteTime = writeTime; } - } catch (IOException e) { - iox = e; - } catch (RuntimeException e) { - iox = new IOException(e); + } catch (InterruptedException ie) { + iox = new IOException(ie); + } catch (ExecutionException ee) { + iox = ee.getCause() instanceof IOException + ? (IOException) ee.getCause() + : new IOException(ee.getCause()); } } if (iox != null) { @@ -424,6 +445,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi */ @Override public boolean replicate(ReplicateContext replicateContext) { + CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec); int sleepMultiplier = 1; if (!peersSelected && this.isRunning()) { @@ -446,7 +468,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } List<List<Entry>> batches = createBatches(replicateContext.getEntries()); - while (this.isRunning() && !this.stopping) { + while (this.isRunning() && !exec.isShutdown()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; @@ -455,7 +477,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } try { // replicate the batches to sink side. - parallelReplicate(replicateContext, batches); + parallelReplicate(pool, replicateContext, batches); return true; } catch (IOException ioe) { if (ioe instanceof RemoteException) { @@ -510,117 +532,82 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi @Override protected void doStop() { - // Allow currently running replication tasks to finish - this.stopping = true; disconnect(); // don't call super.doStop() + // Allow currently running replication tasks to finish + exec.shutdown(); + try { + exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + // Abort if the tasks did not terminate in time + if (!exec.isTerminated()) { + String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " + + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " + + "Aborting to prevent Replication from deadlocking. See HBASE-16081."; + abortable.abort(errMsg, new IOException(errMsg)); + } notifyStopped(); } - protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex, - int timeout) { - int entriesHashCode = System.identityHashCode(entries); - if (LOG.isTraceEnabled()) { - long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); - LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(), - entriesHashCode, entries.size(), size, replicationClusterId); - } + protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout) + throws IOException { SinkPeer sinkPeer = null; - final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>(); try { + int entriesHashCode = System.identityHashCode(entries); + if (LOG.isTraceEnabled()) { + long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum(); + LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", + logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId); + } sinkPeer = getReplicationSink(); - } catch (IOException e) { - this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer); - resultCompletableFuture.completeExceptionally(e); - return resultCompletableFuture; - } - assert sinkPeer != null; - AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); - final SinkPeer sinkPeerToUse = sinkPeer; - FutureUtils.addListener( - ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]), - replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout), - (response, exception) -> { - if (exception != null) { - onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse); - resultCompletableFuture.completeExceptionally(exception); - return; + AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer(); + try { + ReplicationProtobufUtil.replicateWALEntry(rsAdmin, + entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, + hfileArchiveDir, timeout); + if (LOG.isTraceEnabled()) { + LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); } - reportSinkSuccess(sinkPeerToUse); - resultCompletableFuture.complete(batchIndex); - }); - return resultCompletableFuture; - } - - private void onReplicateWALEntryException(int entriesHashCode, Throwable exception, - final SinkPeer sinkPeer) { - if (LOG.isTraceEnabled()) { - LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception); - } - if (exception instanceof IOException) { + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e); + } + throw e; + } + reportSinkSuccess(sinkPeer); + } catch (IOException ioe) { if (sinkPeer != null) { reportBadSink(sinkPeer); } + throw ioe; } + return batchIndex; } - /** - * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the - * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we - * send the next batch, until we send all entries out. - */ - private CompletableFuture<Integer> serialReplicateRegionEntries( - PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) { - if (!walEntryPeekingIterator.hasNext()) { - return CompletableFuture.completedFuture(batchIndex); - } - int batchSize = 0; + private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout) + throws IOException { + int batchSize = 0, index = 0; List<Entry> batch = new ArrayList<>(); - while (walEntryPeekingIterator.hasNext()) { - Entry entry = walEntryPeekingIterator.peek(); + for (Entry entry : entries) { int entrySize = getEstimatedEntrySize(entry); if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { - break; + replicateEntries(batch, index++, timeout); + batch.clear(); + batchSize = 0; } - walEntryPeekingIterator.next(); batch.add(entry); batchSize += entrySize; } - - if (batchSize <= 0) { - return CompletableFuture.completedFuture(batchIndex); + if (batchSize > 0) { + replicateEntries(batch, index, timeout); } - final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>(); - FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> { - if (exception != null) { - resultCompletableFuture.completeExceptionally(exception); - return; - } - if (!walEntryPeekingIterator.hasNext()) { - resultCompletableFuture.complete(batchIndex); - return; - } - FutureUtils.addListener( - serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout), - (currentResponse, currentException) -> { - if (currentException != null) { - resultCompletableFuture.completeExceptionally(currentException); - return; - } - resultCompletableFuture.complete(batchIndex); - }); - }); - return resultCompletableFuture; + return batchIndex; } - /** - * Replicate entries to peer cluster by async API. - */ - protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex, - int timeout) { + protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) { return isSerial - ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex, - timeout) - : replicateEntries(entries, batchIndex, timeout); + ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) + : () -> replicateEntries(entries, batchIndex, timeout); } private String logPeerId() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index e82d69826d8..011f0a19f0c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -268,14 +267,14 @@ public class SyncReplicationTestBase { new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit()); } if (!expectedRejection) { - FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( + ReplicationProtobufUtil.replicateWALEntry( connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, - HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); } else { try { - FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry( + ReplicationProtobufUtil.replicateWALEntry( connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, - HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT)); + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); fail("Should throw IOException when sync-replication state is in A or DA"); } catch (RemoteException e) { assertRejection(e.unwrapRemoteException()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 9bc632e223b..53512ec2af8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -30,7 +30,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -556,16 +556,15 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Override - protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, - int timeout) { + protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { // Fail only once, we don't want to slow down the test. if (failedOnce) { - return CompletableFuture.completedFuture(ordinal); + return () -> ordinal; } else { failedOnce = true; - CompletableFuture<Integer> future = new CompletableFuture<Integer>(); - future.completeExceptionally(new IOException("Sample Exception: Failed to replicate.")); - return future; + return () -> { + throw new IOException("Sample Exception: Failed to replicate."); + }; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index c48755fb5f0..803c4278f97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -228,15 +228,15 @@ public class TestReplicator extends TestReplicationBase { } @Override - protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, - int timeout) { - return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { + protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { + return () -> { + int batchIndex = replicateEntries(entries, ordinal, timeout); entriesCount += entries.size(); int count = batchCount.incrementAndGet(); LOG.info( "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); - }); - + return batchIndex; + }; } } @@ -245,23 +245,20 @@ public class TestReplicator extends TestReplicationBase { private final AtomicBoolean failNext = new AtomicBoolean(false); @Override - protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, - int timeout) { - - if (failNext.compareAndSet(false, true)) { - return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { + protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { + return () -> { + if (failNext.compareAndSet(false, true)) { + int batchIndex = replicateEntries(entries, ordinal, timeout); entriesCount += entries.size(); int count = batchCount.incrementAndGet(); LOG.info( "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); - }); - } else if (failNext.compareAndSet(true, false)) { - CompletableFuture<Integer> future = new CompletableFuture<Integer>(); - future.completeExceptionally(new ServiceException("Injected failure")); - return future; - } - return CompletableFuture.completedFuture(ordinal); - + return batchIndex; + } else if (failNext.compareAndSet(true, false)) { + throw new ServiceException("Injected failure"); + } + return ordinal; + }; } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java index 5f99b88e0a4..c0eace0bbda 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java @@ -22,7 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Callable; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -165,10 +165,11 @@ public class TestSerialReplicationEndpoint { } @Override - protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, - int timeout) { - entryQueue.addAll(entries); - return CompletableFuture.completedFuture(ordinal); + protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { + return () -> { + entryQueue.addAll(entries); + return ordinal; + }; } @Override
