http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java new file mode 100644 index 0000000..903dbd3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ProcedureSyncWait.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateException; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.RegionStates; +import org.apache.hadoop.hbase.master.RegionState.State; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureResult; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; + +/** + * Helper to synchronously wait on conditions. + * This will be removed in the future (mainly when the AssignmentManager will be + * replaced with a Procedure version) by using ProcedureYieldException, + * and the queue will handle waiting and scheduling based on events. + */ [email protected] [email protected] +public final class ProcedureSyncWait { + private static final Log LOG = LogFactory.getLog(ProcedureSyncWait.class); + + private ProcedureSyncWait() {} + + @InterfaceAudience.Private + public interface Predicate<T> { + T evaluate() throws IOException; + } + + public static byte[] submitAndWaitProcedure(ProcedureExecutor<MasterProcedureEnv> procExec, + final Procedure proc) throws IOException { + long procId = procExec.submitProcedure(proc); + return waitForProcedureToComplete(procExec, procId); + } + + public static byte[] waitForProcedureToComplete(ProcedureExecutor<MasterProcedureEnv> procExec, + final long procId) throws IOException { + while (!procExec.isFinished(procId) && procExec.isRunning()) { + // TODO: add a config to make it tunable + // Dev Consideration: are we waiting forever, or we can set up some timeout value? + Threads.sleepWithoutInterrupt(250); + } + ProcedureResult result = procExec.getResult(procId); + if (result != null) { + if (result.isFailed()) { + // If the procedure fails, we should always have an exception captured. Throw it. + throw result.getException().unwrapRemoteException(); + } + return result.getResult(); + } else { + if (procExec.isRunning()) { + throw new IOException("Procedure " + procId + "not found"); + } else { + throw new IOException("The Master is Aborting"); + } + } + } + + public static <T> T waitFor(MasterProcedureEnv env, String purpose, Predicate<T> predicate) + throws IOException { + final Configuration conf = env.getMasterConfiguration(); + final long waitTime = conf.getLong("hbase.master.wait.on.region", 5 * 60 * 1000); + final long waitingTimeForEvents = conf.getInt("hbase.master.event.waiting.time", 1000); + return waitFor(env, waitTime, waitingTimeForEvents, purpose, predicate); + } + + public static <T> T waitFor(MasterProcedureEnv env, long waitTime, long waitingTimeForEvents, + String purpose, Predicate<T> predicate) throws IOException { + final long done = EnvironmentEdgeManager.currentTime() + waitTime; + do { + T result = predicate.evaluate(); + if (result != null && !result.equals(Boolean.FALSE)) { + return result; + } + try { + Thread.sleep(waitingTimeForEvents); + } catch (InterruptedException e) { + LOG.warn("Interrupted while sleeping, waiting on " + purpose); + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + LOG.debug("Waiting on " + purpose); + } while (EnvironmentEdgeManager.currentTime() < done && env.isRunning()); + + throw new TimeoutIOException("Timed out while waiting on " + purpose); + } + + protected static void waitMetaRegions(final MasterProcedureEnv env) throws IOException { + int timeout = env.getMasterConfiguration().getInt("hbase.client.catalog.timeout", 10000); + try { + if (env.getMasterServices().getMetaTableLocator().waitMetaRegionLocation( + env.getMasterServices().getZooKeeper(), timeout) == null) { + throw new NotAllMetaRegionsOnlineException(); + } + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + } + + protected static void waitRegionServers(final MasterProcedureEnv env) throws IOException { + final ServerManager sm = env.getMasterServices().getServerManager(); + ProcedureSyncWait.waitFor(env, "server to assign region(s)", + new ProcedureSyncWait.Predicate<Boolean>() { + @Override + public Boolean evaluate() throws IOException { + List<ServerName> servers = sm.createDestinationServersList(); + return servers != null && !servers.isEmpty(); + } + }); + } + + protected static List<HRegionInfo> getRegionsFromMeta(final MasterProcedureEnv env, + final TableName tableName) throws IOException { + return ProcedureSyncWait.waitFor(env, "regions of table=" + tableName + " from meta", + new ProcedureSyncWait.Predicate<List<HRegionInfo>>() { + @Override + public List<HRegionInfo> evaluate() throws IOException { + if (TableName.META_TABLE_NAME.equals(tableName)) { + return new MetaTableLocator().getMetaRegions(env.getMasterServices().getZooKeeper()); + } + return MetaTableAccessor.getTableRegions(env.getMasterServices().getConnection(),tableName); + } + }); + } + + protected static void waitRegionInTransition(final MasterProcedureEnv env, + final List<HRegionInfo> regions) throws IOException, CoordinatedStateException { + final AssignmentManager am = env.getMasterServices().getAssignmentManager(); + final RegionStates states = am.getRegionStates(); + for (final HRegionInfo region : regions) { + ProcedureSyncWait.waitFor(env, "regions " + region.getRegionNameAsString() + " in transition", + new ProcedureSyncWait.Predicate<Boolean>() { + @Override + public Boolean evaluate() throws IOException { + if (states.isRegionInState(region, State.FAILED_OPEN)) { + am.regionOffline(region); + } + return !states.isRegionInTransition(region); + } + }); + } + } +}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java new file mode 100644 index 0000000..76ca094 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableProcedureInterface.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.TableName; + +/** + * Procedures that operates on a specific Table (e.g. create, delete, snapshot, ...) + * must implement this interface to allow the system handle the lock/concurrency problems. + */ [email protected] [email protected] +public interface TableProcedureInterface { + public enum TableOperationType { CREATE, DELETE, EDIT, READ }; + + /** + * @return the name of the table the procedure is operating on + */ + TableName getTableName(); + + /** + * Given an operation type we can take decisions about what to do with pending operations. + * e.g. if we get a delete and we have some table operation pending (e.g. add column) + * we can abort those operations. + * @return the operation type that the procedure is executing. + */ + TableOperationType getTableOperationType(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java index 9893fc8..5fe5f8c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.master.handler.CreateTableHandler; +import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; import org.apache.hadoop.hbase.namespace.NamespaceAuditor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetQuotaRequest; @@ -444,14 +444,11 @@ public class MasterQuotaManager implements RegionStateListener { new HRegionInfo(QuotaUtil.QUOTA_TABLE_NAME) }; - masterServices.getExecutorService() - .submit(new CreateTableHandler(masterServices, - masterServices.getMasterFileSystem(), - QuotaUtil.QUOTA_TABLE_DESC, - masterServices.getConfiguration(), - newRegions, - masterServices) - .prepare()); + masterServices.getMasterProcedureExecutor() + .submitProcedure(new CreateTableProcedure( + masterServices.getMasterProcedureExecutor().getEnvironment(), + QuotaUtil.QUOTA_TABLE_DESC, + newRegions)); } private static class NamedLock<T> { http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a515f8e..f15eb1b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -981,13 +981,14 @@ public class HRegionServer extends HasThread implements // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already - if(this.hMemManager != null) this.hMemManager.stop(); + if (this.hMemManager != null) this.hMemManager.stop(); if (this.cacheFlusher != null) this.cacheFlusher.interruptIfNecessary(); if (this.compactSplitThread != null) this.compactSplitThread.interruptIfNecessary(); if (this.compactionChecker != null) this.compactionChecker.cancel(true); if (this.healthCheckChore != null) this.healthCheckChore.cancel(true); if (this.nonceManagerChore != null) this.nonceManagerChore.cancel(true); if (this.storefileRefresher != null) this.storefileRefresher.cancel(true); + sendShutdownInterrupt(); // Stop the quota manager if (rsQuotaManager != null) { @@ -2073,6 +2074,12 @@ public class HRegionServer extends HasThread implements } /** + * Called on stop/abort before closing the cluster connection and meta locator. + */ + protected void sendShutdownInterrupt() { + } + + /** * Wait on all threads to finish. Presumption is that all closes and stops * have already been called. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java index 95d8a17..347cad5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java @@ -67,6 +67,30 @@ public abstract class ModifyRegionUtils { void editRegion(final HRegionInfo region) throws IOException; } + public static HRegionInfo[] createHRegionInfos(HTableDescriptor hTableDescriptor, + byte[][] splitKeys) { + long regionId = System.currentTimeMillis(); + HRegionInfo[] hRegionInfos = null; + if (splitKeys == null || splitKeys.length == 0) { + hRegionInfos = new HRegionInfo[]{ + new HRegionInfo(hTableDescriptor.getTableName(), null, null, false, regionId) + }; + } else { + int numRegions = splitKeys.length + 1; + hRegionInfos = new HRegionInfo[numRegions]; + byte[] startKey = null; + byte[] endKey = null; + for (int i = 0; i < numRegions; i++) { + endKey = (i == splitKeys.length) ? null : splitKeys[i]; + hRegionInfos[i] = + new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey, + false, regionId); + startKey = endKey; + } + } + return hRegionInfos; + } + /** * Create new set of regions on the specified file-system. * NOTE: that you should add the regions to hbase:meta after this operation. http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 8ed49ff..2c13f39 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -63,6 +63,8 @@ import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLog import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.master.CatalogJanitor.SplitParentFirstComparator; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; @@ -261,6 +263,11 @@ public class TestCatalogJanitor { } @Override + public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { + return null; + } + + @Override public ServerManager getServerManager() { return null; } @@ -912,7 +919,7 @@ public class TestCatalogJanitor { MasterServices services = new MockMasterServices(server); // create the janitor - + CatalogJanitor janitor = new CatalogJanitor(server, services); // Create regions. http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java new file mode 100644 index 0000000..d6c19e1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableDescriptor; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class MasterProcedureTestingUtility { + private static final Log LOG = LogFactory.getLog(MasterProcedureTestingUtility.class); + + private MasterProcedureTestingUtility() { + } + + public static HTableDescriptor createHTD(final TableName tableName, final String... family) { + HTableDescriptor htd = new HTableDescriptor(tableName); + for (int i = 0; i < family.length; ++i) { + htd.addFamily(new HColumnDescriptor(family[i])); + } + return htd; + } + + public static HRegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec, + final TableName tableName, final byte[][] splitKeys, String... family) throws IOException { + HTableDescriptor htd = createHTD(tableName, family); + HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); + long procId = ProcedureTestingUtility.submitAndWait(procExec, + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); + ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); + return regions; + } + + public static void validateTableCreation(final HMaster master, final TableName tableName, + final HRegionInfo[] regions, String... family) throws IOException { + validateTableCreation(master, tableName, regions, true, family); + } + + public static void validateTableCreation(final HMaster master, final TableName tableName, + final HRegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException { + // check filesystem + final FileSystem fs = master.getMasterFileSystem().getFileSystem(); + final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); + assertTrue(fs.exists(tableDir)); + List<Path> allRegionDirs = FSUtils.getRegionDirs(fs, tableDir); + for (int i = 0; i < regions.length; ++i) { + Path regionDir = new Path(tableDir, regions[i].getEncodedName()); + assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir)); + assertTrue(allRegionDirs.remove(regionDir)); + List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir); + for (int j = 0; j < family.length; ++j) { + final Path familyDir = new Path(regionDir, family[j]); + if (hasFamilyDirs) { + assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir)); + assertTrue(allFamilyDirs.remove(familyDir)); + } else { + // TODO: WARN: Modify Table/Families does not create a family dir + if (!fs.exists(familyDir)) { + LOG.warn(family[j] + " family dir does not exist"); + } + allFamilyDirs.remove(familyDir); + } + } + assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty()); + } + assertTrue("found extraneous regions: " + allRegionDirs, allRegionDirs.isEmpty()); + + // check meta + assertTrue(MetaTableAccessor.tableExists(master.getConnection(), tableName)); + assertEquals(regions.length, countMetaRegions(master, tableName)); + + // check htd + TableDescriptor tableDesc = master.getTableDescriptors().getDescriptor(tableName); + assertTrue("table descriptor not found", tableDesc != null); + HTableDescriptor htd = tableDesc.getHTableDescriptor(); + assertTrue("table descriptor not found", htd != null); + for (int i = 0; i < family.length; ++i) { + assertTrue("family not found " + family[i], htd.getFamily(Bytes.toBytes(family[i])) != null); + } + assertEquals(family.length, htd.getFamilies().size()); + } + + public static void validateTableDeletion(final HMaster master, final TableName tableName, + final HRegionInfo[] regions, String... family) throws IOException { + // check filesystem + final FileSystem fs = master.getMasterFileSystem().getFileSystem(); + final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); + assertFalse(fs.exists(tableDir)); + + // check meta + assertFalse(MetaTableAccessor.tableExists(master.getConnection(), tableName)); + assertEquals(0, countMetaRegions(master, tableName)); + + // check htd + assertTrue("found htd of deleted table", + master.getTableDescriptors().getDescriptor(tableName) == null); + } + + private static int countMetaRegions(final HMaster master, final TableName tableName) + throws IOException { + final AtomicInteger actualRegCount = new AtomicInteger(0); + final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { + @Override + public boolean visit(Result rowResult) throws IOException { + RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); + if (list == null) { + LOG.warn("No serialized HRegionInfo in " + rowResult); + return true; + } + HRegionLocation l = list.getRegionLocation(); + if (l == null) { + return true; + } + if (!l.getRegionInfo().getTable().equals(tableName)) { + return false; + } + if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; + HRegionLocation[] locations = list.getRegionLocations(); + for (HRegionLocation location : locations) { + if (location == null) continue; + ServerName serverName = location.getServerName(); + // Make sure that regions are assigned to server + if (serverName != null && serverName.getHostAndPort() != null) { + actualRegCount.incrementAndGet(); + } + } + return true; + } + }; + MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName); + return actualRegCount.get(); + } + + public static <TState> void testRecoveryAndDoubleExecution( + final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, + final int numSteps, final TState[] states) throws Exception { + ProcedureTestingUtility.waitProcedure(procExec, procId); + assertEquals(false, procExec.isRunning()); + // Restart the executor and execute the step twice + // execute step N - kill before store update + // restart executor/store + // execute step N - save on store + for (int i = 0; i < numSteps; ++i) { + LOG.info("Restart "+ i +" exec state: " + states[i]); + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + ProcedureTestingUtility.restart(procExec); + ProcedureTestingUtility.waitProcedure(procExec, procId); + } + assertEquals(true, procExec.isRunning()); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId); + } + + public static <TState> void testRollbackAndDoubleExecution( + final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, + final int lastStep, final TState[] states) throws Exception { + ProcedureTestingUtility.waitProcedure(procExec, procId); + + // Restart the executor and execute the step twice + // execute step N - kill before store update + // restart executor/store + // execute step N - save on store + for (int i = 0; i < lastStep; ++i) { + LOG.info("Restart "+ i +" exec state: " + states[i]); + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + ProcedureTestingUtility.restart(procExec); + ProcedureTestingUtility.waitProcedure(procExec, procId); + } + + // Restart the executor and rollback the step twice + // rollback step N - kill before store update + // restart executor/store + // rollback step N - save on store + MasterProcedureTestingUtility.InjectAbortOnLoadListener abortListener = + new MasterProcedureTestingUtility.InjectAbortOnLoadListener(procExec); + procExec.registerListener(abortListener); + try { + for (int i = lastStep + 1; i >= 0; --i) { + LOG.info("Restart " + i +" rollback state: "+ states[i]); + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + ProcedureTestingUtility.restart(procExec); + ProcedureTestingUtility.waitProcedure(procExec, procId); + } + } finally { + assertTrue(procExec.unregisterListener(abortListener)); + } + + ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); + } + + public static <TState> void testRollbackAndDoubleExecutionAfterPONR( + final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, + final int lastStep, final TState[] states) throws Exception { + ProcedureTestingUtility.waitProcedure(procExec, procId); + + // Restart the executor and execute the step twice + // execute step N - kill before store update + // restart executor/store + // execute step N - save on store + for (int i = 0; i < lastStep; ++i) { + LOG.info("Restart "+ i +" exec state: " + states[i]); + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + ProcedureTestingUtility.restart(procExec); + ProcedureTestingUtility.waitProcedure(procExec, procId); + } + + // try to inject the abort + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); + MasterProcedureTestingUtility.InjectAbortOnLoadListener abortListener = + new MasterProcedureTestingUtility.InjectAbortOnLoadListener(procExec); + procExec.registerListener(abortListener); + try { + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + ProcedureTestingUtility.restart(procExec); + LOG.info("Restart and execute"); + ProcedureTestingUtility.waitProcedure(procExec, procId); + } finally { + assertTrue(procExec.unregisterListener(abortListener)); + } + + assertEquals(true, procExec.isRunning()); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId); + } + + public static <TState> void testRollbackRetriableFailure( + final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, + final int lastStep, final TState[] states) throws Exception { + ProcedureTestingUtility.waitProcedure(procExec, procId); + + // Restart the executor and execute the step twice + // execute step N - kill before store update + // restart executor/store + // execute step N - save on store + for (int i = 0; i < lastStep; ++i) { + LOG.info("Restart "+ i +" exec state: " + states[i]); + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + ProcedureTestingUtility.restart(procExec); + ProcedureTestingUtility.waitProcedure(procExec, procId); + } + + // execute the rollback + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); + MasterProcedureTestingUtility.InjectAbortOnLoadListener abortListener = + new MasterProcedureTestingUtility.InjectAbortOnLoadListener(procExec); + procExec.registerListener(abortListener); + try { + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + ProcedureTestingUtility.restart(procExec); + LOG.info("Restart and rollback"); + ProcedureTestingUtility.waitProcedure(procExec, procId); + } finally { + assertTrue(procExec.unregisterListener(abortListener)); + } + + ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); + } + + public static class InjectAbortOnLoadListener + implements ProcedureExecutor.ProcedureExecutorListener { + private final ProcedureExecutor<MasterProcedureEnv> procExec; + + public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) { + this.procExec = procExec; + } + + @Override + public void procedureLoaded(long procId) { + procExec.abort(procId); + } + + @Override + public void procedureAdded(long procId) { /* no-op */ } + + @Override + public void procedureFinished(long procId) { /* no-op */ } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java new file mode 100644 index 0000000..0ac4e4f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestCreateTableProcedure.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, MediumTests.class}) +public class TestCreateTableProcedure { + private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static void setupConf(Configuration conf) { + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + } + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Before + public void setup() throws Exception { + resetProcExecutorTestingKillFlag(); + } + + @After + public void tearDown() throws Exception { + resetProcExecutorTestingKillFlag(); + for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + UTIL.deleteTable(htd.getTableName()); + } + } + + private void resetProcExecutorTestingKillFlag() { + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); + assertTrue("expected executor to be running", procExec.isRunning()); + } + + @Test(timeout=60000) + public void testSimpleCreate() throws Exception { + final TableName tableName = TableName.valueOf("testSimpleCreate"); + final byte[][] splitKeys = null; + testSimpleCreate(tableName, splitKeys); + } + + @Test(timeout=60000) + public void testSimpleCreateWithSplits() throws Exception { + final TableName tableName = TableName.valueOf("testSimpleCreateWithSplits"); + final byte[][] splitKeys = new byte[][] { + Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") + }; + testSimpleCreate(tableName, splitKeys); + } + + private void testSimpleCreate(final TableName tableName, byte[][] splitKeys) throws Exception { + HRegionInfo[] regions = MasterProcedureTestingUtility.createTable( + getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2"); + MasterProcedureTestingUtility.validateTableCreation( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + } + + @Test(timeout=60000, expected=TableExistsException.class) + public void testCreateExisting() throws Exception { + final TableName tableName = TableName.valueOf("testCreateExisting"); + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + final HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f"); + final HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null); + + // create the table + long procId1 = procExec.submitProcedure( + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); + + // create another with the same name + ProcedurePrepareLatch latch2 = new ProcedurePrepareLatch.CompatibilityLatch(); + long procId2 = procExec.submitProcedure( + new CreateTableProcedure(procExec.getEnvironment(), htd, regions, latch2)); + + ProcedureTestingUtility.waitProcedure(procExec, procId1); + ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId1)); + + ProcedureTestingUtility.waitProcedure(procExec, procId2); + latch2.await(); + } + + @Test(timeout=60000) + public void testRecoveryAndDoubleExecution() throws Exception { + final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution"); + + // create the table + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + + // Start the Create procedure && kill the executor + byte[][] splitKeys = null; + HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2"); + HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); + long procId = procExec.submitProcedure( + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); + + // Restart the executor and execute the step twice + // NOTE: the 6 (number of CreateTableState steps) is hardcoded, + // so you have to look at this test at least once when you add a new step. + MasterProcedureTestingUtility.testRecoveryAndDoubleExecution( + procExec, procId, 6, CreateTableState.values()); + + MasterProcedureTestingUtility.validateTableCreation( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + } + + @Test(timeout=90000) + public void testRollbackAndDoubleExecution() throws Exception { + final TableName tableName = TableName.valueOf("testRollbackAndDoubleExecution"); + + // create the table + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + + // Start the Create procedure && kill the executor + final byte[][] splitKeys = new byte[][] { + Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") + }; + HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2"); + HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); + long procId = procExec.submitProcedure( + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); + + // NOTE: the 4 (number of CreateTableState steps) is hardcoded, + // so you have to look at this test at least once when you add a new step. + MasterProcedureTestingUtility.testRollbackAndDoubleExecution( + procExec, procId, 4, CreateTableState.values()); + + MasterProcedureTestingUtility.validateTableDeletion( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + + // are we able to create the table after a rollback? + resetProcExecutorTestingKillFlag(); + testSimpleCreate(tableName, splitKeys); + } + + @Test(timeout=90000) + public void testRollbackRetriableFailure() throws Exception { + final TableName tableName = TableName.valueOf("testRollbackRetriableFailure"); + + // create the table + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + + // Start the Create procedure && kill the executor + final byte[][] splitKeys = new byte[][] { + Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") + }; + HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2"); + HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); + long procId = procExec.submitProcedure( + new FaultyCreateTableProcedure(procExec.getEnvironment(), htd, regions)); + + // NOTE: the 4 (number of CreateTableState steps) is hardcoded, + // so you have to look at this test at least once when you add a new step. + MasterProcedureTestingUtility.testRollbackRetriableFailure( + procExec, procId, 4, CreateTableState.values()); + + MasterProcedureTestingUtility.validateTableDeletion( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + + // are we able to create the table after a rollback? + resetProcExecutorTestingKillFlag(); + testSimpleCreate(tableName, splitKeys); + } + + private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { + return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + } + + public static class FaultyCreateTableProcedure extends CreateTableProcedure { + private int retries = 0; + + public FaultyCreateTableProcedure() { + // Required by the Procedure framework to create the procedure on replay + } + + public FaultyCreateTableProcedure(final MasterProcedureEnv env, + final HTableDescriptor hTableDescriptor, final HRegionInfo[] newRegions) + throws IOException { + super(env, hTableDescriptor, newRegions); + } + + @Override + protected void rollbackState(final MasterProcedureEnv env, final CreateTableState state) + throws IOException { + if (retries++ < 3) { + LOG.info("inject rollback failure state=" + state); + throw new IOException("injected failure number " + retries); + } else { + super.rollbackState(env, state); + retries = 0; + } + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteTableProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteTableProcedure.java new file mode 100644 index 0000000..6795b22 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteTableProcedure.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureResult; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, MediumTests.class}) +public class TestDeleteTableProcedure { + private static final Log LOG = LogFactory.getLog(TestDeleteTableProcedure.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static void setupConf(Configuration conf) { + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + } + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Before + public void setup() throws Exception { + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); + assertTrue("expected executor to be running", procExec.isRunning()); + } + + @After + public void tearDown() throws Exception { + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false); + for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) { + LOG.info("Tear down, remove table=" + htd.getTableName()); + UTIL.deleteTable(htd.getTableName()); + } + } + + @Test(timeout=60000, expected=TableNotFoundException.class) + public void testDeleteNotExistentTable() throws Exception { + final TableName tableName = TableName.valueOf("testDeleteNotExistentTable"); + + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedurePrepareLatch latch = new ProcedurePrepareLatch.CompatibilityLatch(); + long procId = ProcedureTestingUtility.submitAndWait(procExec, + new DeleteTableProcedure(procExec.getEnvironment(), tableName, latch)); + latch.await(); + } + + @Test(timeout=60000, expected=TableNotDisabledException.class) + public void testDeleteNotDisabledTable() throws Exception { + final TableName tableName = TableName.valueOf("testDeleteNotDisabledTable"); + + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + MasterProcedureTestingUtility.createTable(procExec, tableName, null, "f"); + + ProcedurePrepareLatch latch = new ProcedurePrepareLatch.CompatibilityLatch(); + long procId = ProcedureTestingUtility.submitAndWait(procExec, + new DeleteTableProcedure(procExec.getEnvironment(), tableName, latch)); + latch.await(); + } + + @Test(timeout=60000) + public void testDeleteDeletedTable() throws Exception { + final TableName tableName = TableName.valueOf("testDeleteDeletedTable"); + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + + HRegionInfo[] regions = MasterProcedureTestingUtility.createTable( + procExec, tableName, null, "f"); + UTIL.getHBaseAdmin().disableTable(tableName); + + // delete the table (that exists) + long procId1 = procExec.submitProcedure( + new DeleteTableProcedure(procExec.getEnvironment(), tableName)); + // delete the table (that will no longer exist) + long procId2 = procExec.submitProcedure( + new DeleteTableProcedure(procExec.getEnvironment(), tableName)); + + // Wait the completion + ProcedureTestingUtility.waitProcedure(procExec, procId1); + ProcedureTestingUtility.waitProcedure(procExec, procId2); + + // First delete should succeed + ProcedureTestingUtility.assertProcNotFailed(procExec, procId1); + MasterProcedureTestingUtility.validateTableDeletion( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f"); + + // Second delete should fail with TableNotFound + ProcedureResult result = procExec.getResult(procId2); + assertTrue(result.isFailed()); + LOG.debug("Delete failed with exception: " + result.getException()); + assertTrue(result.getException().getCause() instanceof TableNotFoundException); + } + + @Test(timeout=60000) + public void testSimpleDelete() throws Exception { + final TableName tableName = TableName.valueOf("testSimpleDelete"); + final byte[][] splitKeys = null; + testSimpleDelete(tableName, splitKeys); + } + + @Test(timeout=60000) + public void testSimpleDeleteWithSplits() throws Exception { + final TableName tableName = TableName.valueOf("testSimpleDeleteWithSplits"); + final byte[][] splitKeys = new byte[][] { + Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c") + }; + testSimpleDelete(tableName, splitKeys); + } + + private void testSimpleDelete(final TableName tableName, byte[][] splitKeys) throws Exception { + HRegionInfo[] regions = MasterProcedureTestingUtility.createTable( + getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2"); + UTIL.getHBaseAdmin().disableTable(tableName); + + // delete the table + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + long procId = ProcedureTestingUtility.submitAndWait(procExec, + new DeleteTableProcedure(procExec.getEnvironment(), tableName)); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId); + MasterProcedureTestingUtility.validateTableDeletion( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + } + + @Test(timeout=60000) + public void testRecoveryAndDoubleExecution() throws Exception { + final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecution"); + + // create the table + byte[][] splitKeys = null; + HRegionInfo[] regions = MasterProcedureTestingUtility.createTable( + getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2"); + UTIL.getHBaseAdmin().disableTable(tableName); + + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.waitNoProcedureRunning(procExec); + ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true); + + // Start the Delete procedure && kill the executor + long procId = procExec.submitProcedure( + new DeleteTableProcedure(procExec.getEnvironment(), tableName)); + + // Restart the executor and execute the step twice + // NOTE: the 6 (number of DeleteTableState steps) is hardcoded, + // so you have to look at this test at least once when you add a new step. + MasterProcedureTestingUtility.testRecoveryAndDoubleExecution( + procExec, procId, 6, DeleteTableState.values()); + + MasterProcedureTestingUtility.validateTableDeletion( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + } + + private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { + return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java new file mode 100644 index 0000000..faf7845 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterFailoverWithProcedures.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.util.concurrent.CountDownLatch; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.CreateTableState; +import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.DeleteTableState; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.ModifyRegionUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, LargeTests.class}) +public class TestMasterFailoverWithProcedures { + private static final Log LOG = LogFactory.getLog(TestMasterFailoverWithProcedures.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static void setupConf(Configuration conf) { + } + + @Before + public void setup() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(2, 1); + + final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, false); + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, false); + } + + @After + public void tearDown() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test(timeout=60000) + public void testWalRecoverLease() throws Exception { + final ProcedureStore masterStore = getMasterProcedureExecutor().getStore(); + assertTrue("expected WALStore for this test", masterStore instanceof WALProcedureStore); + + HMaster firstMaster = UTIL.getHBaseCluster().getMaster(); + // Abort Latch for the master store + final CountDownLatch masterStoreAbort = new CountDownLatch(1); + masterStore.registerListener(new ProcedureStore.ProcedureStoreListener() { + @Override + public void abortProcess() { + LOG.debug("Abort store of Master"); + masterStoreAbort.countDown(); + } + }); + + // startup a fake master the new WAL store will take the lease + // and the active master should abort. + HMaster backupMaster3 = Mockito.mock(HMaster.class); + Mockito.doReturn(firstMaster.getConfiguration()).when(backupMaster3).getConfiguration(); + Mockito.doReturn(true).when(backupMaster3).isActiveMaster(); + final WALProcedureStore backupStore3 = new WALProcedureStore(firstMaster.getConfiguration(), + firstMaster.getMasterFileSystem().getFileSystem(), + ((WALProcedureStore)masterStore).getLogDir(), + new MasterProcedureEnv.WALStoreLeaseRecovery(backupMaster3)); + // Abort Latch for the test store + final CountDownLatch backupStore3Abort = new CountDownLatch(1); + backupStore3.registerListener(new ProcedureStore.ProcedureStoreListener() { + @Override + public void abortProcess() { + LOG.debug("Abort store of backupMaster3"); + backupStore3Abort.countDown(); + backupStore3.stop(true); + } + }); + backupStore3.start(1); + backupStore3.recoverLease(); + + // Try to trigger a command on the master (WAL lease expired on the active one) + HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(TableName.valueOf("mtb"), "f"); + HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, null); + LOG.debug("submit proc"); + getMasterProcedureExecutor().submitProcedure( + new CreateTableProcedure(getMasterProcedureExecutor().getEnvironment(), htd, regions)); + LOG.debug("wait master store abort"); + masterStoreAbort.await(); + + // Now the real backup master should start up + LOG.debug("wait backup master to startup"); + waitBackupMaster(UTIL, firstMaster); + assertEquals(true, firstMaster.isStopped()); + + // wait the store in here to abort (the test will fail due to timeout if it doesn't) + LOG.debug("wait the store to abort"); + backupStore3.getStoreTracker().setDeleted(1, false); + backupStore3.delete(1); + backupStore3Abort.await(); + } + + // ========================================================================== + // Test Create Table + // ========================================================================== + @Test(timeout=60000) + public void testCreateWithFailover() throws Exception { + // TODO: Should we try every step? (master failover takes long time) + // It is already covered by TestCreateTableProcedure + // but without the master restart, only the executor/store is restarted. + // Without Master restart we may not find bug in the procedure code + // like missing "wait" for resources to be available (e.g. RS) + testCreateWithFailoverAtStep(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS.ordinal()); + } + + private void testCreateWithFailoverAtStep(final int step) throws Exception { + final TableName tableName = TableName.valueOf("testCreateWithFailoverAtStep" + step); + + // create the table + ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true); + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true); + + // Start the Create procedure && kill the executor + byte[][] splitKeys = null; + HTableDescriptor htd = MasterProcedureTestingUtility.createHTD(tableName, "f1", "f2"); + HRegionInfo[] regions = ModifyRegionUtils.createHRegionInfos(htd, splitKeys); + long procId = procExec.submitProcedure( + new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); + testRecoveryAndDoubleExecution(UTIL, procId, step, CreateTableState.values()); + + MasterProcedureTestingUtility.validateTableCreation( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + } + + // ========================================================================== + // Test Delete Table + // ========================================================================== + @Test(timeout=60000) + public void testDeleteWithFailover() throws Exception { + // TODO: Should we try every step? (master failover takes long time) + // It is already covered by TestDeleteTableProcedure + // but without the master restart, only the executor/store is restarted. + // Without Master restart we may not find bug in the procedure code + // like missing "wait" for resources to be available (e.g. RS) + testDeleteWithFailoverAtStep(DeleteTableState.DELETE_TABLE_UNASSIGN_REGIONS.ordinal()); + } + + private void testDeleteWithFailoverAtStep(final int step) throws Exception { + final TableName tableName = TableName.valueOf("testDeleteWithFailoverAtStep" + step); + + // create the table + byte[][] splitKeys = null; + HRegionInfo[] regions = MasterProcedureTestingUtility.createTable( + getMasterProcedureExecutor(), tableName, splitKeys, "f1", "f2"); + Path tableDir = FSUtils.getTableDir(getRootDir(), tableName); + MasterProcedureTestingUtility.validateTableCreation( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + UTIL.getHBaseAdmin().disableTable(tableName); + + ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor(); + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExec, true); + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExec, true); + + // Start the Delete procedure && kill the executor + long procId = procExec.submitProcedure( + new DeleteTableProcedure(procExec.getEnvironment(), tableName)); + testRecoveryAndDoubleExecution(UTIL, procId, step, DeleteTableState.values()); + + MasterProcedureTestingUtility.validateTableDeletion( + UTIL.getHBaseCluster().getMaster(), tableName, regions, "f1", "f2"); + } + + // ========================================================================== + // Test Helpers + // ========================================================================== + public static <TState> void testRecoveryAndDoubleExecution(final HBaseTestingUtility testUtil, + final long procId, final int lastStepBeforeFailover, TState[] states) throws Exception { + ProcedureExecutor<MasterProcedureEnv> procExec = + testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + for (int i = 0; i < lastStepBeforeFailover; ++i) { + LOG.info("Restart "+ i +" exec state: " + states[i]); + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + ProcedureTestingUtility.restart(procExec); + ProcedureTestingUtility.waitProcedure(procExec, procId); + } + ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); + + LOG.info("Trigger master failover"); + masterFailover(testUtil); + + procExec = testUtil.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + ProcedureTestingUtility.waitProcedure(procExec, procId); + ProcedureTestingUtility.assertProcNotFailed(procExec, procId); + } + + // ========================================================================== + // Master failover utils + // ========================================================================== + public static void masterFailover(final HBaseTestingUtility testUtil) + throws Exception { + MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); + + // Kill the master + HMaster oldMaster = cluster.getMaster(); + cluster.killMaster(cluster.getMaster().getServerName()); + + // Wait the secondary + waitBackupMaster(testUtil, oldMaster); + } + + public static void waitBackupMaster(final HBaseTestingUtility testUtil, + final HMaster oldMaster) throws Exception { + MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); + + HMaster newMaster = cluster.getMaster(); + while (newMaster == null || newMaster == oldMaster) { + Thread.sleep(250); + newMaster = cluster.getMaster(); + } + + while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { + Thread.sleep(250); + } + } + + // ========================================================================== + // Helpers + // ========================================================================== + private MasterProcedureEnv getMasterProcedureEnv() { + return getMasterProcedureExecutor().getEnvironment(); + } + + private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() { + return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(); + } + + private FileSystem getFileSystem() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); + } + + private Path getRootDir() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + } + + private Path getTempDir() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getTempDir(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a7f9b6dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java new file mode 100644 index 0000000..d22930f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({MasterTests.class, SmallTests.class}) +public class TestMasterProcedureQueue { + private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class); + + private MasterProcedureQueue queue; + private Configuration conf; + + @Before + public void setUp() throws IOException { + conf = HBaseConfiguration.create(); + queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager()); + } + + @After + public void tearDown() throws IOException { + assertEquals(0, queue.size()); + } + + /** + * Verify simple create/insert/fetch/delete of the table queue. + */ + @Test + public void testSimpleTableOpsQueues() throws Exception { + final int NUM_TABLES = 10; + final int NUM_ITEMS = 10; + + int count = 0; + for (int i = 1; i <= NUM_TABLES; ++i) { + TableName tableName = TableName.valueOf(String.format("test-%04d", i)); + // insert items + for (int j = 1; j <= NUM_ITEMS; ++j) { + queue.addBack(new TestTableProcedure(i * 1000 + j, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + assertEquals(++count, queue.size()); + } + } + assertEquals(NUM_TABLES * NUM_ITEMS, queue.size()); + + for (int j = 1; j <= NUM_ITEMS; ++j) { + for (int i = 1; i <= NUM_TABLES; ++i) { + Long procId = queue.poll(); + assertEquals(--count, queue.size()); + assertEquals(i * 1000 + j, procId.longValue()); + } + } + assertEquals(0, queue.size()); + + for (int i = 1; i <= NUM_TABLES; ++i) { + TableName tableName = TableName.valueOf(String.format("test-%04d", i)); + // complete the table deletion + assertTrue(queue.markTableAsDeleted(tableName)); + } + } + + /** + * Check that the table queue is not deletable until every procedure + * in-progress is completed (this is a special case for write-locks). + */ + @Test + public void testCreateDeleteTableOperationsWithWriteLock() throws Exception { + TableName tableName = TableName.valueOf("testtb"); + + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + + // table can't be deleted because one item is in the queue + assertFalse(queue.markTableAsDeleted(tableName)); + + // fetch item and take a lock + assertEquals(1, queue.poll().longValue()); + // take the xlock + assertTrue(queue.tryAcquireTableWrite(tableName, "write")); + // table can't be deleted because we have the lock + assertEquals(0, queue.size()); + assertFalse(queue.markTableAsDeleted(tableName)); + // release the xlock + queue.releaseTableWrite(tableName); + // complete the table deletion + assertTrue(queue.markTableAsDeleted(tableName)); + } + + /** + * Check that the table queue is not deletable until every procedure + * in-progress is completed (this is a special case for read-locks). + */ + @Test + public void testCreateDeleteTableOperationsWithReadLock() throws Exception { + final TableName tableName = TableName.valueOf("testtb"); + final int nitems = 2; + + for (int i = 1; i <= nitems; ++i) { + queue.addBack(new TestTableProcedure(i, tableName, + TableProcedureInterface.TableOperationType.READ)); + } + + // table can't be deleted because one item is in the queue + assertFalse(queue.markTableAsDeleted(tableName)); + + for (int i = 1; i <= nitems; ++i) { + // fetch item and take a lock + assertEquals(i, queue.poll().longValue()); + // take the rlock + assertTrue(queue.tryAcquireTableRead(tableName, "read " + i)); + // table can't be deleted because we have locks and/or items in the queue + assertFalse(queue.markTableAsDeleted(tableName)); + } + + for (int i = 1; i <= nitems; ++i) { + // table can't be deleted because we have locks + assertFalse(queue.markTableAsDeleted(tableName)); + // release the rlock + queue.releaseTableRead(tableName); + } + + // there are no items and no lock in the queeu + assertEquals(0, queue.size()); + // complete the table deletion + assertTrue(queue.markTableAsDeleted(tableName)); + } + + /** + * Verify the correct logic of RWLocks on the queue + */ + @Test + public void testVerifyRwLocks() throws Exception { + TableName tableName = TableName.valueOf("testtb"); + queue.addBack(new TestTableProcedure(1, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + queue.addBack(new TestTableProcedure(2, tableName, + TableProcedureInterface.TableOperationType.READ)); + queue.addBack(new TestTableProcedure(3, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + queue.addBack(new TestTableProcedure(4, tableName, + TableProcedureInterface.TableOperationType.READ)); + queue.addBack(new TestTableProcedure(5, tableName, + TableProcedureInterface.TableOperationType.READ)); + + // Fetch the 1st item and take the write lock + Long procId = queue.poll(); + assertEquals(1, procId.longValue()); + assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId)); + + // Fetch the 2nd item and verify that the lock can't be acquired + assertEquals(null, queue.poll()); + + // Release the write lock and acquire the read lock + queue.releaseTableWrite(tableName); + + // Fetch the 2nd item and take the read lock + procId = queue.poll(); + assertEquals(2, procId.longValue()); + assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId)); + + // Fetch the 3rd item and verify that the lock can't be acquired + procId = queue.poll(); + assertEquals(3, procId.longValue()); + assertEquals(false, queue.tryAcquireTableWrite(tableName, "write " + procId)); + + // release the rdlock of item 2 and take the wrlock for the 3d item + queue.releaseTableRead(tableName); + assertEquals(true, queue.tryAcquireTableWrite(tableName, "write " + procId)); + + // Fetch 4th item and verify that the lock can't be acquired + assertEquals(null, queue.poll()); + + // Release the write lock and acquire the read lock + queue.releaseTableWrite(tableName); + + // Fetch the 4th item and take the read lock + procId = queue.poll(); + assertEquals(4, procId.longValue()); + assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId)); + + // Fetch the 4th item and take the read lock + procId = queue.poll(); + assertEquals(5, procId.longValue()); + assertEquals(true, queue.tryAcquireTableRead(tableName, "read " + procId)); + + // Release 4th and 5th read-lock + queue.releaseTableRead(tableName); + queue.releaseTableRead(tableName); + + // remove table queue + assertEquals(0, queue.size()); + assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName)); + } + + /** + * Verify that "write" operations for a single table are serialized, + * but different tables can be executed in parallel. + */ + @Test(timeout=90000) + public void testConcurrentWriteOps() throws Exception { + final TestTableProcSet procSet = new TestTableProcSet(queue); + + final int NUM_ITEMS = 10; + final int NUM_TABLES = 4; + final AtomicInteger opsCount = new AtomicInteger(0); + for (int i = 0; i < NUM_TABLES; ++i) { + TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); + for (int j = 1; j < NUM_ITEMS; ++j) { + procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, + TableProcedureInterface.TableOperationType.EDIT)); + opsCount.incrementAndGet(); + } + } + assertEquals(opsCount.get(), queue.size()); + + final Thread[] threads = new Thread[NUM_TABLES * 2]; + final HashSet<TableName> concurrentTables = new HashSet<TableName>(); + final ArrayList<String> failures = new ArrayList<String>(); + final AtomicInteger concurrentCount = new AtomicInteger(0); + for (int i = 0; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (opsCount.get() > 0) { + try { + TableProcedureInterface proc = procSet.acquire(); + if (proc == null) { + queue.signalAll(); + if (opsCount.get() > 0) { + continue; + } + break; + } + synchronized (concurrentTables) { + assertTrue("unexpected concurrency on " + proc.getTableName(), + concurrentTables.add(proc.getTableName())); + } + assertTrue(opsCount.decrementAndGet() >= 0); + try { + long procId = ((Procedure)proc).getProcId(); + TableName tableId = proc.getTableName(); + int concurrent = concurrentCount.incrementAndGet(); + assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES, + concurrent >= 1 && concurrent <= NUM_TABLES); + LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); + Thread.sleep(2000); + concurrent = concurrentCount.decrementAndGet(); + LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); + assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); + } finally { + synchronized (concurrentTables) { + assertTrue(concurrentTables.remove(proc.getTableName())); + } + procSet.release(proc); + } + } catch (Throwable e) { + LOG.error("Failed " + e.getMessage(), e); + synchronized (failures) { + failures.add(e.getMessage()); + } + } finally { + queue.signalAll(); + } + } + } + }; + threads[i].start(); + } + for (int i = 0; i < threads.length; ++i) { + threads[i].join(); + } + assertTrue(failures.toString(), failures.isEmpty()); + assertEquals(0, opsCount.get()); + assertEquals(0, queue.size()); + + for (int i = 1; i <= NUM_TABLES; ++i) { + TableName table = TableName.valueOf(String.format("testtb-%04d", i)); + assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table)); + } + } + + public static class TestTableProcSet { + private final MasterProcedureQueue queue; + private Map<Long, TableProcedureInterface> procsMap = + new ConcurrentHashMap<Long, TableProcedureInterface>(); + + public TestTableProcSet(final MasterProcedureQueue queue) { + this.queue = queue; + } + + public void addBack(TableProcedureInterface tableProc) { + Procedure proc = (Procedure)tableProc; + procsMap.put(proc.getProcId(), tableProc); + queue.addBack(proc); + } + + public void addFront(TableProcedureInterface tableProc) { + Procedure proc = (Procedure)tableProc; + procsMap.put(proc.getProcId(), tableProc); + queue.addFront(proc); + } + + public TableProcedureInterface acquire() { + TableProcedureInterface proc = null; + boolean avail = false; + while (!avail) { + Long procId = queue.poll(); + proc = procId != null ? procsMap.remove(procId) : null; + if (proc == null) break; + switch (proc.getTableOperationType()) { + case CREATE: + case DELETE: + case EDIT: + avail = queue.tryAcquireTableWrite(proc.getTableName(), + "op="+ proc.getTableOperationType()); + break; + case READ: + avail = queue.tryAcquireTableRead(proc.getTableName(), + "op="+ proc.getTableOperationType()); + break; + } + if (!avail) { + addFront(proc); + LOG.debug("yield procId=" + procId); + } + } + return proc; + } + + public void release(TableProcedureInterface proc) { + switch (proc.getTableOperationType()) { + case CREATE: + case DELETE: + case EDIT: + queue.releaseTableWrite(proc.getTableName()); + break; + case READ: + queue.releaseTableRead(proc.getTableName()); + break; + } + } + } + + public static class TestTableProcedure extends Procedure<Void> + implements TableProcedureInterface { + private final TableOperationType opType; + private final TableName tableName; + + public TestTableProcedure() { + throw new UnsupportedOperationException("recovery should not be triggered here"); + } + + public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) { + this.tableName = tableName; + this.opType = opType; + setProcId(procId); + } + + @Override + public TableName getTableName() { + return tableName; + } + + @Override + public TableOperationType getTableOperationType() { + return opType; + } + + @Override + protected Procedure[] execute(Void env) { + return null; + } + + @Override + protected void rollback(Void env) { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(Void env) { + throw new UnsupportedOperationException(); + } + + @Override + protected void serializeStateData(final OutputStream stream) throws IOException {} + + @Override + protected void deserializeStateData(final InputStream stream) throws IOException {} + } +}
