This is an automated email from the ASF dual-hosted git repository. sunxin pushed a commit to branch HBASE-24666 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ff16870505d082fe5075cb31e60d6ec045cf2ab6 Author: XinSun <ddu...@gmail.com> AuthorDate: Tue Apr 27 11:13:15 2021 +0800 HBASE-24737 Find a way to resolve WALFileLengthProvider#getLogFileSizeIfBeingWritten problem (#3045) Signed-off-by: Duo Zhang <zhang...@apache.org> --- .../src/main/protobuf/server/region/Admin.proto | 12 ++ .../hbase/client/AsyncRegionServerAdmin.java | 8 ++ .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 24 ++++ .../hbase/replication/HReplicationServer.java | 11 +- .../regionserver/WALFileLengthProvider.java | 3 +- .../RemoteWALFileLengthProvider.java | 73 ++++++++++++ .../org/apache/hadoop/hbase/wal/WALProvider.java | 15 ++- .../hadoop/hbase/master/MockRegionServer.java | 7 ++ .../TestRemoteWALFileLengthProvider.java | 130 +++++++++++++++++++++ 10 files changed, 280 insertions(+), 5 deletions(-) diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto index 0667292..693a809 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto @@ -328,6 +328,15 @@ message ClearSlowLogResponses { required bool is_cleaned = 1; } +message GetLogFileSizeIfBeingWrittenRequest { + required string wal_path = 1; +} + +message GetLogFileSizeIfBeingWrittenResponse { + required bool is_being_written = 1; + optional uint64 length = 2; +} + service AdminService { rpc GetRegionInfo(GetRegionInfoRequest) returns(GetRegionInfoResponse); @@ -399,4 +408,7 @@ service AdminService { rpc GetLogEntries(LogRequest) returns(LogEntry); + rpc GetLogFileSizeIfBeingWritten(GetLogFileSizeIfBeingWrittenRequest) + returns(GetLogFileSizeIfBeingWrittenResponse); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java index 8ff869f..f18d894 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; @@ -216,4 +218,10 @@ public class AsyncRegionServerAdmin { ExecuteProceduresRequest request) { return call((stub, controller, done) -> stub.executeProcedures(controller, request, done)); } + + public CompletableFuture<GetLogFileSizeIfBeingWrittenResponse> getLogFileSizeIfBeingWritten( + GetLogFileSizeIfBeingWrittenRequest request) { + return call((stub, controller, done) -> + stub.getLogFileSizeIfBeingWritten(controller, request, done)); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c00a8b7..a5eb4e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2323,7 +2323,7 @@ public class HRegionServer extends Thread implements return walRoller; } - WALFactory getWalFactory() { + public WALFactory getWalFactory() { return walFactory; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 91bf9cb..edc33d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; @@ -136,6 +137,7 @@ import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler; import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker; import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.AccessChecker; @@ -189,6 +191,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; @@ -4055,6 +4059,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler, throw new ServiceException("Invalid request params"); } + @Override + public GetLogFileSizeIfBeingWrittenResponse getLogFileSizeIfBeingWritten( + RpcController controller, GetLogFileSizeIfBeingWrittenRequest request) throws ServiceException { + GetLogFileSizeIfBeingWrittenResponse.Builder builder = + GetLogFileSizeIfBeingWrittenResponse.newBuilder(); + try { + WALFileLengthProvider walLengthProvider = + this.regionServer.getWalFactory().getWALProvider().getWALFileLengthProvider(); + OptionalLong lengthOptional = + walLengthProvider.getLogFileSizeIfBeingWritten(new Path(request.getWalPath())); + if (lengthOptional.isPresent()) { + return builder.setIsBeingWritten(true).setLength(lengthOptional.getAsLong()).build(); + } else { + return builder.setIsBeingWritten(false).build(); + } + } catch (Exception e) { + throw new ServiceException(e); + } + } + public RpcScheduler getRpcScheduler() { return rpcServer.getScheduler(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java index 8d85b85..2654565 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java @@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSour import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.replication.replicationserver.RemoteWALFileLengthProvider; import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; @@ -716,7 +718,7 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId); // init replication source src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this, - producer, queueId, clusterId, p -> OptionalLong.empty(), metrics); + producer, queueId, clusterId, createWALFileLengthProvider(producer, queueId), metrics); queueStorage.getWALsInQueue(producer, queueId) .forEach(walName -> src.enqueueLog(new Path(walDir, walName))); src.startup(); @@ -744,4 +746,11 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou abort("Failed to operate on replication queue", e); } } + + private WALFileLengthProvider createWALFileLengthProvider(ServerName producer, String queueId) { + if (new ReplicationQueueInfo(queueId).isQueueRecovered()) { + return p -> OptionalLong.empty(); + } + return new RemoteWALFileLengthProvider(asyncClusterConnection, producer); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java index c60faa9..f91dd2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.IOException; import java.util.OptionalLong; import org.apache.hadoop.fs.Path; @@ -33,5 +34,5 @@ import org.apache.yetus.audience.InterfaceAudience; @FunctionalInterface public interface WALFileLengthProvider { - OptionalLong getLogFileSizeIfBeingWritten(Path path); + OptionalLong getLogFileSizeIfBeingWritten(Path path) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java new file mode 100644 index 0000000..07d216d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.replicationserver; + +import java.io.IOException; +import java.util.OptionalLong; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest; + +/** + * Used by ReplicationServer while Replication offload enabled. + * On ReplicationServer, we need to know the length of the wal being writing from RegionServer that + * holds the wal. So achieve that through RPC call. + */ +@InterfaceAudience.Private +public class RemoteWALFileLengthProvider implements WALFileLengthProvider { + + private static final Logger LOG = LoggerFactory.getLogger(RemoteWALFileLengthProvider.class); + + private AsyncClusterConnection conn; + + private ServerName rs; + + public RemoteWALFileLengthProvider(AsyncClusterConnection conn, ServerName rs) { + this.conn = conn; + this.rs = rs; + } + + @Override + public OptionalLong getLogFileSizeIfBeingWritten(Path path) throws IOException { + AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(rs); + GetLogFileSizeIfBeingWrittenRequest request = + GetLogFileSizeIfBeingWrittenRequest.newBuilder().setWalPath(path.toString()).build(); + try { + AdminProtos.GetLogFileSizeIfBeingWrittenResponse response = + FutureUtils.get(rsAdmin.getLogFileSizeIfBeingWritten(request)); + if (response.getIsBeingWritten()) { + return OptionalLong.of(response.getLength()); + } else { + return OptionalLong.empty(); + } + } catch (IOException e) { + LOG.warn("Exceptionally get the length of wal {} from RS {}", path.getName(), rs); + throw e; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index 01c1d11..a9bd50e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -130,7 +130,18 @@ public interface WALProvider { void addWALActionsListener(WALActionsListener listener); default WALFileLengthProvider getWALFileLengthProvider() { - return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path)) - .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); + return path -> getWALs().stream().map(w -> { + try { + return w.getLogFileSizeIfBeingWritten(path); + } catch (IOException e) { + // Won't go here. For supporting replication offload in HBASE-24737, we introduce + // RemoteWALFileLengthProvider implementing WALFileLengthProvider, it is hold by + // ReplicationServer and gets the length of WALs from RS through RPC, it may throw an IOE. + // So we need declare WALFileLengthProvider.getLogFileSizeIfBeingWritten as throwing IOE. + // But this is safe here, WALProvider is only used by RS, getWALs returns WAL that extents + // WALFileLengthProvider and won't throw IOE. + return OptionalLong.empty(); + } + }).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 69a7a79..084b5af 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -690,6 +690,13 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, } @Override + public AdminProtos.GetLogFileSizeIfBeingWrittenResponse getLogFileSizeIfBeingWritten( + RpcController controller, AdminProtos.GetLogFileSizeIfBeingWrittenRequest request) + throws ServiceException { + return null; + } + + @Override public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots( RpcController controller, GetSpaceQuotaSnapshotsRequest request) throws ServiceException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java new file mode 100644 index 0000000..a9adbec --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.replicationserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.OptionalLong; +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ MediumTests.class}) +public class TestRemoteWALFileLengthProvider { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRemoteWALFileLengthProvider.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestRemoteWALFileLengthProvider.class); + + @Rule + public final TestName name = new TestName(); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final byte[] CF = Bytes.toBytes("C"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(); + } + + @AfterClass + public static void teardownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + TableName tableName = TableName.valueOf(name.getMethodName()); + Table table = UTIL.createTable(tableName, CF); + UTIL.waitUntilAllRegionsAssigned(tableName); + assertEquals(1, UTIL.getMiniHBaseCluster().getNumLiveRegionServers()); + + // Find the RS which holds test table regions. + HRegionServer rs = + UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream() + .map(JVMClusterUtil.RegionServerThread::getRegionServer) + .filter(s -> !s.getRegions(tableName).isEmpty()) + .findFirst().get(); + assertNotNull(rs); + + // Put some data and request rolling log, make multiple wals. + table.put(new Put(Bytes.toBytes("r1")).addColumn(CF, CF, Bytes.toBytes("v"))); + rs.getWalRoller().requestRollAll(); + table.put(new Put(Bytes.toBytes("r2")).addColumn(CF, CF, Bytes.toBytes("v"))); + UTIL.waitFor(60000, rs::walRollRequestFinished); + + WALFileLengthProvider rsLengthProvider = + rs.getWalFactory().getWALProvider().getWALFileLengthProvider(); + WALFileLengthProvider remoteLengthProvider = + new RemoteWALFileLengthProvider(UTIL.getAsyncConnection(), rs.getServerName()); + + // Check that RegionServer and ReplicationServer can get same result whether the wal is being + // written + boolean foundWalIsBeingWritten = false; + List<Path> wals = getRsWalsOnFs(rs); + assertTrue(wals.size() > 1); + for (Path wal : wals) { + Path path = new Path(rs.getWALRootDir(), wal); + OptionalLong rsWalLength = rsLengthProvider.getLogFileSizeIfBeingWritten(path); + OptionalLong remoteLength = remoteLengthProvider.getLogFileSizeIfBeingWritten(path); + assertEquals(rsWalLength.isPresent(), remoteLength.isPresent()); + if (rsWalLength.isPresent() && remoteLength.isPresent()) { + foundWalIsBeingWritten = true; + assertEquals(rsWalLength.getAsLong(), remoteLength.getAsLong()); + } + } + assertTrue(foundWalIsBeingWritten); + } + + private List<Path> getRsWalsOnFs(HRegionServer rs) throws IOException { + FileSystem fs = rs.getFileSystem(); + FileStatus[] fileStatuses = fs.listStatus(new Path(rs.getWALRootDir(), + AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().toString()))); + return Arrays.stream(fileStatuses).map(FileStatus::getPath).collect(Collectors.toList()); + } +}