This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-26233 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 8083542f2b51241da38d3dc0f448102933657b47 Author: Duo Zhang <[email protected]> AuthorDate: Sun Nov 14 20:52:04 2021 +0800 HBASE-26449 The way we add or clear failedReplicas may have race (#3846) Signed-off-by: Xin Sun <[email protected]> --- .../regionreplication/RegionReplicationSink.java | 28 ++++- .../TestRegionReplicationSink.java | 130 ++++++++++++++++++++- 2 files changed, 150 insertions(+), 8 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index 9c6f6e2..68aa508 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -30,6 +30,7 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.agrona.collections.IntHashSet; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -123,7 +124,7 @@ public class RegionReplicationSink { // used to track the replicas which we failed to replicate edits to them // will be cleared after we get a flush edit. - private final Set<Integer> failedReplicas = new HashSet<>(); + private final IntHashSet failedReplicas = new IntHashSet(); private final Queue<SinkEntry> entries = new ArrayDeque<>(); @@ -135,6 +136,8 @@ public class RegionReplicationSink { private volatile long pendingSize; + private long lastFlushSequenceNumber; + private boolean sending; private boolean stopping; @@ -162,8 +165,10 @@ public class RegionReplicationSink { private void onComplete(List<SinkEntry> sent, Map<Integer, MutableObject<Throwable>> replica2Error) { + long maxSequenceId = Long.MIN_VALUE; long toReleaseSize = 0; for (SinkEntry entry : sent) { + maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId()); entry.replicated(); toReleaseSize += entry.size; } @@ -173,9 +178,20 @@ public class RegionReplicationSink { Integer replicaId = entry.getKey(); Throwable error = entry.getValue().getValue(); if (error != null) { - LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" + - " for a while and trigger a flush", replicaId, primary, error); - failed.add(replicaId); + if (maxSequenceId > lastFlushSequenceNumber) { + LOG.warn( + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is greater than the last flush SN {}," + + " we will stop replicating for a while and trigger a flush", + replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error); + failed.add(replicaId); + } else { + LOG.warn( + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + + " we will not stop replicating", + replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error); + } } } synchronized (entries) { @@ -215,6 +231,9 @@ public class RegionReplicationSink { AtomicInteger remaining = new AtomicInteger(toSendReplicaCount); Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>(); for (int replicaId = 1; replicaId < regionReplication; replicaId++) { + if (failedReplicas.contains(replicaId)) { + continue; + } MutableObject<Throwable> error = new MutableObject<>(); replica2Error.put(replicaId, error); RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId); @@ -292,6 +311,7 @@ public class RegionReplicationSink { break; } } + lastFlushSequenceNumber = flushDesc.getFlushSequenceNumber(); failedReplicas.clear(); LOG.debug( "Got a flush all request with sequence id {}, clear failed replicas {}" + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java index 19b1698..248cdba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java @@ -28,11 +28,19 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableNameTestRule; @@ -45,14 +53,20 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.ipc.ServerCall; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; + @Category({ RegionServerTests.class, MediumTests.class }) public class TestRegionReplicationSink { @@ -72,6 +86,8 @@ public class TestRegionReplicationSink { private RegionReplicationBufferManager manager; + private RegionReplicationSink sink; + @Rule public final TableNameTestRule name = new TableNameTestRule(); @@ -84,15 +100,17 @@ public class TestRegionReplicationSink { flushRequester = mock(Runnable.class); conn = mock(AsyncClusterConnection.class); manager = mock(RegionReplicationBufferManager.class); + sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn); } - private RegionReplicationSink create() { - return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn); + @After + public void tearDown() throws InterruptedException { + sink.stop(); + sink.waitUntilStopped(); } @Test public void testNormal() { - RegionReplicationSink sink = create(); MutableInt next = new MutableInt(0); List<CompletableFuture<Void>> futures = Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); @@ -129,7 +147,6 @@ public class TestRegionReplicationSink { @Test public void testDropEdits() { - RegionReplicationSink sink = create(); MutableInt next = new MutableInt(0); List<CompletableFuture<Void>> futures = Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); @@ -191,4 +208,109 @@ public class TestRegionReplicationSink { // replicas verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); } + + @Test + public void testNotAddToFailedReplicas() { + MutableInt next = new MutableInt(0); + List<CompletableFuture<Void>> futures = + Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList()); + when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) + .then(i -> futures.get(next.getAndIncrement())); + + ServerCall<?> rpcCall1 = mock(ServerCall.class); + WALKeyImpl key1 = mock(WALKeyImpl.class); + when(key1.estimatedSerializedSizeOf()).thenReturn(100L); + when(key1.getSequenceId()).thenReturn(1L); + WALEdit edit1 = mock(WALEdit.class); + when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); + when(manager.increase(anyLong())).thenReturn(true); + sink.add(key1, edit1, rpcCall1); + + ServerCall<?> rpcCall2 = mock(ServerCall.class); + WALKeyImpl key2 = mock(WALKeyImpl.class); + when(key2.estimatedSerializedSizeOf()).thenReturn(200L); + when(key2.getSequenceId()).thenReturn(3L); + + Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream() + .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { + throw new IllegalStateException(); + }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); + FlushDescriptor fd = + ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles); + WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd); + sink.add(key2, edit2, rpcCall2); + + // fail the call to replica 2 + futures.get(0).complete(null); + futures.get(1).completeExceptionally(new IOException("inject error")); + + // the failure should not cause replica 2 to be added to failedReplicas, as we have already + // trigger a flush after it. + verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + + futures.get(2).complete(null); + futures.get(3).complete(null); + + // should have send out all so no pending entries. + assertEquals(0, sink.pendingSize()); + } + + @Test + public void testAddToFailedReplica() { + MutableInt next = new MutableInt(0); + List<CompletableFuture<Void>> futures = + Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList()); + when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) + .then(i -> futures.get(next.getAndIncrement())); + + ServerCall<?> rpcCall1 = mock(ServerCall.class); + WALKeyImpl key1 = mock(WALKeyImpl.class); + when(key1.estimatedSerializedSizeOf()).thenReturn(100L); + when(key1.getSequenceId()).thenReturn(1L); + WALEdit edit1 = mock(WALEdit.class); + when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); + when(manager.increase(anyLong())).thenReturn(true); + sink.add(key1, edit1, rpcCall1); + + ServerCall<?> rpcCall2 = mock(ServerCall.class); + WALKeyImpl key2 = mock(WALKeyImpl.class); + when(key2.estimatedSerializedSizeOf()).thenReturn(200L); + when(key2.getSequenceId()).thenReturn(1L); + WALEdit edit2 = mock(WALEdit.class); + when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L); + when(manager.increase(anyLong())).thenReturn(true); + sink.add(key2, edit2, rpcCall2); + + // fail the call to replica 2 + futures.get(0).complete(null); + futures.get(1).completeExceptionally(new IOException("inject error")); + + // we should only call replicate once for edit2, since replica 2 is marked as failed + verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + futures.get(2).complete(null); + // should have send out all so no pending entries. + assertEquals(0, sink.pendingSize()); + + ServerCall<?> rpcCall3 = mock(ServerCall.class); + WALKeyImpl key3 = mock(WALKeyImpl.class); + when(key3.estimatedSerializedSizeOf()).thenReturn(200L); + when(key3.getSequenceId()).thenReturn(3L); + Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream() + .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { + throw new IllegalStateException(); + }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); + FlushDescriptor fd = + ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles); + WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd); + sink.add(key3, edit3, rpcCall3); + + // the flush marker should have cleared the failedReplicas, so we will send the edit to 2 + // replicas again + verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + futures.get(3).complete(null); + futures.get(4).complete(null); + + // should have send out all so no pending entries. + assertEquals(0, sink.pendingSize()); + } }
