This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-3 by this push: new da508f1dae4 HBASE-26974 Introduce a LogRollProcedure (#5408) da508f1dae4 is described below commit da508f1dae4bad4f3aa6074b8db44febebefa60e Author: Ruanhui <32773751+frostr...@users.noreply.github.com> AuthorDate: Fri Sep 12 23:09:34 2025 +0800 HBASE-26974 Introduce a LogRollProcedure (#5408) Co-authored-by: huiruan <huir...@tencent.com> Signed-off-by: Duo Zhang <zhang...@apache.org> (cherry picked from commit ffed09d96bbaccdc83e1d7df66d640cf10b2f191) --- .../hbase/backup/impl/FullTableBackupClient.java | 7 +- .../backup/impl/IncrementalBackupManager.java | 10 +- .../hadoop/hbase/backup/util/BackupUtils.java | 51 ++++++ .../apache/hadoop/hbase/backup/TestBackupBase.java | 7 +- .../hadoop/hbase/backup/TestBackupMerge.java | 19 +-- .../java/org/apache/hadoop/hbase/client/Admin.java | 10 ++ .../hadoop/hbase/client/AdminOverAsyncAdmin.java | 5 + .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 9 ++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 + .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 138 ++++++++++++---- .../hbase/shaded/protobuf/RequestConverter.java | 6 + .../org/apache/hadoop/hbase/util/FutureUtils.java | 2 +- .../procedure2/RemoteProcedureDispatcher.java | 2 +- .../src/main/protobuf/HBase.proto | 4 + .../src/main/protobuf/server/master/Master.proto | 12 ++ .../protobuf/server/master/MasterProcedure.proto | 18 +++ .../server/master/RegionServerStatus.proto | 1 + .../src/main/protobuf/server/region/Admin.proto | 1 - .../apache/hadoop/hbase/executor/EventType.java | 8 +- .../apache/hadoop/hbase/executor/ExecutorType.java | 3 +- .../org/apache/hadoop/hbase/master/HMaster.java | 24 ++- .../hadoop/hbase/master/MasterRpcServices.java | 19 ++- .../apache/hadoop/hbase/master/MasterServices.java | 6 + .../apache/hadoop/hbase/master/ServerManager.java | 8 + .../assignment/RegionRemoteProcedureBase.java | 2 +- .../assignment/RegionTransitionProcedure.java | 2 +- .../master/procedure/FlushRegionProcedure.java | 2 +- .../hbase/master/procedure/LogRollProcedure.java | 178 +++++++++++++++++++++ .../master/procedure/LogRollRemoteProcedure.java | 113 +++++++++++++ .../master/procedure/ServerProcedureInterface.java | 5 + .../hadoop/hbase/master/procedure/ServerQueue.java | 1 + .../master/procedure/ServerRemoteProcedure.java | 3 +- .../master/procedure/SnapshotRegionProcedure.java | 2 +- .../hbase/procedure2/BaseRSProcedureCallable.java | 7 +- .../hbase/procedure2/RSProcedureCallable.java | 2 +- .../hbase/regionserver/FlushRegionCallable.java | 3 +- .../hadoop/hbase/regionserver/HRegionServer.java | 11 +- .../hadoop/hbase/regionserver/LogRollCallable.java | 84 ++++++++++ .../hadoop/hbase/regionserver/RSRpcServices.java | 2 +- .../hbase/regionserver/ReloadQuotasCallable.java | 3 +- .../RemoteProcedureResultReporter.java | 7 +- .../hbase/regionserver/SnapshotRegionCallable.java | 3 +- .../hbase/regionserver/SnapshotVerifyCallable.java | 3 +- .../hbase/regionserver/SplitWALCallable.java | 3 +- .../regionserver/handler/RSProcedureHandler.java | 8 +- .../ClaimReplicationQueueCallable.java | 3 +- .../regionserver/RefreshPeerCallable.java | 3 +- .../ReplaySyncReplicationWALCallable.java | 3 +- .../SwitchRpcThrottleRemoteCallable.java | 3 +- .../apache/hadoop/hbase/wal/AbstractWALRoller.java | 2 +- .../hbase/master/MockNoopMasterServices.java | 5 + .../master/procedure/TestLogRollProcedure.java | 104 ++++++++++++ .../procedure/TestServerRemoteProcedure.java | 3 +- .../TestRegisterPeerWorkerWhenRestarting.java | 4 +- .../hbase/rsgroup/VerifyingRSGroupAdmin.java | 5 + hbase-shell/src/main/ruby/hbase/admin.rb | 6 + hbase-shell/src/main/ruby/shell.rb | 1 + .../src/main/ruby/shell/commands/wal_roll_all.rb | 37 +++++ .../hadoop/hbase/thrift2/client/ThriftAdmin.java | 4 + 59 files changed, 901 insertions(+), 101 deletions(-) diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java index f21ced9bf2f..2293fd4f814 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/FullTableBackupClient.java @@ -25,7 +25,6 @@ import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CON import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.TableName; @@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; import org.apache.hadoop.hbase.backup.BackupRequest; import org.apache.hadoop.hbase.backup.BackupRestoreFactory; import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; @@ -158,10 +156,7 @@ public class FullTableBackupClient extends TableBackupClient { // snapshots for the same reason as the log rolls. List<BulkLoad> bulkLoadsToDelete = backupManager.readBulkloadRows(tableList); - Map<String, String> props = new HashMap<>(); - props.put("backupRoot", backupInfo.getBackupRootDir()); - admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf); newTimestamps = backupManager.readRegionServerLastLogRollResult(); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java index c92c0747e83..20884edf836 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/IncrementalBackupManager.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.backup.impl; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -29,9 +28,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -84,13 +81,8 @@ public class IncrementalBackupManager extends BackupManager { } LOG.info("Execute roll log procedure for incremental backup ..."); - HashMap<String, String> props = new HashMap<>(); - props.put("backupRoot", backupInfo.getBackupRootDir()); + BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf); - try (Admin admin = conn.getAdmin()) { - admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); - } newTimestamps = readRegionServerLastLogRollResult(); logList = getLogFilesForNewBackup(previousTimestampMins, newTimestamps, conf, savedStartCode); diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java index 15159ed73e4..183cc2054f1 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java @@ -49,6 +49,8 @@ import org.apache.hadoop.hbase.backup.HBackupFileSystem; import org.apache.hadoop.hbase.backup.RestoreRequest; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; +import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; @@ -65,6 +67,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; import org.apache.hbase.thirdparty.com.google.common.collect.Iterators; @@ -770,4 +773,52 @@ public final class BackupUtils { return BackupRestoreConstants.BACKUPID_PREFIX + recentTimestamp; } + /** + * roll WAL writer for all region servers and record the newest log roll result + */ + public static void logRoll(Connection conn, String backupRootDir, Configuration conf) + throws IOException { + boolean legacy = conf.getBoolean("hbase.backup.logroll.legacy.used", false); + if (legacy) { + logRollV1(conn, backupRootDir); + } else { + logRollV2(conn, backupRootDir); + } + } + + private static void logRollV1(Connection conn, String backupRootDir) throws IOException { + try (Admin admin = conn.getAdmin()) { + admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, + LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, + ImmutableMap.of("backupRoot", backupRootDir)); + } + } + + private static void logRollV2(Connection conn, String backupRootDir) throws IOException { + BackupSystemTable backupSystemTable = new BackupSystemTable(conn); + HashMap<String, Long> lastLogRollResult = + backupSystemTable.readRegionServerLastLogRollResult(backupRootDir); + try (Admin admin = conn.getAdmin()) { + Map<ServerName, Long> newLogRollResult = admin.rollAllWALWriters(); + + for (Map.Entry<ServerName, Long> entry : newLogRollResult.entrySet()) { + ServerName serverName = entry.getKey(); + long newHighestWALFilenum = entry.getValue(); + + String address = serverName.getAddress().toString(); + Long lastHighestWALFilenum = lastLogRollResult.get(address); + if (lastHighestWALFilenum != null && lastHighestWALFilenum > newHighestWALFilenum) { + LOG.warn("Won't update last roll log result for server {}: current = {}, new = {}", + serverName, lastHighestWALFilenum, newHighestWALFilenum); + } else { + backupSystemTable.writeRegionServerLastLogRollResult(address, newHighestWALFilenum, + backupRootDir); + if (LOG.isDebugEnabled()) { + LOG.debug("updated last roll log result for {} from {} to {}", serverName, + lastHighestWALFilenum, newHighestWALFilenum); + } + } + } + } + } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java index b5f58508441..a14fce59faf 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.backup; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -45,7 +44,6 @@ import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.backup.impl.FullTableBackupClient; import org.apache.hadoop.hbase.backup.impl.IncrementalBackupManager; import org.apache.hadoop.hbase.backup.impl.IncrementalTableBackupClient; -import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; @@ -239,10 +237,7 @@ public class TestBackupBase { // the snapshot. LOG.info("Execute roll log procedure for full backup ..."); - Map<String, String> props = new HashMap<>(); - props.put("backupRoot", backupInfo.getBackupRootDir()); - admin.execProcedure(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE, - LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, props); + BackupUtils.logRoll(conn, backupInfo.getBackupRootDir(), conf); failStageIf(Stage.stage_2); newTimestamps = backupManager.readRegionServerLastLogRollResult(); diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java index 38204f68e31..b9197632544 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupMerge.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.util.BackupUtils; @@ -70,17 +71,17 @@ public class TestBackupMerge extends TestBackupBase { // #2 - insert some data to table1 Table t1 = insertIntoTable(conn, table1, famName, 1, ADD_ROWS); - LOG.debug("writing " + ADD_ROWS + " rows to " + table1); + LOG.debug("writing {} rows to {}", ADD_ROWS, table1); - Assert.assertEquals(TEST_UTIL.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS); + Assert.assertEquals(HBaseTestingUtil.countRows(t1), NB_ROWS_IN_BATCH + ADD_ROWS); t1.close(); - LOG.debug("written " + ADD_ROWS + " rows to " + table1); + LOG.debug("written {} rows to {}", ADD_ROWS, table1); Table t2 = insertIntoTable(conn, table2, famName, 1, ADD_ROWS); - Assert.assertEquals(TEST_UTIL.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS); + Assert.assertEquals(HBaseTestingUtil.countRows(t2), NB_ROWS_IN_BATCH + ADD_ROWS); t2.close(); - LOG.debug("written " + ADD_ROWS + " rows to " + table2); + LOG.debug("written {} rows to {}", ADD_ROWS, table2); // #3 - incremental backup for multiple tables tables = Lists.newArrayList(table1, table2); @@ -112,15 +113,15 @@ public class TestBackupMerge extends TestBackupBase { tablesRestoreIncMultiple, tablesMapIncMultiple, true)); Table hTable = conn.getTable(table1_restore); - LOG.debug("After incremental restore: " + hTable.getDescriptor()); - int countRows = TEST_UTIL.countRows(hTable, famName); - LOG.debug("f1 has " + countRows + " rows"); + LOG.debug("After incremental restore: {}", hTable.getDescriptor()); + int countRows = HBaseTestingUtil.countRows(hTable, famName); + LOG.debug("f1 has {} rows", countRows); Assert.assertEquals(NB_ROWS_IN_BATCH + 2 * ADD_ROWS, countRows); hTable.close(); hTable = conn.getTable(table2_restore); - Assert.assertEquals(TEST_UTIL.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); + Assert.assertEquals(HBaseTestingUtil.countRows(hTable), NB_ROWS_IN_BATCH + 2 * ADD_ROWS); hTable.close(); admin.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 75dd2ef07b3..43a004a471c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -1404,6 +1404,16 @@ public interface Admin extends Abortable, Closeable { */ void rollWALWriter(ServerName serverName) throws IOException, FailedLogCloseException; + /** + * Roll log writer for all RegionServers. Note that unlike + * {@link Admin#rollWALWriter(ServerName)}, this method is synchronous, which means it will block + * until all RegionServers have completed the log roll, or a RegionServer fails due to an + * exception that retry will not work. + * @return server and the highest wal filenum of server before performing log roll + * @throws IOException if a remote or network exception occurs + */ + Map<ServerName, Long> rollAllWALWriters() throws IOException; + /** * Helper that delegates to getClusterMetrics().getMasterCoprocessorNames(). * @return an array of master coprocessors diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java index c13dfc33e3d..c866f434e63 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AdminOverAsyncAdmin.java @@ -635,6 +635,11 @@ class AdminOverAsyncAdmin implements Admin { get(admin.rollWALWriter(serverName)); } + @Override + public Map<ServerName, Long> rollAllWALWriters() throws IOException { + return get(admin.rollAllWALWriters()); + } + @Override public CompactionState getCompactionState(TableName tableName) throws IOException { return get(admin.getCompactionState(tableName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 331aa4a254a..d808aecc815 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1270,6 +1270,15 @@ public interface AsyncAdmin { */ CompletableFuture<Void> rollWALWriter(ServerName serverName); + /** + * Roll log writer for all RegionServers. Note that unlike + * {@link Admin#rollWALWriter(ServerName)}, this method is synchronous, which means it will block + * until all RegionServers have completed the log roll, or a RegionServer fails due to an + * exception that retry will not work. + * @return server and the highest wal filenum of server before performing log roll + */ + CompletableFuture<Map<ServerName, Long>> rollAllWALWriters(); + /** * Clear compacting queues on a region server. * @param serverName The servername of the region server. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 69f35360003..33ac47c73d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -691,6 +691,11 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.rollWALWriter(serverName)); } + @Override + public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() { + return wrap(rawAdmin.rollAllWALWriters()); + } + @Override public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { return wrap(rawAdmin.clearCompactionQueues(serverName, queues)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 79adce33a13..d7501ccedd8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -105,6 +105,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; @@ -149,6 +150,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerR import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; @@ -263,6 +265,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.Recommissi import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; @@ -497,28 +501,70 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } + /** + * short-circuit call for + * {@link RawAsyncHBaseAdmin#procedureCall(Object, MasterRpcCall, Converter, Converter, ProcedureBiConsumer)} + * by ignoring procedure result + */ private <PREQ, PRESP> CompletableFuture<Void> procedureCall(PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, - ProcedureBiConsumer consumer) { + ProcedureBiConsumer<Void> consumer) { + return procedureCall(preq, rpcCall, respConverter, result -> null, consumer); + } + + /** + * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, + * ProcedureBiConsumer) by skip setting priority for request + */ + private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(PREQ preq, + MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, + Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { return procedureCall(b -> { - }, preq, rpcCall, respConverter, consumer); + }, preq, rpcCall, respConverter, resultConverter, consumer); } + /** + * short-circuit call for procedureCall(TableName, Object, MasterRpcCall, Converter, Converter, + * ProcedureBiConsumer) by ignoring procedure result + */ private <PREQ, PRESP> CompletableFuture<Void> procedureCall(TableName tableName, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, - ProcedureBiConsumer consumer) { - return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); + ProcedureBiConsumer<Void> consumer) { + return procedureCall(tableName, preq, rpcCall, respConverter, result -> null, consumer); + } + + /** + * short-circuit call for procedureCall(Consumer, Object, MasterRpcCall, Converter, Converter, + * ProcedureBiConsumer) by skip setting priority for request + */ + private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall(TableName tableName, PREQ preq, + MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, + Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { + return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, resultConverter, + consumer); } - private <PREQ, PRESP> CompletableFuture<Void> procedureCall( + /** + * @param <PREQ> type of request + * @param <PRESP> type of response + * @param <PRES> type of procedure call result + * @param prioritySetter prioritySetter set priority by table for request + * @param preq procedure call request + * @param rpcCall procedure rpc call + * @param respConverter extract proc id from procedure call response + * @param resultConverter extract result from procedure call result + * @param consumer action performs on result + * @return procedure call result, null if procedure is void + */ + private <PREQ, PRESP, PRES> CompletableFuture<PRES> procedureCall( Consumer<MasterRequestCallerBuilder<?>> prioritySetter, PREQ preq, MasterRpcCall<PRESP, PREQ> rpcCall, Converter<Long, PRESP> respConverter, - ProcedureBiConsumer consumer) { - MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller().action((controller, - stub) -> this.<PREQ, PRESP, Long> call(controller, stub, preq, rpcCall, respConverter)); + Converter<PRES, ByteString> resultConverter, ProcedureBiConsumer<PRES> consumer) { + MasterRequestCallerBuilder<Long> builder = this.<Long> newMasterCaller() + .action((controller, stub) -> this.call(controller, stub, preq, rpcCall, respConverter)); prioritySetter.accept(builder); CompletableFuture<Long> procFuture = builder.call(); - CompletableFuture<Void> future = waitProcedureResult(procFuture); + CompletableFuture<PRES> future = waitProcedureResult(procFuture, resultConverter); addListener(future, consumer); return future; } @@ -1932,7 +1978,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return failedFuture(new ReplicationException("tableCfs is null")); } - CompletableFuture<Void> future = new CompletableFuture<Void>(); + CompletableFuture<Void> future = new CompletableFuture<>(); addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { if (!completeExceptionally(future, error)) { ReplicationPeerConfig newPeerConfig = @@ -1954,7 +2000,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return failedFuture(new ReplicationException("tableCfs is null")); } - CompletableFuture<Void> future = new CompletableFuture<Void>(); + CompletableFuture<Void> future = new CompletableFuture<>(); addListener(getReplicationPeerConfig(id), (peerConfig, error) -> { if (!completeExceptionally(future, error)) { ReplicationPeerConfig newPeerConfig = null; @@ -2053,7 +2099,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private void waitSnapshotFinish(SnapshotDescription snapshot, CompletableFuture<Void> future, SnapshotResponse resp) { if (resp.hasProcId()) { - getProcedureResult(resp.getProcId(), future, 0); + getProcedureResult(resp.getProcId(), src -> null, future, 0); addListener(future, new SnapshotProcedureBiConsumer(snapshot.getTableName())); } else { long expectedTimeout = resp.getExpectedTimeout(); @@ -2269,7 +2315,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .action((controller, stub) -> this.<RestoreSnapshotRequest, RestoreSnapshotResponse, Long> call(controller, stub, builder.build(), (s, c, req, done) -> s.restoreSnapshot(c, req, done), (resp) -> resp.getProcId())) - .call()); + .call(), result -> null); } @Override @@ -2681,14 +2727,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private static abstract class ProcedureBiConsumer implements BiConsumer<Void, Throwable> { + private static abstract class ProcedureBiConsumer<T> implements BiConsumer<T, Throwable> { abstract void onFinished(); abstract void onError(Throwable error); @Override - public void accept(Void v, Throwable error) { + public void accept(T value, Throwable error) { if (error != null) { onError(error); return; @@ -2697,7 +2743,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer { + private static abstract class TableProcedureBiConsumer extends ProcedureBiConsumer<Void> { protected final TableName tableName; TableProcedureBiConsumer(TableName tableName) { @@ -2722,7 +2768,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer { + private static abstract class NamespaceProcedureBiConsumer extends ProcedureBiConsumer<Void> { protected final String namespaceName; NamespaceProcedureBiConsumer(String namespaceName) { @@ -2737,12 +2783,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override void onFinished() { - LOG.info(getDescription() + " completed"); + LOG.info("{} completed", getDescription()); } @Override void onError(Throwable error) { - LOG.info(getDescription() + " failed with " + error.getMessage()); + LOG.info("{} failed with {}", getDescription(), error.getMessage()); } } @@ -2981,7 +3027,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } } - private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer { + private static class ReplicationProcedureBiConsumer extends ProcedureBiConsumer<Void> { private final String peerId; private final Supplier<String> getOperation; @@ -2996,28 +3042,44 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override void onFinished() { - LOG.info(getDescription() + " completed"); + LOG.info("{} completed", getDescription()); } @Override void onError(Throwable error) { - LOG.info(getDescription() + " failed with " + error.getMessage()); + LOG.info("{} failed with {}", getDescription(), error.getMessage()); } } - private CompletableFuture<Void> waitProcedureResult(CompletableFuture<Long> procFuture) { - CompletableFuture<Void> future = new CompletableFuture<>(); + private static final class RollAllWALWritersBiConsumer + extends ProcedureBiConsumer<Map<ServerName, Long>> { + + @Override + void onFinished() { + LOG.info("Rolling all WAL writers completed"); + } + + @Override + void onError(Throwable error) { + LOG.warn("Rolling all WAL writers failed with {}", error.getMessage()); + } + } + + private <T> CompletableFuture<T> waitProcedureResult(CompletableFuture<Long> procFuture, + Converter<T, ByteString> converter) { + CompletableFuture<T> future = new CompletableFuture<>(); addListener(procFuture, (procId, error) -> { if (error != null) { future.completeExceptionally(error); return; } - getProcedureResult(procId, future, 0); + getProcedureResult(procId, converter, future, 0); }); return future; } - private void getProcedureResult(long procId, CompletableFuture<Void> future, int retries) { + private <T> void getProcedureResult(long procId, Converter<T, ByteString> converter, + CompletableFuture<T> future, int retries) { addListener( this.<GetProcedureResultResponse> newMasterCaller() .action((controller, stub) -> this.<GetProcedureResultRequest, GetProcedureResultResponse, @@ -3029,12 +3091,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { if (error != null) { LOG.warn("failed to get the procedure result procId={}", procId, ConnectionUtils.translateException(error)); - retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); return; } if (response.getState() == GetProcedureResultResponse.State.RUNNING) { - retryTimer.newTimeout(t -> getProcedureResult(procId, future, retries + 1), + retryTimer.newTimeout(t -> getProcedureResult(procId, converter, future, retries + 1), ConnectionUtils.getPauseTime(pauseNs, retries), TimeUnit.NANOSECONDS); return; } @@ -3042,7 +3104,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { IOException ioe = ForeignExceptionUtil.toIOException(response.getException()); future.completeExceptionally(ioe); } else { - future.complete(null); + try { + future.complete(converter.convert(response.getResult())); + } catch (IOException e) { + future.completeExceptionally(e); + } } }); } @@ -3185,6 +3251,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { .serverName(serverName).call(); } + @Override + public CompletableFuture<Map<ServerName, Long>> rollAllWALWriters() { + return this + .<RollAllWALWritersRequest, RollAllWALWritersResponse, + Map<ServerName, + Long>> procedureCall( + RequestConverter.buildRollAllWALWritersRequest(ng.getNonceGroup(), ng.newNonce()), + (s, c, req, done) -> s.rollAllWALWriters(c, req, done), resp -> resp.getProcId(), + result -> LastHighestWalFilenum.parseFrom(result.toByteArray()).getFileNumMap() + .entrySet().stream().collect(Collectors + .toUnmodifiableMap(e -> ServerName.valueOf(e.getKey()), Map.Entry::getValue)), + new RollAllWALWritersBiConsumer()); + } + @Override public CompletableFuture<Void> clearCompactionQueues(ServerName serverName, Set<String> queues) { return this.<Void> newAdminCaller() diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 3bbfac500ce..37fdb1ba6fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -139,6 +139,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeR import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.OfflineRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RecommissionRegionServerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionSpecifierAndState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetBalancerRunningRequest; @@ -860,6 +861,11 @@ public final class RequestConverter { return RollWALWriterRequest.getDefaultInstance(); } + public static RollAllWALWritersRequest buildRollAllWALWritersRequest(long nonceGroup, + long nonce) { + return RollAllWALWritersRequest.newBuilder().setNonceGroup(nonceGroup).setNonce(nonce).build(); + } + /** * Create a new GetServerInfoRequest * @return a GetServerInfoRequest diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java index 4f8a7320fb4..37292d5feef 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/FutureUtils.java @@ -65,7 +65,7 @@ public final class FutureUtils { try { // See this post on stack overflow(shorten since the url is too long), // https://s.apache.org/completionexception - // For a chain of CompleableFuture, only the first child CompletableFuture can get the + // For a chain of CompletableFuture, only the first child CompletableFuture can get the // original exception, others will get a CompletionException, which wraps the original // exception. So here we unwrap it before passing it to the callback action. action.accept(resp, unwrapCompletionException(error)); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index e6a9d8fb2bd..6e68ce5f190 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -262,7 +262,7 @@ public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable * Called when RS tells the remote procedure is succeeded through the * {@code reportProcedureDone} method. */ - void remoteOperationCompleted(TEnv env); + void remoteOperationCompleted(TEnv env, byte[] remoteResultData); /** * Called when RS tells the remote procedure is failed through the {@code reportProcedureDone} diff --git a/hbase-protocol-shaded/src/main/protobuf/HBase.proto b/hbase-protocol-shaded/src/main/protobuf/HBase.proto index 0fd3d667d4d..c66ee7eb979 100644 --- a/hbase-protocol-shaded/src/main/protobuf/HBase.proto +++ b/hbase-protocol-shaded/src/main/protobuf/HBase.proto @@ -289,3 +289,7 @@ message RotateFileData { required int64 timestamp = 1; required bytes data = 2; } + +message LastHighestWalFilenum { + map<string, uint64> file_num = 1; +} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index a8adaa27453..768a1d7544e 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -799,6 +799,15 @@ message ModifyColumnStoreFileTrackerResponse { message FlushMasterStoreRequest {} message FlushMasterStoreResponse {} +message RollAllWALWritersRequest { + optional uint64 nonce_group = 1 [default = 0]; + optional uint64 nonce = 2 [default = 0]; +} + +message RollAllWALWritersResponse { + optional uint64 proc_id = 1; +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -1270,6 +1279,9 @@ service MasterService { rpc FlushTable(FlushTableRequest) returns(FlushTableResponse); + + rpc rollAllWALWriters(RollAllWALWritersRequest) + returns(RollAllWALWritersResponse); } // HBCK Service definitions. diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto index e3b43afd66a..554d7ec9c41 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/MasterProcedure.proto @@ -839,3 +839,21 @@ message ReloadQuotasProcedureStateData { required ServerName target_server = 1; optional ForeignExceptionMessage error = 2; } + +enum LogRollProcedureState { + LOG_ROLL_ROLL_LOG_ON_RS = 1; + LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM = 2; + LOG_ROLL_UNREGISTER_SERVER_LISTENER = 3; +} + +message LogRollRemoteProcedureStateData { + required ServerName target_server = 1; +} + +message RSLogRollParameter { +} + +message LogRollRemoteProcedureResult { + optional ServerName server_name = 1; + optional uint64 last_highest_wal_filenum = 2; +} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto index e68ba8e7286..3d2d8c6ff5f 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/RegionServerStatus.proto @@ -160,6 +160,7 @@ message RemoteProcedureResult { optional ForeignExceptionMessage error = 3; // Master active time as fencing token optional int64 initiating_master_active_time = 4; + optional bytes proc_result_data = 5; } message ReportProcedureDoneRequest { repeated RemoteProcedureResult result = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto index 230795f2747..30eb328fd3c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto @@ -420,5 +420,4 @@ service AdminService { rpc GetCachedFilesList(GetCachedFilesListRequest) returns(GetCachedFilesListResponse); - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index fce32333577..fee132b7a4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -303,7 +303,13 @@ public enum EventType { * RS reload quotas.<br> * RS_RELOAD_QUOTAS */ - RS_RELOAD_QUOTAS(90, ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS); + RS_RELOAD_QUOTAS(90, ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS), + + /** + * RS log roll.<br> + * RS_LOG_ROLL + */ + RS_LOG_ROLL(91, ExecutorType.RS_LOG_ROLL); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index 1d689d276aa..668cd701c0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -56,7 +56,8 @@ public enum ExecutorType { RS_CLAIM_REPLICATION_QUEUE(35), RS_SNAPSHOT_OPERATIONS(36), RS_FLUSH_OPERATIONS(37), - RS_RELOAD_QUOTAS_OPERATIONS(38); + RS_RELOAD_QUOTAS_OPERATIONS(38), + RS_LOG_ROLL(39); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 1cda553a81d..6f235b2156f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -160,6 +160,7 @@ import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.FlushTableProcedure; import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; +import org.apache.hadoop.hbase.master.procedure.LogRollProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; @@ -4201,11 +4202,11 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste return (RemoteProcedure<MasterProcedureEnv, ?>) procedure; } - public void remoteProcedureCompleted(long procId) { + public void remoteProcedureCompleted(long procId, byte[] remoteResultData) { LOG.debug("Remote procedure done, pid={}", procId); RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId); if (procedure != null) { - procedure.remoteOperationCompleted(procedureExecutor.getEnvironment()); + procedure.remoteOperationCompleted(procedureExecutor.getEnvironment(), remoteResultData); } } @@ -4539,7 +4540,7 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste @Override protected void run() throws IOException { getMaster().getMasterCoprocessorHost().preTableFlush(tableName); - LOG.info(getClientIdAuditPrefix() + " flush " + tableName); + LOG.info("{} flush {}", getClientIdAuditPrefix(), tableName); submitProcedure( new FlushTableProcedure(procedureExecutor.getEnvironment(), tableName, columnFamilies)); getMaster().getMasterCoprocessorHost().postTableFlush(tableName); @@ -4551,4 +4552,21 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste } }); } + + @Override + public long rollAllWALWriters(long nonceGroup, long nonce) throws IOException { + return MasterProcedureUtil + .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) { + @Override + protected void run() { + LOG.info("{} roll all wal writers", getClientIdAuditPrefix()); + submitProcedure(new LogRollProcedure()); + } + + @Override + protected String getDescription() { + return "RollAllWALWriters"; + } + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index fc246d38d51..de911b54ee9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -321,6 +321,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.Recommissi import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RegionSpecifierAndState; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RestoreSnapshotResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RollAllWALWritersResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCatalogScanResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.RunCleanerChoreRequest; @@ -1372,7 +1374,7 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster> @Override public GetProcedureResultResponse getProcedureResult(RpcController controller, GetProcedureResultRequest request) throws ServiceException { - LOG.debug("Checking to see if procedure is done pid=" + request.getProcId()); + LOG.debug("Checking to see if procedure is done pid={}", request.getProcId()); try { server.checkInitialized(); GetProcedureResultResponse.Builder builder = GetProcedureResultResponse.newBuilder(); @@ -2575,7 +2577,9 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster> } request.getResultList().forEach(result -> { if (result.getStatus() == RemoteProcedureResult.Status.SUCCESS) { - server.remoteProcedureCompleted(result.getProcId()); + byte[] remoteResultData = + result.hasProcResultData() ? result.getProcResultData().toByteArray() : null; + server.remoteProcedureCompleted(result.getProcId(), remoteResultData); } else { server.remoteProcedureFailed(result.getProcId(), RemoteProcedureException.fromProto(result.getError())); @@ -3662,4 +3666,15 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster> throw new ServiceException(ioe); } } + + @Override + public RollAllWALWritersResponse rollAllWALWriters(RpcController rpcController, + RollAllWALWritersRequest request) throws ServiceException { + try { + long procId = server.rollAllWALWriters(request.getNonceGroup(), request.getNonce()); + return RollAllWALWritersResponse.newBuilder().setProcId(procId).build(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index e9c98d62446..0573b1a7562 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -515,4 +515,10 @@ public interface MasterServices extends Server { * @return procedure Id */ long truncateRegion(RegionInfo regionInfo, long nonceGroup, long nonce) throws IOException; + + /** + * Roll WAL writer for all RegionServers + * @return procedure id + */ + long rollAllWALWriters(long nonceGroup, long nonce) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 55cfc28bb53..b99f0448e8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -236,6 +236,14 @@ public class ServerManager implements ConfigurationObserver { return this.listeners.remove(listener); } + /** + * Removes all of the ServerListeners of this collection that satisfy the given predicate. + * @param filter a predicate which returns true for ServerListener to be removed + */ + public boolean unregisterListenerIf(final Predicate<ServerListener> filter) { + return this.listeners.removeIf(filter); + } + /** * Let the server manager know a new regionserver has come online * @param request the startup request diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java index a828b5b668f..cb3b91ca0e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionRemoteProcedureBase.java @@ -103,7 +103,7 @@ public abstract class RegionRemoteProcedureBase extends Procedure<MasterProcedur newRemoteOperation(MasterProcedureEnv env); @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { + public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { // should not be called since we use reportRegionStateTransition to report the result throw new UnsupportedOperationException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index e0712f1d2aa..4cf685f50a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -166,7 +166,7 @@ public abstract class RegionTransitionProcedure extends Procedure<MasterProcedur } @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { + public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { // should not be called for region operation until we modified the open/close region procedure throw new UnsupportedOperationException(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java index 7c67f0e3ee9..af482aeff28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FlushRegionProcedure.java @@ -149,7 +149,7 @@ public class FlushRegionProcedure extends Procedure<MasterProcedureEnv> } @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { + public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { complete(env, null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java new file mode 100644 index 00000000000..a61b2c4afa5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollProcedure.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollProcedureState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureResult; + +/** + * The procedure to perform WAL rolling on all of RegionServers. + */ +@InterfaceAudience.Private +public class LogRollProcedure + extends StateMachineProcedure<MasterProcedureEnv, LogRollProcedureState> + implements GlobalProcedureInterface { + + private static final Logger LOG = LoggerFactory.getLogger(LogRollProcedure.class); + + public LogRollProcedure() { + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, LogRollProcedureState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + LOG.info("{} execute state={}", this, state); + + final ServerManager serverManager = env.getMasterServices().getServerManager(); + + try { + switch (state) { + case LOG_ROLL_ROLL_LOG_ON_RS: + // avoid potential new region server missing + serverManager.registerListener(new NewServerWALRoller(env)); + + final List<LogRollRemoteProcedure> subProcedures = + serverManager.getOnlineServersList().stream().map(LogRollRemoteProcedure::new).toList(); + addChildProcedure(subProcedures.toArray(new LogRollRemoteProcedure[0])); + setNextState(LogRollProcedureState.LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM); + return Flow.HAS_MORE_STATE; + case LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM: + // get children procedure + List<LogRollRemoteProcedure> children = + env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream() + .filter(p -> p instanceof LogRollRemoteProcedure) + .filter(p -> p.getParentProcId() == getProcId()).map(p -> (LogRollRemoteProcedure) p) + .toList(); + LastHighestWalFilenum.Builder builder = LastHighestWalFilenum.newBuilder(); + for (Procedure<MasterProcedureEnv> child : children) { + LogRollRemoteProcedureResult result = + LogRollRemoteProcedureResult.parseFrom(child.getResult()); + builder.putFileNum(ProtobufUtil.toServerName(result.getServerName()).toString(), + result.getLastHighestWalFilenum()); + } + setResult(builder.build().toByteArray()); + setNextState(LogRollProcedureState.LOG_ROLL_UNREGISTER_SERVER_LISTENER); + return Flow.HAS_MORE_STATE; + case LOG_ROLL_UNREGISTER_SERVER_LISTENER: + serverManager.unregisterListenerIf(l -> l instanceof NewServerWALRoller); + return Flow.NO_MORE_STATE; + } + } catch (Exception e) { + setFailure("log-roll", e); + } + return Flow.NO_MORE_STATE; + } + + @Override + public String getGlobalId() { + return getClass().getSimpleName(); + } + + private static final class NewServerWALRoller implements ServerListener { + + private final MasterProcedureEnv env; + + public NewServerWALRoller(MasterProcedureEnv env) { + this.env = env; + } + + @Override + public void serverAdded(ServerName server) { + env.getMasterServices().getMasterProcedureExecutor() + .submitProcedure(new LogRollRemoteProcedure(server)); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, LogRollProcedureState state) { + // nothing to rollback + } + + @Override + protected LogRollProcedureState getState(int stateId) { + return LogRollProcedureState.forNumber(stateId); + } + + @Override + protected int getStateId(LogRollProcedureState state) { + return state.getNumber(); + } + + @Override + protected LogRollProcedureState getInitialState() { + return LogRollProcedureState.LOG_ROLL_ROLL_LOG_ON_RS; + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + + if (getResult() != null && getResult().length > 0) { + serializer.serialize(LastHighestWalFilenum.parseFrom(getResult())); + } else { + serializer.serialize(LastHighestWalFilenum.getDefaultInstance()); + } + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + + if (getResult() == null) { + LastHighestWalFilenum lastHighestWalFilenum = + serializer.deserialize(LastHighestWalFilenum.class); + if (lastHighestWalFilenum != null) { + if ( + lastHighestWalFilenum.getFileNumMap().isEmpty() + && getCurrentState() == LogRollProcedureState.LOG_ROLL_UNREGISTER_SERVER_LISTENER + ) { + LOG.warn("pid = {}, current state is the last state, but rsHighestWalFilenumMap is " + + "empty, this should not happen. Are all region servers down ?", getProcId()); + } else { + setResult(lastHighestWalFilenum.toByteArray()); + } + } + } + } + + @Override + protected void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java new file mode 100644 index 00000000000..df8e02ed601 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/LogRollRemoteProcedure.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.Optional; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.regionserver.LogRollCallable; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureStateData; + +/** + * The remote procedure to perform WAL rolling on the specific RegionServer without retrying. + */ +@InterfaceAudience.Private +public class LogRollRemoteProcedure extends ServerRemoteProcedure + implements ServerProcedureInterface { + + public LogRollRemoteProcedure() { + } + + public LogRollRemoteProcedure(ServerName targetServer) { + this.targetServer = targetServer; + } + + @Override + protected void rollback(MasterProcedureEnv env) { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + serializer.serialize(LogRollRemoteProcedureStateData.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(targetServer)).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + LogRollRemoteProcedureStateData data = + serializer.deserialize(LogRollRemoteProcedureStateData.class); + this.targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } + + @Override + public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName serverName) { + return Optional.of(new ServerOperation(this, getProcId(), LogRollCallable.class, + LogRollRemoteProcedureStateData.getDefaultInstance().toByteArray(), + env.getMasterServices().getMasterActiveTime())); + } + + @Override + public ServerName getServerName() { + return targetServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.LOG_ROLL; + } + + @Override + protected boolean complete(MasterProcedureEnv env, Throwable error) { + // do not retry. just returns. + if (error != null) { + LOG.warn("Failed to roll wal for {}", targetServer, error); + return false; + } else { + return true; + } + } + + @Override + public synchronized void remoteOperationCompleted(MasterProcedureEnv env, + byte[] remoteResultData) { + setResult(remoteResultData); + super.remoteOperationCompleted(env, remoteResultData); + } + + @Override + protected void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()).append(" targetServer=").append(targetServer); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index e73b23a3f96..b7ff6db67db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -62,6 +62,11 @@ public interface ServerProcedureInterface { * Re-read the hbase:quotas table and update {@link QuotaCache}. */ RELOAD_QUOTAS, + + /** + * send roll log request to region server and handle the response + */ + LOG_ROLL } /** Returns Name of this server instance. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 57912f41903..55920bd47b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -44,6 +44,7 @@ class ServerQueue extends Queue<ServerName> { case CLAIM_REPLICATION_QUEUE_REMOTE: case VERIFY_SNAPSHOT: case RELOAD_QUOTAS: + case LOG_ROLL: return false; default: break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java index 0c89b639641..563961d765e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerRemoteProcedure.java @@ -123,7 +123,8 @@ public abstract class ServerRemoteProcedure extends Procedure<MasterProcedureEnv } @Override - public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + public synchronized void remoteOperationCompleted(MasterProcedureEnv env, + byte[] remoteResultData) { state = MasterProcedureProtos.ServerRemoteProcedureState.SERVER_REMOTE_PROCEDURE_REPORT_SUCCEED; remoteOperationDone(env, null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java index 05621767e7f..f4df40b168f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java @@ -108,7 +108,7 @@ public class SnapshotRegionProcedure extends Procedure<MasterProcedureEnv> } @Override - public void remoteOperationCompleted(MasterProcedureEnv env) { + public void remoteOperationCompleted(MasterProcedureEnv env, byte[] remoteResultData) { complete(env, null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java index 68aac1ef6e2..7ea98d00cc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/BaseRSProcedureCallable.java @@ -28,12 +28,11 @@ public abstract class BaseRSProcedureCallable implements RSProcedureCallable { private Exception initError; @Override - public final Void call() throws Exception { + public final byte[] call() throws Exception { if (initError != null) { throw initError; } - doCall(); - return null; + return doCall(); } @Override @@ -46,7 +45,7 @@ public abstract class BaseRSProcedureCallable implements RSProcedureCallable { } } - protected abstract void doCall() throws Exception; + protected abstract byte[] doCall() throws Exception; protected abstract void initParameter(byte[] parameter) throws Exception; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java index 635d2b6f87a..7ed9ff7664b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java @@ -26,7 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience; * A general interface for a sub procedure runs at RS side. */ @InterfaceAudience.Private -public interface RSProcedureCallable extends Callable<Void> { +public interface RSProcedureCallable extends Callable<byte[]> { /** * Initialize the callable diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java index 3dd932a1736..e39317290bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java @@ -43,7 +43,7 @@ public class FlushRegionCallable extends BaseRSProcedureCallable { private List<byte[]> columnFamilies; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { HRegion region = rs.getRegion(regionInfo.getEncodedName()); if (region == null) { throw new NotServingRegionException("region=" + regionInfo.getRegionNameAsString()); @@ -64,6 +64,7 @@ public class FlushRegionCallable extends BaseRSProcedureCallable { LOG.debug("Closing region operation on {}", region); region.closeRegionOperation(); } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 350baca36f4..cd49ceb753e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1969,6 +1969,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices> executorService.startExecutorService( executorService.new ExecutorConfig().setExecutorType(ExecutorType.RS_RELOAD_QUOTAS_OPERATIONS) .setCorePoolSize(rsRefreshQuotasThreads)); + final int logRollThreads = conf.getInt("hbase.regionserver.executor.log.roll.threads", 1); + executorService.startExecutorService(executorService.new ExecutorConfig() + .setExecutorType(ExecutorType.RS_LOG_ROLL).setCorePoolSize(logRollThreads)); Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller", uncaughtExceptionHandler); @@ -2203,7 +2206,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices> */ public void stop(final String msg, final boolean force, final User user) { if (!this.stopped) { - LOG.info("***** STOPPING region server '" + this + "' *****"); + LOG.info("***** STOPPING region server '{}' *****", this); if (this.rsHost != null) { // when forced via abort don't allow CPs to override try { @@ -3551,9 +3554,9 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices> .submit(new RSProcedureHandler(this, procId, initiatingMasterActiveTime, callable)); } - public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, - Throwable error) { - procedureResultReporter.complete(procId, initiatingMasterActiveTime, error); + public void remoteProcedureComplete(long procId, long initiatingMasterActiveTime, Throwable error, + byte[] procResultData) { + procedureResultReporter.complete(procId, initiatingMasterActiveTime, error, procResultData); } void reportProcedureDone(ReportProcedureDoneRequest request) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java new file mode 100644 index 00000000000..11dc28c2a68 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRollCallable.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.AbstractWALRoller; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureResult; + +@InterfaceAudience.Private +public class LogRollCallable extends BaseRSProcedureCallable { + + private static final Logger LOG = LoggerFactory.getLogger(LogRollCallable.class); + + private int maxRollRetry; + + @Override + protected byte[] doCall() throws Exception { + for (int nAttempt = 0; nAttempt < maxRollRetry; nAttempt++) { + try { + Pair<Long, Long> filenumPairBefore = getFilenumPair(); + + rs.getWalRoller().requestRollAll(); + rs.getWalRoller().waitUntilWalRollFinished(); + + Pair<Long, Long> filenumPairAfter = getFilenumPair(); + LOG.info( + "Before rolling log, highest filenum = {} default WAL filenum = {}, After " + + "rolling log, highest filenum = {} default WAL filenum = {}", + filenumPairBefore.getFirst(), filenumPairBefore.getSecond(), filenumPairAfter.getFirst(), + filenumPairAfter.getSecond()); + return LogRollRemoteProcedureResult.newBuilder() + .setServerName(ProtobufUtil.toServerName(rs.getServerName())) + .setLastHighestWalFilenum(filenumPairBefore.getFirst()).build().toByteArray(); + } catch (Exception e) { + LOG.warn("Failed rolling log on attempt={}", nAttempt, e); + if (nAttempt == maxRollRetry - 1) { + throw e; + } + } + } + return null; + } + + private Pair<Long, Long> getFilenumPair() throws IOException { + long highestFilenum = rs.getWALs().stream() + .mapToLong(wal -> ((AbstractFSWAL<?>) wal).getFilenum()).max().orElse(-1L); + long defaultWALFilenum = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum(); + return Pair.newPair(highestFilenum, defaultWALFilenum); + } + + @Override + protected void initParameter(byte[] parameter) throws Exception { + this.maxRollRetry = rs.getConfiguration().getInt(AbstractWALRoller.WAL_ROLL_RETRIES, 1); + } + + @Override + public EventType getEventType() { + return EventType.RS_LOG_ROLL; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index bd232addcec..d325c67a82a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -3966,7 +3966,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer> LOG.warn("Failed to instantiating remote procedure {}, pid={}", request.getProcClass(), request.getProcId(), e); server.remoteProcedureComplete(request.getProcId(), request.getInitiatingMasterActiveTime(), - e); + e, null); return; } callable.init(request.getProcData().toByteArray(), server); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java index e134dfda7ac..de23db37856 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReloadQuotasCallable.java @@ -29,9 +29,10 @@ public class ReloadQuotasCallable extends BaseRSProcedureCallable { private static final Logger LOG = LoggerFactory.getLogger(ReloadQuotasCallable.class); @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { LOG.info("Reloading quotas"); rs.getRegionServerRpcQuotaManager().reload(); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java index 21016fe59dd..7fcf363a919 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RemoteProcedureResultReporter.java @@ -28,6 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; @@ -51,7 +52,8 @@ class RemoteProcedureResultReporter extends Thread { this.server = server; } - public void complete(long procId, long initiatingMasterActiveTime, Throwable error) { + public void complete(long procId, long initiatingMasterActiveTime, Throwable error, + byte[] procReturnValue) { RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId) .setInitiatingMasterActiveTime(initiatingMasterActiveTime); if (error != null) { @@ -62,6 +64,9 @@ class RemoteProcedureResultReporter extends Thread { LOG.debug("Successfully complete execution of pid={}", procId); builder.setStatus(RemoteProcedureResult.Status.SUCCESS); } + if (procReturnValue != null) { + builder.setProcResultData(ByteString.copyFrom(procReturnValue)); + } results.add(builder.build()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java index 0693aee8750..7158671efb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotRegionCallable.java @@ -41,7 +41,7 @@ public class SnapshotRegionCallable extends BaseRSProcedureCallable { private ForeignExceptionDispatcher monitor; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { HRegion region = rs.getRegion(regionInfo.getEncodedName()); if (region == null) { throw new NotServingRegionException( @@ -78,6 +78,7 @@ public class SnapshotRegionCallable extends BaseRSProcedureCallable { LOG.debug("Closing snapshot operation on {}", region); region.closeRegionOperation(Region.Operation.SNAPSHOT); } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java index db7908d81be..76a3c1cf84e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SnapshotVerifyCallable.java @@ -32,8 +32,9 @@ public class SnapshotVerifyCallable extends BaseRSProcedureCallable { private RegionInfo region; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { rs.getRsSnapshotVerifier().verifyRegion(snapshot, region); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java index 151c865db79..e6ae50f6e9a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -79,7 +79,7 @@ public class SplitWALCallable extends BaseRSProcedureCallable { } @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { // grab a lock splitWALLock = splitWALLocks.acquireLock(walPath); try { @@ -97,6 +97,7 @@ public class SplitWALCallable extends BaseRSProcedureCallable { } finally { splitWALLock.unlock(); } + return null; } public String getWalPath() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java index 6eacc6b78e6..3e150144f2c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java @@ -51,14 +51,16 @@ public class RSProcedureHandler extends EventHandler { @Override public void process() { Throwable error = null; + byte[] procResultData = null; try { MDC.put("pid", Long.toString(procId)); - callable.call(); + procResultData = callable.call(); } catch (Throwable t) { - LOG.error("pid=" + this.procId, t); + LOG.error("pid={}", this.procId, t); error = t; } finally { - ((HRegionServer) server).remoteProcedureComplete(procId, initiatingMasterActiveTime, error); + ((HRegionServer) server).remoteProcedureComplete(procId, initiatingMasterActiveTime, error, + procResultData); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java index 2b7e14f9f7a..73fa2976618 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ClaimReplicationQueueCallable.java @@ -39,9 +39,10 @@ public class ClaimReplicationQueueCallable extends BaseRSProcedureCallable { } @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler(); handler.claimReplicationQueue(queueId); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java index 094a61dcdd1..5d4454c1448 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshPeerCallable.java @@ -43,7 +43,7 @@ public class RefreshPeerCallable extends BaseRSProcedureCallable { private int stage; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { LOG.info("Received a peer change event, peerId=" + peerId + ", type=" + type); PeerProcedureHandler handler = rs.getReplicationSourceService().getPeerProcedureHandler(); switch (type) { @@ -68,6 +68,7 @@ public class RefreshPeerCallable extends BaseRSProcedureCallable { default: throw new IllegalArgumentException("Unknown peer modification type: " + type); } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java index 427fe80b0c3..ed368e18981 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplaySyncReplicationWALCallable.java @@ -69,7 +69,7 @@ public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable { private final KeyLocker<String> peersLock = new KeyLocker<>(); @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { LOG.info("Received a replay sync replication wals {} event, peerId={}", wals, peerId); if (rs.getReplicationSinkService() != null) { Lock peerLock = peersLock.acquireLock(wals.get(0)); @@ -81,6 +81,7 @@ public class ReplaySyncReplicationWALCallable extends BaseRSProcedureCallable { peerLock.unlock(); } } + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java index d09c821b9ed..fd35464e686 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SwitchRpcThrottleRemoteCallable.java @@ -34,8 +34,9 @@ public class SwitchRpcThrottleRemoteCallable extends BaseRSProcedureCallable { private boolean rpcThrottleEnabled; @Override - protected void doCall() throws Exception { + protected byte[] doCall() throws Exception { rs.getRegionServerRpcQuotaManager().switchRpcThrottle(rpcThrottleEnabled); + return null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java index c900333af9e..5e645721134 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractWALRoller.java @@ -69,7 +69,7 @@ public abstract class AbstractWALRoller<T extends Abortable> extends Thread impl * Configure for the max count of log rolling retry. The real retry count is also limited by the * timeout of log rolling via {@link #WAL_ROLL_WAIT_TIMEOUT} */ - protected static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries"; + public static final String WAL_ROLL_RETRIES = "hbase.regionserver.logroll.retries"; protected final ConcurrentMap<WAL, RollController> wals = new ConcurrentHashMap<>(); protected final T abortable; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index e78ca7d0cdb..daaa2e5c2b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -568,4 +568,9 @@ public class MockNoopMasterServices implements MasterServices { long nonce) throws IOException { return 0; } + + @Override + public long rollAllWALWriters(long nonceGroup, long nonce) throws IOException { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java new file mode 100644 index 00000000000..1b587097dda --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestLogRollProcedure.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY; +import static org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.DISPATCH_MAX_QUEUE_SIZE_CONF_KEY; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SingleProcessHBaseCluster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(MediumTests.class) +public class TestLogRollProcedure { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestLogRollProcedure.class); + + @Rule + public TestName name = new TestName(); + + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private Configuration conf; + + @Before + public void setUp() throws Exception { + conf = TEST_UTIL.getConfiguration(); + conf.set(DISPATCH_DELAY_CONF_KEY, "2000"); + conf.set(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, "128"); + TEST_UTIL.startMiniCluster(2); + } + + @After + public void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testSimpleLogRoll() throws IOException { + HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(0); + long fileNumBefore = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum(); + + TEST_UTIL.getAdmin().rollAllWALWriters(); + + long fileNumAfter = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum(); + assertTrue(fileNumAfter > fileNumBefore); + } + + @Test + public void testMasterRestarts() throws IOException { + SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); + HRegionServer rs = cluster.getRegionServer(0); + long fileNumBefore = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum(); + + LogRollProcedure procedure = new LogRollProcedure(); + long procId = cluster.getMaster().getMasterProcedureExecutor().submitProcedure(procedure); + + TEST_UTIL.waitFor(60000, () -> cluster.getMaster().getMasterProcedureExecutor().getProcedures() + .stream().anyMatch(p -> p instanceof LogRollRemoteProcedure)); + ServerName serverName = cluster.getMaster().getServerName(); + cluster.killMaster(serverName); + cluster.waitForMasterToStop(serverName, 30000); + cluster.startMaster(); + cluster.waitForActiveAndReadyMaster(); + + ProcedureExecutor<MasterProcedureEnv> exec = cluster.getMaster().getMasterProcedureExecutor(); + TEST_UTIL.waitFor(30000, () -> exec.isRunning() && exec.isFinished(procId)); + + long fileNumAfter = ((AbstractFSWAL<?>) rs.getWAL(null)).getFilenum(); + + assertTrue(fileNumAfter > fileNumBefore); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java index 1500a3c00cd..f828f5ce1ba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerRemoteProcedure.java @@ -188,7 +188,8 @@ public class TestServerRemoteProcedure { } @Override - public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + public synchronized void remoteOperationCompleted(MasterProcedureEnv env, + byte[] remoteResultData) { complete(env, null); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java index 1c4abd15eaf..c0a37c20e88 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestRegisterPeerWorkerWhenRestarting.java @@ -57,14 +57,14 @@ public class TestRegisterPeerWorkerWhenRestarting extends SyncReplicationTestBas } @Override - public void remoteProcedureCompleted(long procId) { + public void remoteProcedureCompleted(long procId, byte[] data) { if ( FAIL && getMasterProcedureExecutor() .getProcedure(procId) instanceof SyncReplicationReplayWALRemoteProcedure ) { throw new RuntimeException("Inject error"); } - super.remoteProcedureCompleted(procId); + super.remoteProcedureCompleted(procId, data); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java index 35c868413e1..4d592b49d0d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rsgroup/VerifyingRSGroupAdmin.java @@ -526,6 +526,11 @@ public class VerifyingRSGroupAdmin implements Admin, Closeable { admin.rollWALWriter(serverName); } + @Override + public Map<ServerName, Long> rollAllWALWriters() throws IOException { + return admin.rollAllWALWriters(); + } + public CompactionState getCompactionState(TableName tableName) throws IOException { return admin.getCompactionState(tableName); } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 5ceaf2a08c7..93cc312338c 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -179,6 +179,12 @@ module Hbase # TODO: remove older hlog_roll version alias hlog_roll wal_roll + #---------------------------------------------------------------------------------------------- + # Requests all region servers to roll wal writer + def wal_roll_all + @admin.rollAllWALWriters + end + #---------------------------------------------------------------------------------------------- # Requests a table or region split def split(table_or_region_name, split_point = nil) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 46b38dd96b8..6be3854b8a5 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -467,6 +467,7 @@ Shell.load_command_group( unassign zk_dump wal_roll + wal_roll_all hbck_chore_run catalogjanitor_run catalogjanitor_switch diff --git a/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb b/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb new file mode 100644 index 00000000000..13d76449565 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/wal_roll_all.rb @@ -0,0 +1,37 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +module Shell + module Commands + class WalRollAll < Command + def help + <<-EOF +Request all region servers to roll wal writer. Note that this method is synchronous, +which means it will block until all RegionServers have completed the log roll, +or a RegionServer fails due to an exception that retry will not work. Here is how +you would run the command in the hbase shell: + hbase> wal_roll_all +EOF + end + + def command + admin.wal_roll_all + end + end + end +end diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java index 0eff84bba7c..a0d73dcca21 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java @@ -826,7 +826,11 @@ public class ThriftAdmin implements Admin { @Override public void rollWALWriter(ServerName serverName) { throw new NotImplementedException("rollWALWriter not supported in ThriftAdmin"); + } + @Override + public Map<ServerName, Long> rollAllWALWriters() { + throw new NotImplementedException("rollAllWALWriters not supported in ThriftAdmin"); } @Override