This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-26233 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 116f462653615acb9473aca942aedc9df4211c29 Author: Duo Zhang <[email protected]> AuthorDate: Wed Nov 17 23:20:22 2021 +0800 HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850) Signed-off-by: Xiaolin Ha <[email protected]> --- .../regionreplication/RegionReplicationSink.java | 119 +++++++++++++-------- .../TestRegionReplicationSink.java | 87 +++++++++++++++ 2 files changed, 162 insertions(+), 44 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index 9095870..d5e2387 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -22,15 +22,16 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -import org.agrona.collections.IntHashSet; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -123,8 +124,11 @@ public class RegionReplicationSink { private final AsyncClusterConnection conn; // used to track the replicas which we failed to replicate edits to them - // will be cleared after we get a flush edit. - private final IntHashSet failedReplicas = new IntHashSet(); + // the key is the replica id, the value is the sequence id of the last failed edit + // when we get a flush all request, we will try to remove a replica from this map, the key point + // here is the flush sequence number must be greater than the failed sequence id, otherwise we + // should not remove the replica from this map + private final Map<Integer, Long> failedReplicas = new HashMap<>(); private final Queue<SinkEntry> entries = new ArrayDeque<>(); @@ -180,16 +184,16 @@ public class RegionReplicationSink { if (error != null) { if (maxSequenceId > lastFlushedSequenceId) { LOG.warn( - "Failed to replicate to secondary replica {} for {}, since the max sequence" + - " id of sunk entris is {}, which is greater than the last flush SN {}," + - " we will stop replicating for a while and trigger a flush", + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is greater than the last flush SN {}," + + " we will stop replicating for a while and trigger a flush", replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); failed.add(replicaId); } else { LOG.warn( - "Failed to replicate to secondary replica {} for {}, since the max sequence" + - " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + - " we will not stop replicating", + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + + " we will not stop replicating", replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); } } @@ -197,7 +201,9 @@ public class RegionReplicationSink { synchronized (entries) { pendingSize -= toReleaseSize; if (!failed.isEmpty()) { - failedReplicas.addAll(failed); + for (Integer replicaId : failed) { + failedReplicas.put(replicaId, maxSequenceId); + } flushRequester.requestFlush(maxSequenceId); } sending = false; @@ -231,7 +237,7 @@ public class RegionReplicationSink { AtomicInteger remaining = new AtomicInteger(toSendReplicaCount); Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>(); for (int replicaId = 1; replicaId < regionReplication; replicaId++) { - if (failedReplicas.contains(replicaId)) { + if (failedReplicas.containsKey(replicaId)) { continue; } MutableObject<Throwable> error = new MutableObject<>(); @@ -247,7 +253,7 @@ public class RegionReplicationSink { } } - private boolean flushAllStores(FlushDescriptor flushDesc) { + private boolean isFlushAllStores(FlushDescriptor flushDesc) { Set<byte[]> storesFlushed = flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray()) .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR))); @@ -257,6 +263,24 @@ public class RegionReplicationSink { return storesFlushed.containsAll(tableDesc.getColumnFamilyNames()); } + private Optional<FlushDescriptor> getFlushAllDescriptor(Cell metaCell) { + if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) { + return Optional.empty(); + } + FlushDescriptor flushDesc; + try { + flushDesc = WALEdit.getFlushDescriptor(metaCell); + } catch (IOException e) { + LOG.warn("Failed to parse FlushDescriptor from {}", metaCell); + return Optional.empty(); + } + if (flushDesc != null && isFlushAllStores(flushDesc)) { + return Optional.of(flushDesc); + } else { + return Optional.empty(); + } + } + private void clearAllEntries() { long toClearSize = 0; for (SinkEntry entry : entries) { @@ -268,6 +292,20 @@ public class RegionReplicationSink { manager.decrease(toClearSize); } + private void clearFailedReplica(long flushSequenceNumber) { + for (Iterator<Map.Entry<Integer, Long>> iter = failedReplicas.entrySet().iterator(); iter + .hasNext();) { + Map.Entry<Integer, Long> entry = iter.next(); + if (entry.getValue().longValue() < flushSequenceNumber) { + LOG.debug( + "Got a flush all request with sequence id {}, clear failed replica {}" + + " with last failed sequence id {}", + flushSequenceNumber, entry.getKey(), entry.getValue()); + iter.remove(); + } + } + } + /** * Add this edit to replication queue. * <p/> @@ -287,41 +325,34 @@ public class RegionReplicationSink { // check whether we flushed all stores, which means we could drop all the previous edits, // and also, recover from the previous failure of some replicas for (Cell metaCell : edit.getCells()) { - if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) { - FlushDescriptor flushDesc; - try { - flushDesc = WALEdit.getFlushDescriptor(metaCell); - } catch (IOException e) { - LOG.warn("Failed to parse FlushDescriptor from {}", metaCell); - continue; - } - if (flushDesc != null && flushAllStores(flushDesc)) { - long flushedSequenceId = flushDesc.getFlushSequenceNumber(); - int toClearCount = 0; - long toClearSize = 0; - for (;;) { - SinkEntry e = entries.peek(); - if (e == null) { - break; - } - if (e.key.getSequenceId() < flushedSequenceId) { - entries.poll(); - toClearCount++; - toClearSize += e.size; - } else { - break; - } + getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> { + long flushSequenceNumber = flushDesc.getFlushSequenceNumber(); + int toClearCount = 0; + long toClearSize = 0; + for (;;) { + SinkEntry e = entries.peek(); + if (e == null) { + break; } - lastFlushedSequenceId = flushedSequenceId; - failedReplicas.clear(); + if (e.key.getSequenceId() < flushSequenceNumber) { + entries.poll(); + toClearCount++; + toClearSize += e.size; + } else { + break; + } + } + lastFlushedSequenceId = flushSequenceNumber; + if (LOG.isDebugEnabled()) { LOG.debug( - "Got a flush all request with sequence id {}, clear failed replicas {}" + - " and {} pending entries with size {}", - flushedSequenceId, failedReplicas, toClearCount, + "Got a flush all request with sequence id {}, clear {} pending" + + " entries with size {}", + flushSequenceNumber, toClearCount, StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1)); - flushRequester.recordFlush(flushedSequenceId); } - } + clearFailedReplica(flushSequenceNumber); + flushRequester.recordFlush(flushSequenceNumber); + }); } } if (failedReplicas.size() == regionReplication - 1) { @@ -340,7 +371,7 @@ public class RegionReplicationSink { // failed clearAllEntries(); for (int replicaId = 1; replicaId < regionReplication; replicaId++) { - failedReplicas.add(replicaId); + failedReplicas.put(replicaId, entry.key.getSequenceId()); } flushRequester.requestFlush(entry.key.getSequenceId()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java index 248cdba..76a224b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java @@ -313,4 +313,91 @@ public class TestRegionReplicationSink { // should have send out all so no pending entries. assertEquals(0, sink.pendingSize()); } + + @Test + public void testNotClearFailedReplica() { + // simulate this scenario: + // 1. prepare flush + // 2. add one edit broken + // 3. commit flush with flush sequence number less than the previous edit(this is the normal + // case) + // we should not clear the failed replica as we do not flush the broken edit out with this + // flush, we need an extra flush to flush it out + MutableInt next = new MutableInt(0); + List<CompletableFuture<Void>> futures = + Stream.generate(() -> new CompletableFuture<Void>()).limit(8).collect(Collectors.toList()); + when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) + .then(i -> futures.get(next.getAndIncrement())); + when(manager.increase(anyLong())).thenReturn(true); + + ServerCall<?> rpcCall1 = mock(ServerCall.class); + WALKeyImpl key1 = mock(WALKeyImpl.class); + when(key1.estimatedSerializedSizeOf()).thenReturn(100L); + when(key1.getSequenceId()).thenReturn(1L); + Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream() + .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { + throw new IllegalStateException(); + }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); + FlushDescriptor fd = + ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles); + WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd); + sink.add(key1, edit1, rpcCall1); + + futures.get(0).complete(null); + futures.get(1).complete(null); + + ServerCall<?> rpcCall2 = mock(ServerCall.class); + WALKeyImpl key2 = mock(WALKeyImpl.class); + when(key2.estimatedSerializedSizeOf()).thenReturn(200L); + when(key2.getSequenceId()).thenReturn(2L); + WALEdit edit2 = mock(WALEdit.class); + when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L); + sink.add(key2, edit2, rpcCall2); + + // fail the call to replica 1 + futures.get(2).completeExceptionally(new IOException("inject error")); + futures.get(3).complete(null); + + ServerCall<?> rpcCall3 = mock(ServerCall.class); + WALKeyImpl key3 = mock(WALKeyImpl.class); + when(key3.estimatedSerializedSizeOf()).thenReturn(300L); + when(key3.getSequenceId()).thenReturn(3L); + FlushDescriptor fd3 = + ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles); + WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3); + sink.add(key3, edit3, rpcCall3); + + // we should only call replicate once for edit3, since replica 1 is marked as failed, and the + // flush request can not clean the failed replica since the flush sequence number is not greater + // than sequence id of the last failed edit + verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + futures.get(4).complete(null); + + ServerCall<?> rpcCall4 = mock(ServerCall.class); + WALKeyImpl key4 = mock(WALKeyImpl.class); + when(key4.estimatedSerializedSizeOf()).thenReturn(400L); + when(key4.getSequenceId()).thenReturn(4L); + WALEdit edit4 = mock(WALEdit.class); + when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L); + sink.add(key4, edit4, rpcCall4); + + // still, only send to replica 2 + verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + futures.get(5).complete(null); + + ServerCall<?> rpcCall5 = mock(ServerCall.class); + WALKeyImpl key5 = mock(WALKeyImpl.class); + when(key5.estimatedSerializedSizeOf()).thenReturn(300L); + when(key5.getSequenceId()).thenReturn(3L); + FlushDescriptor fd5 = + ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles); + WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5); + sink.add(key5, edit5, rpcCall5); + + futures.get(6).complete(null); + futures.get(7).complete(null); + // should have cleared the failed replica because the flush sequence number is greater than than + // the sequence id of the last failed edit + verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + } }
