http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index c3900dd..ced7abc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -106,12 +107,10 @@ public class CreateTableProcedure setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS); break; case CREATE_TABLE_ASSIGN_REGIONS: - setEnablingState(env, getTableName()); - addChildProcedure(env.getAssignmentManager().createAssignProcedures(newRegions)); + assignRegions(env, getTableName(), newRegions); setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE); break; case CREATE_TABLE_UPDATE_DESC_CACHE: - setEnabledState(env, getTableName()); updateTableDescCache(env, getTableName()); setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION); break; @@ -334,21 +333,21 @@ public class CreateTableProcedure protected static List<HRegionInfo> addTableToMeta(final MasterProcedureEnv env, final HTableDescriptor hTableDescriptor, final List<HRegionInfo> regions) throws IOException { - assert (regions != null && regions.size() > 0) : "expected at least 1 region, got " + regions; + if (regions != null && regions.size() > 0) { + ProcedureSyncWait.waitMetaRegions(env); - ProcedureSyncWait.waitMetaRegions(env); + // Add regions to META + addRegionsToMeta(env, hTableDescriptor, regions); + // Add replicas if needed + List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions); - // Add replicas if needed - List<HRegionInfo> newRegions = addReplicas(env, hTableDescriptor, regions); - - // Add regions to META - addRegionsToMeta(env, hTableDescriptor, newRegions); - - // Setup replication for region replicas if needed - if (hTableDescriptor.getRegionReplication() > 1) { - ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); + // Setup replication for region replicas if needed + if (hTableDescriptor.getRegionReplication() > 1) { + ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration()); + } + return newRegions; } - return newRegions; + return regions; } /** @@ -375,16 +374,18 @@ public class CreateTableProcedure return hRegionInfos; } + protected static void assignRegions(final MasterProcedureEnv env, + final TableName tableName, final List<HRegionInfo> regions) throws IOException { + ProcedureSyncWait.waitRegionServers(env); - protected static void setEnablingState(final MasterProcedureEnv env, final TableName tableName) - throws IOException { // Mark the table as Enabling env.getMasterServices().getTableStateManager() .setTableState(tableName, TableState.State.ENABLING); - } - protected static void setEnabledState(final MasterProcedureEnv env, final TableName tableName) - throws IOException { + // Trigger immediate assignment of the regions in round-robin fashion + final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager(); + ModifyRegionUtils.assignRegions(assignmentManager, regions); + // Enable table env.getMasterServices().getTableStateManager() .setTableState(tableName, TableState.State.ENABLED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java index 78bd715..096172a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; @@ -105,10 +106,7 @@ public class DeleteColumnFamilyProcedure setNextState(DeleteColumnFamilyState.DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS); break; case DELETE_COLUMN_FAMILY_REOPEN_ALL_REGIONS: - if (env.getAssignmentManager().isTableEnabled(getTableName())) { - addChildProcedure(env.getAssignmentManager() - .createReopenProcedures(getRegionInfoList(env))); - } + reOpenAllRegionsIfTableIsOnline(env); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException(this + " unhandled state=" + state); @@ -294,8 +292,7 @@ public class DeleteColumnFamilyProcedure env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor); // Make sure regions are opened after table descriptor is updated. - //reOpenAllRegionsIfTableIsOnline(env); - // TODO: NUKE ROLLBACK!!!! + reOpenAllRegionsIfTableIsOnline(env); } /** @@ -319,6 +316,25 @@ public class DeleteColumnFamilyProcedure } /** + * Last action from the procedure - executed when online schema change is supported. + * @param env MasterProcedureEnv + * @throws IOException + */ + private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException { + // This operation only run when the table is enabled. + if (!env.getMasterServices().getTableStateManager() + .isTableState(getTableName(), TableState.State.ENABLED)) { + return; + } + + if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) { + LOG.info("Completed delete column family operation on table " + getTableName()); + } else { + LOG.warn("Error on reopening the regions on table " + getTableName()); + } + } + + /** * The procedure could be restarted from a different machine. If the variable is null, we need to * retrieve it. * @return traceEnabled @@ -360,8 +376,7 @@ public class DeleteColumnFamilyProcedure private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException { if (regionInfoList == null) { - regionInfoList = env.getAssignmentManager().getRegionStates() - .getRegionsOfTable(getTableName()); + regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); } return regionInfoList; } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index 04dfc60..bda68eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.hadoop.hbase.favored.FavoredNodesManager; +import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.mob.MobConstants; @@ -96,8 +97,8 @@ public class DeleteTableProcedure } // TODO: Move out... in the acquireLock() - LOG.debug("Waiting for '" + getTableName() + "' regions in transition"); - regions = env.getAssignmentManager().getRegionStates().getRegionsOfTable(getTableName()); + LOG.debug("waiting for '" + getTableName() + "' regions in transition"); + regions = ProcedureSyncWait.getRegionsFromMeta(env, getTableName()); assert regions != null && !regions.isEmpty() : "unexpected 0 regions"; ProcedureSyncWait.waitRegionInTransition(env, regions); @@ -349,7 +350,8 @@ public class DeleteTableProcedure final TableName tableName) throws IOException { Connection connection = env.getMasterServices().getConnection(); Scan tableScan = MetaTableAccessor.getScanForTableName(connection, tableName); - try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) { + try (Table metaTable = + connection.getTable(TableName.META_TABLE_NAME)) { List<Delete> deletes = new ArrayList<>(); try (ResultScanner resScanner = metaTable.getScanner(tableScan)) { for (Result result : resScanner) { @@ -383,9 +385,11 @@ public class DeleteTableProcedure protected static void deleteAssignmentState(final MasterProcedureEnv env, final TableName tableName) throws IOException { + final AssignmentManager am = env.getMasterServices().getAssignmentManager(); + // Clean up regions of the table in RegionStates. LOG.debug("Removing '" + tableName + "' from region states."); - env.getMasterServices().getAssignmentManager().deleteTable(tableName); + am.getRegionStates().tableDeleted(tableName); // If entry for this table states, remove it. LOG.debug("Marking '" + tableName + "' as deleted."); http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index 409ca26..b53ce45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -21,9 +21,12 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; +import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; @@ -31,11 +34,17 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.constraint.ConstraintException; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.master.RegionStates; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DisableTableState; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.htrace.Trace; @InterfaceAudience.Private public class DisableTableProcedure @@ -107,8 +116,12 @@ public class DisableTableProcedure setNextState(DisableTableState.DISABLE_TABLE_MARK_REGIONS_OFFLINE); break; case DISABLE_TABLE_MARK_REGIONS_OFFLINE: - addChildProcedure(env.getAssignmentManager().createUnassignProcedures(tableName)); - setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); + if (markRegionsOffline(env, tableName, true) == + MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { + setNextState(DisableTableState.DISABLE_TABLE_SET_DISABLED_TABLE_STATE); + } else { + LOG.trace("Retrying later to disable the missing regions"); + } break; case DISABLE_TABLE_SET_DISABLED_TABLE_STATE: setTableStateToDisabled(env, tableName); @@ -236,7 +249,7 @@ public class DisableTableProcedure // set the state later on). A quick state check should be enough for us to move forward. TableStateManager tsm = env.getMasterServices().getTableStateManager(); TableState.State state = tsm.getTableState(tableName); - if (!state.equals(TableState.State.ENABLED)){ + if(!state.equals(TableState.State.ENABLED)){ LOG.info("Table " + tableName + " isn't enabled;is "+state.name()+"; skipping disable"); setFailure("master-disable-table", new TableNotEnabledException( tableName+" state is "+state.name())); @@ -277,6 +290,83 @@ public class DisableTableProcedure } /** + * Mark regions of the table offline with retries + * @param env MasterProcedureEnv + * @param tableName the target table + * @param retryRequired whether to retry if the first run failed + * @return whether the operation is fully completed or being interrupted. + * @throws IOException + */ + protected static MarkRegionOfflineOpResult markRegionsOffline( + final MasterProcedureEnv env, + final TableName tableName, + final Boolean retryRequired) throws IOException { + // Dev consideration: add a config to control max number of retry. For now, it is hard coded. + int maxTry = (retryRequired ? 10 : 1); + MarkRegionOfflineOpResult operationResult = + MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED; + do { + try { + operationResult = markRegionsOffline(env, tableName); + if (operationResult == MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { + break; + } + maxTry--; + } catch (Exception e) { + LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e); + maxTry--; + if (maxTry > 0) { + continue; // we still have some retry left, try again. + } + throw e; + } + } while (maxTry > 0); + + if (operationResult != MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL) { + LOG.warn("Some or all regions of the Table '" + tableName + "' were still online"); + } + + return operationResult; + } + + /** + * Mark regions of the table offline + * @param env MasterProcedureEnv + * @param tableName the target table + * @return whether the operation is fully completed or being interrupted. + * @throws IOException + */ + private static MarkRegionOfflineOpResult markRegionsOffline( + final MasterProcedureEnv env, + final TableName tableName) throws IOException { + // Get list of online regions that are of this table. Regions that are + // already closed will not be included in this list; i.e. the returned + // list is not ALL regions in a table, its all online regions according + // to the in-memory state on this master. + MarkRegionOfflineOpResult operationResult = + MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_SUCCESSFUL; + final List<HRegionInfo> regions = + env.getMasterServices().getAssignmentManager().getRegionStates() + .getRegionsOfTable(tableName); + if (regions.size() > 0) { + LOG.info("Offlining " + regions.size() + " regions."); + + BulkDisabler bd = new BulkDisabler(env, tableName, regions); + try { + if (!bd.bulkAssign()) { + operationResult = MarkRegionOfflineOpResult.BULK_ASSIGN_REGIONS_FAILED; + } + } catch (InterruptedException e) { + LOG.warn("Disable was interrupted"); + // Preserve the interrupt. + Thread.currentThread().interrupt(); + operationResult = MarkRegionOfflineOpResult.MARK_ALL_REGIONS_OFFLINE_INTERRUPTED; + } + } + return operationResult; + } + + /** * Mark table state to Disabled * @param env MasterProcedureEnv * @throws IOException @@ -338,4 +428,64 @@ public class DisableTableProcedure } } } + + /** + * Run bulk disable. + */ + private static class BulkDisabler extends BulkAssigner { + private final AssignmentManager assignmentManager; + private final List<HRegionInfo> regions; + private final TableName tableName; + private final int waitingTimeForEvents; + + public BulkDisabler(final MasterProcedureEnv env, final TableName tableName, + final List<HRegionInfo> regions) { + super(env.getMasterServices()); + this.assignmentManager = env.getMasterServices().getAssignmentManager(); + this.tableName = tableName; + this.regions = regions; + this.waitingTimeForEvents = + env.getMasterServices().getConfiguration() + .getInt("hbase.master.event.waiting.time", 1000); + } + + @Override + protected void populatePool(ExecutorService pool) { + RegionStates regionStates = assignmentManager.getRegionStates(); + for (final HRegionInfo region : regions) { + if (regionStates.isRegionInTransition(region) + && !regionStates.isRegionInState(region, RegionState.State.FAILED_CLOSE)) { + continue; + } + pool.execute(Trace.wrap("DisableTableHandler.BulkDisabler", new Runnable() { + @Override + public void run() { + assignmentManager.unassign(region); + } + })); + } + } + + @Override + protected boolean waitUntilDone(long timeout) throws InterruptedException { + long startTime = EnvironmentEdgeManager.currentTime(); + long remaining = timeout; + List<HRegionInfo> regions = null; + long lastLogTime = startTime; + while (!server.isStopped() && remaining > 0) { + Thread.sleep(waitingTimeForEvents); + regions = assignmentManager.getRegionStates().getRegionsOfTable(tableName); + long now = EnvironmentEdgeManager.currentTime(); + // Don't log more than once every ten seconds. Its obnoxious. And only log table regions + // if we are waiting a while for them to go down... + if (LOG.isDebugEnabled() && ((now - lastLogTime) > 10000)) { + lastLogTime = now; + LOG.debug("Disable waiting until done; " + remaining + " ms remaining; " + regions); + } + if (regions.isEmpty()) break; + remaining = timeout - (now - startTime); + } + return regions != null && regions.isEmpty(); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java deleted file mode 100644 index 15ed429..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DispatchMergingRegionsProcedure.java +++ /dev/null @@ -1,584 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.master.procedure; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.RegionLoad; -import org.apache.hadoop.hbase.ServerLoad; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.UnknownRegionException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.exceptions.MergeRegionException; -import org.apache.hadoop.hbase.exceptions.RegionOpeningException; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; -import org.apache.hadoop.hbase.master.assignment.RegionStates; -import org.apache.hadoop.hbase.master.CatalogJanitor; -import org.apache.hadoop.hbase.master.RegionPlan; -import org.apache.hadoop.hbase.master.RegionState; -import org.apache.hadoop.hbase.master.ServerManager; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.DispatchMergingRegionsState; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; - -/** - * The procedure to Merge a region in a table. - */ [email protected] -public class DispatchMergingRegionsProcedure - extends AbstractStateMachineTableProcedure<DispatchMergingRegionsState> { - private static final Log LOG = LogFactory.getLog(DispatchMergingRegionsProcedure.class); - - private final AtomicBoolean aborted = new AtomicBoolean(false); - private Boolean traceEnabled; - private AssignmentManager assignmentManager; - private int timeout; - private ServerName regionLocation; - private String regionsToMergeListFullName; - private String regionsToMergeListEncodedName; - - private TableName tableName; - private HRegionInfo [] regionsToMerge; - private boolean forcible; - - public DispatchMergingRegionsProcedure() { - this.traceEnabled = isTraceEnabled(); - this.assignmentManager = null; - this.timeout = -1; - this.regionLocation = null; - this.regionsToMergeListFullName = null; - this.regionsToMergeListEncodedName = null; - } - - public DispatchMergingRegionsProcedure( - final MasterProcedureEnv env, - final TableName tableName, - final HRegionInfo [] regionsToMerge, - final boolean forcible) { - super(env); - this.traceEnabled = isTraceEnabled(); - this.assignmentManager = getAssignmentManager(env); - this.tableName = tableName; - // For now, we only merge 2 regions. It could be extended to more than 2 regions in - // the future. - assert(regionsToMerge.length == 2); - this.regionsToMerge = regionsToMerge; - this.forcible = forcible; - - this.timeout = -1; - this.regionsToMergeListFullName = getRegionsToMergeListFullNameString(); - this.regionsToMergeListEncodedName = getRegionsToMergeListEncodedNameString(); - } - - @Override - protected Flow executeFromState( - final MasterProcedureEnv env, - final DispatchMergingRegionsState state) throws InterruptedException { - if (isTraceEnabled()) { - LOG.trace(this + " execute state=" + state); - } - - try { - switch (state) { - case DISPATCH_MERGING_REGIONS_PREPARE: - prepareMergeRegion(env); - setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PRE_OPERATION); - break; - case DISPATCH_MERGING_REGIONS_PRE_OPERATION: - //Unused for now - reserve to add preMerge coprocessor in the future - setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS); - break; - case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS: - if (MoveRegionsToSameRS(env)) { - setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS); - } else { - LOG.info("Cancel merging regions " + getRegionsToMergeListFullNameString() - + ", because can't move them to the same RS"); - setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION); - } - break; - case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: - doMergeInRS(env); - setNextState(DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_POST_OPERATION); - break; - case DISPATCH_MERGING_REGIONS_POST_OPERATION: - //Unused for now - reserve to add postCompletedMerge coprocessor in the future - return Flow.NO_MORE_STATE; - default: - throw new UnsupportedOperationException(this + " unhandled state=" + state); - } - } catch (IOException e) { - LOG.warn("Error trying to merge regions " + getRegionsToMergeListFullNameString() + - " in the table " + tableName + " (in state=" + state + ")", e); - - setFailure("master-merge-regions", e); - } - return Flow.HAS_MORE_STATE; - } - - @Override - protected void rollbackState( - final MasterProcedureEnv env, - final DispatchMergingRegionsState state) throws IOException, InterruptedException { - if (isTraceEnabled()) { - LOG.trace(this + " rollback state=" + state); - } - - try { - switch (state) { - case DISPATCH_MERGING_REGIONS_POST_OPERATION: - case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: - String msg = this + " We are in the " + state + " state." - + " It is complicated to rollback the merge operation that region server is working on." - + " Rollback is not supported and we should let the merge operation to complete"; - LOG.warn(msg); - // PONR - throw new UnsupportedOperationException(this + " unhandled state=" + state); - case DISPATCH_MERGING_REGIONS_MOVE_REGION_TO_SAME_RS: - break; // nothing to rollback - case DISPATCH_MERGING_REGIONS_PRE_OPERATION: - break; // nothing to rollback - case DISPATCH_MERGING_REGIONS_PREPARE: - break; // nothing to rollback - default: - throw new UnsupportedOperationException(this + " unhandled state=" + state); - } - } catch (Exception e) { - // This will be retried. Unless there is a bug in the code, - // this should be just a "temporary error" (e.g. network down) - LOG.warn("Failed rollback attempt step " + state + " for merging the regions " - + getRegionsToMergeListFullNameString() + " in table " + tableName, e); - throw e; - } - } - - @Override - protected DispatchMergingRegionsState getState(final int stateId) { - return DispatchMergingRegionsState.valueOf(stateId); - } - - @Override - protected int getStateId(final DispatchMergingRegionsState state) { - return state.getNumber(); - } - - @Override - protected DispatchMergingRegionsState getInitialState() { - return DispatchMergingRegionsState.DISPATCH_MERGING_REGIONS_PREPARE; - } - - /* - * Check whether we are in the state that can be rollback - */ - @Override - protected boolean isRollbackSupported(final DispatchMergingRegionsState state) { - switch (state) { - case DISPATCH_MERGING_REGIONS_POST_OPERATION: - case DISPATCH_MERGING_REGIONS_DO_MERGE_IN_RS: - // It is not safe to rollback if we reach to these states. - return false; - default: - break; - } - return true; - } - - @Override - public void serializeStateData(final OutputStream stream) throws IOException { - super.serializeStateData(stream); - - MasterProcedureProtos.DispatchMergingRegionsStateData.Builder dispatchMergingRegionsMsg = - MasterProcedureProtos.DispatchMergingRegionsStateData.newBuilder() - .setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser())) - .setTableName(ProtobufUtil.toProtoTableName(tableName)) - .setForcible(forcible); - for (HRegionInfo hri: regionsToMerge) { - dispatchMergingRegionsMsg.addRegionInfo(HRegionInfo.convert(hri)); - } - dispatchMergingRegionsMsg.build().writeDelimitedTo(stream); - } - - @Override - public void deserializeStateData(final InputStream stream) throws IOException { - super.deserializeStateData(stream); - - MasterProcedureProtos.DispatchMergingRegionsStateData dispatchMergingRegionsMsg = - MasterProcedureProtos.DispatchMergingRegionsStateData.parseDelimitedFrom(stream); - setUser(MasterProcedureUtil.toUserInfo(dispatchMergingRegionsMsg.getUserInfo())); - tableName = ProtobufUtil.toTableName(dispatchMergingRegionsMsg.getTableName()); - - assert(dispatchMergingRegionsMsg.getRegionInfoCount() == 2); - regionsToMerge = new HRegionInfo[dispatchMergingRegionsMsg.getRegionInfoCount()]; - for (int i = 0; i < regionsToMerge.length; i++) { - regionsToMerge[i] = HRegionInfo.convert(dispatchMergingRegionsMsg.getRegionInfo(i)); - } - } - - @Override - public void toStringClassDetails(StringBuilder sb) { - sb.append(getClass().getSimpleName()); - sb.append(" (table="); - sb.append(tableName); - sb.append(" regions="); - sb.append(getRegionsToMergeListFullNameString()); - sb.append(" forcible="); - sb.append(forcible); - sb.append(")"); - } - - @Override - protected LockState acquireLock(final MasterProcedureEnv env) { - if (!getTableName().isSystemTable() && env.waitInitialized(this)) { - return LockState.LOCK_EVENT_WAIT; - } - if (env.getProcedureScheduler().waitRegions(this, getTableName(), regionsToMerge)) { - return LockState.LOCK_EVENT_WAIT; - } - return LockState.LOCK_ACQUIRED; - } - - @Override - protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureScheduler().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]); - } - - @Override - public TableName getTableName() { - return tableName; - } - - @Override - public TableOperationType getTableOperationType() { - return TableOperationType.REGION_MERGE; - } - - /** - * Prepare merge and do some check - * @param env MasterProcedureEnv - * @throws IOException - */ - private void prepareMergeRegion(final MasterProcedureEnv env) throws IOException { - // Note: the following logic assumes that we only have 2 regions to merge. In the future, - // if we want to extend to more than 2 regions, the code needs to modify a little bit. - // - CatalogJanitor catalogJanitor = env.getMasterServices().getCatalogJanitor(); - boolean regionAHasMergeQualifier = !catalogJanitor.cleanMergeQualifier(regionsToMerge[0]); - if (regionAHasMergeQualifier - || !catalogJanitor.cleanMergeQualifier(regionsToMerge[1])) { - String msg = "Skip merging regions " + regionsToMerge[0].getRegionNameAsString() - + ", " + regionsToMerge[1].getRegionNameAsString() + ", because region " - + (regionAHasMergeQualifier ? regionsToMerge[0].getEncodedName() : regionsToMerge[1] - .getEncodedName()) + " has merge qualifier"; - LOG.info(msg); - throw new MergeRegionException(msg); - } - - RegionStates regionStates = getAssignmentManager(env).getRegionStates(); - RegionState regionStateA = regionStates.getRegionState(regionsToMerge[0].getEncodedName()); - RegionState regionStateB = regionStates.getRegionState(regionsToMerge[1].getEncodedName()); - if (regionStateA == null || regionStateB == null) { - throw new UnknownRegionException( - regionStateA == null ? - regionsToMerge[0].getEncodedName() : regionsToMerge[1].getEncodedName()); - } - - if (!regionStateA.isOpened() || !regionStateB.isOpened()) { - throw new MergeRegionException( - "Unable to merge regions not online " + regionStateA + ", " + regionStateB); - } - - if (regionsToMerge[0].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID || - regionsToMerge[1].getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) { - throw new MergeRegionException("Can't merge non-default replicas"); - } - - if (!forcible && !HRegionInfo.areAdjacent(regionsToMerge[0], regionsToMerge[1])) { - throw new MergeRegionException( - "Unable to merge not adjacent regions " - + regionsToMerge[0].getRegionNameAsString() + ", " - + regionsToMerge[1].getRegionNameAsString() - + " where forcible = " + forcible); - } - } - - /** - * Move all regions to the same region server - * @param env MasterProcedureEnv - * @return whether target regions hosted by the same RS - * @throws IOException - */ - private boolean MoveRegionsToSameRS(final MasterProcedureEnv env) throws IOException { - // Make sure regions are on the same regionserver before send merge - // regions request to region server. - // - boolean onSameRS = isRegionsOnTheSameServer(env); - if (!onSameRS) { - // Note: the following logic assumes that we only have 2 regions to merge. In the future, - // if we want to extend to more than 2 regions, the code needs to modify a little bit. - // - RegionStates regionStates = getAssignmentManager(env).getRegionStates(); - ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); - - RegionLoad loadOfRegionA = getRegionLoad(env, regionLocation, regionsToMerge[0]); - RegionLoad loadOfRegionB = getRegionLoad(env, regionLocation2, regionsToMerge[1]); - if (loadOfRegionA != null && loadOfRegionB != null - && loadOfRegionA.getRequestsCount() < loadOfRegionB.getRequestsCount()) { - // switch regionsToMerge[0] and regionsToMerge[1] - HRegionInfo tmpRegion = this.regionsToMerge[0]; - this.regionsToMerge[0] = this.regionsToMerge[1]; - this.regionsToMerge[1] = tmpRegion; - ServerName tmpLocation = regionLocation; - regionLocation = regionLocation2; - regionLocation2 = tmpLocation; - } - - long startTime = EnvironmentEdgeManager.currentTime(); - - RegionPlan regionPlan = new RegionPlan(regionsToMerge[1], regionLocation2, regionLocation); - LOG.info("Moving regions to same server for merge: " + regionPlan.toString()); - getAssignmentManager(env).moveAsync(regionPlan); - do { - try { - Thread.sleep(20); - // Make sure check RIT first, then get region location, otherwise - // we would make a wrong result if region is online between getting - // region location and checking RIT - boolean isRIT = regionStates.isRegionInTransition(regionsToMerge[1]); - regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[1]); - onSameRS = regionLocation.equals(regionLocation2); - if (onSameRS || !isRIT) { - // Regions are on the same RS, or regionsToMerge[1] is not in - // RegionInTransition any more - break; - } - } catch (InterruptedException e) { - InterruptedIOException iioe = new InterruptedIOException(); - iioe.initCause(e); - throw iioe; - } - } while ((EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env)); - } - return onSameRS; - } - - /** - * Do the real merge operation in the region server that hosts regions - * @param env MasterProcedureEnv - * @throws IOException - */ - private void doMergeInRS(final MasterProcedureEnv env) throws IOException { - long duration = 0; - long startTime = EnvironmentEdgeManager.currentTime(); - do { - try { - if (getServerName(env) == null) { - // The merge probably already happen. Check - RegionState regionState = getAssignmentManager(env).getRegionStates().getRegionState( - regionsToMerge[0].getEncodedName()); - if (regionState.isMerging() || regionState.isMerged()) { - LOG.info("Merge regions " + getRegionsToMergeListEncodedNameString() + - " is in progress or completed. No need to send a new request."); - } else { - LOG.warn("Cannot sending merge to hosting server of the regions " + - getRegionsToMergeListEncodedNameString() + " as the server is unknown"); - } - return; - } - // TODO: the following RPC call is not idempotent. Multiple calls (eg. after master - // failover, re-execute this step) could result in some exception thrown that does not - // paint the correct picture. This behavior is on-par with old releases. Improvement - // could happen in the future. - env.getMasterServices().getServerManager().sendRegionsMerge( - getServerName(env), - regionsToMerge[0], - regionsToMerge[1], - forcible, - getUser()); - LOG.info("Sent merge to server " + getServerName(env) + " for region " + - getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible); - return; - } catch (RegionOpeningException roe) { - // Do a retry since region should be online on RS immediately - LOG.warn("Failed mergering regions in " + getServerName(env) + ", retrying...", roe); - } catch (Exception ie) { - LOG.warn("Failed sending merge to " + getServerName(env) + " for regions " + - getRegionsToMergeListEncodedNameString() + ", forcible=" + forcible, ie); - return; - } - } while ((duration = EnvironmentEdgeManager.currentTime() - startTime) <= getTimeout(env)); - - // If we reaches here, it means that we get timed out. - String msg = "Failed sending merge to " + getServerName(env) + " after " + duration + "ms"; - LOG.warn(msg); - throw new IOException(msg); - } - - private RegionLoad getRegionLoad( - final MasterProcedureEnv env, - final ServerName sn, - final HRegionInfo hri) { - ServerManager serverManager = env.getMasterServices().getServerManager(); - ServerLoad load = serverManager.getLoad(sn); - if (load != null) { - Map<byte[], RegionLoad> regionsLoad = load.getRegionsLoad(); - if (regionsLoad != null) { - return regionsLoad.get(hri.getRegionName()); - } - } - return null; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param env MasterProcedureEnv - * @return whether target regions hosted by the same RS - */ - private boolean isRegionsOnTheSameServer(final MasterProcedureEnv env) throws IOException{ - Boolean onSameRS = true; - int i = 0; - RegionStates regionStates = getAssignmentManager(env).getRegionStates(); - regionLocation = regionStates.getRegionServerOfRegion(regionsToMerge[i]); - if (regionLocation != null) { - for(i = 1; i < regionsToMerge.length; i++) { - ServerName regionLocation2 = regionStates.getRegionServerOfRegion(regionsToMerge[i]); - if (regionLocation2 != null) { - if (onSameRS) { - onSameRS = regionLocation.equals(regionLocation2); - } - } else { - // At least one region is not online, merge will fail, no need to continue. - break; - } - } - if (i == regionsToMerge.length) { - // Finish checking all regions, return the result; - return onSameRS; - } - } - - // If reaching here, at least one region is not online. - String msg = "Skip merging regions " + getRegionsToMergeListFullNameString() + - ", because region " + regionsToMerge[i].getEncodedName() + " is not online now."; - LOG.warn(msg); - throw new IOException(msg); - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param env MasterProcedureEnv - * @return assignmentManager - */ - private AssignmentManager getAssignmentManager(final MasterProcedureEnv env) { - if (assignmentManager == null) { - assignmentManager = env.getMasterServices().getAssignmentManager(); - } - return assignmentManager; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param env MasterProcedureEnv - * @return timeout value - */ - private int getTimeout(final MasterProcedureEnv env) { - if (timeout == -1) { - timeout = env.getMasterConfiguration().getInt( - "hbase.master.regionmerge.timeout", regionsToMerge.length * 60 * 1000); - } - return timeout; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param env MasterProcedureEnv - * @return serverName - */ - private ServerName getServerName(final MasterProcedureEnv env) { - if (regionLocation == null) { - regionLocation = - getAssignmentManager(env).getRegionStates().getRegionServerOfRegion(regionsToMerge[0]); - } - return regionLocation; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @param fullName whether return only encoded name - * @return region names in a list - */ - private String getRegionsToMergeListFullNameString() { - if (regionsToMergeListFullName == null) { - StringBuilder sb = new StringBuilder("["); - int i = 0; - while(i < regionsToMerge.length - 1) { - sb.append(regionsToMerge[i].getRegionNameAsString() + ", "); - i++; - } - sb.append(regionsToMerge[i].getRegionNameAsString() + " ]"); - regionsToMergeListFullName = sb.toString(); - } - return regionsToMergeListFullName; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @return encoded region names - */ - private String getRegionsToMergeListEncodedNameString() { - if (regionsToMergeListEncodedName == null) { - StringBuilder sb = new StringBuilder("["); - int i = 0; - while(i < regionsToMerge.length - 1) { - sb.append(regionsToMerge[i].getEncodedName() + ", "); - i++; - } - sb.append(regionsToMerge[i].getEncodedName() + " ]"); - regionsToMergeListEncodedName = sb.toString(); - } - return regionsToMergeListEncodedName; - } - - /** - * The procedure could be restarted from a different machine. If the variable is null, we need to - * retrieve it. - * @return traceEnabled - */ - private Boolean isTraceEnabled() { - if (traceEnabled == null) { - traceEnabled = LOG.isTraceEnabled(); - } - return traceEnabled; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java index 4f4b5b1..4d67edd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java @@ -21,20 +21,34 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.BulkAssigner; +import org.apache.hadoop.hbase.master.GeneralBulkAssigner; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.EnableTableState; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @InterfaceAudience.Private public class EnableTableProcedure @@ -100,7 +114,7 @@ public class EnableTableProcedure setNextState(EnableTableState.ENABLE_TABLE_MARK_REGIONS_ONLINE); break; case ENABLE_TABLE_MARK_REGIONS_ONLINE: - addChildProcedure(env.getAssignmentManager().createAssignProcedures(tableName)); + markRegionsOnline(env, tableName, true); setNextState(EnableTableState.ENABLE_TABLE_SET_ENABLED_TABLE_STATE); break; case ENABLE_TABLE_SET_ENABLED_TABLE_STATE: @@ -273,6 +287,137 @@ public class EnableTableProcedure } /** + * Mark offline regions of the table online with retry + * @param env MasterProcedureEnv + * @param tableName the target table + * @param retryRequired whether to retry if the first run failed + * @throws IOException + */ + protected static void markRegionsOnline( + final MasterProcedureEnv env, + final TableName tableName, + final Boolean retryRequired) throws IOException { + // This is best effort approach to make all regions of a table online. If we fail to do + // that, it is ok that the table has some offline regions; user can fix it manually. + + // Dev consideration: add a config to control max number of retry. For now, it is hard coded. + int maxTry = (retryRequired ? 10 : 1); + boolean done = false; + + do { + try { + done = markRegionsOnline(env, tableName); + if (done) { + break; + } + maxTry--; + } catch (Exception e) { + LOG.warn("Received exception while marking regions online. tries left: " + maxTry, e); + maxTry--; + if (maxTry > 0) { + continue; // we still have some retry left, try again. + } + throw e; + } + } while (maxTry > 0); + + if (!done) { + LOG.warn("Some or all regions of the Table '" + tableName + "' were offline"); + } + } + + /** + * Mark offline regions of the table online + * @param env MasterProcedureEnv + * @param tableName the target table + * @return whether the operation is fully completed or being interrupted. + * @throws IOException + */ + private static boolean markRegionsOnline(final MasterProcedureEnv env, final TableName tableName) + throws IOException { + final AssignmentManager assignmentManager = env.getMasterServices().getAssignmentManager(); + final MasterServices masterServices = env.getMasterServices(); + final ServerManager serverManager = masterServices.getServerManager(); + boolean done = false; + // Get the regions of this table. We're done when all listed + // tables are onlined. + List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations; + + if (TableName.META_TABLE_NAME.equals(tableName)) { + tableRegionsAndLocations = + new MetaTableLocator().getMetaRegionsAndLocations(masterServices.getZooKeeper()); + } else { + tableRegionsAndLocations = + MetaTableAccessor.getTableRegionsAndLocations(masterServices.getConnection(), tableName); + } + + int countOfRegionsInTable = tableRegionsAndLocations.size(); + Map<HRegionInfo, ServerName> regionsToAssign = + regionsToAssignWithServerName(env, tableRegionsAndLocations); + + // need to potentially create some regions for the replicas + List<HRegionInfo> unrecordedReplicas = + AssignmentManager.replicaRegionsNotRecordedInMeta(new HashSet<>( + regionsToAssign.keySet()), masterServices); + Map<ServerName, List<HRegionInfo>> srvToUnassignedRegs = + assignmentManager.getBalancer().roundRobinAssignment(unrecordedReplicas, + serverManager.getOnlineServersList()); + if (srvToUnassignedRegs != null) { + for (Map.Entry<ServerName, List<HRegionInfo>> entry : srvToUnassignedRegs.entrySet()) { + for (HRegionInfo h : entry.getValue()) { + regionsToAssign.put(h, entry.getKey()); + } + } + } + + int offlineRegionsCount = regionsToAssign.size(); + + LOG.info("Table '" + tableName + "' has " + countOfRegionsInTable + " regions, of which " + + offlineRegionsCount + " are offline."); + if (offlineRegionsCount == 0) { + return true; + } + + List<ServerName> onlineServers = serverManager.createDestinationServersList(); + Map<ServerName, List<HRegionInfo>> bulkPlan = + env.getMasterServices().getAssignmentManager().getBalancer() + .retainAssignment(regionsToAssign, onlineServers); + if (bulkPlan != null) { + LOG.info("Bulk assigning " + offlineRegionsCount + " region(s) across " + bulkPlan.size() + + " server(s), retainAssignment=true"); + + BulkAssigner ba = new GeneralBulkAssigner(masterServices, bulkPlan, assignmentManager, true); + try { + if (ba.bulkAssign()) { + done = true; + } + } catch (InterruptedException e) { + LOG.warn("Enable operation was interrupted when enabling table '" + tableName + "'"); + // Preserve the interrupt. + Thread.currentThread().interrupt(); + } + } else { + LOG.info("Balancer was unable to find suitable servers for table " + tableName + + ", leaving unassigned"); + } + return done; + } + + /** + * Mark regions of the table offline during recovery + * @param env MasterProcedureEnv + */ + private void markRegionsOfflineDuringRecovery(final MasterProcedureEnv env) { + try { + // This is a best effort attempt. We will move on even it does not succeed. We will retry + // several times until we giving up. + DisableTableProcedure.markRegionsOffline(env, tableName, true); + } catch (Exception e) { + LOG.debug("Failed to offline all regions of table " + tableName + ". Ignoring", e); + } + } + + /** * Mark table state to Enabled * @param env MasterProcedureEnv * @throws IOException @@ -312,6 +457,31 @@ public class EnableTableProcedure } /** + * @param regionsInMeta + * @return List of regions neither in transition nor assigned. + * @throws IOException + */ + private static Map<HRegionInfo, ServerName> regionsToAssignWithServerName( + final MasterProcedureEnv env, + final List<Pair<HRegionInfo, ServerName>> regionsInMeta) throws IOException { + Map<HRegionInfo, ServerName> regionsToAssign = new HashMap<>(regionsInMeta.size()); + RegionStates regionStates = env.getMasterServices().getAssignmentManager().getRegionStates(); + for (Pair<HRegionInfo, ServerName> regionLocation : regionsInMeta) { + HRegionInfo hri = regionLocation.getFirst(); + ServerName sn = regionLocation.getSecond(); + if (regionStates.isRegionOffline(hri)) { + regionsToAssign.put(hri, sn); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping assign for the region " + hri + " during enable table " + + hri.getTable() + " because its already in tranition or assigned."); + } + } + } + return regionsToAssign; + } + + /** * Coprocessor Action. * @param env MasterProcedureEnv * @param state the procedure state http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java index 31d05a7..4b9a7ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterDDLOperationHelper.java @@ -19,19 +19,32 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.util.Bytes; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * Helper class for schema change procedures */ @@ -47,13 +60,16 @@ public final class MasterDDLOperationHelper { public static void deleteColumnFamilyFromFileSystem( final MasterProcedureEnv env, final TableName tableName, - final List<HRegionInfo> regionInfoList, + List<HRegionInfo> regionInfoList, final byte[] familyName, - final boolean hasMob) throws IOException { + boolean hasMob) throws IOException { final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem(); if (LOG.isDebugEnabled()) { LOG.debug("Removing family=" + Bytes.toString(familyName) + " from table=" + tableName); } + if (regionInfoList == null) { + regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName); + } for (HRegionInfo hri : regionInfoList) { // Delete the family directory in FS for all the regions one by one mfs.deleteFamilyFromFS(hri, familyName); @@ -65,4 +81,77 @@ public final class MasterDDLOperationHelper { mfs.deleteFamilyFromFS(mobRootDir, mobRegionInfo, familyName); } } + + /** + * Reopen all regions from a table after a schema change operation. + **/ + public static boolean reOpenAllRegions( + final MasterProcedureEnv env, + final TableName tableName, + final List<HRegionInfo> regionInfoList) throws IOException { + boolean done = false; + LOG.info("Bucketing regions by region server..."); + List<HRegionLocation> regionLocations = null; + Connection connection = env.getMasterServices().getConnection(); + try (RegionLocator locator = connection.getRegionLocator(tableName)) { + regionLocations = locator.getAllRegionLocations(); + } + // Convert List<HRegionLocation> to Map<HRegionInfo, ServerName>. + NavigableMap<HRegionInfo, ServerName> hri2Sn = new TreeMap<>(); + for (HRegionLocation location : regionLocations) { + hri2Sn.put(location.getRegionInfo(), location.getServerName()); + } + TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps.newTreeMap(); + List<HRegionInfo> reRegions = new ArrayList<>(); + for (HRegionInfo hri : regionInfoList) { + ServerName sn = hri2Sn.get(hri); + // Skip the offlined split parent region + // See HBASE-4578 for more information. + if (null == sn) { + LOG.info("Skip " + hri); + continue; + } + if (!serverToRegions.containsKey(sn)) { + LinkedList<HRegionInfo> hriList = Lists.newLinkedList(); + serverToRegions.put(sn, hriList); + } + reRegions.add(hri); + serverToRegions.get(sn).add(hri); + } + + LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size() + + " region servers."); + AssignmentManager am = env.getMasterServices().getAssignmentManager(); + am.setRegionsToReopen(reRegions); + BulkReOpen bulkReopen = new BulkReOpen(env.getMasterServices(), serverToRegions, am); + while (true) { + try { + if (bulkReopen.bulkReOpen()) { + done = true; + break; + } else { + LOG.warn("Timeout before reopening all regions"); + } + } catch (InterruptedException e) { + LOG.warn("Reopen was interrupted"); + // Preserve the interrupt. + Thread.currentThread().interrupt(); + break; + } + } + return done; + } + + /** + * Get the region info list of a table from meta if it is not already known by the caller. + **/ + public static List<HRegionInfo> getRegionInfoList( + final MasterProcedureEnv env, + final TableName tableName, + List<HRegionInfo> regionInfoList) throws IOException { + if (regionInfoList == null) { + regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName); + } + return regionInfoList; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java index f815bea..c21137d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java @@ -29,7 +29,7 @@ public final class MasterProcedureConstants { /** Number of threads used by the procedure executor */ public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; - public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16; + public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 4; /** * Procedure replay sanity check. In case a WAL is missing or unreadable we http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 0f1c40f..2cd5b08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; @@ -94,19 +93,12 @@ public class MasterProcedureEnv implements ConfigurationObserver { } } - private final RSProcedureDispatcher remoteDispatcher; private final MasterProcedureScheduler procSched; private final MasterServices master; public MasterProcedureEnv(final MasterServices master) { - this(master, new RSProcedureDispatcher(master)); - } - - public MasterProcedureEnv(final MasterServices master, - final RSProcedureDispatcher remoteDispatcher) { this.master = master; this.procSched = new MasterProcedureScheduler(master.getConfiguration()); - this.remoteDispatcher = remoteDispatcher; } public User getRequestUser() { @@ -125,10 +117,6 @@ public class MasterProcedureEnv implements ConfigurationObserver { return master.getConfiguration(); } - public AssignmentManager getAssignmentManager() { - return master.getAssignmentManager(); - } - public MasterCoprocessorHost getMasterCoprocessorHost() { return master.getMasterCoprocessorHost(); } @@ -137,12 +125,7 @@ public class MasterProcedureEnv implements ConfigurationObserver { return procSched; } - public RSProcedureDispatcher getRemoteDispatcher() { - return remoteDispatcher; - } - public boolean isRunning() { - if (this.master == null || this.master.getMasterProcedureExecutor() == null) return false; return master.getMasterProcedureExecutor().isRunning(); } @@ -151,18 +134,11 @@ public class MasterProcedureEnv implements ConfigurationObserver { } public boolean waitInitialized(Procedure proc) { - return procSched.waitEvent(master.getInitializedEvent(), proc); + return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc); } public boolean waitServerCrashProcessingEnabled(Procedure proc) { - if (master instanceof HMaster) { - return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); - } - return false; - } - - public boolean waitFailoverCleanup(Procedure proc) { - return procSched.waitEvent(master.getAssignmentManager().getFailoverCleanupEvent(), proc); + return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); } public void setEventReady(ProcedureEvent event, boolean isReady) { @@ -177,4 +153,4 @@ public class MasterProcedureEnv implements ConfigurationObserver { public void onConfigurationChange(Configuration conf) { master.getMasterProcedureExecutor().refreshConfiguration(conf); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 1410748..15b557a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -598,13 +598,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return false; // region operations are using the shared-lock on the table // and then they will grab an xlock on the region. - case REGION_SPLIT: - case REGION_MERGE: - case REGION_ASSIGN: - case REGION_UNASSIGN: + case SPLIT: + case MERGE: + case ASSIGN: + case UNASSIGN: case REGION_EDIT: - case REGION_GC: - case MERGED_REGIONS_GC: return false; default: break; @@ -817,11 +815,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { boolean hasLock = true; final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { - LOG.info(procedure + " " + table + " " + regionInfo[i].getRegionNameAsString()); - assert table != null; - assert regionInfo[i] != null; - assert regionInfo[i].getTable() != null; - assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure; + assert regionInfo[i].getTable().equals(table); assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; regionLocks[i] = locking.getRegionLock(regionInfo[i].getEncodedName()); @@ -1260,12 +1254,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { */ @VisibleForTesting public String dumpLocks() throws IOException { - schedLock(); - try { - // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter - return this.locking.toString(); - } finally { - schedUnlock(); - } + // TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter + return this.locking.toString(); } }
