This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch HBASE-26233 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 4d59cb5d1f3eea5f457958d9b11bc25b0b1c6a9e Author: Duo Zhang <[email protected]> AuthorDate: Tue Nov 9 21:41:25 2021 +0800 HBASE-26412 Handle sink failure in RegionReplicationSink (#3815) Signed-off-by: GeorryHuang <[email protected]> --- .../apache/hadoop/hbase/regionserver/HRegion.java | 7 +- .../hbase/regionserver/RegionReplicationSink.java | 147 ++++++++++++++----- .../TestRegionReplicaReplicationError.java | 158 +++++++++++++++++++++ .../regionserver/TestRegionReplicaReplication.java | 4 +- 4 files changed, 275 insertions(+), 41 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index c611a8a..5de22eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1104,8 +1104,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return; } status.setStatus("Initializaing region replication sink"); - regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, - regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection())); + regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, () -> { + rss.getFlushRequester().requestFlush(this, new ArrayList<>(td.getColumnFamilyNames()), + FlushLifeCycleTracker.DUMMY); + }, rss.getAsyncClusterConnection())); + } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java index 6911289..cdd77e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java @@ -17,18 +17,29 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Queue; -import java.util.concurrent.CompletableFuture; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.ipc.ServerCall; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; @@ -39,6 +50,8 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; + /** * The class for replicating WAL edits to secondary replicas, one instance per region. */ @@ -97,35 +110,39 @@ public class RegionReplicationSink { private final RegionInfo primary; - private final int regionReplication; - - private final boolean hasRegionMemStoreReplication; + private final TableDescriptor tableDesc; - private final Queue<SinkEntry> entries = new ArrayDeque<>(); + private final Runnable flushRequester; private final AsyncClusterConnection conn; + // used to track the replicas which we failed to replicate edits to them + // will be cleared after we get a flush edit. + private final Set<Integer> failedReplicas = new HashSet<>(); + + private final Queue<SinkEntry> entries = new ArrayDeque<>(); + private final int retries; private final long rpcTimeoutNs; private final long operationTimeoutNs; - private CompletableFuture<Void> future; + private boolean sending; private boolean stopping; private boolean stopped; - RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication, - boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) { + RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td, + Runnable flushRequester, AsyncClusterConnection conn) { Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary", primary); - Preconditions.checkArgument(regionReplication > 1, - "region replication should be greater than 1 but got %s", regionReplication); + Preconditions.checkArgument(td.getRegionReplication() > 1, + "region replication should be greater than 1 but got %s", td.getRegionReplication()); this.primary = primary; - this.regionReplication = regionReplication; - this.hasRegionMemStoreReplication = hasRegionMemStoreReplication; + this.tableDesc = td; + this.flushRequester = flushRequester; this.conn = conn; this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT); this.rpcTimeoutNs = @@ -134,6 +151,36 @@ public class RegionReplicationSink { .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT)); } + private void onComplete(List<SinkEntry> sent, + Map<Integer, MutableObject<Throwable>> replica2Error) { + sent.forEach(SinkEntry::replicated); + Set<Integer> failed = new HashSet<>(); + for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) { + Integer replicaId = entry.getKey(); + Throwable error = entry.getValue().getValue(); + if (error != null) { + LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" + + " for a while and trigger a flush", replicaId, primary, error); + failed.add(replicaId); + } + } + synchronized (entries) { + if (!failed.isEmpty()) { + failedReplicas.addAll(failed); + flushRequester.run(); + } + sending = false; + if (stopping) { + stopped = true; + entries.notifyAll(); + return; + } + if (!entries.isEmpty()) { + send(); + } + } + } + private void send() { List<SinkEntry> toSend = new ArrayList<>(); for (SinkEntry entry;;) { @@ -143,32 +190,37 @@ public class RegionReplicationSink { } toSend.add(entry); } + int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size(); + if (toSendReplicaCount <= 0) { + return; + } + sending = true; List<WAL.Entry> walEntries = toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList()); - List<CompletableFuture<Void>> futures = new ArrayList<>(); - for (int replicaId = 1; replicaId < regionReplication; replicaId++) { + AtomicInteger remaining = new AtomicInteger(toSendReplicaCount); + Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>(); + for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) { + MutableObject<Throwable> error = new MutableObject<>(); + replica2Error.put(replicaId, error); RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId); - futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs)); + FutureUtils.addListener( + conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs), (r, e) -> { + error.setValue(e); + if (remaining.decrementAndGet() == 0) { + onComplete(toSend, replica2Error); + } + }); } - future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0])); - FutureUtils.addListener(future, (r, e) -> { - if (e != null) { - // TODO: drop pending edits and issue a flush - LOG.warn("Failed to replicate to secondary replicas for {}", primary, e); - } - toSend.forEach(SinkEntry::replicated); - synchronized (entries) { - future = null; - if (stopping) { - stopped = true; - entries.notifyAll(); - return; - } - if (!entries.isEmpty()) { - send(); - } - } - }); + } + + private boolean flushAllStores(FlushDescriptor flushDesc) { + Set<byte[]> storesFlushed = + flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray()) + .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR))); + if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) { + return false; + } + return storesFlushed.containsAll(tableDesc.getColumnFamilyNames()); } /** @@ -178,7 +230,7 @@ public class RegionReplicationSink { * rpc call has cell scanner, which is off heap. */ public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) { - if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) { + if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) { // only replicate meta edit if region memstore replication is not enabled return; } @@ -186,10 +238,31 @@ public class RegionReplicationSink { if (stopping) { return; } + if (edit.isMetaEdit()) { + // check whether we flushed all stores, which means we could drop all the previous edits, + // and also, recover from the previous failure of some replicas + for (Cell metaCell : edit.getCells()) { + if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) { + FlushDescriptor flushDesc; + try { + flushDesc = WALEdit.getFlushDescriptor(metaCell); + } catch (IOException e) { + LOG.warn("Failed to parse FlushDescriptor from {}", metaCell); + continue; + } + if (flushDesc != null && flushAllStores(flushDesc)) { + LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" + + " replication entries", failedReplicas, entries.size()); + entries.clear(); + failedReplicas.clear(); + } + } + } + } // TODO: limit the total cached entries here, and we should have a global limitation, not for // only this region. entries.add(new SinkEntry(key, edit, rpcCall)); - if (future == null) { + if (!sending) { send(); } } @@ -203,7 +276,7 @@ public class RegionReplicationSink { void stop() { synchronized (entries) { stopping = true; - if (future == null) { + if (!sending) { stopped = true; entries.notifyAll(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java new file mode 100644 index 0000000..f88c4a9 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaReplicationError.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.StartTestingClusterOption; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.FlakeyTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; + +/** + * Test region replication when error occur. + * <p/> + * We can not simply move the secondary replicas as we will trigger a flush for the primary replica + * when secondary replica is online, which will always make the data of the two regions in sync. So + * here we need to simulate request errors. + */ +@Category({ FlakeyTests.class, LargeTests.class }) +public class TestRegionReplicaReplicationError { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionReplicaReplicationError.class); + + public static final class ErrorReplayRSRpcServices extends RSRpcServices { + + private final AtomicInteger count = new AtomicInteger(0); + + public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ReplicateWALEntryResponse replay(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + List<WALEntry> entries = request.getEntryList(); + if (CollectionUtils.isEmpty(entries)) { + return ReplicateWALEntryResponse.getDefaultInstance(); + } + ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); + HRegion region; + try { + region = server.getRegionByEncodedName(regionName.toStringUtf8()); + } catch (NotServingRegionException e) { + throw new ServiceException(e); + } + // fail the first several request + if (region.getRegionInfo().getReplicaId() == 1 && count.addAndGet(entries.size()) < 100) { + throw new ServiceException("Inject error!"); + } + return super.replay(controller, request); + } + + } + + public static final class RSForTest + extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { + + public RSForTest(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new ErrorReplayRSRpcServices(this); + } + } + + private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); + + private static TableName TN = TableName.valueOf("test"); + + private static byte[] CF = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + @BeforeClass + public static void setUp() throws Exception { + HTU.getConfiguration().setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, + true); + HTU.startMiniCluster( + StartTestingClusterOption.builder().rsClass(RSForTest.class).numRegionServers(3).build()); + TableDescriptor td = TableDescriptorBuilder.newBuilder(TN).setRegionReplication(3) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(); + HTU.getAdmin().createTable(td); + } + + @AfterClass + public static void tearDown() throws Exception { + HTU.shutdownMiniCluster(); + } + + private boolean checkReplica(Table table, int replicaId) throws IOException { + boolean ret = true; + for (int i = 0; i < 500; i++) { + Result result = table.get(new Get(Bytes.toBytes(i)).setReplicaId(replicaId)); + byte[] value = result.getValue(CF, CQ); + ret &= value != null && value.length > 0 && Bytes.toInt(value) == i; + } + return ret; + } + + @Test + public void test() throws IOException { + try (Table table = HTU.getConnection().getTable(TN)) { + for (int i = 0; i < 500; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + HTU.waitFor(30000, () -> checkReplica(table, 2)); + HTU.waitFor(30000, () -> checkReplica(table, 1)); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java index 7dd4255..231c9e1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java @@ -53,8 +53,8 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; /** - * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying - * async wal replication replays the edits to the secondary region in various scenarios. + * Tests region replication by setting up region replicas and verifying async wal replication + * replays the edits to the secondary region in various scenarios. */ @Category({FlakeyTests.class, LargeTests.class}) public class TestRegionReplicaReplication {
