http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java new file mode 100644 index 0000000..887e272 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -0,0 +1,541 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import com.google.common.collect.ArrayListMultimap; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerListener; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * A remote procecdure dispatcher for regionservers. + */ +public class RSProcedureDispatcher + extends RemoteProcedureDispatcher<MasterProcedureEnv, ServerName> + implements ServerListener { + private static final Log LOG = LogFactory.getLog(RSProcedureDispatcher.class); + + public static final String RS_RPC_STARTUP_WAIT_TIME_CONF_KEY = + "hbase.regionserver.rpc.startup.waittime"; + private static final int DEFAULT_RS_RPC_STARTUP_WAIT_TIME = 60000; + + private static final int RS_VERSION_WITH_EXEC_PROCS = 0x0201000; // 2.1 + + protected final MasterServices master; + protected final long rsStartupWaitTime; + + public RSProcedureDispatcher(final MasterServices master) { + super(master.getConfiguration()); + + this.master = master; + this.rsStartupWaitTime = master.getConfiguration().getLong( + RS_RPC_STARTUP_WAIT_TIME_CONF_KEY, DEFAULT_RS_RPC_STARTUP_WAIT_TIME); + } + + @Override + public boolean start() { + if (!super.start()) { + return false; + } + + master.getServerManager().registerListener(this); + for (ServerName serverName: master.getServerManager().getOnlineServersList()) { + addNode(serverName); + } + return true; + } + + @Override + public boolean stop() { + if (!super.stop()) { + return false; + } + + master.getServerManager().unregisterListener(this); + return true; + } + + @Override + protected void remoteDispatch(final ServerName serverName, + final Set<RemoteProcedure> operations) { + final int rsVersion = master.getAssignmentManager().getServerVersion(serverName); + if (rsVersion >= RS_VERSION_WITH_EXEC_PROCS) { + LOG.info(String.format( + "Using procedure batch rpc execution for serverName=%s version=%s", + serverName, rsVersion)); + submitTask(new ExecuteProceduresRemoteCall(serverName, operations)); + } else { + LOG.info(String.format( + "Fallback to compat rpc execution for serverName=%s version=%s", + serverName, rsVersion)); + submitTask(new CompatRemoteProcedureResolver(serverName, operations)); + } + } + + protected void abortPendingOperations(final ServerName serverName, + final Set<RemoteProcedure> operations) { + // TODO: Replace with a ServerNotOnlineException() + final IOException e = new DoNotRetryIOException("server not online " + serverName); + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + for (RemoteProcedure proc: operations) { + proc.remoteCallFailed(env, serverName, e); + } + } + + public void serverAdded(final ServerName serverName) { + addNode(serverName); + } + + public void serverRemoved(final ServerName serverName) { + removeNode(serverName); + } + + /** + * Base remote call + */ + protected abstract class AbstractRSRemoteCall implements Callable<Void> { + private final ServerName serverName; + + private int numberOfAttemptsSoFar = 0; + private long maxWaitTime = -1; + + public AbstractRSRemoteCall(final ServerName serverName) { + this.serverName = serverName; + } + + public abstract Void call(); + + protected AdminService.BlockingInterface getRsAdmin() throws IOException { + final AdminService.BlockingInterface admin = master.getServerManager().getRsAdmin(serverName); + if (admin == null) { + throw new IOException("Attempting to send OPEN RPC to server " + getServerName() + + " failed because no RPC connection found to this server"); + } + return admin; + } + + protected ServerName getServerName() { + return serverName; + } + + protected boolean scheduleForRetry(final IOException e) { + // Should we wait a little before retrying? If the server is starting it's yes. + final boolean hold = (e instanceof ServerNotRunningYetException); + if (hold) { + LOG.warn(String.format("waiting a little before trying on the same server=%s try=%d", + serverName, numberOfAttemptsSoFar), e); + long now = EnvironmentEdgeManager.currentTime(); + if (now < getMaxWaitTime()) { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("server is not yet up; waiting up to %dms", + (getMaxWaitTime() - now)), e); + } + submitTask(this, 100, TimeUnit.MILLISECONDS); + return true; + } + + LOG.warn(String.format("server %s is not up for a while; try a new one", serverName), e); + return false; + } + + // In case socket is timed out and the region server is still online, + // the openRegion RPC could have been accepted by the server and + // just the response didn't go through. So we will retry to + // open the region on the same server. + final boolean retry = !hold && (e instanceof SocketTimeoutException + && master.getServerManager().isServerOnline(serverName)); + if (retry) { + // we want to retry as many times as needed as long as the RS is not dead. + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Retrying to same RegionServer %s because: %s", + serverName, e.getMessage()), e); + } + submitTask(this); + return true; + } + + // trying to send the request elsewhere instead + LOG.warn(String.format("the request should be tried elsewhere instead; server=%s try=%d", + serverName, numberOfAttemptsSoFar), e); + return false; + } + + private long getMaxWaitTime() { + if (this.maxWaitTime < 0) { + // This is the max attempts, not retries, so it should be at least 1. + this.maxWaitTime = EnvironmentEdgeManager.currentTime() + rsStartupWaitTime; + } + return this.maxWaitTime; + } + + protected IOException unwrapException(IOException e) { + if (e instanceof RemoteException) { + e = ((RemoteException)e).unwrapRemoteException(); + } + return e; + } + } + + private interface RemoteProcedureResolver { + void dispatchOpenRequests(MasterProcedureEnv env, List<RegionOpenOperation> operations); + void dispatchCloseRequests(MasterProcedureEnv env, List<RegionCloseOperation> operations); + } + + public void splitAndResolveOperation(final ServerName serverName, + final Set<RemoteProcedure> operations, final RemoteProcedureResolver resolver) { + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + final ArrayListMultimap<Class<?>, RemoteOperation> reqsByType = + buildAndGroupRequestByType(env, serverName, operations); + + final List<RegionOpenOperation> openOps = fetchType(reqsByType, RegionOpenOperation.class); + if (!openOps.isEmpty()) resolver.dispatchOpenRequests(env, openOps); + + final List<RegionCloseOperation> closeOps = fetchType(reqsByType, RegionCloseOperation.class); + if (!closeOps.isEmpty()) resolver.dispatchCloseRequests(env, closeOps); + + if (!reqsByType.isEmpty()) { + LOG.warn("unknown request type in the queue: " + reqsByType); + } + } + + // ========================================================================== + // Compatibility calls + // ========================================================================== + protected class ExecuteProceduresRemoteCall extends AbstractRSRemoteCall + implements RemoteProcedureResolver { + private final Set<RemoteProcedure> operations; + + private ExecuteProceduresRequest.Builder request = null; + + public ExecuteProceduresRemoteCall(final ServerName serverName, + final Set<RemoteProcedure> operations) { + super(serverName); + this.operations = operations; + } + + public Void call() { + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + + request = ExecuteProceduresRequest.newBuilder(); + splitAndResolveOperation(getServerName(), operations, this); + + try { + final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build()); + remoteCallCompleted(env, response); + } catch (IOException e) { + e = unwrapException(e); + // TODO: In the future some operation may want to bail out early. + // TODO: How many times should we retry (use numberOfAttemptsSoFar) + if (!scheduleForRetry(e)) { + remoteCallFailed(env, e); + } + } + return null; + } + + public void dispatchOpenRequests(final MasterProcedureEnv env, + final List<RegionOpenOperation> operations) { + request.addOpenRegion(buildOpenRegionRequest(env, getServerName(), operations)); + } + + public void dispatchCloseRequests(final MasterProcedureEnv env, + final List<RegionCloseOperation> operations) { + for (RegionCloseOperation op: operations) { + request.addCloseRegion(op.buildCloseRegionRequest(getServerName())); + } + } + + protected ExecuteProceduresResponse sendRequest(final ServerName serverName, + final ExecuteProceduresRequest request) throws IOException { + try { + return getRsAdmin().executeProcedures(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + + private void remoteCallCompleted(final MasterProcedureEnv env, + final ExecuteProceduresResponse response) { + /* + for (RemoteProcedure proc: operations) { + proc.remoteCallCompleted(env, getServerName(), response); + }*/ + } + + private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { + for (RemoteProcedure proc: operations) { + proc.remoteCallFailed(env, getServerName(), e); + } + } + } + + // ========================================================================== + // Compatibility calls + // Since we don't have a "batch proc-exec" request on the target RS + // we have to chunk the requests by type and dispatch the specific request. + // ========================================================================== + private static OpenRegionRequest buildOpenRegionRequest(final MasterProcedureEnv env, + final ServerName serverName, final List<RegionOpenOperation> operations) { + final OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder(); + builder.setServerStartCode(serverName.getStartcode()); + builder.setMasterSystemTime(EnvironmentEdgeManager.currentTime()); + for (RegionOpenOperation op: operations) { + builder.addOpenInfo(op.buildRegionOpenInfoRequest(env)); + } + return builder.build(); + } + + private final class OpenRegionRemoteCall extends AbstractRSRemoteCall { + private final List<RegionOpenOperation> operations; + + public OpenRegionRemoteCall(final ServerName serverName, + final List<RegionOpenOperation> operations) { + super(serverName); + this.operations = operations; + } + + @Override + public Void call() { + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + final OpenRegionRequest request = buildOpenRegionRequest(env, getServerName(), operations); + + try { + OpenRegionResponse response = sendRequest(getServerName(), request); + remoteCallCompleted(env, response); + } catch (IOException e) { + e = unwrapException(e); + // TODO: In the future some operation may want to bail out early. + // TODO: How many times should we retry (use numberOfAttemptsSoFar) + if (!scheduleForRetry(e)) { + remoteCallFailed(env, e); + } + } + return null; + } + + private OpenRegionResponse sendRequest(final ServerName serverName, + final OpenRegionRequest request) throws IOException { + try { + return getRsAdmin().openRegion(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + private void remoteCallCompleted(final MasterProcedureEnv env, + final OpenRegionResponse response) { + int index = 0; + for (RegionOpenOperation op: operations) { + OpenRegionResponse.RegionOpeningState state = response.getOpeningState(index++); + op.setFailedOpen(state == OpenRegionResponse.RegionOpeningState.FAILED_OPENING); + op.getRemoteProcedure().remoteCallCompleted(env, getServerName(), op); + } + } + + private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { + for (RegionOpenOperation op: operations) { + op.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); + } + } + } + + private final class CloseRegionRemoteCall extends AbstractRSRemoteCall { + private final RegionCloseOperation operation; + + public CloseRegionRemoteCall(final ServerName serverName, + final RegionCloseOperation operation) { + super(serverName); + this.operation = operation; + } + + @Override + public Void call() { + final MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + final CloseRegionRequest request = operation.buildCloseRegionRequest(getServerName()); + try { + CloseRegionResponse response = sendRequest(getServerName(), request); + remoteCallCompleted(env, response); + } catch (IOException e) { + e = unwrapException(e); + // TODO: In the future some operation may want to bail out early. + // TODO: How many times should we retry (use numberOfAttemptsSoFar) + if (!scheduleForRetry(e)) { + remoteCallFailed(env, e); + } + } + return null; + } + + private CloseRegionResponse sendRequest(final ServerName serverName, + final CloseRegionRequest request) throws IOException { + try { + return getRsAdmin().closeRegion(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + private void remoteCallCompleted(final MasterProcedureEnv env, + final CloseRegionResponse response) { + operation.setClosed(response.getClosed()); + operation.getRemoteProcedure().remoteCallCompleted(env, getServerName(), operation); + } + + private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { + operation.getRemoteProcedure().remoteCallFailed(env, getServerName(), e); + } + } + + protected class CompatRemoteProcedureResolver implements Callable<Void>, RemoteProcedureResolver { + private final Set<RemoteProcedure> operations; + private final ServerName serverName; + + public CompatRemoteProcedureResolver(final ServerName serverName, + final Set<RemoteProcedure> operations) { + this.serverName = serverName; + this.operations = operations; + } + + @Override + public Void call() { + splitAndResolveOperation(serverName, operations, this); + return null; + } + + public void dispatchOpenRequests(final MasterProcedureEnv env, + final List<RegionOpenOperation> operations) { + submitTask(new OpenRegionRemoteCall(serverName, operations)); + } + + public void dispatchCloseRequests(final MasterProcedureEnv env, + final List<RegionCloseOperation> operations) { + for (RegionCloseOperation op: operations) { + submitTask(new CloseRegionRemoteCall(serverName, op)); + } + } + } + + // ========================================================================== + // RPC Messages + // - ServerOperation: refreshConfig, grant, revoke, ... + // - RegionOperation: open, close, flush, snapshot, ... + // ========================================================================== + public static abstract class ServerOperation extends RemoteOperation { + protected ServerOperation(final RemoteProcedure remoteProcedure) { + super(remoteProcedure); + } + } + + public static abstract class RegionOperation extends RemoteOperation { + private final HRegionInfo regionInfo; + + protected RegionOperation(final RemoteProcedure remoteProcedure, + final HRegionInfo regionInfo) { + super(remoteProcedure); + this.regionInfo = regionInfo; + } + + public HRegionInfo getRegionInfo() { + return this.regionInfo; + } + } + + public static class RegionOpenOperation extends RegionOperation { + private final List<ServerName> favoredNodes; + private final boolean openForReplay; + private boolean failedOpen; + + public RegionOpenOperation(final RemoteProcedure remoteProcedure, + final HRegionInfo regionInfo, final List<ServerName> favoredNodes, + final boolean openForReplay) { + super(remoteProcedure, regionInfo); + this.favoredNodes = favoredNodes; + this.openForReplay = openForReplay; + } + + protected void setFailedOpen(final boolean failedOpen) { + this.failedOpen = failedOpen; + } + + public boolean isFailedOpen() { + return failedOpen; + } + + public OpenRegionRequest.RegionOpenInfo buildRegionOpenInfoRequest( + final MasterProcedureEnv env) { + return RequestConverter.buildRegionOpenInfo(getRegionInfo(), + env.getAssignmentManager().getFavoredNodes(getRegionInfo()), false); + } + } + + public static class RegionCloseOperation extends RegionOperation { + private final ServerName destinationServer; + private boolean closed = false; + + public RegionCloseOperation(final RemoteProcedure remoteProcedure, + final HRegionInfo regionInfo, final ServerName destinationServer) { + super(remoteProcedure, regionInfo); + this.destinationServer = destinationServer; + } + + public ServerName getDestinationServer() { + return destinationServer; + } + + protected void setClosed(final boolean closed) { + this.closed = closed; + } + + public boolean isClosed() { + return closed; + } + + public CloseRegionRequest buildCloseRegionRequest(final ServerName serverName) { + return ProtobufUtil.buildCloseRegionRequest(serverName, + getRegionInfo().getRegionName(), getDestinationServer()); + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java index 21709f8..cfd9df9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RestoreSnapshotProcedure.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.MetricsSnapshot; -import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -416,17 +415,7 @@ public class RestoreSnapshotProcedure try { Connection conn = env.getMasterServices().getConnection(); - // 1. Forces all the RegionStates to be offline - // - // The AssignmentManager keeps all the region states around - // with no possibility to remove them, until the master is restarted. - // This means that a region marked as SPLIT before the restore will never be assigned again. - // To avoid having all states around all the regions are switched to the OFFLINE state, - // which is the same state that the regions will be after a delete table. - forceRegionsOffline(env, regionsToAdd); - forceRegionsOffline(env, regionsToRestore); - forceRegionsOffline(env, regionsToRemove); - + // 1. Prepare to restore getMonitorStatus().setStatus("Preparing to restore each region"); // 2. Applies changes to hbase:meta @@ -496,20 +485,6 @@ public class RestoreSnapshotProcedure } /** - * Make sure that region states of the region list is in OFFLINE state. - * @param env MasterProcedureEnv - * @param hris region info list - **/ - private void forceRegionsOffline(final MasterProcedureEnv env, final List<HRegionInfo> hris) { - RegionStates states = env.getMasterServices().getAssignmentManager().getRegionStates(); - if (hris != null) { - for (HRegionInfo hri: hris) { - states.regionOffline(hri); - } - } - } - - /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java new file mode 100644 index 0000000..ca351f69 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashException.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Passed as Exception by {@link ServerCrashProcedure} + * notifying on-going RIT that server has failed. + */ [email protected] +@SuppressWarnings("serial") +public class ServerCrashException extends HBaseIOException { + private final long procId; + private final ServerName serverName; + + /** + * @param serverName The server that crashed. + */ + public ServerCrashException(long procId, ServerName serverName) { + this.procId = procId; + this.serverName = serverName; + } + + @Override + public String getMessage() { + return "ServerCrashProcedure pid=" + this.procId + ", server=" + this.serverName; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/5e6ec199/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 2703947..71c6b89 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -19,55 +19,40 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InputStream; -import java.io.InterruptedIOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; -import java.util.Set; -import java.util.concurrent.locks.Lock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionReplicaUtil; -import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterWalManager; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.assignment.AssignProcedure; +import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.assignment.RegionTransitionProcedure; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ServerCrashState; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; -import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.hadoop.util.StringUtils; -import org.apache.zookeeper.KeeperException; /** * Handle crashed server. This is a port to ProcedureV2 of what used to be euphemistically called * ServerShutdownHandler. * - * <p>The procedure flow varies dependent on whether meta is assigned, if we are - * doing distributed log replay versus distributed log splitting, and if we are to split logs at - * all. - * - * <p>This procedure asks that all crashed servers get processed equally; we yield after the - * completion of each successful flow step. We do this so that we do not 'deadlock' waiting on - * a region assignment so we can replay edits which could happen if a region moved there are edits - * on two servers for replay. + * <p>The procedure flow varies dependent on whether meta is assigned and if we are to split logs. * - * <p>TODO: ASSIGN and WAIT_ON_ASSIGN (at least) are not idempotent. Revisit when assign is pv2. - * TODO: We do not have special handling for system tables. + * <p>We come in here after ServerManager has noticed a server has expired. Procedures + * queued on the rpc should have been notified about fail and should be concurrently + * getting themselves ready to assign elsewhere. */ public class ServerCrashProcedure extends StateMachineProcedure<MasterProcedureEnv, ServerCrashState> @@ -75,36 +60,6 @@ implements ServerProcedureInterface { private static final Log LOG = LogFactory.getLog(ServerCrashProcedure.class); /** - * Configuration key to set how long to wait in ms doing a quick check on meta state. - */ - public static final String KEY_SHORT_WAIT_ON_META = - "hbase.master.servercrash.short.wait.on.meta.ms"; - - public static final int DEFAULT_SHORT_WAIT_ON_META = 1000; - - /** - * Configuration key to set how many retries to cycle before we give up on meta. - * Each attempt will wait at least {@link #KEY_SHORT_WAIT_ON_META} milliseconds. - */ - public static final String KEY_RETRIES_ON_META = - "hbase.master.servercrash.meta.retries"; - - public static final int DEFAULT_RETRIES_ON_META = 10; - - /** - * Configuration key to set how long to wait in ms on regions in transition. - */ - public static final String KEY_WAIT_ON_RIT = - "hbase.master.servercrash.wait.on.rit.ms"; - - public static final int DEFAULT_WAIT_ON_RIT = 30000; - - private static final Set<HRegionInfo> META_REGION_SET = new HashSet<>(); - static { - META_REGION_SET.add(HRegionInfo.FIRST_META_REGIONINFO); - } - - /** * Name of the crashed server to process. */ private ServerName serverName; @@ -117,14 +72,8 @@ implements ServerProcedureInterface { /** * Regions that were on the crashed server. */ - private Set<HRegionInfo> regionsOnCrashedServer; + private List<HRegionInfo> regionsOnCrashedServer; - /** - * Regions assigned. Usually some subset of {@link #regionsOnCrashedServer}. - */ - private List<HRegionInfo> regionsAssigned; - - private boolean distributedLogReplay = false; private boolean carryingMeta = false; private boolean shouldSplitWal; @@ -164,20 +113,11 @@ implements ServerProcedureInterface { super(); } - private void throwProcedureYieldException(final String msg) throws ProcedureYieldException { - String logMsg = msg + "; cycle=" + this.cycles + ", running for " + - StringUtils.formatTimeDiff(System.currentTimeMillis(), getSubmittedTime()); - // The procedure executor logs ProcedureYieldException at trace level. For now, log these - // yields for server crash processing at DEBUG. Revisit when stable. - if (LOG.isDebugEnabled()) LOG.debug(logMsg); - throw new ProcedureYieldException(logMsg); - } - @Override protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) - throws ProcedureYieldException { + throws ProcedureSuspendedException, ProcedureYieldException { if (LOG.isTraceEnabled()) { - LOG.trace(state); + LOG.trace(state + " " + this + "; cycles=" + this.cycles); } // Keep running count of cycles if (state.ordinal() != this.previousState) { @@ -186,11 +126,7 @@ implements ServerProcedureInterface { } else { this.cycles++; } - MasterServices services = env.getMasterServices(); - // Is master fully online? If not, yield. No processing of servers unless master is up - if (!services.getAssignmentManager().isFailoverCleanupDone()) { - throwProcedureYieldException("Waiting on master failover to complete"); - } + final MasterServices services = env.getMasterServices(); // HBASE-14802 // If we have not yet notified that we are processing a dead server, we should do now. if (!notifiedDeadServer) { @@ -201,102 +137,61 @@ implements ServerProcedureInterface { try { switch (state) { case SERVER_CRASH_START: - LOG.info("Start processing crashed " + this.serverName); + LOG.info("Start " + this); start(env); // If carrying meta, process it first. Else, get list of regions on crashed server. - if (this.carryingMeta) setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META); - else setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); + if (this.carryingMeta) { + setNextState(ServerCrashState.SERVER_CRASH_PROCESS_META); + } else { + setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); + } break; case SERVER_CRASH_GET_REGIONS: // If hbase:meta is not assigned, yield. - if (!isMetaAssignedQuickTest(env)) { - // isMetaAssignedQuickTest does not really wait. Let's delay a little before - // another round of execution. - long wait = - env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, - DEFAULT_SHORT_WAIT_ON_META); - wait = wait / 10; - Thread.sleep(wait); - throwProcedureYieldException("Waiting on hbase:meta assignment"); + if (env.getAssignmentManager().waitMetaInitialized(this)) { + throw new ProcedureSuspendedException(); } - this.regionsOnCrashedServer = - services.getAssignmentManager().getRegionStates().getServerRegions(this.serverName); - // Where to go next? Depends on whether we should split logs at all or if we should do - // distributed log splitting (DLS) vs distributed log replay (DLR). + + this.regionsOnCrashedServer = services.getAssignmentManager().getRegionStates() + .getServerRegionInfoSet(serverName); + // Where to go next? Depends on whether we should split logs at all or + // if we should do distributed log splitting. if (!this.shouldSplitWal) { setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); - } else if (this.distributedLogReplay) { - setNextState(ServerCrashState.SERVER_CRASH_PREPARE_LOG_REPLAY); } else { setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); } break; case SERVER_CRASH_PROCESS_META: - // If we fail processing hbase:meta, yield. - if (!processMeta(env)) { - throwProcedureYieldException("Waiting on regions-in-transition to clear"); - } + processMeta(env); setNextState(ServerCrashState.SERVER_CRASH_GET_REGIONS); break; - case SERVER_CRASH_PREPARE_LOG_REPLAY: - prepareLogReplay(env, this.regionsOnCrashedServer); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); - break; - case SERVER_CRASH_SPLIT_LOGS: splitLogs(env); - // If DLR, go to FINISH. Otherwise, if DLS, go to SERVER_CRASH_CALC_REGIONS_TO_ASSIGN - if (this.distributedLogReplay) setNextState(ServerCrashState.SERVER_CRASH_FINISH); - else setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); break; case SERVER_CRASH_ASSIGN: - List<HRegionInfo> regionsToAssign = calcRegionsToAssign(env); - - // Assign may not be idempotent. SSH used to requeue the SSH if we got an IOE assigning - // which is what we are mimicing here but it looks prone to double assignment if assign - // fails midway. TODO: Test. - // If no regions to assign, skip assign and skip to the finish. - boolean regions = regionsToAssign != null && !regionsToAssign.isEmpty(); - if (regions) { - this.regionsAssigned = regionsToAssign; - if (!assign(env, regionsToAssign)) { - throwProcedureYieldException("Failed assign; will retry"); + // Filter out meta regions. Those are handled elsewhere in this procedure. + // Filter changes this.regionsOnCrashedServer. + if (filterDefaultMetaRegions(regionsOnCrashedServer)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Assigning regions " + + HRegionInfo.getShortNameToLog(regionsOnCrashedServer) + ", " + this + + "; cycles=" + this.cycles); } + handleRIT(env, regionsOnCrashedServer); + addChildProcedure(env.getAssignmentManager(). + createAssignProcedures(regionsOnCrashedServer, true)); } - if (this.shouldSplitWal && distributedLogReplay) { - // Take this route even if there are apparently no regions assigned. This may be our - // second time through here; i.e. we assigned and crashed just about here. On second - // time through, there will be no regions because we assigned them in the previous step. - // Even though no regions, we need to go through here to clean up the DLR zk markers. - setNextState(ServerCrashState.SERVER_CRASH_WAIT_ON_ASSIGN); - } else { - setNextState(ServerCrashState.SERVER_CRASH_FINISH); - } - break; - - case SERVER_CRASH_WAIT_ON_ASSIGN: - // TODO: The list of regionsAssigned may be more than we actually assigned. See down in - // AM #1629 around 'if (regionStates.wasRegionOnDeadServer(encodedName)) {' where where we - // will skip assigning a region because it is/was on a dead server. Should never happen! - // It was on this server. Worst comes to worst, we'll still wait here till other server is - // processed. - - // If the wait on assign failed, yield -- if we have regions to assign. - if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) { - if (!waitOnAssign(env, this.regionsAssigned)) { - throwProcedureYieldException("Waiting on region assign"); - } - } - setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); + setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; case SERVER_CRASH_FINISH: - LOG.info("Finished processing of crashed " + serverName); services.getServerManager().getDeadServers().finish(serverName); return Flow.NO_MORE_STATE; @@ -304,11 +199,7 @@ implements ServerProcedureInterface { throw new UnsupportedOperationException("unhandled state=" + state); } } catch (IOException e) { - LOG.warn("Failed serverName=" + this.serverName + ", state=" + state + "; retry", e); - } catch (InterruptedException e) { - // TODO: Make executor allow IEs coming up out of execute. - LOG.warn("Interrupted serverName=" + this.serverName + ", state=" + state + "; retry", e); - Thread.currentThread().interrupt(); + LOG.warn("Failed state=" + state + ", retry " + this + "; cycles=" + this.cycles, e); } return Flow.HAS_MORE_STATE; } @@ -318,96 +209,60 @@ implements ServerProcedureInterface { * @param env * @throws IOException */ - private void start(final MasterProcedureEnv env) throws IOException { - MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); - // Set recovery mode late. This is what the old ServerShutdownHandler used do. - mwm.setLogRecoveryMode(); - this.distributedLogReplay = mwm.getLogRecoveryMode() == RecoveryMode.LOG_REPLAY; - } + private void start(final MasterProcedureEnv env) throws IOException {} /** * @param env - * @return False if we fail to assign and split logs on meta ('process'). * @throws IOException * @throws InterruptedException */ - private boolean processMeta(final MasterProcedureEnv env) - throws IOException { + private void processMeta(final MasterProcedureEnv env) throws IOException { if (LOG.isDebugEnabled()) LOG.debug("Processing hbase:meta that was on " + this.serverName); - MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); - AssignmentManager am = env.getMasterServices().getAssignmentManager(); - HRegionInfo metaHRI = HRegionInfo.FIRST_META_REGIONINFO; + if (this.shouldSplitWal) { - if (this.distributedLogReplay) { - prepareLogReplay(env, META_REGION_SET); - } else { - // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment. - mwm.splitMetaLog(serverName); - am.getRegionStates().logSplit(metaHRI); - } + // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment. + env.getMasterServices().getMasterWalManager().splitMetaLog(serverName); } // Assign meta if still carrying it. Check again: region may be assigned because of RIT timeout - boolean processed = true; - if (am.isCarryingMeta(serverName)) { - // TODO: May block here if hard time figuring state of meta. - am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO); - verifyAndAssignMetaWithRetries(env); - if (this.shouldSplitWal && distributedLogReplay) { - int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT); - if (!waitOnRegionToClearRegionsInTransition(am, metaHRI, timeout)) { - processed = false; - } else { - // TODO: Matteo. We BLOCK here but most important thing to be doing at this moment. - mwm.splitMetaLog(serverName); - } - } + final AssignmentManager am = env.getMasterServices().getAssignmentManager(); + for (HRegionInfo hri: am.getRegionStates().getServerRegionInfoSet(serverName)) { + if (!isDefaultMetaRegion(hri)) continue; + + am.offlineRegion(hri); + addChildProcedure(am.createAssignProcedure(hri, true)); } - return processed; } - /** - * @return True if region cleared RIT, else false if we timed out waiting. - * @throws InterruptedIOException - */ - private boolean waitOnRegionToClearRegionsInTransition(AssignmentManager am, - final HRegionInfo hri, final int timeout) - throws InterruptedIOException { - try { - if (!am.waitOnRegionToClearRegionsInTransition(hri, timeout)) { - // Wait here is to avoid log replay hits current dead server and incur a RPC timeout - // when replay happens before region assignment completes. - LOG.warn("Region " + hri.getEncodedName() + " didn't complete assignment in time"); - return false; + private boolean filterDefaultMetaRegions(final List<HRegionInfo> regions) { + if (regions == null) return false; + final Iterator<HRegionInfo> it = regions.iterator(); + while (it.hasNext()) { + final HRegionInfo hri = it.next(); + if (isDefaultMetaRegion(hri)) { + it.remove(); } - } catch (InterruptedException ie) { - throw new InterruptedIOException("Caught " + ie + - " during waitOnRegionToClearRegionsInTransition for " + hri); } - return true; + return !regions.isEmpty(); } - private void prepareLogReplay(final MasterProcedureEnv env, final Set<HRegionInfo> regions) - throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Mark " + size(this.regionsOnCrashedServer) + " regions-in-recovery from " + - this.serverName); - } - MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); - AssignmentManager am = env.getMasterServices().getAssignmentManager(); - mwm.prepareLogReplay(this.serverName, regions); - am.getRegionStates().logSplit(this.serverName); + private boolean isDefaultMetaRegion(final HRegionInfo hri) { + return hri.getTable().equals(TableName.META_TABLE_NAME) && + RegionReplicaUtil.isDefaultReplica(hri); } private void splitLogs(final MasterProcedureEnv env) throws IOException { if (LOG.isDebugEnabled()) { - LOG.debug("Splitting logs from " + serverName + "; region count=" + - size(this.regionsOnCrashedServer)); + LOG.debug("Splitting WALs " + this); } MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); AssignmentManager am = env.getMasterServices().getAssignmentManager(); // TODO: For Matteo. Below BLOCKs!!!! Redo so can relinquish executor while it is running. + // PROBLEM!!! WE BLOCK HERE. mwm.splitLog(this.serverName); + if (LOG.isDebugEnabled()) { + LOG.debug("Done splitting WALs " + this); + } am.getRegionStates().logSplit(this.serverName); } @@ -415,124 +270,6 @@ implements ServerProcedureInterface { return hris == null? 0: hris.size(); } - /** - * Figure out what we need to assign. Should be idempotent. - * @param env - * @return List of calculated regions to assign; may be empty or null. - * @throws IOException - */ - private List<HRegionInfo> calcRegionsToAssign(final MasterProcedureEnv env) - throws IOException { - AssignmentManager am = env.getMasterServices().getAssignmentManager(); - List<HRegionInfo> regionsToAssignAggregator = new ArrayList<>(); - int replicaCount = env.getMasterConfiguration().getInt(HConstants.META_REPLICAS_NUM, - HConstants.DEFAULT_META_REPLICA_NUM); - for (int i = 1; i < replicaCount; i++) { - HRegionInfo metaHri = - RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, i); - if (am.isCarryingMetaReplica(this.serverName, metaHri)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Reassigning meta replica" + metaHri + " that was on " + this.serverName); - } - regionsToAssignAggregator.add(metaHri); - } - } - // Clean out anything in regions in transition. - List<HRegionInfo> regionsInTransition = am.cleanOutCrashedServerReferences(serverName); - if (LOG.isDebugEnabled()) { - LOG.debug("Reassigning " + size(this.regionsOnCrashedServer) + - " region(s) that " + (serverName == null? "null": serverName) + - " was carrying (and " + regionsInTransition.size() + - " regions(s) that were opening on this server)"); - } - regionsToAssignAggregator.addAll(regionsInTransition); - - // Iterate regions that were on this server and figure which of these we need to reassign - if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) { - RegionStates regionStates = am.getRegionStates(); - for (HRegionInfo hri: this.regionsOnCrashedServer) { - if (regionsInTransition.contains(hri)) continue; - String encodedName = hri.getEncodedName(); - Lock lock = am.acquireRegionLock(encodedName); - try { - RegionState rit = regionStates.getRegionTransitionState(hri); - if (processDeadRegion(hri, am)) { - ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri); - if (addressFromAM != null && !addressFromAM.equals(this.serverName)) { - // If this region is in transition on the dead server, it must be - // opening or pending_open, which should have been covered by - // AM#cleanOutCrashedServerReferences - LOG.info("Skip assigning " + hri.getRegionNameAsString() - + " because opened on " + addressFromAM.getServerName()); - continue; - } - if (rit != null) { - if (rit.getServerName() != null && !rit.isOnServer(this.serverName)) { - // Skip regions that are in transition on other server - LOG.info("Skip assigning region in transition on other server" + rit); - continue; - } - LOG.info("Reassigning region " + rit + " and clearing zknode if exists"); - regionStates.updateRegionState(hri, RegionState.State.OFFLINE); - } else if (regionStates.isRegionInState( - hri, RegionState.State.SPLITTING_NEW, RegionState.State.MERGING_NEW)) { - regionStates.updateRegionState(hri, RegionState.State.OFFLINE); - } - regionsToAssignAggregator.add(hri); - // TODO: The below else if is different in branch-1 from master branch. - } else if (rit != null) { - if ((rit.isClosing() || rit.isFailedClose() || rit.isOffline()) - && am.getTableStateManager().isTableState(hri.getTable(), - TableState.State.DISABLED, TableState.State.DISABLING) || - am.getReplicasToClose().contains(hri)) { - // If the table was partially disabled and the RS went down, we should clear the - // RIT and remove the node for the region. - // The rit that we use may be stale in case the table was in DISABLING state - // but though we did assign we will not be clearing the znode in CLOSING state. - // Doing this will have no harm. See HBASE-5927 - regionStates.updateRegionState(hri, RegionState.State.OFFLINE); - am.offlineDisabledRegion(hri); - } else { - LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition " - + rit + " not to be assigned by SSH of server " + serverName); - } - } - } finally { - lock.unlock(); - } - } - } - return regionsToAssignAggregator; - } - - private boolean assign(final MasterProcedureEnv env, final List<HRegionInfo> hris) - throws InterruptedIOException { - AssignmentManager am = env.getMasterServices().getAssignmentManager(); - try { - am.assign(hris); - } catch (InterruptedException ie) { - LOG.error("Caught " + ie + " during round-robin assignment"); - throw (InterruptedIOException)new InterruptedIOException().initCause(ie); - } catch (IOException ioe) { - LOG.info("Caught " + ioe + " during region assignment, will retry"); - return false; - } - return true; - } - - private boolean waitOnAssign(final MasterProcedureEnv env, final List<HRegionInfo> hris) - throws InterruptedIOException { - int timeout = env.getMasterConfiguration().getInt(KEY_WAIT_ON_RIT, DEFAULT_WAIT_ON_RIT); - for (HRegionInfo hri: hris) { - // TODO: Blocks here. - if (!waitOnRegionToClearRegionsInTransition(env.getMasterServices().getAssignmentManager(), - hri, timeout)) { - return false; - } - } - return true; - } - @Override protected void rollbackState(MasterProcedureEnv env, ServerCrashState state) throws IOException { @@ -580,11 +317,11 @@ implements ServerProcedureInterface { @Override public void toStringClassDetails(StringBuilder sb) { sb.append(getClass().getSimpleName()); - sb.append(" serverName="); - sb.append(this.serverName); - sb.append(", shouldSplitWal="); + sb.append(" server="); + sb.append(serverName); + sb.append(", splitWal="); sb.append(shouldSplitWal); - sb.append(", carryingMeta="); + sb.append(", meta="); sb.append(carryingMeta); } @@ -595,7 +332,6 @@ implements ServerProcedureInterface { MasterProcedureProtos.ServerCrashStateData.Builder state = MasterProcedureProtos.ServerCrashStateData.newBuilder(). setServerName(ProtobufUtil.toServerName(this.serverName)). - setDistributedLogReplay(this.distributedLogReplay). setCarryingMeta(this.carryingMeta). setShouldSplitWal(this.shouldSplitWal); if (this.regionsOnCrashedServer != null && !this.regionsOnCrashedServer.isEmpty()) { @@ -603,11 +339,6 @@ implements ServerProcedureInterface { state.addRegionsOnCrashedServer(HRegionInfo.convert(hri)); } } - if (this.regionsAssigned != null && !this.regionsAssigned.isEmpty()) { - for (HRegionInfo hri: this.regionsAssigned) { - state.addRegionsAssigned(HRegionInfo.convert(hri)); - } - } state.build().writeDelimitedTo(stream); } @@ -618,142 +349,16 @@ implements ServerProcedureInterface { MasterProcedureProtos.ServerCrashStateData state = MasterProcedureProtos.ServerCrashStateData.parseDelimitedFrom(stream); this.serverName = ProtobufUtil.toServerName(state.getServerName()); - this.distributedLogReplay = state.hasDistributedLogReplay()? - state.getDistributedLogReplay(): false; this.carryingMeta = state.hasCarryingMeta()? state.getCarryingMeta(): false; // shouldSplitWAL has a default over in pb so this invocation will always work. this.shouldSplitWal = state.getShouldSplitWal(); int size = state.getRegionsOnCrashedServerCount(); if (size > 0) { - this.regionsOnCrashedServer = new HashSet<>(size); + this.regionsOnCrashedServer = new ArrayList<HRegionInfo>(size); for (RegionInfo ri: state.getRegionsOnCrashedServerList()) { this.regionsOnCrashedServer.add(HRegionInfo.convert(ri)); } } - size = state.getRegionsAssignedCount(); - if (size > 0) { - this.regionsAssigned = new ArrayList<>(size); - for (RegionInfo ri: state.getRegionsOnCrashedServerList()) { - this.regionsAssigned.add(HRegionInfo.convert(ri)); - } - } - } - - /** - * Process a dead region from a dead RS. Checks if the region is disabled or - * disabling or if the region has a partially completed split. - * @param hri - * @param assignmentManager - * @return Returns true if specified region should be assigned, false if not. - * @throws IOException - */ - private static boolean processDeadRegion(HRegionInfo hri, AssignmentManager assignmentManager) - throws IOException { - boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable()); - if (!tablePresent) { - LOG.info("The table " + hri.getTable() + " was deleted. Hence not proceeding."); - return false; - } - // If table is not disabled but the region is offlined, - boolean disabled = assignmentManager.getTableStateManager().isTableState(hri.getTable(), - TableState.State.DISABLED); - if (disabled){ - LOG.info("The table " + hri.getTable() + " was disabled. Hence not proceeding."); - return false; - } - if (hri.isOffline() && hri.isSplit()) { - // HBASE-7721: Split parent and daughters are inserted into hbase:meta as an atomic operation. - // If the meta scanner saw the parent split, then it should see the daughters as assigned - // to the dead server. We don't have to do anything. - return false; - } - boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(), - TableState.State.DISABLING); - if (disabling) { - LOG.info("The table " + hri.getTable() + " is disabled. Hence not assigning region" + - hri.getEncodedName()); - return false; - } - return true; - } - - /** - * If hbase:meta is not assigned already, assign. - * @throws IOException - */ - private void verifyAndAssignMetaWithRetries(final MasterProcedureEnv env) throws IOException { - MasterServices services = env.getMasterServices(); - int iTimes = services.getConfiguration().getInt(KEY_RETRIES_ON_META, DEFAULT_RETRIES_ON_META); - // Just reuse same time as we have for short wait on meta. Adding another config is overkill. - long waitTime = - services.getConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META); - int iFlag = 0; - while (true) { - try { - verifyAndAssignMeta(env); - break; - } catch (KeeperException e) { - services.abort("In server shutdown processing, assigning meta", e); - throw new IOException("Aborting", e); - } catch (Exception e) { - if (iFlag >= iTimes) { - services.abort("verifyAndAssignMeta failed after" + iTimes + " retries, aborting", e); - throw new IOException("Aborting", e); - } - try { - Thread.sleep(waitTime); - } catch (InterruptedException e1) { - LOG.warn("Interrupted when is the thread sleep", e1); - Thread.currentThread().interrupt(); - throw (InterruptedIOException)new InterruptedIOException().initCause(e1); - } - iFlag++; - } - } - } - - /** - * If hbase:meta is not assigned already, assign. - * @throws InterruptedException - * @throws IOException - * @throws KeeperException - */ - private void verifyAndAssignMeta(final MasterProcedureEnv env) - throws InterruptedException, IOException, KeeperException { - MasterServices services = env.getMasterServices(); - if (!isMetaAssignedQuickTest(env)) { - services.getAssignmentManager().assignMeta(HRegionInfo.FIRST_META_REGIONINFO); - } else if (serverName.equals(services.getMetaTableLocator(). - getMetaRegionLocation(services.getZooKeeper()))) { - throw new IOException("hbase:meta is onlined on the dead server " + this.serverName); - } else { - LOG.info("Skip assigning hbase:meta because it is online at " - + services.getMetaTableLocator().getMetaRegionLocation(services.getZooKeeper())); - } - } - - /** - * A quick test that hbase:meta is assigned; blocks for short time only. - * @return True if hbase:meta location is available and verified as good. - * @throws InterruptedException - * @throws IOException - */ - private boolean isMetaAssignedQuickTest(final MasterProcedureEnv env) - throws InterruptedException, IOException { - ZooKeeperWatcher zkw = env.getMasterServices().getZooKeeper(); - MetaTableLocator mtl = env.getMasterServices().getMetaTableLocator(); - boolean metaAssigned = false; - // Is hbase:meta location available yet? - if (mtl.isLocationAvailable(zkw)) { - ClusterConnection connection = env.getMasterServices().getClusterConnection(); - // Is hbase:meta location good yet? - long timeout = - env.getMasterConfiguration().getLong(KEY_SHORT_WAIT_ON_META, DEFAULT_SHORT_WAIT_ON_META); - if (mtl.verifyMetaRegionLocation(connection, zkw, timeout)) { - metaAssigned = true; - } - } - return metaAssigned; } @Override @@ -789,4 +394,46 @@ implements ServerProcedureInterface { // the client does not know about this procedure. return false; } -} + + /** + * Handle any outstanding RIT that are up against this.serverName, the crashed server. + * Notify them of crash. Remove assign entries from the passed in <code>regions</code> + * otherwise we have two assigns going on and they will fight over who has lock. + * Notify Unassigns also. + * @param crashedServer Server that crashed. + * @param regions Regions that were on crashed server + * @return Subset of <code>regions</code> that were RIT against <code>crashedServer</code> + */ + private void handleRIT(final MasterProcedureEnv env, final List<HRegionInfo> regions) { + if (regions == null) return; + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + final Iterator<HRegionInfo> it = regions.iterator(); + ServerCrashException sce = null; + while (it.hasNext()) { + final HRegionInfo hri = it.next(); + RegionTransitionProcedure rtp = am.getRegionStates().getRegionTransitionProcedure(hri); + if (rtp == null) continue; + // Make sure the RIT is against this crashed server. In the case where there are many + // processings of a crashed server -- backed up for whatever reason (slow WAL split) -- + // then a previous SCP may have already failed an assign, etc., and it may have a new + // location target; DO NOT fail these else we make for assign flux. + ServerName rtpServerName = rtp.getServer(env); + if (rtpServerName == null) { + LOG.warn("RIT with ServerName null! " + rtp); + continue; + } + if (!rtpServerName.equals(this.serverName)) continue; + LOG.info("pid=" + getProcId() + " found RIT " + rtp + "; " + + rtp.getRegionState(env).toShortString()); + // Notify RIT on server crash. + if (sce == null) { + sce = new ServerCrashException(getProcId(), getServerName()); + } + rtp.remoteCallFailed(env, this.serverName, sce); + if (rtp instanceof AssignProcedure) { + // If an assign, include it in our return and remove from passed-in list of regions. + it.remove(); + } + } + } +} \ No newline at end of file
