Apache9 commented on a change in pull request #3920:
URL: https://github.com/apache/hbase/pull/3920#discussion_r777216548
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
##########
@@ -1871,48 +1871,55 @@ public void onComplete() {
return failedFuture(e);
}
CompletableFuture<Void> future = new CompletableFuture<>();
- final SnapshotRequest request =
SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
- addListener(this.<Long> newMasterCaller()
- .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse,
Long> call(controller,
- stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
- resp -> resp.getExpectedTimeout()))
- .call(), (expectedTimeout, err) -> {
+ final SnapshotRequest request =
+
SnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
+ .setNonce(ng.newNonce()).build();
+ addListener(this.<SnapshotResponse> newMasterCaller()
+ .action((controller, stub) ->
+ this.<SnapshotRequest, SnapshotResponse, SnapshotResponse>
call(controller, stub,
+ request, (s, c, req, done) -> s.snapshot(c, req, done), resp ->
resp))
+ .call(), (resp, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- long startTime = EnvironmentEdgeManager.currentTime();
- long endTime = startTime + expectedTimeout;
- long maxPauseTime = expectedTimeout / maxAttempts;
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (EnvironmentEdgeManager.currentTime() < endTime) {
- addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (done) {
- future.complete(null);
- } else {
- // retry again after pauseTime.
- long pauseTime =
-
ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
- pauseTime = Math.min(pauseTime, maxPauseTime);
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
- TimeUnit.MILLISECONDS);
- }
- });
- } else {
- future.completeExceptionally(
- new SnapshotCreationException("Snapshot '" +
snapshot.getName() +
- "' wasn't completed in expectedTime:" + expectedTimeout + "
ms", snapshotDesc));
+ if (resp.hasProcId()) {
Review comment:
Better add some comments to explain this is for keeping compatibility
with old server implementation.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotRegionProcedure.java
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import com.google.errorprone.annotations.RestrictedApi;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
+import org.apache.hadoop.hbase.master.assignment.ServerState;
+import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
+import
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.regionserver.SnapshotRegionCallable;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotRegionProcedureStateData;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+
+/**
+ * A remote procedure which is used to send region snapshot request to region
server.
+ * The basic logic of SnapshotRegionProcedure is similar like {@link
ServerRemoteProcedure},
+ * only with a little difference, when {@link FailedRemoteDispatchException}
was thrown,
+ * SnapshotRegionProcedure will sleep some time and continue retrying until
success.
+ */
[email protected]
+public class SnapshotRegionProcedure extends Procedure<MasterProcedureEnv>
+ implements TableProcedureInterface, RemoteProcedure<MasterProcedureEnv,
ServerName> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotRegionProcedure.class);
+
+ private SnapshotDescription snapshot;
+ private ProcedureEvent<?> event;
+ private RegionInfo region;
+ private boolean dispatched;
+ private boolean succ;
+ private RetryCounter retryCounter;
+
+ public SnapshotRegionProcedure() {
+ }
+
+ public SnapshotRegionProcedure(SnapshotDescription snapshot, RegionInfo
region) {
+ this.snapshot = snapshot;
+ this.region = region;
+ }
+
+ @Override
+ protected LockState acquireLock(final MasterProcedureEnv env) {
+ if (env.getProcedureScheduler().waitRegions(this, getTableName(), region))
{
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(final MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakeRegions(this, getTableName(), region);
+ }
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ return false;
+ }
+
+ @Override
+ public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env,
ServerName serverName) {
+ return Optional.of(new RSProcedureDispatcher.ServerOperation(this,
getProcId(),
+ SnapshotRegionCallable.class,
MasterProcedureProtos.SnapshotRegionParameter.newBuilder()
+
.setRegion(ProtobufUtil.toRegionInfo(region)).setSnapshot(snapshot).build().toByteArray()));
+ }
+
+ @Override
+ public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName,
IOException e) {
+ complete(env, e);
+ }
+
+ @Override
+ public void remoteOperationCompleted(MasterProcedureEnv env) {
+ complete(env, null);
+ }
+
+ @Override
+ public void remoteOperationFailed(MasterProcedureEnv env,
RemoteProcedureException e) {
+ complete(env, e);
+ }
+
+ // keep retrying until success
+ private void complete(MasterProcedureEnv env, Throwable error) {
+ if (isFinished()) {
+ LOG.info("This procedure {} is already finished, skip the rest
processes", this.getProcId());
+ return;
+ }
+ if (event == null) {
+ LOG.warn("procedure event for {} is null, maybe the procedure is created
when recovery",
+ getProcId());
+ return;
+ }
+ if (error == null) {
+ LOG.info("finish snapshot {} on region {}", snapshot.getName(),
region.getEncodedName());
+ succ = true;
+ }
+
+ event.wake(env.getProcedureScheduler());
+ event = null;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return region.getTable();
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.REGION_SNAPSHOT;
+ }
+
+ @Override
+ protected Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException,
InterruptedException {
+ if (dispatched) {
+ if (succ) {
+ return null;
+ }
+ dispatched = false;
+ }
+
+ RegionStates regionStates = env.getAssignmentManager().getRegionStates();
+ RegionStateNode regionNode = regionStates.getRegionStateNode(region);
+ regionNode.lock();
+ try {
+ if (regionNode.getProcedure() != null) {
+ setTimeoutForSuspend(env, String.format("region %s has a TRSP attached
%s",
+ region.getRegionNameAsString(), regionNode.getProcedure()));
+ throw new ProcedureSuspendedException();
+ }
+ if (!regionNode.getState().matches(RegionState.State.OPEN)) {
Review comment:
Is this possible if we do not have a TRSP for this region?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MetricsSnapshot;
+import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
+import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
+import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+
+/**
+ * A procedure used to take snapshot on tables.
+ */
[email protected]
+public class SnapshotProcedure
+ extends AbstractStateMachineTableProcedure<SnapshotState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotProcedure.class);
+ private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
+
+ private Configuration conf;
+ private SnapshotDescription snapshot;
+ private Path rootDir;
+ private Path snapshotDir;
+ private Path workingDir;
+ private FileSystem workingDirFS;
+ private TableName snapshotTable;
+ private MonitoredTask status;
+ private SnapshotManifest snapshotManifest;
+ private TableDescriptor htd;
+
+ private RetryCounter retryCounter;
+
+ public SnapshotProcedure() { }
+
+ public SnapshotProcedure(final MasterProcedureEnv env, final
SnapshotDescription snapshot) {
+ super(env);
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TableName.valueOf(snapshot.getTable());
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.SNAPSHOT;
+ }
+
+ @Override
+ protected LockState acquireLock(MasterProcedureEnv env) {
+ // AbstractStateMachineTableProcedure acquires exclusive table lock by
default,
+ // but we may need to downgrade it to shared lock for some reasons:
+ // a. exclusive lock has a negative effect on assigning region. See
HBASE-21480 for details.
+ // b. we want to support taking multiple different snapshots on same table
on the same time.
+ if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName()))
{
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakeTableSharedLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ // In order to avoid enabling/disabling/modifying/deleting table during
snapshot,
+ // we don't release lock during suspend
+ return true;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ LOG.info("{} execute state={}", this, state);
+
+ try {
+ switch (state) {
+ case SNAPSHOT_PREPARE:
+ prepareSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_PRE_OPERATION:
+ preSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_WRITE_SNAPSHOT_INFO:
+ SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir,
workingDirFS);
+ TableState tableState =
+
env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
+ if (tableState.isEnabled()) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
+ } else if (tableState.isDisabled()) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
+ addChildProcedure(createRemoteSnapshotProcedures(env));
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS:
+ snapshotOfflineRegions(env);
+ if (MobUtils.hasMobColumns(htd)) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
+ } else {
+ setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_MOB_REGION:
+ snapshotMobRegion();
+ setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
+ // flush the in-memory state, and write the single manifest
+ status.setStatus("Consolidate snapshot: " + snapshot.getName());
+ snapshotManifest.consolidate();
+ setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_VERIFIER_SNAPSHOT:
+ status.setStatus("Verifying snapshot: " + snapshot.getName());
+ verifySnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_COMPLETE_SNAPSHOT:
+ completeSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_POST_OPERATION:
+ postSnapshot(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ } catch (ProcedureSuspendedException e) {
+ throw e;
+ } catch (Exception e) {
+ setFailure("master-snapshot", e);
+ LOG.warn("unexpected exception while execute {}. Mark procedure
Failed.", this, e);
+ status.abort("Abort Snapshot " + snapshot.getName() + " on Table " +
snapshotTable);
+ return Flow.NO_MORE_STATE;
+ }
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
+ throws IOException, InterruptedException {
+ if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
+ try {
+ if (!workingDirFS.delete(workingDir, true)) {
+ LOG.error("Couldn't delete snapshot working directory {}",
workingDir);
+ }
+ } catch (IOException e) {
+ LOG.error("Couldn't delete snapshot working directory {}", workingDir,
e);
+ }
+ }
+ }
+
+ @Override
+ protected boolean isRollbackSupported(SnapshotState state) {
+ return true;
+ }
+
+ @Override
+ protected SnapshotState getState(final int stateId) {
+ return SnapshotState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(SnapshotState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected SnapshotState getInitialState() {
+ return SnapshotState.SNAPSHOT_PREPARE;
+ }
+
+ private void prepareSnapshot(MasterProcedureEnv env)
+ throws ProcedureSuspendedException, IOException {
+ if (isAnySplitOrMergeProcedureRunning(env)) {
+ if (retryCounter == null) {
+ retryCounter =
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this,
backoff);
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+ prepareSnapshotEnv(env);
+ }
+
+ private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
+ this.conf = env.getMasterConfiguration();
+ this.snapshotTable = TableName.valueOf(snapshot.getTable());
+ this.htd = loadTableDescriptorSnapshot(env);
+ this.rootDir = CommonFSUtils.getRootDir(conf);
+ this.snapshotDir =
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
+ this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot,
rootDir, conf);
+ this.workingDirFS = workingDir.getFileSystem(conf);
+ this.status = TaskMonitor.get()
+ .createStatus("Taking " + snapshot.getType() + " snapshot on table: " +
snapshotTable);
+ ForeignExceptionDispatcher monitor = new
ForeignExceptionDispatcher(snapshot.getName());
+ this.snapshotManifest = SnapshotManifest.create(conf,
+ workingDirFS, workingDir, snapshot, monitor, status);
+ this.snapshotManifest.addTableDescriptor(htd);
+ }
+
+ @Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
+ private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
+ return
env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
+ .filter(p -> !p.isFinished())
+ .filter(p -> p instanceof SplitTableRegionProcedure ||
+ p instanceof MergeTableRegionsProcedure)
+ .anyMatch(p -> ((AbstractStateMachineTableProcedure<?>) p)
+ .getTableName().equals(getTableName()));
+ }
+
+ private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env)
throws IOException {
+ TableDescriptor htd =
env.getMasterServices().getTableDescriptors().get(snapshotTable);
+ if (htd == null) {
Review comment:
Is this possible?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MetricsSnapshot;
+import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
+import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
+import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+
+/**
+ * A procedure used to take snapshot on tables.
+ */
[email protected]
+public class SnapshotProcedure
+ extends AbstractStateMachineTableProcedure<SnapshotState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotProcedure.class);
+ private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
+
+ private Configuration conf;
+ private SnapshotDescription snapshot;
+ private Path rootDir;
+ private Path snapshotDir;
+ private Path workingDir;
+ private FileSystem workingDirFS;
+ private TableName snapshotTable;
+ private MonitoredTask status;
+ private SnapshotManifest snapshotManifest;
+ private TableDescriptor htd;
+
+ private RetryCounter retryCounter;
+
+ public SnapshotProcedure() { }
+
+ public SnapshotProcedure(final MasterProcedureEnv env, final
SnapshotDescription snapshot) {
+ super(env);
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TableName.valueOf(snapshot.getTable());
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.SNAPSHOT;
+ }
+
+ @Override
+ protected LockState acquireLock(MasterProcedureEnv env) {
+ // AbstractStateMachineTableProcedure acquires exclusive table lock by
default,
+ // but we may need to downgrade it to shared lock for some reasons:
+ // a. exclusive lock has a negative effect on assigning region. See
HBASE-21480 for details.
+ // b. we want to support taking multiple different snapshots on same table
on the same time.
+ if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName()))
{
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakeTableSharedLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ // In order to avoid enabling/disabling/modifying/deleting table during
snapshot,
+ // we don't release lock during suspend
+ return true;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ LOG.info("{} execute state={}", this, state);
+
+ try {
+ switch (state) {
+ case SNAPSHOT_PREPARE:
+ prepareSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_PRE_OPERATION:
+ preSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_WRITE_SNAPSHOT_INFO:
+ SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir,
workingDirFS);
+ TableState tableState =
+
env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
+ if (tableState.isEnabled()) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
+ } else if (tableState.isDisabled()) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
+ addChildProcedure(createRemoteSnapshotProcedures(env));
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS:
+ snapshotOfflineRegions(env);
+ if (MobUtils.hasMobColumns(htd)) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
+ } else {
+ setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_MOB_REGION:
+ snapshotMobRegion();
+ setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
+ // flush the in-memory state, and write the single manifest
+ status.setStatus("Consolidate snapshot: " + snapshot.getName());
+ snapshotManifest.consolidate();
+ setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_VERIFIER_SNAPSHOT:
+ status.setStatus("Verifying snapshot: " + snapshot.getName());
+ verifySnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_COMPLETE_SNAPSHOT:
+ completeSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_POST_OPERATION:
+ postSnapshot(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ } catch (ProcedureSuspendedException e) {
+ throw e;
+ } catch (Exception e) {
+ setFailure("master-snapshot", e);
+ LOG.warn("unexpected exception while execute {}. Mark procedure
Failed.", this, e);
+ status.abort("Abort Snapshot " + snapshot.getName() + " on Table " +
snapshotTable);
+ return Flow.NO_MORE_STATE;
+ }
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
+ throws IOException, InterruptedException {
+ if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
+ try {
+ if (!workingDirFS.delete(workingDir, true)) {
+ LOG.error("Couldn't delete snapshot working directory {}",
workingDir);
+ }
+ } catch (IOException e) {
+ LOG.error("Couldn't delete snapshot working directory {}", workingDir,
e);
+ }
+ }
+ }
+
+ @Override
+ protected boolean isRollbackSupported(SnapshotState state) {
+ return true;
+ }
+
+ @Override
+ protected SnapshotState getState(final int stateId) {
+ return SnapshotState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(SnapshotState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected SnapshotState getInitialState() {
+ return SnapshotState.SNAPSHOT_PREPARE;
+ }
+
+ private void prepareSnapshot(MasterProcedureEnv env)
+ throws ProcedureSuspendedException, IOException {
+ if (isAnySplitOrMergeProcedureRunning(env)) {
+ if (retryCounter == null) {
+ retryCounter =
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this,
backoff);
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+ prepareSnapshotEnv(env);
+ }
+
+ private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
+ this.conf = env.getMasterConfiguration();
+ this.snapshotTable = TableName.valueOf(snapshot.getTable());
+ this.htd = loadTableDescriptorSnapshot(env);
+ this.rootDir = CommonFSUtils.getRootDir(conf);
+ this.snapshotDir =
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
+ this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot,
rootDir, conf);
+ this.workingDirFS = workingDir.getFileSystem(conf);
+ this.status = TaskMonitor.get()
+ .createStatus("Taking " + snapshot.getType() + " snapshot on table: " +
snapshotTable);
+ ForeignExceptionDispatcher monitor = new
ForeignExceptionDispatcher(snapshot.getName());
+ this.snapshotManifest = SnapshotManifest.create(conf,
+ workingDirFS, workingDir, snapshot, monitor, status);
+ this.snapshotManifest.addTableDescriptor(htd);
+ }
+
+ @Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
+ private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
+ return
env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
+ .filter(p -> !p.isFinished())
+ .filter(p -> p instanceof SplitTableRegionProcedure ||
+ p instanceof MergeTableRegionsProcedure)
+ .anyMatch(p -> ((AbstractStateMachineTableProcedure<?>) p)
+ .getTableName().equals(getTableName()));
+ }
+
+ private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env)
throws IOException {
+ TableDescriptor htd =
env.getMasterServices().getTableDescriptors().get(snapshotTable);
+ if (htd == null) {
+ throw new IOException("TableDescriptor missing for " + snapshotTable);
+ }
+ if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) {
+ return
TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE,
+ Long.toString(this.snapshot.getMaxFileSize())).build();
+ }
+ return htd;
+ }
+
+ private void preSnapshot(MasterProcedureEnv env) throws IOException {
+
env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot);
+
+ MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.preSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd);
+ }
+ }
+
+ private void postSnapshot(MasterProcedureEnv env) throws IOException {
+ MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd);
+ }
+ }
+
+ private void verifySnapshot(MasterProcedureEnv env) throws IOException {
+ int verifyThreshold = env.getMasterConfiguration()
+ .getInt("hbase.snapshot.remote.verify.threshold", 10000);
+ int numRegions = (int) env.getAssignmentManager()
+ .getTableRegions(snapshotTable, false)
+ .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).count();
+
+ if (numRegions >= verifyThreshold) {
+ addChildProcedure(createRemoteVerifyProcedures(env));
+ } else {
+ MasterSnapshotVerifier verifier =
+ new MasterSnapshotVerifier(env.getMasterServices(), snapshot);
+ verifier.verifySnapshot();
+ }
+ }
+
+ private void completeSnapshot(MasterProcedureEnv env) throws IOException {
+ // complete the snapshot, atomically moving from tmp to .snapshot dir.
+ SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir,
+ env.getMasterFileSystem().getFileSystem(), workingDirFS, conf);
+ // update metric. when master restarts, the metric value is wrong
+ metricsSnapshot.addSnapshot(status.getCompletionTimestamp() -
status.getStartTime());
+ if (env.getMasterCoprocessorHost() != null) {
+ env.getMasterCoprocessorHost()
+
.postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd);
+ }
+ status.markComplete("Snapshot " + snapshot.getName() + " completed");
+ }
+
+ private void snapshotOfflineRegions(MasterProcedureEnv env) throws
IOException {
+ List<RegionInfo> regions = env.getAssignmentManager()
+ .getTableRegions(snapshotTable, false).stream()
+ .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
+ .filter(RegionInfo::isSplit).collect(Collectors.toList());
+
+ ThreadPoolExecutor exec = SnapshotManifest
+ .createExecutor(env.getMasterConfiguration(),
"OfflineRegionsSnapshotPool");
+ try {
+ ModifyRegionUtils.editRegions(exec, regions, new
ModifyRegionUtils.RegionEditTask() {
+ @Override
+ public void editRegion(final RegionInfo regionInfo) throws IOException
{
+ snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir,
snapshotTable), regionInfo);
+ }
+ });
+ } finally {
+ exec.shutdown();
+ }
+ status.setStatus("Completed referencing offline regions of table: " +
snapshotTable);
+ }
+
+ private void snapshotMobRegion() throws IOException {
+ RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
+ snapshotManifest.addMobRegion(mobRegionInfo);
+ status.setStatus("Completed referencing HFiles for the mob region of
table: " + snapshotTable);
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ super.serializeStateData(serializer);
+ serializer.serialize(SnapshotProcedureStateData
+ .newBuilder().setSnapshot(this.snapshot).build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ super.deserializeStateData(serializer);
+ SnapshotProcedureStateData data =
serializer.deserialize(SnapshotProcedureStateData.class);
+ this.snapshot = data.getSnapshot();
+ }
+
+ private Procedure<MasterProcedureEnv>[]
createRemoteSnapshotProcedures(MasterProcedureEnv env) {
+ return env.getAssignmentManager().getTableRegions(snapshotTable, true)
+ .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r))
+ .map(r -> new SnapshotRegionProcedure(snapshot, r))
+ .toArray(SnapshotRegionProcedure[]::new);
+ }
+
+ // here we assign region snapshot manifest verify tasks to all region servers
+ // in cluster with the help of master load balancer.
+ private Procedure<MasterProcedureEnv>[]
createRemoteVerifyProcedures(MasterProcedureEnv env)
+ throws IOException {
+ List<RegionInfo> regions = env
+ .getAssignmentManager().getTableRegions(snapshotTable, false);
+ List<ServerName> servers = env
+ .getMasterServices().getServerManager().getOnlineServersList();
+ return env.getMasterServices().getLoadBalancer()
Review comment:
Do we need to use LoadBalancer here? I suppose we could use something
like the SplitWALManager, where we could configure the max concurrency number
for a region server, and if there are more tasks then the
number_of_region_servers * max_concurrency, we just do not schedule all the
tasks, once there are tasks finished, we will schedule new tasks. This could
make the tasks more evenly distributed.
##########
File path: hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
##########
@@ -437,10 +437,13 @@ message IsCleanerChoreEnabledResponse {
message SnapshotRequest {
required SnapshotDescription snapshot = 1;
+ optional uint64 nonce_group = 2 [default = 0];
Review comment:
Why we do not have this in the past?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
##########
@@ -1706,12 +1707,23 @@ public SnapshotResponse snapshot(RpcController
controller,
// get the snapshot information
SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(
request.getSnapshot(), server.getConfiguration());
- server.snapshotManager.takeSnapshot(snapshot);
-
// send back the max amount of time the client should wait for the
snapshot to complete
long waitTime =
SnapshotDescriptionUtils.getMaxMasterTimeout(server.getConfiguration(),
snapshot.getType(), SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
- return
SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
+
+ SnapshotResponse.Builder builder =
SnapshotResponse.newBuilder().setExpectedTimeout(waitTime);
+
+ // just to pass the unit tests for all 3.x versions,
+ // the minimum version maybe needs to be modified later
+ if (VersionInfoUtil.currentClientHasMinimumVersion(2, 10)) {
Review comment:
I think here we could always use the same logic? Just always return the
procId, it is the client's choice to whether to make use of it. And we could
just test whether we have nonceGroup and nonce directly, so we do not need to
test the client version.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java
##########
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher;
+import org.apache.hadoop.hbase.regionserver.SnapshotVerifyCallable;
+import org.apache.hadoop.hbase.snapshot.CorruptedSnapshotException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyParameter;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotVerifyProcedureStateData;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+
+/**
+ * A remote procedure which is used to send verify snapshot request to region
server.
+ */
[email protected]
+public class SnapshotVerifyProcedure
+ extends ServerRemoteProcedure implements ServerProcedureInterface {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotVerifyProcedure.class);
+
+ private SnapshotDescription snapshot;
+ private List<RegionInfo> regions;
+ private int expectedNumRegion;
+ private CorruptedSnapshotException remoteException;
+
+ public SnapshotVerifyProcedure() {}
+
+ public SnapshotVerifyProcedure(SnapshotDescription snapshot,
List<RegionInfo> regions,
+ ServerName targetServer, int expectedNumRegion) {
+ this.targetServer = targetServer;
+ this.snapshot = snapshot;
+ this.regions = regions;
+ this.expectedNumRegion = expectedNumRegion;
+ }
+
+ @Override
+ protected void complete(MasterProcedureEnv env, Throwable error) {
+ if (error != null) {
+ Throwable realError = error.getCause();
+ if (realError instanceof CorruptedSnapshotException) {
+ synchronized (this) {
+ this.remoteException = (CorruptedSnapshotException) realError;
+ }
+ this.succ = true;
+ } else {
+ this.succ = false;
+ }
+ } else {
+ this.succ = true;
+ }
+ }
+
+ @Override
+ protected synchronized Procedure<MasterProcedureEnv>[]
execute(MasterProcedureEnv env)
+ throws ProcedureYieldException, ProcedureSuspendedException,
InterruptedException {
+ // Regardless of success or failure, ServerRemoteProcedure returns and
leaves the parent
+ // procedure to find out and handle failures. In this case,
SnapshotProcedure doesn't
+ // care which region server the task is assigned to, so we push down the
choice of
+ // new target server to SnapshotVerifyProcedure.
+ Procedure<MasterProcedureEnv>[] res = super.execute(env);
+ if (res == null) {
+ if (succ) {
+ // remote task has finished, we already known snapshot if snapshot is
corrupted
+ if (remoteException != null) {
+ setFailure("verify-snapshot", remoteException);
+ }
+ return null;
+ } else {
+ // can not send request to remote server, we will try another server
+ ServerName newTargetServer =
env.getMasterServices().getServerManager().randomSelect();
Review comment:
I think we'd better just fail here and let the upper layer to reschedule
a procedure for verifying?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MetricsSnapshot;
+import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
+import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
+import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+
+/**
+ * A procedure used to take snapshot on tables.
+ */
[email protected]
+public class SnapshotProcedure
+ extends AbstractStateMachineTableProcedure<SnapshotState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotProcedure.class);
+ private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
+
+ private Configuration conf;
+ private SnapshotDescription snapshot;
+ private Path rootDir;
+ private Path snapshotDir;
+ private Path workingDir;
+ private FileSystem workingDirFS;
+ private TableName snapshotTable;
+ private MonitoredTask status;
+ private SnapshotManifest snapshotManifest;
+ private TableDescriptor htd;
+
+ private RetryCounter retryCounter;
+
+ public SnapshotProcedure() { }
+
+ public SnapshotProcedure(final MasterProcedureEnv env, final
SnapshotDescription snapshot) {
+ super(env);
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TableName.valueOf(snapshot.getTable());
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.SNAPSHOT;
+ }
+
+ @Override
+ protected LockState acquireLock(MasterProcedureEnv env) {
+ // AbstractStateMachineTableProcedure acquires exclusive table lock by
default,
+ // but we may need to downgrade it to shared lock for some reasons:
+ // a. exclusive lock has a negative effect on assigning region. See
HBASE-21480 for details.
+ // b. we want to support taking multiple different snapshots on same table
on the same time.
+ if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName()))
{
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakeTableSharedLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ // In order to avoid enabling/disabling/modifying/deleting table during
snapshot,
+ // we don't release lock during suspend
+ return true;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ LOG.info("{} execute state={}", this, state);
+
+ try {
+ switch (state) {
+ case SNAPSHOT_PREPARE:
+ prepareSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_PRE_OPERATION:
+ preSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_WRITE_SNAPSHOT_INFO:
+ SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir,
workingDirFS);
+ TableState tableState =
+
env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
+ if (tableState.isEnabled()) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
+ } else if (tableState.isDisabled()) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
+ addChildProcedure(createRemoteSnapshotProcedures(env));
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS:
+ snapshotOfflineRegions(env);
+ if (MobUtils.hasMobColumns(htd)) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
+ } else {
+ setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_MOB_REGION:
+ snapshotMobRegion();
+ setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
+ // flush the in-memory state, and write the single manifest
+ status.setStatus("Consolidate snapshot: " + snapshot.getName());
+ snapshotManifest.consolidate();
+ setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_VERIFIER_SNAPSHOT:
+ status.setStatus("Verifying snapshot: " + snapshot.getName());
+ verifySnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_COMPLETE_SNAPSHOT:
+ completeSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_POST_OPERATION:
+ postSnapshot(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ } catch (ProcedureSuspendedException e) {
+ throw e;
+ } catch (Exception e) {
+ setFailure("master-snapshot", e);
+ LOG.warn("unexpected exception while execute {}. Mark procedure
Failed.", this, e);
+ status.abort("Abort Snapshot " + snapshot.getName() + " on Table " +
snapshotTable);
+ return Flow.NO_MORE_STATE;
+ }
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
+ throws IOException, InterruptedException {
+ if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
+ try {
+ if (!workingDirFS.delete(workingDir, true)) {
+ LOG.error("Couldn't delete snapshot working directory {}",
workingDir);
+ }
+ } catch (IOException e) {
+ LOG.error("Couldn't delete snapshot working directory {}", workingDir,
e);
+ }
+ }
+ }
+
+ @Override
+ protected boolean isRollbackSupported(SnapshotState state) {
+ return true;
+ }
+
+ @Override
+ protected SnapshotState getState(final int stateId) {
+ return SnapshotState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(SnapshotState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected SnapshotState getInitialState() {
+ return SnapshotState.SNAPSHOT_PREPARE;
+ }
+
+ private void prepareSnapshot(MasterProcedureEnv env)
+ throws ProcedureSuspendedException, IOException {
+ if (isAnySplitOrMergeProcedureRunning(env)) {
+ if (retryCounter == null) {
+ retryCounter =
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this,
backoff);
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+ prepareSnapshotEnv(env);
+ }
+
+ private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
+ this.conf = env.getMasterConfiguration();
+ this.snapshotTable = TableName.valueOf(snapshot.getTable());
+ this.htd = loadTableDescriptorSnapshot(env);
+ this.rootDir = CommonFSUtils.getRootDir(conf);
+ this.snapshotDir =
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
+ this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot,
rootDir, conf);
+ this.workingDirFS = workingDir.getFileSystem(conf);
+ this.status = TaskMonitor.get()
+ .createStatus("Taking " + snapshot.getType() + " snapshot on table: " +
snapshotTable);
+ ForeignExceptionDispatcher monitor = new
ForeignExceptionDispatcher(snapshot.getName());
+ this.snapshotManifest = SnapshotManifest.create(conf,
+ workingDirFS, workingDir, snapshot, monitor, status);
+ this.snapshotManifest.addTableDescriptor(htd);
+ }
+
+ @Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
+ private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
+ return
env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
+ .filter(p -> !p.isFinished())
+ .filter(p -> p instanceof SplitTableRegionProcedure ||
+ p instanceof MergeTableRegionsProcedure)
+ .anyMatch(p -> ((AbstractStateMachineTableProcedure<?>) p)
+ .getTableName().equals(getTableName()));
+ }
+
+ private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env)
throws IOException {
+ TableDescriptor htd =
env.getMasterServices().getTableDescriptors().get(snapshotTable);
+ if (htd == null) {
+ throw new IOException("TableDescriptor missing for " + snapshotTable);
+ }
+ if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) {
Review comment:
Why do we need to treat the maxFileSize specially?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotProcedure.java
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableState;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
+import org.apache.hadoop.hbase.master.MetricsSnapshot;
+import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
+import org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure;
+import org.apache.hadoop.hbase.master.snapshot.MasterSnapshotVerifier;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.monitoring.TaskMonitor;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotProcedureStateData;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SnapshotState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
+
+/**
+ * A procedure used to take snapshot on tables.
+ */
[email protected]
+public class SnapshotProcedure
+ extends AbstractStateMachineTableProcedure<SnapshotState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotProcedure.class);
+ private final MetricsSnapshot metricsSnapshot = new MetricsSnapshot();
+
+ private Configuration conf;
+ private SnapshotDescription snapshot;
+ private Path rootDir;
+ private Path snapshotDir;
+ private Path workingDir;
+ private FileSystem workingDirFS;
+ private TableName snapshotTable;
+ private MonitoredTask status;
+ private SnapshotManifest snapshotManifest;
+ private TableDescriptor htd;
+
+ private RetryCounter retryCounter;
+
+ public SnapshotProcedure() { }
+
+ public SnapshotProcedure(final MasterProcedureEnv env, final
SnapshotDescription snapshot) {
+ super(env);
+ this.snapshot = snapshot;
+ }
+
+ @Override
+ public TableName getTableName() {
+ return TableName.valueOf(snapshot.getTable());
+ }
+
+ @Override
+ public TableOperationType getTableOperationType() {
+ return TableOperationType.SNAPSHOT;
+ }
+
+ @Override
+ protected LockState acquireLock(MasterProcedureEnv env) {
+ // AbstractStateMachineTableProcedure acquires exclusive table lock by
default,
+ // but we may need to downgrade it to shared lock for some reasons:
+ // a. exclusive lock has a negative effect on assigning region. See
HBASE-21480 for details.
+ // b. we want to support taking multiple different snapshots on same table
on the same time.
+ if (env.getProcedureScheduler().waitTableSharedLock(this, getTableName()))
{
+ return LockState.LOCK_EVENT_WAIT;
+ }
+ return LockState.LOCK_ACQUIRED;
+ }
+
+ @Override
+ protected void releaseLock(MasterProcedureEnv env) {
+ env.getProcedureScheduler().wakeTableSharedLock(this, getTableName());
+ }
+
+ @Override
+ protected boolean holdLock(MasterProcedureEnv env) {
+ // In order to avoid enabling/disabling/modifying/deleting table during
snapshot,
+ // we don't release lock during suspend
+ return true;
+ }
+
+ @Override
+ protected Flow executeFromState(MasterProcedureEnv env, SnapshotState state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ LOG.info("{} execute state={}", this, state);
+
+ try {
+ switch (state) {
+ case SNAPSHOT_PREPARE:
+ prepareSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_PRE_OPERATION);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_PRE_OPERATION:
+ preSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_WRITE_SNAPSHOT_INFO);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_WRITE_SNAPSHOT_INFO:
+ SnapshotDescriptionUtils.writeSnapshotInfo(snapshot, workingDir,
workingDirFS);
+ TableState tableState =
+
env.getMasterServices().getTableStateManager().getTableState(snapshotTable);
+ if (tableState.isEnabled()) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_ONLINE_REGIONS);
+ } else if (tableState.isDisabled()) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_ONLINE_REGIONS:
+ addChildProcedure(createRemoteSnapshotProcedures(env));
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_OFFLINE_REGIONS:
+ snapshotOfflineRegions(env);
+ if (MobUtils.hasMobColumns(htd)) {
+ setNextState(SnapshotState.SNAPSHOT_SNAPSHOT_MOB_REGION);
+ } else {
+ setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
+ }
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_SNAPSHOT_MOB_REGION:
+ snapshotMobRegion();
+ setNextState(SnapshotState.SNAPSHOT_CONSOLIDATE_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_CONSOLIDATE_SNAPSHOT:
+ // flush the in-memory state, and write the single manifest
+ status.setStatus("Consolidate snapshot: " + snapshot.getName());
+ snapshotManifest.consolidate();
+ setNextState(SnapshotState.SNAPSHOT_VERIFIER_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_VERIFIER_SNAPSHOT:
+ status.setStatus("Verifying snapshot: " + snapshot.getName());
+ verifySnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_COMPLETE_SNAPSHOT);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_COMPLETE_SNAPSHOT:
+ completeSnapshot(env);
+ setNextState(SnapshotState.SNAPSHOT_POST_OPERATION);
+ return Flow.HAS_MORE_STATE;
+ case SNAPSHOT_POST_OPERATION:
+ postSnapshot(env);
+ return Flow.NO_MORE_STATE;
+ default:
+ throw new UnsupportedOperationException("unhandled state=" + state);
+ }
+ } catch (ProcedureSuspendedException e) {
+ throw e;
+ } catch (Exception e) {
+ setFailure("master-snapshot", e);
+ LOG.warn("unexpected exception while execute {}. Mark procedure
Failed.", this, e);
+ status.abort("Abort Snapshot " + snapshot.getName() + " on Table " +
snapshotTable);
+ return Flow.NO_MORE_STATE;
+ }
+ }
+
+ @Override
+ protected void rollbackState(MasterProcedureEnv env, SnapshotState state)
+ throws IOException, InterruptedException {
+ if (state == SnapshotState.SNAPSHOT_PRE_OPERATION) {
+ try {
+ if (!workingDirFS.delete(workingDir, true)) {
+ LOG.error("Couldn't delete snapshot working directory {}",
workingDir);
+ }
+ } catch (IOException e) {
+ LOG.error("Couldn't delete snapshot working directory {}", workingDir,
e);
+ }
+ }
+ }
+
+ @Override
+ protected boolean isRollbackSupported(SnapshotState state) {
+ return true;
+ }
+
+ @Override
+ protected SnapshotState getState(final int stateId) {
+ return SnapshotState.forNumber(stateId);
+ }
+
+ @Override
+ protected int getStateId(SnapshotState state) {
+ return state.getNumber();
+ }
+
+ @Override
+ protected SnapshotState getInitialState() {
+ return SnapshotState.SNAPSHOT_PREPARE;
+ }
+
+ private void prepareSnapshot(MasterProcedureEnv env)
+ throws ProcedureSuspendedException, IOException {
+ if (isAnySplitOrMergeProcedureRunning(env)) {
+ if (retryCounter == null) {
+ retryCounter =
ProcedureUtil.createRetryCounter(env.getMasterConfiguration());
+ }
+ long backoff = retryCounter.getBackoffTimeAndIncrementAttempts();
+ LOG.warn("{} waits {} ms for Split/Merge procedure to finish", this,
backoff);
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+ prepareSnapshotEnv(env);
+ }
+
+ private void prepareSnapshotEnv(MasterProcedureEnv env) throws IOException {
+ this.conf = env.getMasterConfiguration();
+ this.snapshotTable = TableName.valueOf(snapshot.getTable());
+ this.htd = loadTableDescriptorSnapshot(env);
+ this.rootDir = CommonFSUtils.getRootDir(conf);
+ this.snapshotDir =
SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
+ this.workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot,
rootDir, conf);
+ this.workingDirFS = workingDir.getFileSystem(conf);
+ this.status = TaskMonitor.get()
+ .createStatus("Taking " + snapshot.getType() + " snapshot on table: " +
snapshotTable);
+ ForeignExceptionDispatcher monitor = new
ForeignExceptionDispatcher(snapshot.getName());
+ this.snapshotManifest = SnapshotManifest.create(conf,
+ workingDirFS, workingDir, snapshot, monitor, status);
+ this.snapshotManifest.addTableDescriptor(htd);
+ }
+
+ @Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
+ private boolean isAnySplitOrMergeProcedureRunning(MasterProcedureEnv env) {
+ return
env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
+ .filter(p -> !p.isFinished())
+ .filter(p -> p instanceof SplitTableRegionProcedure ||
+ p instanceof MergeTableRegionsProcedure)
+ .anyMatch(p -> ((AbstractStateMachineTableProcedure<?>) p)
+ .getTableName().equals(getTableName()));
+ }
+
+ private TableDescriptor loadTableDescriptorSnapshot(MasterProcedureEnv env)
throws IOException {
+ TableDescriptor htd =
env.getMasterServices().getTableDescriptors().get(snapshotTable);
+ if (htd == null) {
+ throw new IOException("TableDescriptor missing for " + snapshotTable);
+ }
+ if (htd.getMaxFileSize() == -1 && this.snapshot.getMaxFileSize() > 0) {
+ return
TableDescriptorBuilder.newBuilder(htd).setValue(TableDescriptorBuilder.MAX_FILESIZE,
+ Long.toString(this.snapshot.getMaxFileSize())).build();
+ }
+ return htd;
+ }
+
+ private void preSnapshot(MasterProcedureEnv env) throws IOException {
+
env.getMasterServices().getSnapshotManager().prepareWorkingDirectory(snapshot);
+
+ MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.preSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd);
+ }
+ }
+
+ private void postSnapshot(MasterProcedureEnv env) throws IOException {
+ MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
+ if (cpHost != null) {
+ cpHost.postSnapshot(ProtobufUtil.createSnapshotDesc(snapshot), htd);
+ }
+ }
+
+ private void verifySnapshot(MasterProcedureEnv env) throws IOException {
+ int verifyThreshold = env.getMasterConfiguration()
+ .getInt("hbase.snapshot.remote.verify.threshold", 10000);
+ int numRegions = (int) env.getAssignmentManager()
+ .getTableRegions(snapshotTable, false)
+ .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r)).count();
+
+ if (numRegions >= verifyThreshold) {
+ addChildProcedure(createRemoteVerifyProcedures(env));
+ } else {
+ MasterSnapshotVerifier verifier =
+ new MasterSnapshotVerifier(env.getMasterServices(), snapshot);
+ verifier.verifySnapshot();
+ }
+ }
+
+ private void completeSnapshot(MasterProcedureEnv env) throws IOException {
+ // complete the snapshot, atomically moving from tmp to .snapshot dir.
+ SnapshotDescriptionUtils.completeSnapshot(snapshotDir, workingDir,
+ env.getMasterFileSystem().getFileSystem(), workingDirFS, conf);
+ // update metric. when master restarts, the metric value is wrong
+ metricsSnapshot.addSnapshot(status.getCompletionTimestamp() -
status.getStartTime());
+ if (env.getMasterCoprocessorHost() != null) {
+ env.getMasterCoprocessorHost()
+
.postCompletedSnapshotAction(ProtobufUtil.createSnapshotDesc(snapshot), htd);
+ }
+ status.markComplete("Snapshot " + snapshot.getName() + " completed");
+ }
+
+ private void snapshotOfflineRegions(MasterProcedureEnv env) throws
IOException {
+ List<RegionInfo> regions = env.getAssignmentManager()
+ .getTableRegions(snapshotTable, false).stream()
+ .filter(r -> RegionReplicaUtil.isDefaultReplica(r))
+ .filter(RegionInfo::isSplit).collect(Collectors.toList());
+
+ ThreadPoolExecutor exec = SnapshotManifest
+ .createExecutor(env.getMasterConfiguration(),
"OfflineRegionsSnapshotPool");
+ try {
+ ModifyRegionUtils.editRegions(exec, regions, new
ModifyRegionUtils.RegionEditTask() {
+ @Override
+ public void editRegion(final RegionInfo regionInfo) throws IOException
{
+ snapshotManifest.addRegion(CommonFSUtils.getTableDir(rootDir,
snapshotTable), regionInfo);
+ }
+ });
+ } finally {
+ exec.shutdown();
+ }
+ status.setStatus("Completed referencing offline regions of table: " +
snapshotTable);
+ }
+
+ private void snapshotMobRegion() throws IOException {
+ RegionInfo mobRegionInfo = MobUtils.getMobRegionInfo(htd.getTableName());
+ snapshotManifest.addMobRegion(mobRegionInfo);
+ status.setStatus("Completed referencing HFiles for the mob region of
table: " + snapshotTable);
+ }
+
+ @Override
+ protected void serializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ super.serializeStateData(serializer);
+ serializer.serialize(SnapshotProcedureStateData
+ .newBuilder().setSnapshot(this.snapshot).build());
+ }
+
+ @Override
+ protected void deserializeStateData(ProcedureStateSerializer serializer)
throws IOException {
+ super.deserializeStateData(serializer);
+ SnapshotProcedureStateData data =
serializer.deserialize(SnapshotProcedureStateData.class);
+ this.snapshot = data.getSnapshot();
+ }
+
+ private Procedure<MasterProcedureEnv>[]
createRemoteSnapshotProcedures(MasterProcedureEnv env) {
+ return env.getAssignmentManager().getTableRegions(snapshotTable, true)
+ .stream().filter(r -> RegionReplicaUtil.isDefaultReplica(r))
+ .map(r -> new SnapshotRegionProcedure(snapshot, r))
+ .toArray(SnapshotRegionProcedure[]::new);
+ }
+
+ // here we assign region snapshot manifest verify tasks to all region servers
+ // in cluster with the help of master load balancer.
+ private Procedure<MasterProcedureEnv>[]
createRemoteVerifyProcedures(MasterProcedureEnv env)
+ throws IOException {
+ List<RegionInfo> regions = env
+ .getAssignmentManager().getTableRegions(snapshotTable, false);
+ List<ServerName> servers = env
+ .getMasterServices().getServerManager().getOnlineServersList();
+ return env.getMasterServices().getLoadBalancer()
+ .roundRobinAssignment(regions, servers).entrySet().stream()
+ .map(e -> new SnapshotVerifyProcedure(snapshot, e.getValue(),
e.getKey(), regions.size()))
+ .toArray(SnapshotVerifyProcedure[]::new);
+ }
+
+ @Override
+ public void toStringClassDetails(StringBuilder builder) {
+ builder.append(getClass().getName())
+ .append(", id=").append(getProcId())
+ .append(",
snapshot=").append(ClientSnapshotDescriptionUtils.toString(snapshot));
+ }
+
+ public SnapshotDescription getSnapshotDesc() {
+ return snapshot;
+ }
+
+ @Override
+ protected void afterReplay(MasterProcedureEnv env) {
+ try {
+ prepareSnapshotEnv(env);
+ } catch (IOException e) {
+ LOG.error("Failed replaying {}, mark procedure as failed", this, e);
Review comment:
Is this possible?
##########
File path:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
##########
@@ -1871,48 +1871,55 @@ public void onComplete() {
return failedFuture(e);
}
CompletableFuture<Void> future = new CompletableFuture<>();
- final SnapshotRequest request =
SnapshotRequest.newBuilder().setSnapshot(snapshot).build();
- addListener(this.<Long> newMasterCaller()
- .action((controller, stub) -> this.<SnapshotRequest, SnapshotResponse,
Long> call(controller,
- stub, request, (s, c, req, done) -> s.snapshot(c, req, done),
- resp -> resp.getExpectedTimeout()))
- .call(), (expectedTimeout, err) -> {
+ final SnapshotRequest request =
+
SnapshotRequest.newBuilder().setSnapshot(snapshot).setNonceGroup(ng.getNonceGroup())
+ .setNonce(ng.newNonce()).build();
+ addListener(this.<SnapshotResponse> newMasterCaller()
+ .action((controller, stub) ->
+ this.<SnapshotRequest, SnapshotResponse, SnapshotResponse>
call(controller, stub,
+ request, (s, c, req, done) -> s.snapshot(c, req, done), resp ->
resp))
+ .call(), (resp, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
- TimerTask pollingTask = new TimerTask() {
- int tries = 0;
- long startTime = EnvironmentEdgeManager.currentTime();
- long endTime = startTime + expectedTimeout;
- long maxPauseTime = expectedTimeout / maxAttempts;
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (EnvironmentEdgeManager.currentTime() < endTime) {
- addListener(isSnapshotFinished(snapshotDesc), (done, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (done) {
- future.complete(null);
- } else {
- // retry again after pauseTime.
- long pauseTime =
-
ConnectionUtils.getPauseTime(TimeUnit.NANOSECONDS.toMillis(pauseNs), ++tries);
- pauseTime = Math.min(pauseTime, maxPauseTime);
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(this, pauseTime,
- TimeUnit.MILLISECONDS);
- }
- });
- } else {
- future.completeExceptionally(
- new SnapshotCreationException("Snapshot '" +
snapshot.getName() +
- "' wasn't completed in expectedTime:" + expectedTimeout + "
ms", snapshotDesc));
+ if (resp.hasProcId()) {
+ getProcedureResult(resp.getProcId(), future, 0);
+ addListener(future, new
SnapshotProcedureBiConsumer(snapshotDesc.getTableName()));
+ } else {
+ long expectedTimeout = resp.getExpectedTimeout();
Review comment:
Better extract the code to an separated method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]