This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 902bc3ed70a Revert "HBASE-27062 ThreadPool is unnecessary in
HBaseInterClusterReplication… (#4463)" (#4559)
902bc3ed70a is described below
commit 902bc3ed70a332db97d07e41f38fe4bd2686990d
Author: chenglei <[email protected]>
AuthorDate: Wed Jun 22 19:38:48 2022 +0800
Revert "HBASE-27062 ThreadPool is unnecessary in
HBaseInterClusterReplication… (#4463)" (#4559)
---
.../hbase/protobuf/ReplicationProtobufUtil.java | 11 +-
.../HBaseInterClusterReplicationEndpoint.java | 211 ++++++++++-----------
.../hbase/replication/SyncReplicationTestBase.java | 9 +-
.../hbase/replication/TestReplicationEndpoint.java | 13 +-
.../replication/regionserver/TestReplicator.java | 35 ++--
.../TestSerialReplicationEndpoint.java | 11 +-
6 files changed, 136 insertions(+), 154 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index cfdf0e12c85..c2e96ead6f7 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -29,6 +28,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -37,7 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-import
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@@ -53,12 +52,12 @@ public class ReplicationProtobufUtil {
* @param sourceBaseNamespaceDir Path to source cluster base namespace
directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive
directory
*/
- public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
- AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId,
- Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) {
+ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[]
entries,
+ String replicationClusterId, Path sourceBaseNamespaceDir, Path
sourceHFileArchiveDir,
+ int timeout) throws IOException {
Pair<ReplicateWALEntryRequest, CellScanner> p =
buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
- return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout);
+ FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(),
timeout));
}
/**
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 39e68bf9eb4..cec360a4c97 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -29,11 +29,18 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -50,7 +57,7 @@ import
org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.ipc.RemoteException;
@@ -58,8 +65,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
-import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
+import
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
implementation for replicating
@@ -76,6 +82,8 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
private static final Logger LOG =
LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
+ private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
+
/** Drop edits for tables that been deleted from the replication source and
target */
public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
"hbase.replication.drop.on.deleted.table";
@@ -89,22 +97,25 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
+ // Amount of time for shutdown to wait for all tasks to complete
+ private long maxTerminationWait;
// Size limit for replication RPCs, in bytes
private int replicationRpcLimit;
// Metrics for this source
private MetricsSource metrics;
private boolean peersSelected = false;
private String replicationClusterId = "";
+ private ThreadPoolExecutor exec;
private int maxThreads;
private Path baseNamespaceDir;
private Path hfileArchiveDir;
private boolean replicationBulkLoadDataEnabled;
+ private Abortable abortable;
private boolean dropOnDeletedTables;
private boolean dropOnDeletedColumnFamilies;
private boolean isSerial = false;
// Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;
- private volatile boolean stopping = false;
@Override
public void init(Context context) throws IOException {
@@ -113,11 +124,20 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier =
this.conf.getInt("replication.source.socketTimeoutMultiplier",
maxRetriesMultiplier);
+ // A Replicator job is bound by the RPC timeout. We will wait this long
for all Replicator
+ // tasks to terminate when doStop() is called.
+ long maxTerminationWaitMultiplier = this.conf.getLong(
+ "replication.source.maxterminationmultiplier",
DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
+ this.maxTerminationWait = maxTerminationWaitMultiplier
+ * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// per sink thread pool
this.maxThreads =
this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
+ this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60,
TimeUnit.SECONDS,
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
+ this.abortable = ctx.getAbortable();
// Set the size limit for replication RPCs to 95% of the max request size.
// We could do with less slop if we have an accurate estimate of encoded
size. Being
// conservative for now.
@@ -374,31 +394,30 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
return entryList;
}
- private long parallelReplicate(ReplicateContext replicateContext,
List<List<Entry>> batches)
- throws IOException {
- List<CompletableFuture<Integer>> futures =
- new ArrayList<CompletableFuture<Integer>>(batches.size());
+ private long parallelReplicate(CompletionService<Integer> pool,
ReplicateContext replicateContext,
+ List<List<Entry>> batches) throws IOException {
+ int futures = 0;
for (int i = 0; i < batches.size(); i++) {
List<Entry> entries = batches.get(i);
- if (entries.isEmpty()) {
- continue;
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Submitting {} entries of total size {}", logPeerId(),
entries.size(),
- replicateContext.getSize());
+ if (!entries.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Submitting {} entries of total size {}", logPeerId(),
entries.size(),
+ replicateContext.getSize());
+ }
+ // RuntimeExceptions encountered here bubble up and are handled in
ReplicationSource
+ pool.submit(createReplicator(entries, i,
replicateContext.getTimeout()));
+ futures++;
}
- // RuntimeExceptions encountered here bubble up and are handled in
ReplicationSource
- futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
}
IOException iox = null;
long lastWriteTime = 0;
-
- for (CompletableFuture<Integer> f : futures) {
+ for (int i = 0; i < futures; i++) {
try {
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
- int index = FutureUtils.get(f);
+ Future<Integer> f = pool.take();
+ int index = f.get();
List<Entry> batch = batches.get(index);
batches.set(index, Collections.emptyList()); // remove successful batch
// Find the most recent write time in the batch
@@ -406,10 +425,12 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
if (writeTime > lastWriteTime) {
lastWriteTime = writeTime;
}
- } catch (IOException e) {
- iox = e;
- } catch (RuntimeException e) {
- iox = new IOException(e);
+ } catch (InterruptedException ie) {
+ iox = new IOException(ie);
+ } catch (ExecutionException ee) {
+ iox = ee.getCause() instanceof IOException
+ ? (IOException) ee.getCause()
+ : new IOException(ee.getCause());
}
}
if (iox != null) {
@@ -424,6 +445,7 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
+ CompletionService<Integer> pool = new
ExecutorCompletionService<>(this.exec);
int sleepMultiplier = 1;
if (!peersSelected && this.isRunning()) {
@@ -446,7 +468,7 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
}
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
- while (this.isRunning() && !this.stopping) {
+ while (this.isRunning() && !exec.isShutdown()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
@@ -455,7 +477,7 @@ public class HBaseInterClusterReplicationEndpoint extends
HBaseReplicationEndpoi
}
try {
// replicate the batches to sink side.
- parallelReplicate(replicateContext, batches);
+ parallelReplicate(pool, replicateContext, batches);
return true;
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
@@ -510,117 +532,82 @@ public class HBaseInterClusterReplicationEndpoint
extends HBaseReplicationEndpoi
@Override
protected void doStop() {
- // Allow currently running replication tasks to finish
- this.stopping = true;
disconnect(); // don't call super.doStop()
+ // Allow currently running replication tasks to finish
+ exec.shutdown();
+ try {
+ exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+ // Abort if the tasks did not terminate in time
+ if (!exec.isTerminated()) {
+ String errMsg = "HBaseInterClusterReplicationEndpoint termination
failed. The "
+ + "ThreadPoolExecutor failed to finish all tasks within " +
maxTerminationWait + "ms. "
+ + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
+ abortable.abort(errMsg, new IOException(errMsg));
+ }
notifyStopped();
}
- protected CompletableFuture<Integer> replicateEntries(List<Entry> entries,
int batchIndex,
- int timeout) {
- int entriesHashCode = System.identityHashCode(entries);
- if (LOG.isTraceEnabled()) {
- long size =
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
- LOG.trace("{} Replicating batch {} of {} entries with total size {}
bytes to {}", logPeerId(),
- entriesHashCode, entries.size(), size, replicationClusterId);
- }
+ protected int replicateEntries(List<Entry> entries, int batchIndex, int
timeout)
+ throws IOException {
SinkPeer sinkPeer = null;
- final CompletableFuture<Integer> resultCompletableFuture = new
CompletableFuture<Integer>();
try {
+ int entriesHashCode = System.identityHashCode(entries);
+ if (LOG.isTraceEnabled()) {
+ long size =
entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+ LOG.trace("{} Replicating batch {} of {} entries with total size {}
bytes to {}",
+ logPeerId(), entriesHashCode, entries.size(), size,
replicationClusterId);
+ }
sinkPeer = getReplicationSink();
- } catch (IOException e) {
- this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
- resultCompletableFuture.completeExceptionally(e);
- return resultCompletableFuture;
- }
- assert sinkPeer != null;
- AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
- final SinkPeer sinkPeerToUse = sinkPeer;
- FutureUtils.addListener(
- ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new
Entry[entries.size()]),
- replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
- (response, exception) -> {
- if (exception != null) {
- onReplicateWALEntryException(entriesHashCode, exception,
sinkPeerToUse);
- resultCompletableFuture.completeExceptionally(exception);
- return;
+ AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
+ try {
+ ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
+ entries.toArray(new Entry[entries.size()]), replicationClusterId,
baseNamespaceDir,
+ hfileArchiveDir, timeout);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Completed replicating batch {}", logPeerId(),
entriesHashCode);
}
- reportSinkSuccess(sinkPeerToUse);
- resultCompletableFuture.complete(batchIndex);
- });
- return resultCompletableFuture;
- }
-
- private void onReplicateWALEntryException(int entriesHashCode, Throwable
exception,
- final SinkPeer sinkPeer) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Failed replicating batch {}", logPeerId(),
entriesHashCode, exception);
- }
- if (exception instanceof IOException) {
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Failed replicating batch {}", logPeerId(),
entriesHashCode, e);
+ }
+ throw e;
+ }
+ reportSinkSuccess(sinkPeer);
+ } catch (IOException ioe) {
if (sinkPeer != null) {
reportBadSink(sinkPeer);
}
+ throw ioe;
}
+ return batchIndex;
}
- /**
- * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true,
we iterator over the
- * WAL {@link Entry} list, once we reached a batch limit, we send it out,
and in the callback, we
- * send the next batch, until we send all entries out.
- */
- private CompletableFuture<Integer> serialReplicateRegionEntries(
- PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int
timeout) {
- if (!walEntryPeekingIterator.hasNext()) {
- return CompletableFuture.completedFuture(batchIndex);
- }
- int batchSize = 0;
+ private int serialReplicateRegionEntries(List<Entry> entries, int
batchIndex, int timeout)
+ throws IOException {
+ int batchSize = 0, index = 0;
List<Entry> batch = new ArrayList<>();
- while (walEntryPeekingIterator.hasNext()) {
- Entry entry = walEntryPeekingIterator.peek();
+ for (Entry entry : entries) {
int entrySize = getEstimatedEntrySize(entry);
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
- break;
+ replicateEntries(batch, index++, timeout);
+ batch.clear();
+ batchSize = 0;
}
- walEntryPeekingIterator.next();
batch.add(entry);
batchSize += entrySize;
}
-
- if (batchSize <= 0) {
- return CompletableFuture.completedFuture(batchIndex);
+ if (batchSize > 0) {
+ replicateEntries(batch, index, timeout);
}
- final CompletableFuture<Integer> resultCompletableFuture = new
CompletableFuture<Integer>();
- FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout),
(response, exception) -> {
- if (exception != null) {
- resultCompletableFuture.completeExceptionally(exception);
- return;
- }
- if (!walEntryPeekingIterator.hasNext()) {
- resultCompletableFuture.complete(batchIndex);
- return;
- }
- FutureUtils.addListener(
- serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex,
timeout),
- (currentResponse, currentException) -> {
- if (currentException != null) {
- resultCompletableFuture.completeExceptionally(currentException);
- return;
- }
- resultCompletableFuture.complete(batchIndex);
- });
- });
- return resultCompletableFuture;
+ return batchIndex;
}
- /**
- * Replicate entries to peer cluster by async API.
- */
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int
batchIndex,
- int timeout) {
+ protected Callable<Integer> createReplicator(List<Entry> entries, int
batchIndex, int timeout) {
return isSerial
- ?
serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()),
batchIndex,
- timeout)
- : replicateEntries(entries, batchIndex, timeout);
+ ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
+ : () -> replicateEntries(entries, batchIndex, timeout);
}
private String logPeerId() {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index e82d69826d8..011f0a19f0c 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -50,7 +50,6 @@ import
org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
@@ -268,14 +267,14 @@ public class SyncReplicationTestBase {
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0),
new WALEdit());
}
if (!expectedRejection) {
- FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
+ ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()),
entries, null, null, null,
- HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
+ HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
} else {
try {
- FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
+ ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()),
entries, null, null, null,
- HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
+ HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
fail("Should throw IOException when sync-replication state is in A or
DA");
} catch (RemoteException e) {
assertRejection(e.unwrapRemoteException());
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 9bc632e223b..53512ec2af8 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -30,7 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -556,16 +556,15 @@ public class TestReplicationEndpoint extends
TestReplicationBase {
}
@Override
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries,
int ordinal,
- int timeout) {
+ protected Callable<Integer> createReplicator(List<Entry> entries, int
ordinal, int timeout) {
// Fail only once, we don't want to slow down the test.
if (failedOnce) {
- return CompletableFuture.completedFuture(ordinal);
+ return () -> ordinal;
} else {
failedOnce = true;
- CompletableFuture<Integer> future = new CompletableFuture<Integer>();
- future.completeExceptionally(new IOException("Sample Exception: Failed
to replicate."));
- return future;
+ return () -> {
+ throw new IOException("Sample Exception: Failed to replicate.");
+ };
}
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index c48755fb5f0..803c4278f97 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -228,15 +228,15 @@ public class TestReplicator extends TestReplicationBase {
}
@Override
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries,
int ordinal,
- int timeout) {
- return replicateEntries(entries, ordinal,
timeout).whenComplete((response, exception) -> {
+ protected Callable<Integer> createReplicator(List<Entry> entries, int
ordinal, int timeout) {
+ return () -> {
+ int batchIndex = replicateEntries(entries, ordinal, timeout);
entriesCount += entries.size();
int count = batchCount.incrementAndGet();
LOG.info(
"Completed replicating batch " + System.identityHashCode(entries) +
" count=" + count);
- });
-
+ return batchIndex;
+ };
}
}
@@ -245,23 +245,20 @@ public class TestReplicator extends TestReplicationBase {
private final AtomicBoolean failNext = new AtomicBoolean(false);
@Override
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries,
int ordinal,
- int timeout) {
-
- if (failNext.compareAndSet(false, true)) {
- return replicateEntries(entries, ordinal,
timeout).whenComplete((response, exception) -> {
+ protected Callable<Integer> createReplicator(List<Entry> entries, int
ordinal, int timeout) {
+ return () -> {
+ if (failNext.compareAndSet(false, true)) {
+ int batchIndex = replicateEntries(entries, ordinal, timeout);
entriesCount += entries.size();
int count = batchCount.incrementAndGet();
LOG.info(
"Completed replicating batch " + System.identityHashCode(entries)
+ " count=" + count);
- });
- } else if (failNext.compareAndSet(true, false)) {
- CompletableFuture<Integer> future = new CompletableFuture<Integer>();
- future.completeExceptionally(new ServiceException("Injected failure"));
- return future;
- }
- return CompletableFuture.completedFuture(ordinal);
-
+ return batchIndex;
+ } else if (failNext.compareAndSet(true, false)) {
+ throw new ServiceException("Injected failure");
+ }
+ return ordinal;
+ };
}
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index 5f99b88e0a4..c0eace0bbda 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -165,10 +165,11 @@ public class TestSerialReplicationEndpoint {
}
@Override
- protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries,
int ordinal,
- int timeout) {
- entryQueue.addAll(entries);
- return CompletableFuture.completedFuture(ordinal);
+ protected Callable<Integer> createReplicator(List<Entry> entries, int
ordinal, int timeout) {
+ return () -> {
+ entryQueue.addAll(entries);
+ return ordinal;
+ };
}
@Override