HBASE-21588 Procedure v2 wal splitting implementation
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f02ac310 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f02ac310 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f02ac310 Branch: refs/heads/branch-2 Commit: f02ac310d249fb972705cd6621dfe495512d2d98 Parents: 348c2df Author: Jingyun Tian <[email protected]> Authored: Tue Jan 8 09:49:13 2019 +0800 Committer: Jingyun Tian <[email protected]> Committed: Tue Jan 8 17:26:58 2019 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/HConstants.java | 8 + .../src/main/protobuf/MasterProcedure.proto | 25 ++ .../SplitLogWorkerCoordination.java | 3 - .../ZkSplitLogWorkerCoordination.java | 6 +- .../org/apache/hadoop/hbase/master/HMaster.java | 18 + .../hadoop/hbase/master/MasterServices.java | 7 + .../hadoop/hbase/master/MasterWalManager.java | 6 +- .../hadoop/hbase/master/SplitWALManager.java | 239 ++++++++++++ .../master/procedure/ServerCrashProcedure.java | 76 +++- .../procedure/ServerProcedureInterface.java | 13 +- .../hbase/master/procedure/ServerQueue.java | 2 + .../master/procedure/SplitWALProcedure.java | 199 ++++++++++ .../procedure/SplitWALRemoteProcedure.java | 195 ++++++++++ .../hbase/regionserver/HRegionServer.java | 18 +- .../hbase/regionserver/SplitLogWorker.java | 99 ++--- .../hbase/regionserver/SplitWALCallable.java | 109 ++++++ .../hadoop/hbase/master/AbstractTestDLS.java | 3 +- .../hadoop/hbase/master/TestRestartCluster.java | 21 + .../hadoop/hbase/master/TestRollingRestart.java | 18 +- .../hbase/master/TestSplitWALManager.java | 383 +++++++++++++++++++ .../procedure/TestServerCrashProcedure.java | 21 +- .../master/procedure/TestSplitWALProcedure.java | 133 +++++++ .../hbase/regionserver/TestSplitLogWorker.java | 5 +- 23 files changed, 1537 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 7aa1494..bc184e5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1347,6 +1347,14 @@ public final class HConstants { public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = "hbase.client.fast.fail.interceptor.impl"; + public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated"; + + public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true; + + public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = "hbase.regionserver.wal.max.splitters"; + + public static final int DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER = 2; + /** Config key for if the server should send backpressure and if the client should listen to * that backpressure from the server */ public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index f96859c..1901282 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -308,6 +308,8 @@ enum ServerCrashState { SERVER_CRASH_WAIT_ON_ASSIGN = 9; SERVER_CRASH_SPLIT_META_LOGS = 10; SERVER_CRASH_ASSIGN_META = 11; + SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12; + SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13; SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true]; SERVER_CRASH_FINISH = 100; } @@ -502,4 +504,27 @@ message SwitchRpcThrottleStateData { message SwitchRpcThrottleRemoteStateData { required ServerName target_server = 1; required bool rpc_throttle_enabled = 2; +} + +message SplitWALParameter { + required string wal_path = 1; +} + + +message SplitWALData { + required string wal_path = 1; + required ServerName crashed_server = 2; + optional ServerName worker = 3; +} + +message SplitWALRemoteData { + required string wal_path = 1; + required ServerName crashed_server = 2; + required ServerName worker = 3; +} + +enum SplitWALState { + ACQUIRE_SPLIT_WAL_WORKER = 1; + DISPATCH_WAL_TO_WORKER = 2; + RELEASE_SPLIT_WORKER = 3; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java index ab04f60..ad74015 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java @@ -48,9 +48,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public interface SplitLogWorkerCoordination { -/* SplitLogWorker part */ - int DEFAULT_MAX_SPLITTERS = 2; - /** * Initialize internal values. This method should be used when corresponding SplitLogWorker * instance is created http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index ff555f2..7ceaaec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.coordination; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; + import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -135,7 +138,8 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements this.server = server; this.worker = worker; this.splitTaskExecutor = splitExecutor; - maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); + maxConcurrentTasks = + conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); reportPeriod = conf.getInt("hbase.splitlog.report.period", conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bd5e67a..adbae00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import com.google.protobuf.Descriptors; import com.google.protobuf.Service; @@ -335,6 +337,13 @@ public class HMaster extends HRegionServer implements MasterServices { private MasterFileSystem fileSystemManager; private MasterWalManager walManager; + // manager to manage procedure-based WAL splitting, can be null if current + // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager + // and MasterWalManager, which means zk-based WAL splitting code will be + // useless after we switch to the procedure-based one. our eventual goal + // is to remove all the zk-based WAL splitting code. + private SplitWALManager splitWALManager; + // server manager to deal with region server info private volatile ServerManager serverManager; @@ -942,6 +951,10 @@ public class HMaster extends HRegionServer implements MasterServices { status.setStatus("Initialize ServerManager and schedule SCP for crash servers"); this.serverManager = createServerManager(this); + if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.splitWALManager = new SplitWALManager(this); + } createProcedureExecutor(); @SuppressWarnings("rawtypes") Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> procsByType = @@ -1379,6 +1392,11 @@ public class HMaster extends HRegionServer implements MasterServices { } @Override + public SplitWALManager getSplitWALManager() { + return splitWALManager; + } + + @Override public TableStateManager getTableStateManager() { return tableStateManager; } http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 537e09f..d15197f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -495,4 +495,11 @@ public interface MasterServices extends Server { * @return True if cluster is up; false if cluster is not up (we are shutting down). */ boolean isClusterUp(); + + /** + * @return return null if current is zk-based WAL splitting + */ + default SplitWALManager getSplitWALManager(){ + return null; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 5ab1c28..fbf4594 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -60,7 +60,8 @@ public class MasterWalManager { } }; - final static PathFilter NON_META_FILTER = new PathFilter() { + @VisibleForTesting + public final static PathFilter NON_META_FILTER = new PathFilter() { @Override public boolean accept(Path p) { return !AbstractFSWALProvider.isMetaFile(p); @@ -167,7 +168,6 @@ public class MasterWalManager { /** * @return listing of ServerNames found by parsing WAL directory paths in FS. - * */ public Set<ServerName> getServerNamesFromWALDirPath(final PathFilter filter) throws IOException { FileStatus[] walDirForServerNames = getWALDirPaths(filter); @@ -290,7 +290,7 @@ public class MasterWalManager { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification= "We only release this lock when we set it. Updates to code that uses it should verify use " + "of the guard boolean.") - private List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException { + List<Path> getLogDirs(final Set<ServerName> serverNames) throws IOException { List<Path> logDirs = new ArrayList<>(); boolean needReleaseLock = false; if (!this.services.isInitialized()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java new file mode 100644 index 0000000..fc50840 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; +import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +/** + * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each + * {@link SplitWALProcedure}. + * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER). + * Helps assign and release workers for split tasks. + * Provide helper method to delete split WAL file and directory. + * + * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta) + * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta) + * can delete the splitting WAL and directory via deleteSplitWAL(wal) + * and deleteSplitWAL(crashedServer) + * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath) + * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure) + * and releaseSplitWALWorker(worker, scheduler) + * + * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager}, + * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and + * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed + * after we switch to procedure-based WAL splitting. + */ [email protected] +public class SplitWALManager { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); + + private final MasterServices master; + private final SplitWorkerAssigner splitWorkerAssigner; + private final Path rootDir; + private final FileSystem fs; + private final Configuration conf; + + public SplitWALManager(MasterServices master) { + this.master = master; + this.conf = master.getConfiguration(); + this.splitWorkerAssigner = new SplitWorkerAssigner(this.master, + conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); + this.rootDir = master.getMasterFileSystem().getWALRootDir(); + this.fs = master.getMasterFileSystem().getFileSystem(); + + } + + public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) + throws IOException { + try { + // 1. list all splitting files + List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta); + // 2. create corresponding procedures + return createSplitWALProcedures(splittingFiles, crashedServer); + } catch (IOException e) { + LOG.error("failed to create procedures for splitting logs of {}", crashedServer, e); + throw e; + } + } + + public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta) + throws IOException { + List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName)); + FileStatus[] fileStatuses = + SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); + LOG.info("size of WALs of {} is {}, isMeta: {}", serverName, fileStatuses.length, splitMeta); + return Lists.newArrayList(fileStatuses); + } + + private Path getWALSplitDir(ServerName serverName) { + Path logDir = + new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); + } + + public void deleteSplitWAL(String wal) throws IOException { + fs.delete(new Path(wal), false); + } + + public void deleteWALDir(ServerName serverName) throws IOException { + Path splitDir = getWALSplitDir(serverName); + fs.delete(splitDir, false); + } + + public boolean isSplitWALFinished(String walPath) throws IOException { + return !fs.exists(new Path(rootDir, walPath)); + } + + @VisibleForTesting + List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs, + ServerName crashedServer) { + return splittingWALs.stream() + .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) + .collect(Collectors.toList()); + } + + /** + * try to acquire an worker from online servers which is executring + * @param procedure split WAL task + * @return an available region server which could execute this task + * @throws ProcedureSuspendedException if there is no available worker, + * it will throw this exception to let the procedure wait + */ + public ServerName acquireSplitWALWorker(Procedure<?> procedure) + throws ProcedureSuspendedException { + Optional<ServerName> worker = splitWorkerAssigner.acquire(); + LOG.debug("acquired a worker {} to split a WAL", worker); + if (worker.isPresent()) { + return worker.get(); + } + splitWorkerAssigner.suspend(procedure); + throw new ProcedureSuspendedException(); + } + + /** + * After the worker finished the split WAL task, it will release the worker, and wake up all the + * suspend procedures in the ProcedureEvent + * @param worker worker which is about to release + * @param scheduler scheduler which is to wake up the procedure event + */ + public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { + LOG.debug("release a worker {} to split a WAL", worker); + splitWorkerAssigner.release(worker); + splitWorkerAssigner.wake(scheduler); + } + + /** + * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL + * tasks running on the region server side, they will not be count by the new splitWorkerAssigner. + * Thus we should add the workers of running tasks to the assigner when we load the procedures + * from MasterProcWALs. + * @param worker region server which is executing a split WAL task + */ + public void addUsedSplitWALWorker(ServerName worker){ + splitWorkerAssigner.addUsedWorker(worker); + } + + /** + * help assign and release a worker for each WAL splitting task + * For each worker, concurrent running splitting task should be no more than maxSplitTasks + * If a task failed to acquire a worker, it will suspend and wait for workers available + * + */ + private static final class SplitWorkerAssigner implements ServerListener { + private int maxSplitTasks; + private final ProcedureEvent<?> event; + private Map<ServerName, Integer> currentWorkers = new HashMap<>(); + private MasterServices master; + + public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { + this.maxSplitTasks = maxSplitTasks; + this.master = master; + this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); + this.master.getServerManager().registerListener(this); + } + + public synchronized Optional<ServerName> acquire() { + List<ServerName> serverList = master.getServerManager().getOnlineServersList(); + Collections.shuffle(serverList); + Optional<ServerName> worker = serverList.stream().filter( + serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) + .findAny(); + if (worker.isPresent()) { + currentWorkers.compute(worker.get(), (serverName, + availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); + } + return worker; + } + + public synchronized void release(ServerName serverName) { + currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); + } + + public void suspend(Procedure<?> proc) { + event.suspend(); + event.suspendIfNotReady(proc); + } + + public void wake(MasterProcedureScheduler scheduler) { + if (!event.isReady()) { + event.wake(scheduler); + } + } + + @Override + public void serverAdded(ServerName worker) { + this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + } + + public synchronized void addUsedWorker(ServerName worker) { + // load used worker when master restart + currentWorkers.compute(worker, (serverName, + availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 05bcd28..2072727 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.master.procedure; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -28,9 +31,11 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterWalManager; +import org.apache.hadoop.hbase.master.SplitWALManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; @@ -107,6 +112,7 @@ public class ServerCrashProcedure protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) throws ProcedureSuspendedException, ProcedureYieldException { final MasterServices services = env.getMasterServices(); + final AssignmentManager am = env.getAssignmentManager(); // HBASE-14802 // If we have not yet notified that we are processing a dead server, we should do now. if (!notifiedDeadServer) { @@ -117,6 +123,7 @@ public class ServerCrashProcedure switch (state) { case SERVER_CRASH_START: case SERVER_CRASH_SPLIT_META_LOGS: + case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR: case SERVER_CRASH_ASSIGN_META: break; default: @@ -137,8 +144,24 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_META_LOGS: - splitMetaLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + splitMetaLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + } else { + am.getRegionStates().metaLogSplitting(serverName); + addChildProcedure(createSplittingWalProcedures(env, true)); + setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR); + } + break; + case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR: + if(isSplittingDone(env, true)){ + cleanupSplitDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + am.getRegionStates().metaLogSplit(serverName); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS); + } break; case SERVER_CRASH_ASSIGN_META: assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO)); @@ -156,8 +179,24 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_LOGS: - splitLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + splitLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + } else { + am.getRegionStates().logSplitting(this.serverName); + addChildProcedure(createSplittingWalProcedures(env, false)); + setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_WALS_DIR); + } + break; + case SERVER_CRASH_DELETE_SPLIT_WALS_DIR: + if (isSplittingDone(env, false)) { + cleanupSplitDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + am.getRegionStates().logSplit(this.serverName); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); + } break; case SERVER_CRASH_ASSIGN: // If no regions to assign, skip assign and skip to the finish. @@ -179,6 +218,7 @@ public class ServerCrashProcedure setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; case SERVER_CRASH_FINISH: + LOG.info("removed crashed server {} after splitting done", serverName); services.getAssignmentManager().getRegionStates().removeServer(serverName); services.getServerManager().getDeadServers().finish(serverName); return Flow.NO_MORE_STATE; @@ -191,6 +231,34 @@ public class ServerCrashProcedure return Flow.HAS_MORE_STATE; } + private void cleanupSplitDir(MasterProcedureEnv env) { + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + try { + splitWALManager.deleteWALDir(serverName); + } catch (IOException e) { + LOG.warn("remove WAL directory of server {} failed, ignore...", serverName, e); + } + } + + private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) { + LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta); + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + try { + return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0; + } catch (IOException e) { + LOG.warn("get filelist of serverName {} failed, retry...", serverName, e); + return false; + } + } + + private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, boolean splitMeta) + throws IOException { + LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta); + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + List<Procedure> procedures = splitWALManager.splitWALs(serverName, splitMeta); + return procedures.toArray(new Procedure[procedures.size()]); + } + private boolean filterDefaultMetaRegions() { if (regionsOnCrashedServer == null) { return false; http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index 7549b13..8162269 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -27,7 +27,18 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface ServerProcedureInterface { public enum ServerOperationType { - CRASH_HANDLER, SWITCH_RPC_THROTTLE + CRASH_HANDLER, SWITCH_RPC_THROTTLE, + /** + * help find a available region server as worker and release worker after task done invoke + * SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker manage the split wal + * task flow, will retry if SPLIT_WAL_REMOTE failed + */ + SPLIT_WAL, + + /** + * send the split WAL request to region server and handle the response + */ + SPLIT_WAL_REMOTE } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 9e3b311..1659ab5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -36,6 +36,8 @@ class ServerQueue extends Queue<ServerName> { case CRASH_HANDLER: return true; case SWITCH_RPC_THROTTLE: + case SPLIT_WAL: + case SPLIT_WAL_REMOTE: return false; default: break; http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java new file mode 100644 index 0000000..3b2d0d5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -0,0 +1,199 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.SplitWALManager; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * The procedure is to split a WAL. It will get an available region server and + * schedule a {@link SplitWALRemoteProcedure} to actually send the request to region + * server to split this WAL. + * It also check if the split wal task really succeed. If the WAL still exists, it will + * schedule another region server to split this WAL. + */ [email protected] +public class SplitWALProcedure + extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState> + implements ServerProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALProcedure.class); + private String walPath; + private ServerName worker; + private ServerName crashedServer; + private int attempts = 0; + + public SplitWALProcedure() { + } + + public SplitWALProcedure(String walPath, ServerName crashedServer) { + this.walPath = walPath; + this.crashedServer = crashedServer; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + switch (state) { + case ACQUIRE_SPLIT_WAL_WORKER: + worker = splitWALManager.acquireSplitWALWorker(this); + setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); + return Flow.HAS_MORE_STATE; + case DISPATCH_WAL_TO_WORKER: + assert worker != null; + addChildProcedure(new SplitWALRemoteProcedure(worker, crashedServer, walPath)); + setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_SPLIT_WORKER: + boolean finished; + try { + finished = splitWALManager.isSplitWALFinished(walPath); + } catch (IOException ioe) { + long backoff = ProcedureUtil.getBackoffTimeMs(attempts++); + LOG.warn( + "Failed to check whether splitting wal {} success, wait {} seconds to retry", + walPath, backoff / 1000, ioe); + throw suspend(backoff); + } + splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + if (!finished) { + LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker); + setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); + return Flow.HAS_MORE_STATE; + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + MasterProcedureProtos.SplitWALState splitOneWalState) + throws IOException, InterruptedException { + if (splitOneWalState == getInitialState()) { + return; + } + throw new UnsupportedOperationException(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getState(int stateId) { + return MasterProcedureProtos.SplitWALState.forNumber(stateId); + } + + @Override + protected int getStateId(MasterProcedureProtos.SplitWALState state) { + return state.getNumber(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getInitialState() { + return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + MasterProcedureProtos.SplitWALData.Builder builder = + MasterProcedureProtos.SplitWALData.newBuilder(); + builder.setWalPath(walPath).setCrashedServer(ProtobufUtil.toServerName(crashedServer)); + if (worker != null) { + builder.setWorker(ProtobufUtil.toServerName(worker)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + MasterProcedureProtos.SplitWALData data = + serializer.deserialize(MasterProcedureProtos.SplitWALData.class); + walPath = data.getWalPath(); + crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + if (data.hasWorker()) { + worker = ProtobufUtil.toServerName(data.getWorker()); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + protected final ProcedureSuspendedException suspend(long backoff) + throws ProcedureSuspendedException { + attempts++; + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + public String getWAL() { + return walPath; + } + + @VisibleForTesting + public ServerName getWorker(){ + return worker; + } + + @Override + public ServerName getServerName() { + return this.crashedServer; + } + + @Override + public boolean hasMetaTableRegion() { + return AbstractFSWALProvider.isMetaFile(new Path(walPath)); + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.SPLIT_WAL; + } + + @Override + protected void afterReplay(MasterProcedureEnv env){ + if(worker != null){ + env.getMasterServices().getSplitWALManager().addUsedSplitWALWorker(worker); + } + + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java new file mode 100644 index 0000000..fb2dbd7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -0,0 +1,195 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException; +import org.apache.hadoop.hbase.procedure2.NoServerDispatchException; +import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.SplitWALCallable; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +/** + * A remote procedure which is used to send split WAL request to region server. + * it will return null if the task is succeed or return a DoNotRetryIOException + * {@link SplitWALProcedure} will help handle the situation that encounter + * DoNotRetryIOException. Otherwise it will retry until succeed. + */ [email protected] +public class SplitWALRemoteProcedure extends Procedure<MasterProcedureEnv> + implements RemoteProcedureDispatcher.RemoteProcedure<MasterProcedureEnv, ServerName>, + ServerProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class); + private String walPath; + private ServerName worker; + private ServerName crashedServer; + private boolean dispatched; + private ProcedureEvent<?> event; + private boolean success = false; + + public SplitWALRemoteProcedure() { + } + + public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) { + this.worker = worker; + this.crashedServer = crashedServer; + this.walPath = wal; + } + + @Override + protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (success) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(worker, this); + } catch (NoNodeDispatchException | NullTargetServerDispatchException + | NoServerDispatchException e) { + // When send to a wrong target server, it need construct a new SplitWALRemoteProcedure. + // Thus return null for this procedure and let SplitWALProcedure to handle this. + LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, worker, e); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALRemoteData.Builder builder = + MasterProcedureProtos.SplitWALRemoteData.newBuilder(); + builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker)) + .setCrashedServer(ProtobufUtil.toServerName(crashedServer)); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALRemoteData data = + serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class); + walPath = data.getWalPath(); + worker = ProtobufUtil.toServerName(data.getWorker()); + crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env, + ServerName serverName) { + return new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplitWALCallable.class, + MasterProcedureProtos.SplitWALParameter.newBuilder().setWalPath(walPath).build() + .toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error == null) { + LOG.info("split WAL {} on {} succeeded", walPath, worker); + try { + env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath); + } catch (IOException e){ + LOG.warn("remove WAL {} failed, ignore...", walPath, e); + } + success = true; + } else { + if (error instanceof DoNotRetryIOException) { + LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server", + walPath, worker, error); + success = true; + } else { + LOG.warn("split WAL {} failed, retry...", walPath, error); + success = false; + } + + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + public String getWAL() { + return this.walPath; + } + + @Override + public ServerName getServerName() { + // return the crashed server is to use the queue of root ServerCrashProcedure + return this.crashedServer; + } + + @Override + public boolean hasMetaTableRegion() { + return AbstractFSWALProvider.isMetaFile(new Path(walPath)); + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.SPLIT_WAL_REMOTE; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2582a78..bbae268 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.MemoryType; @@ -88,7 +93,6 @@ import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.client.locking.LockServiceClient; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.RegionMovedException; @@ -616,7 +620,10 @@ public class HRegionServer extends HasThread implements rpcServices.isa.getPort(), this, canCreateBaseZNode()); // If no master in cluster, skip trying to track one or look for a cluster status. if (!this.masterless) { - this.csm = new ZkCoordinatedStateManager(this); + if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.csm = new ZkCoordinatedStateManager(this); + } masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); masterAddressTracker.start(); @@ -1923,7 +1930,7 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( - "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); + HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); // Start the threads for compacted files discharger this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); @@ -1966,9 +1973,10 @@ public class HRegionServer extends HasThread implements sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); - if (this.csm != null) { + if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { // SplitLogWorker needs csm. If none, don't start this. - this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, + this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); splitLogWorker.start(); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index a1c2030..6ecf502 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.util.CancelableProgressable; @@ -78,58 +79,60 @@ public class SplitLogWorker implements Runnable { coordination.init(server, conf, splitTaskExecutor, this); } - public SplitLogWorker(final Server hserver, final Configuration conf, - final RegionServerServices server, final LastSequenceId sequenceIdChecker, - final WALFactory factory) { - this(hserver, conf, server, new TaskExecutor() { - @Override - public Status exec(String filename, CancelableProgressable p) { - Path walDir; - FileSystem fs; - try { - walDir = FSUtils.getWALRootDir(conf); - fs = walDir.getFileSystem(conf); - } catch (IOException e) { - LOG.warn("could not find root dir or fs", e); - return Status.RESIGNED; - } - // TODO have to correctly figure out when log splitting has been - // interrupted or has encountered a transient error and when it has - // encountered a bad non-retry-able persistent error. - try { - if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), - fs, conf, p, sequenceIdChecker, - server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) { - return Status.PREEMPTED; - } - } catch (InterruptedIOException iioe) { - LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe); - return Status.RESIGNED; - } catch (IOException e) { - if (e instanceof FileNotFoundException) { - // A wal file may not exist anymore. Nothing can be recovered so move on - LOG.warn("WAL {} does not exist anymore", filename, e); - return Status.DONE; - } - Throwable cause = e.getCause(); - if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException - || cause instanceof ConnectException - || cause instanceof SocketTimeoutException)) { - LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " - + "resigning", e); - return Status.RESIGNED; - } else if (cause instanceof InterruptedException) { - LOG.warn("log splitting of " + filename + " interrupted, resigning", e); - return Status.RESIGNED; - } - LOG.warn("log splitting of " + filename + " failed, returning error", e); - return Status.ERR; - } + public SplitLogWorker(Configuration conf, RegionServerServices server, + LastSequenceId sequenceIdChecker, WALFactory factory) { + this(server, conf, server, (f, p) -> splitLog(f, p, conf, server, sequenceIdChecker, factory)); + } + + static Status splitLog(String filename, CancelableProgressable p, Configuration conf, + RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) { + Path walDir; + FileSystem fs; + try { + walDir = FSUtils.getWALRootDir(conf); + fs = walDir.getFileSystem(conf); + } catch (IOException e) { + LOG.warn("could not find root dir or fs", e); + return Status.RESIGNED; + } + // TODO have to correctly figure out when log splitting has been + // interrupted or has encountered a transient error and when it has + // encountered a bad non-retry-able persistent error. + try { + SplitLogWorkerCoordination splitLogWorkerCoordination = + server.getCoordinatedStateManager() == null ? null + : server.getCoordinatedStateManager().getSplitLogWorkerCoordination(); + if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf, + p, sequenceIdChecker, splitLogWorkerCoordination, factory)) { + return Status.PREEMPTED; + } + } catch (InterruptedIOException iioe) { + LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe); + return Status.RESIGNED; + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + // A wal file may not exist anymore. Nothing can be recovered so move on + LOG.warn("WAL {} does not exist anymore", filename, e); return Status.DONE; } - }); + Throwable cause = e.getCause(); + if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException + || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) { + LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, " + + "resigning", + e); + return Status.RESIGNED; + } else if (cause instanceof InterruptedException) { + LOG.warn("log splitting of " + filename + " interrupted, resigning", e); + return Status.RESIGNED; + } + LOG.warn("log splitting of " + filename + " failed, returning error", e); + return Status.ERR; + } + return Status.DONE; } + @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java new file mode 100644 index 0000000..b94df22 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; + +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +/** + * This callable is used to do the real split WAL task. It is called by + * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} from master and executed + * by executor service which is in charge of executing the events of EventType.RS_LOG_REPLAY + * + * When execute this callable, it will call SplitLogWorker.splitLog() to split the WAL. + * If the return value is SplitLogWorker.TaskExecutor.Status.DONE, it means the task is successful + * and it will return null to end the call. Otherwise it will throw an exception and let + * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} to handle this problem. + * + * This class is to replace the zk-based WAL splitting related code, {@link SplitLogWorker}, + * {@link org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination} and + * {@link org.apache.hadoop.hbase.coordination.ZkSplitLogWorkerCoordination} can be removed after + * we switch to procedure-based WAL splitting. + */ [email protected] +public class SplitWALCallable implements RSProcedureCallable { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class); + + private String walPath; + private Exception initError; + private HRegionServer rs; + private final KeyLocker<String> splitWALLocks = new KeyLocker<>(); + private volatile Lock splitWALLock = null; + + + @Override + public void init(byte[] parameter, HRegionServer rs) { + try { + this.rs = rs; + MasterProcedureProtos.SplitWALParameter param = + MasterProcedureProtos.SplitWALParameter.parseFrom(parameter); + this.walPath = param.getWalPath(); + } catch (InvalidProtocolBufferException e) { + LOG.error("parse proto buffer of split WAL request failed ", e); + initError = e; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_LOG_REPLAY; + } + + @Override + public Void call() throws Exception { + if (initError != null) { + throw initError; + } + //grab a lock + splitWALLock = splitWALLocks.acquireLock(walPath); + try{ + splitWal(); + LOG.info("split WAL {} succeed.", walPath); + } catch (IOException e){ + LOG.warn("failed to split WAL {}.", walPath, e); + throw e; + } + finally { + splitWALLock.unlock(); + } + return null; + } + + public String getWalPath() { + return this.walPath; + } + + private void splitWal() throws IOException { + SplitLogWorker.TaskExecutor.Status status = + SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.walFactory); + if (status != SplitLogWorker.TaskExecutor.Status.DONE) { + throw new IOException("Split WAL " + walPath + " failed at server "); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index 94e9845..e722aad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; @@ -143,7 +144,7 @@ public abstract class AbstractTestDLS { conf.setInt("zookeeper.recovery.retry", 0); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing - conf.setInt("hbase.regionserver.wal.max.splitters", 3); + conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); conf.set("hbase.wal.provider", getWalProvider()); StartMiniClusterOption option = StartMiniClusterOption.builder() http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java index e031d34..e1a72a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -47,12 +49,16 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) @Category({ MasterTests.class, LargeTests.class }) public class TestRestartCluster { @@ -63,6 +69,9 @@ public class TestRestartCluster { private static final Logger LOG = LoggerFactory.getLogger(TestRestartCluster.class); private HBaseTestingUtility UTIL = new HBaseTestingUtility(); + @Parameterized.Parameter + public boolean splitWALCoordinatedByZK; + private static final TableName[] TABLES = { TableName.valueOf("restartTableOne"), TableName.valueOf("restartTableTwo"), @@ -70,6 +79,13 @@ public class TestRestartCluster { }; private static final byte[] FAMILY = Bytes.toBytes("family"); + @Before + public void setup() throws Exception { + LOG.info("WAL splitting coordinated by zk? {}", splitWALCoordinatedByZK); + UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + splitWALCoordinatedByZK); + } + @After public void tearDown() throws Exception { UTIL.shutdownMiniCluster(); } @@ -304,4 +320,9 @@ public class TestRestartCluster { Thread.sleep(100); } } + + @Parameterized.Parameters + public static Collection coordinatedByZK() { + return Arrays.asList(false, true); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java index ff8ad0b..7530121 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.NavigableSet; import java.util.Set; @@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.StartMiniClusterOption; @@ -45,6 +48,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +58,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** * Tests the restarting of everything as done during rolling restarts. */ +@RunWith(Parameterized.class) @Category({MasterTests.class, LargeTests.class}) public class TestRollingRestart { @@ -65,6 +71,9 @@ public class TestRollingRestart { @Rule public TestName name = new TestName(); + @Parameterized.Parameter + public boolean splitWALCoordinatedByZK; + @Test public void testBasicRollingRestart() throws Exception { @@ -78,6 +87,8 @@ public class TestRollingRestart { // Start the cluster log("Starting cluster"); Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + splitWALCoordinatedByZK); HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf); StartMiniClusterOption option = StartMiniClusterOption.builder() .numMasters(NUM_MASTERS).numRegionServers(NUM_RS).numDataNodes(NUM_RS).build(); @@ -87,7 +98,8 @@ public class TestRollingRestart { cluster.waitForActiveAndReadyMaster(); // Create a table with regions - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = + TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-")); byte [] family = Bytes.toBytes("family"); log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions"); Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE); @@ -282,5 +294,9 @@ public class TestRollingRestart { } + @Parameterized.Parameters + public static Collection coordinatedByZK() { + return Arrays.asList(false, true); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/f02ac310/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java new file mode 100644 index 0000000..9e127c0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +@Category({ MasterTests.class, MediumTests.class }) + +public class TestSplitWALManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSplitWALManager.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private SplitWALManager splitWALManager; + private TableName TABLE_NAME; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + splitWALManager = master.getSplitWALManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testAcquireAndRelease() throws Exception { + List<FakeServerProcedure> testProcedures = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + testProcedures.add(new FakeServerProcedure( + TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + } + ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); + Assert.assertNotNull(server); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); + + Exception e = null; + try { + splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); + } catch (ProcedureSuspendedException suspendException) { + e = suspendException; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof ProcedureSuspendedException); + + splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster() + .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); + } + + @Test + public void testAddNewServer() throws Exception { + List<FakeServerProcedure> testProcedures = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + testProcedures.add(new FakeServerProcedure( + TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + } + ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); + Assert.assertNotNull(server); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); + + Exception e = null; + try { + splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); + } catch (ProcedureSuspendedException suspendException) { + e = suspendException; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof ProcedureSuspendedException); + + JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); + newServer.waitForServerOnline(); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); + } + + @Test + public void testCreateSplitWALProcedures() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), + AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); + // Test splitting meta wal + FileStatus[] wals = + TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); + Assert.assertEquals(1, wals.length); + List<Procedure> testProcedures = + splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); + Assert.assertEquals(1, testProcedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + + // Test splitting wal + wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); + Assert.assertEquals(1, wals.length); + testProcedures = + splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); + Assert.assertEquals(1, testProcedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + } + + @Test + public void testAcquireAndReleaseSplitWALWorker() throws Exception { + ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); + List<FakeServerProcedure> testProcedures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + FakeServerProcedure procedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); + testProcedures.add(procedure); + ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, + HConstants.NO_NONCE); + } + TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); + FakeServerProcedure failedProcedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); + ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, + HConstants.NO_NONCE); + TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); + Assert.assertFalse(failedProcedure.isWorkerAcquired()); + // let one procedure finish and release worker + testProcedures.get(0).countDown(); + TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); + Assert.assertTrue(testProcedures.get(0).isSuccess()); + } + + @Test + public void testGetWALsToSplit() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + List<FileStatus> metaWals = splitWALManager.getWALsToSplit(metaServer, true); + Assert.assertEquals(1, metaWals.size()); + List<FileStatus> wals = splitWALManager.getWALsToSplit(metaServer, false); + Assert.assertEquals(1, wals.size()); + ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() + .get(); + metaWals = splitWALManager.getWALsToSplit(testServer, true); + Assert.assertEquals(0, metaWals.size()); + } + + @Test + public void testSplitLogs() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ProcedureExecutor<MasterProcedureEnv> masterPE = master.getMasterProcedureExecutor(); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() + .get(); + List<Procedure> procedures = splitWALManager.splitWALs(testServer, false); + Assert.assertEquals(1, procedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); + Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); + + procedures = splitWALManager.splitWALs(metaServer, true); + Assert.assertEquals(1, procedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); + Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); + Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); + } + + @Test + public void testWorkerReloadWhenMasterRestart() throws Exception { + List<FakeServerProcedure> testProcedures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + FakeServerProcedure procedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); + testProcedures.add(procedure); + ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure, + HConstants.NO_NONCE, HConstants.NO_NONCE); + } + TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); + // Kill master + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); + // restart master + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + this.master = TEST_UTIL.getHBaseCluster().getMaster(); + + FakeServerProcedure failedProcedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); + ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure, + HConstants.NO_NONCE, HConstants.NO_NONCE); + TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); + Assert.assertFalse(failedProcedure.isWorkerAcquired()); + for (int i = 0; i < 3; i++) { + testProcedures.get(i).countDown(); + } + failedProcedure.countDown(); + } + + public static final class FakeServerProcedure + extends StateMachineProcedure<MasterProcedureEnv, MasterProcedureProtos.SplitWALState> + implements ServerProcedureInterface { + + private ServerName serverName; + private ServerName worker; + private CountDownLatch barrier = new CountDownLatch(1); + private boolean triedToAcquire = false; + + public FakeServerProcedure() { + } + + public FakeServerProcedure(ServerName serverName) { + this.serverName = serverName; + } + + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return SPLIT_WAL; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, + MasterProcedureProtos.SplitWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + switch (state) { + case ACQUIRE_SPLIT_WAL_WORKER: + triedToAcquire = true; + worker = splitWALManager.acquireSplitWALWorker(this); + setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); + return Flow.HAS_MORE_STATE; + case DISPATCH_WAL_TO_WORKER: + barrier.await(); + setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_SPLIT_WORKER: + splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + public boolean isWorkerAcquired() { + return worker != null; + } + + public boolean isTriedToAcquire() { + return triedToAcquire; + } + + public void countDown() { + this.barrier.countDown(); + } + + @Override + protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) + throws IOException, InterruptedException { + + } + + @Override + protected MasterProcedureProtos.SplitWALState getState(int stateId) { + return MasterProcedureProtos.SplitWALState.forNumber(stateId); + } + + @Override + protected int getStateId(MasterProcedureProtos.SplitWALState state) { + return state.getNumber(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getInitialState() { + return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALData.Builder builder = + MasterProcedureProtos.SplitWALData.newBuilder(); + builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName)); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALData data = + serializer.deserialize(MasterProcedureProtos.SplitWALData.class); + serverName = ProtobufUtil.toServerName(data.getCrashedServer()); + } + } +}
