http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java new file mode 100644 index 0000000..9a3714f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -0,0 +1,1241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.master.TableLockManager; +import org.apache.hadoop.hbase.master.TableLockManager.TableLock; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; + +/** + * ProcedureRunnableSet for the Master Procedures. + * This RunnableSet tries to provide to the ProcedureExecutor procedures + * that can be executed without having to wait on a lock. + * Most of the master operations can be executed concurrently, if they + * are operating on different tables (e.g. two create table can be performed + * at the same, time assuming table A and table B) or against two different servers; say + * two servers that crashed at about the same time. + * + * <p>Each procedure should implement an interface providing information for this queue. + * for example table related procedures should implement TableProcedureInterface. + * each procedure will be pushed in its own queue, and based on the operation type + * we may take smarter decision. e.g. we can abort all the operations preceding + * a delete table, or similar. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class MasterProcedureScheduler implements ProcedureRunnableSet { + private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); + + private final TableLockManager lockManager; + private final ReentrantLock schedLock = new ReentrantLock(); + private final Condition schedWaitCond = schedLock.newCondition(); + + private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>(); + private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>(); + private int queueSize = 0; + + private final Object[] serverBuckets = new Object[128]; + private Queue<String> namespaceMap = null; + private Queue<TableName> tableMap = null; + + private final int metaTablePriority; + private final int userTablePriority; + private final int sysTablePriority; + + // TODO: metrics + private long pollCalls = 0; + private long nullPollCalls = 0; + + public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) { + this.lockManager = lockManager; + + // TODO: should this be part of the HTD? + metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); + sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); + userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); + } + + @Override + public void addFront(Procedure proc) { + doAdd(proc, true); + } + + @Override + public void addBack(Procedure proc) { + doAdd(proc, false); + } + + @Override + public void yield(final Procedure proc) { + doAdd(proc, isTableProcedure(proc)); + } + + private void doAdd(final Procedure proc, final boolean addFront) { + schedLock.lock(); + try { + if (isTableProcedure(proc)) { + doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); + } else if (isServerProcedure(proc)) { + doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); + } else { + // TODO: at the moment we only have Table and Server procedures + // if you are implementing a non-table/non-server procedure, you have two options: create + // a group for all the non-table/non-server procedures or try to find a key for your + // non-table/non-server procedures and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException( + "RQs for non-table/non-server procedures are not implemented yet"); + } + schedWaitCond.signal(); + } finally { + schedLock.unlock(); + } + } + + private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq, + final Queue<T> queue, final Procedure proc, final boolean addFront) { + queue.add(proc, addFront); + if (!(queue.isSuspended() || queue.hasExclusiveLock())) { + if (queue.size() == 1 && !IterableList.isLinked(queue)) { + fairq.add(queue); + } + queueSize++; + } + } + + @Override + public Procedure poll() { + return poll(-1); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + Procedure poll(long waitNsec) { + Procedure pollResult = null; + schedLock.lock(); + try { + if (queueSize == 0) { + if (waitNsec < 0) { + schedWaitCond.await(); + } else { + schedWaitCond.awaitNanos(waitNsec); + } + if (queueSize == 0) { + return null; + } + } + + // For now, let server handling have precedence over table handling; presumption is that it + // is more important handling crashed servers than it is running the + // enabling/disabling tables, etc. + pollResult = doPoll(serverRunQueue); + if (pollResult == null) { + pollResult = doPoll(tableRunQueue); + } + + // update metrics + pollCalls++; + nullPollCalls += (pollResult == null) ? 1 : 0; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + schedLock.unlock(); + } + return pollResult; + } + + private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) { + Queue<T> rq = fairq.poll(); + if (rq == null || !rq.isAvailable()) { + return null; + } + + assert !rq.isSuspended() : "rq=" + rq + " is suspended"; + Procedure pollResult = rq.poll(); + this.queueSize--; + if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) { + removeFromRunQueue(fairq, rq); + } + return pollResult; + } + + @Override + public void clear() { + // NOTE: USED ONLY FOR TESTING + schedLock.lock(); + try { + // Remove Servers + for (int i = 0; i < serverBuckets.length; ++i) { + clear((ServerQueue)serverBuckets[i], serverRunQueue); + serverBuckets[i] = null; + } + + // Remove Tables + clear(tableMap, tableRunQueue); + tableMap = null; + + assert queueSize == 0 : "expected queue size to be 0, got " + queueSize; + } finally { + schedLock.unlock(); + } + } + + private <T extends Comparable<T>> void clear(Queue<T> treeMap, FairQueue<T> fairq) { + while (treeMap != null) { + Queue<T> node = AvlTree.getFirst(treeMap); + assert !node.isSuspended() : "can't clear suspended " + node.getKey(); + treeMap = AvlTree.remove(treeMap, node.getKey()); + removeFromRunQueue(fairq, node); + } + } + + @Override + public void signalAll() { + schedLock.lock(); + try { + schedWaitCond.signalAll(); + } finally { + schedLock.unlock(); + } + } + + @Override + public int size() { + schedLock.lock(); + try { + return queueSize; + } finally { + schedLock.unlock(); + } + } + + @Override + public void completionCleanup(Procedure proc) { + if (proc instanceof TableProcedureInterface) { + TableProcedureInterface iProcTable = (TableProcedureInterface)proc; + boolean tableDeleted; + if (proc.hasException()) { + IOException procEx = proc.getException().unwrapRemoteException(); + if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { + // create failed because the table already exist + tableDeleted = !(procEx instanceof TableExistsException); + } else { + // the operation failed because the table does not exist + tableDeleted = (procEx instanceof TableNotFoundException); + } + } else { + // the table was deleted + tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); + } + if (tableDeleted) { + markTableAsDeleted(iProcTable.getTableName()); + return; + } + } else { + // No cleanup for ServerProcedureInterface types, yet. + return; + } + } + + private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) { + if (IterableList.isLinked(queue)) return; + if (!queue.isEmpty()) { + fairq.add(queue); + queueSize += queue.size(); + } + } + + private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) { + if (!IterableList.isLinked(queue)) return; + fairq.remove(queue); + queueSize -= queue.size(); + } + + // ============================================================================ + // TODO: Metrics + // ============================================================================ + public long getPollCalls() { + return pollCalls; + } + + public long getNullPollCalls() { + return nullPollCalls; + } + + // ============================================================================ + // Event Helpers + // ============================================================================ + public boolean waitEvent(ProcedureEvent event, Procedure procedure) { + return waitEvent(event, procedure, false); + } + + public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { + synchronized (event) { + if (event.isReady()) { + return false; + } + + // TODO: Suspend single procedure not implemented yet, fallback to suspending the queue + if (!suspendQueue) suspendQueue = true; + + if (isTableProcedure(procedure)) { + suspendTableQueue(event, getTableName(procedure)); + } else if (isServerProcedure(procedure)) { + suspendServerQueue(event, getServerName(procedure)); + } else { + // TODO: at the moment we only have Table and Server procedures + // if you are implementing a non-table/non-server procedure, you have two options: create + // a group for all the non-table/non-server procedures or try to find a key for your + // non-table/non-server procedures and implement something similar to the TableRunQueue. + throw new UnsupportedOperationException( + "RQs for non-table/non-server procedures are not implemented yet"); + } + } + return true; + } + + private void suspendTableQueue(ProcedureEvent event, TableName tableName) { + schedLock.lock(); + try { + TableQueue queue = getTableQueue(tableName); + if (!queue.setSuspended(true)) return; + + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend table queue " + tableName); + } + removeFromRunQueue(tableRunQueue, queue); + event.suspendTableQueue(queue); + } finally { + schedLock.unlock(); + } + } + + private void suspendServerQueue(ProcedureEvent event, ServerName serverName) { + schedLock.lock(); + try { + // TODO: This will change once we have the new AM + ServerQueue queue = getServerQueue(serverName); + if (!queue.setSuspended(true)) return; + + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend server queue " + serverName); + } + removeFromRunQueue(serverRunQueue, queue); + event.suspendServerQueue(queue); + } finally { + schedLock.unlock(); + } + } + + public void suspend(ProcedureEvent event) { + synchronized (event) { + event.setReady(false); + if (LOG.isDebugEnabled()) { + LOG.debug("Suspend event " + event); + } + } + } + + public void wake(ProcedureEvent event) { + synchronized (event) { + event.setReady(true); + if (LOG.isDebugEnabled()) { + LOG.debug("Wake event " + event); + } + + schedLock.lock(); + try { + while (event.hasWaitingTables()) { + Queue<TableName> queue = event.popWaitingTable(); + addToRunQueue(tableRunQueue, queue); + } + // TODO: This will change once we have the new AM + while (event.hasWaitingServers()) { + Queue<ServerName> queue = event.popWaitingServer(); + addToRunQueue(serverRunQueue, queue); + } + + if (queueSize > 1) { + schedWaitCond.signalAll(); + } else if (queueSize > 0) { + schedWaitCond.signal(); + } + } finally { + schedLock.unlock(); + } + } + } + + public static class ProcedureEvent { + private final String description; + + private Queue<ServerName> waitingServers = null; + private Queue<TableName> waitingTables = null; + private boolean ready = false; + + public ProcedureEvent(String description) { + this.description = description; + } + + public synchronized boolean isReady() { + return ready; + } + + private synchronized void setReady(boolean isReady) { + this.ready = isReady; + } + + private void suspendTableQueue(Queue<TableName> queue) { + waitingTables = IterableList.append(waitingTables, queue); + } + + private void suspendServerQueue(Queue<ServerName> queue) { + waitingServers = IterableList.append(waitingServers, queue); + } + + private boolean hasWaitingTables() { + return waitingTables != null; + } + + private Queue<TableName> popWaitingTable() { + Queue<TableName> node = waitingTables; + waitingTables = IterableList.remove(waitingTables, node); + node.setSuspended(false); + return node; + } + + private boolean hasWaitingServers() { + return waitingServers != null; + } + + private Queue<ServerName> popWaitingServer() { + Queue<ServerName> node = waitingServers; + waitingServers = IterableList.remove(waitingServers, node); + node.setSuspended(false); + return node; + } + + @Override + public String toString() { + return String.format("ProcedureEvent(%s)", description); + } + } + + // ============================================================================ + // Table Queue Lookup Helpers + // ============================================================================ + private TableQueue getTableQueueWithLock(TableName tableName) { + schedLock.lock(); + try { + return getTableQueue(tableName); + } finally { + schedLock.unlock(); + } + } + + private TableQueue getTableQueue(TableName tableName) { + Queue<TableName> node = AvlTree.get(tableMap, tableName); + if (node != null) return (TableQueue)node; + + node = new TableQueue(tableName, getTablePriority(tableName)); + tableMap = AvlTree.insert(tableMap, node); + return (TableQueue)node; + } + + private void removeTableQueue(TableName tableName) { + tableMap = AvlTree.remove(tableMap, tableName); + } + + private int getTablePriority(TableName tableName) { + if (tableName.equals(TableName.META_TABLE_NAME)) { + return metaTablePriority; + } else if (tableName.isSystemTable()) { + return sysTablePriority; + } + return userTablePriority; + } + + private static boolean isTableProcedure(Procedure proc) { + return proc instanceof TableProcedureInterface; + } + + private static TableName getTableName(Procedure proc) { + return ((TableProcedureInterface)proc).getTableName(); + } + + // ============================================================================ + // Server Queue Lookup Helpers + // ============================================================================ + private ServerQueue getServerQueueWithLock(ServerName serverName) { + schedLock.lock(); + try { + return getServerQueue(serverName); + } finally { + schedLock.unlock(); + } + } + + private ServerQueue getServerQueue(ServerName serverName) { + int index = getBucketIndex(serverBuckets, serverName.hashCode()); + Queue<ServerName> root = getTreeRoot(serverBuckets, index); + Queue<ServerName> node = AvlTree.get(root, serverName); + if (node != null) return (ServerQueue)node; + + node = new ServerQueue(serverName); + serverBuckets[index] = AvlTree.insert(root, node); + return (ServerQueue)node; + } + + private void removeServerQueue(ServerName serverName) { + int index = getBucketIndex(serverBuckets, serverName.hashCode()); + serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName); + } + + @SuppressWarnings("unchecked") + private static <T extends Comparable<T>> Queue<T> getTreeRoot(Object[] buckets, int index) { + return (Queue<T>) buckets[index]; + } + + private static int getBucketIndex(Object[] buckets, int hashCode) { + return Math.abs(hashCode) % buckets.length; + } + + private static boolean isServerProcedure(Procedure proc) { + return proc instanceof ServerProcedureInterface; + } + + private static ServerName getServerName(Procedure proc) { + return ((ServerProcedureInterface)proc).getServerName(); + } + + // ============================================================================ + // Table and Server Queue Implementation + // ============================================================================ + public static class ServerQueue extends QueueImpl<ServerName> { + public ServerQueue(ServerName serverName) { + super(serverName); + } + + public boolean requireExclusiveLock(Procedure proc) { + ServerProcedureInterface spi = (ServerProcedureInterface)proc; + switch (spi.getServerOperationType()) { + case CRASH_HANDLER: + return true; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType()); + } + } + + public static class TableQueue extends QueueImpl<TableName> { + private TableLock tableLock = null; + + public TableQueue(TableName tableName, int priority) { + super(tableName, priority); + } + + // TODO: We can abort pending/in-progress operation if the new call is + // something like drop table. We can Override addBack(), + // check the type and abort all the in-flight procedurs. + private boolean canAbortPendingOperations(Procedure proc) { + TableProcedureInterface tpi = (TableProcedureInterface)proc; + switch (tpi.getTableOperationType()) { + case DELETE: + return true; + default: + return false; + } + } + + public boolean requireExclusiveLock(Procedure proc) { + TableProcedureInterface tpi = (TableProcedureInterface)proc; + switch (tpi.getTableOperationType()) { + case CREATE: + case DELETE: + case DISABLE: + case EDIT: + case ENABLE: + return true; + case READ: + return false; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType()); + } + + private synchronized boolean trySharedLock(final TableLockManager lockManager, + final String purpose) { + if (hasExclusiveLock()) return false; + + // Take zk-read-lock + TableName tableName = getKey(); + tableLock = lockManager.readLock(tableName, purpose); + try { + tableLock.acquire(); + } catch (IOException e) { + LOG.error("failed acquire read lock on " + tableName, e); + tableLock = null; + return false; + } + + trySharedLock(); + return true; + } + + private synchronized void releaseSharedLock(final TableLockManager lockManager) { + releaseTableLock(lockManager, isSingleSharedLock()); + releaseSharedLock(); + } + + private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager, + final String purpose) { + // Take zk-write-lock + TableName tableName = getKey(); + tableLock = lockManager.writeLock(tableName, purpose); + try { + tableLock.acquire(); + } catch (IOException e) { + LOG.error("failed acquire write lock on " + tableName, e); + tableLock = null; + return false; + } + return true; + } + + private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) { + releaseTableLock(lockManager, true); + } + + private void releaseTableLock(final TableLockManager lockManager, boolean reset) { + for (int i = 0; i < 3; ++i) { + try { + tableLock.release(); + if (reset) { + tableLock = null; + } + break; + } catch (IOException e) { + LOG.warn("Could not release the table write-lock", e); + } + } + } + } + + // ============================================================================ + // Locking Helpers + // ============================================================================ + /** + * Try to acquire the exclusive lock on the specified table. + * other operations in the table-queue will be executed after the lock is released. + * @param table Table to lock + * @param purpose Human readable reason for locking the table + * @return true if we were able to acquire the lock on the table, otherwise false. + */ + public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) { + schedLock.lock(); + TableQueue queue = getTableQueue(table); + boolean hasXLock = queue.tryExclusiveLock(); + if (!hasXLock) { + schedLock.unlock(); + return false; + } + + removeFromRunQueue(tableRunQueue, queue); + schedLock.unlock(); + + // Zk lock is expensive... + hasXLock = queue.tryZkExclusiveLock(lockManager, purpose); + if (!hasXLock) { + schedLock.lock(); + queue.releaseExclusiveLock(); + addToRunQueue(tableRunQueue, queue); + schedLock.unlock(); + } + return hasXLock; + } + + /** + * Release the exclusive lock taken with tryAcquireTableWrite() + * @param table the name of the table that has the exclusive lock + */ + public void releaseTableExclusiveLock(final TableName table) { + schedLock.lock(); + TableQueue queue = getTableQueue(table); + schedLock.unlock(); + + // Zk lock is expensive... + queue.releaseZkExclusiveLock(lockManager); + + schedLock.lock(); + queue.releaseExclusiveLock(); + addToRunQueue(tableRunQueue, queue); + schedLock.unlock(); + } + + /** + * Try to acquire the shared lock on the specified table. + * other "read" operations in the table-queue may be executed concurrently, + * @param table Table to lock + * @param purpose Human readable reason for locking the table + * @return true if we were able to acquire the lock on the table, otherwise false. + */ + public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) { + return getTableQueueWithLock(table).trySharedLock(lockManager, purpose); + } + + /** + * Release the shared lock taken with tryAcquireTableRead() + * @param table the name of the table that has the shared lock + */ + public void releaseTableSharedLock(final TableName table) { + getTableQueueWithLock(table).releaseSharedLock(lockManager); + } + + /** + * Tries to remove the queue and the table-lock of the specified table. + * If there are new operations pending (e.g. a new create), + * the remove will not be performed. + * @param table the name of the table that should be marked as deleted + * @return true if deletion succeeded, false otherwise meaning that there are + * other new operations pending for that table (e.g. a new create). + */ + protected boolean markTableAsDeleted(final TableName table) { + final ReentrantLock l = schedLock; + l.lock(); + try { + TableQueue queue = getTableQueue(table); + if (queue == null) return true; + + if (queue.isEmpty() && queue.acquireDeleteLock()) { + // remove the table from the run-queue and the map + if (IterableList.isLinked(queue)) { + tableRunQueue.remove(queue); + } + + // Remove the table lock + try { + lockManager.tableDeleted(table); + } catch (IOException e) { + LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical + } + + removeTableQueue(table); + } else { + // TODO: If there are no create, we can drop all the other ops + return false; + } + } finally { + l.unlock(); + } + return true; + } + + // ============================================================================ + // Server Locking Helpers + // ============================================================================ + /** + * Release the exclusive lock + * @see #tryAcquireServerExclusiveLock(ServerName) + * @param serverName the server that has the exclusive lock + */ + public boolean tryAcquireServerExclusiveLock(final ServerName serverName) { + schedLock.lock(); + try { + ServerQueue queue = getServerQueue(serverName); + if (queue.tryExclusiveLock()) { + removeFromRunQueue(serverRunQueue, queue); + return true; + } + } finally { + schedLock.unlock(); + } + return false; + } + + /** + * Release the exclusive lock + * @see #tryAcquireServerExclusiveLock(ServerName) + * @param serverName the server that has the exclusive lock + */ + public void releaseServerExclusiveLock(final ServerName serverName) { + schedLock.lock(); + try { + ServerQueue queue = getServerQueue(serverName); + queue.releaseExclusiveLock(); + addToRunQueue(serverRunQueue, queue); + } finally { + schedLock.unlock(); + } + } + + /** + * Try to acquire the shared lock on the specified server. + * @see #releaseServerSharedLock(ServerName) + * @param serverName Server to lock + * @return true if we were able to acquire the lock on the server, otherwise false. + */ + public boolean tryAcquireServerSharedLock(final ServerName serverName) { + return getServerQueueWithLock(serverName).trySharedLock(); + } + + /** + * Release the shared lock taken + * @see #tryAcquireServerSharedLock(ServerName) + * @param serverName the server that has the shared lock + */ + public void releaseServerSharedLock(final ServerName serverName) { + getServerQueueWithLock(serverName).releaseSharedLock(); + } + + // ============================================================================ + // Generic Helpers + // ============================================================================ + private static interface QueueInterface { + boolean isAvailable(); + boolean isEmpty(); + int size(); + void add(Procedure proc, boolean addFront); + boolean requireExclusiveLock(Procedure proc); + Procedure poll(); + + boolean isSuspended(); + } + + private static abstract class Queue<TKey extends Comparable<TKey>> implements QueueInterface { + private Queue<TKey> avlRight = null; + private Queue<TKey> avlLeft = null; + private int avlHeight = 1; + + private Queue<TKey> iterNext = null; + private Queue<TKey> iterPrev = null; + private boolean suspended = false; + + private boolean exclusiveLock = false; + private int sharedLock = 0; + + private final TKey key; + private final int priority; + + public Queue(TKey key) { + this(key, 1); + } + + public Queue(TKey key, int priority) { + this.key = key; + this.priority = priority; + } + + protected TKey getKey() { + return key; + } + + protected int getPriority() { + return priority; + } + + /** + * True if the queue is not in the run-queue and it is owned by an event. + */ + public boolean isSuspended() { + return suspended; + } + + protected boolean setSuspended(boolean isSuspended) { + if (this.suspended == isSuspended) return false; + this.suspended = isSuspended; + return true; + } + + // ====================================================================== + // Read/Write Locking helpers + // ====================================================================== + public synchronized boolean isLocked() { + return hasExclusiveLock() || sharedLock > 0; + } + + public synchronized boolean hasExclusiveLock() { + return this.exclusiveLock; + } + + public synchronized boolean trySharedLock() { + if (hasExclusiveLock()) return false; + sharedLock++; + return true; + } + + public synchronized void releaseSharedLock() { + sharedLock--; + } + + protected synchronized boolean isSingleSharedLock() { + return sharedLock == 1; + } + + public synchronized boolean tryExclusiveLock() { + if (isLocked()) return false; + exclusiveLock = true; + return true; + } + + public synchronized void releaseExclusiveLock() { + exclusiveLock = false; + } + + public synchronized boolean acquireDeleteLock() { + return tryExclusiveLock(); + } + + // This should go away when we have the new AM and its events + // and we move xlock to the lock-event-queue. + public synchronized boolean isAvailable() { + return !exclusiveLock && !isEmpty(); + } + + // ====================================================================== + // Generic Helpers + // ====================================================================== + public int compareKey(TKey cmpKey) { + return key.compareTo(cmpKey); + } + + public int compareTo(Queue<TKey> other) { + return compareKey(other.key); + } + + @Override + public String toString() { + return String.format("%s(%s)", getClass().getSimpleName(), key); + } + } + + // ====================================================================== + // Helper Data Structures + // ====================================================================== + private static abstract class QueueImpl<TKey extends Comparable<TKey>> extends Queue<TKey> { + private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>(); + + public QueueImpl(TKey key) { + super(key); + } + + public QueueImpl(TKey key, int priority) { + super(key, priority); + } + + public void add(final Procedure proc, final boolean addToFront) { + if (addToFront) { + addFront(proc); + } else { + addBack(proc); + } + } + + protected void addFront(final Procedure proc) { + runnables.addFirst(proc); + } + + protected void addBack(final Procedure proc) { + runnables.addLast(proc); + } + + @Override + public Procedure poll() { + return runnables.poll(); + } + + @Override + public boolean isEmpty() { + return runnables.isEmpty(); + } + + public int size() { + return runnables.size(); + } + } + + private static class FairQueue<T extends Comparable<T>> { + private final int quantum; + + private Queue<T> currentQueue = null; + private Queue<T> queueHead = null; + private int currentQuantum = 0; + + public FairQueue() { + this(1); + } + + public FairQueue(int quantum) { + this.quantum = quantum; + } + + public void add(Queue<T> queue) { + queueHead = IterableList.append(queueHead, queue); + if (currentQueue == null) setNextQueue(queueHead); + } + + public void remove(Queue<T> queue) { + Queue<T> nextQueue = queue.iterNext; + queueHead = IterableList.remove(queueHead, queue); + if (currentQueue == queue) { + setNextQueue(queueHead != null ? nextQueue : null); + } + } + + public Queue<T> poll() { + if (currentQuantum == 0) { + if (!nextQueue()) { + return null; // nothing here + } + currentQuantum = calculateQuantum(currentQueue) - 1; + } else { + currentQuantum--; + } + + // This should go away when we have the new AM and its events + if (!currentQueue.isAvailable()) { + Queue<T> lastQueue = currentQueue; + do { + if (!nextQueue()) + return null; + } while (currentQueue != lastQueue && !currentQueue.isAvailable()); + + currentQuantum = calculateQuantum(currentQueue) - 1; + } + return currentQueue; + } + + private boolean nextQueue() { + if (currentQueue == null) return false; + currentQueue = currentQueue.iterNext; + return currentQueue != null; + } + + private void setNextQueue(Queue<T> queue) { + currentQueue = queue; + if (queue != null) { + currentQuantum = calculateQuantum(currentQueue); + } else { + currentQuantum = 0; + } + } + + private int calculateQuantum(final Queue queue) { + return Math.max(1, queue.getPriority() * quantum); // TODO + } + } + + private static class AvlTree { + public static <T extends Comparable<T>> Queue<T> get(Queue<T> root, T key) { + while (root != null) { + int cmp = root.compareKey(key); + if (cmp > 0) { + root = root.avlLeft; + } else if (cmp < 0) { + root = root.avlRight; + } else { + return root; + } + } + return null; + } + + public static <T extends Comparable<T>> Queue<T> getFirst(Queue<T> root) { + if (root != null) { + while (root.avlLeft != null) { + root = root.avlLeft; + } + } + return root; + } + + public static <T extends Comparable<T>> Queue<T> getLast(Queue<T> root) { + if (root != null) { + while (root.avlRight != null) { + root = root.avlRight; + } + } + return root; + } + + public static <T extends Comparable<T>> Queue<T> insert(Queue<T> root, Queue<T> node) { + if (root == null) return node; + if (node.compareTo(root) < 0) { + root.avlLeft = insert(root.avlLeft, node); + } else { + root.avlRight = insert(root.avlRight, node); + } + return balance(root); + } + + private static <T extends Comparable<T>> Queue<T> removeMin(Queue<T> p) { + if (p.avlLeft == null) + return p.avlRight; + p.avlLeft = removeMin(p.avlLeft); + return balance(p); + } + + public static <T extends Comparable<T>> Queue<T> remove(Queue<T> root, T key) { + if (root == null) return null; + + int cmp = root.compareKey(key); + if (cmp == 0) { + Queue<T> q = root.avlLeft; + Queue<T> r = root.avlRight; + if (r == null) return q; + Queue<T> min = getFirst(r); + min.avlRight = removeMin(r); + min.avlLeft = q; + return balance(min); + } else if (cmp > 0) { + root.avlLeft = remove(root.avlLeft, key); + } else /* if (cmp < 0) */ { + root.avlRight = remove(root.avlRight, key); + } + return balance(root); + } + + private static <T extends Comparable<T>> Queue<T> balance(Queue<T> p) { + fixHeight(p); + int balance = balanceFactor(p); + if (balance == 2) { + if (balanceFactor(p.avlRight) < 0) { + p.avlRight = rotateRight(p.avlRight); + } + return rotateLeft(p); + } else if (balance == -2) { + if (balanceFactor(p.avlLeft) > 0) { + p.avlLeft = rotateLeft(p.avlLeft); + } + return rotateRight(p); + } + return p; + } + + private static <T extends Comparable<T>> Queue<T> rotateRight(Queue<T> p) { + Queue<T> q = p.avlLeft; + p.avlLeft = q.avlRight; + q.avlRight = p; + fixHeight(p); + fixHeight(q); + return q; + } + + private static <T extends Comparable<T>> Queue<T> rotateLeft(Queue<T> q) { + Queue<T> p = q.avlRight; + q.avlRight = p.avlLeft; + p.avlLeft = q; + fixHeight(q); + fixHeight(p); + return p; + } + + private static <T extends Comparable<T>> void fixHeight(Queue<T> node) { + int heightLeft = height(node.avlLeft); + int heightRight = height(node.avlRight); + node.avlHeight = 1 + Math.max(heightLeft, heightRight); + } + + private static <T extends Comparable<T>> int height(Queue<T> node) { + return node != null ? node.avlHeight : 0; + } + + private static <T extends Comparable<T>> int balanceFactor(Queue<T> node) { + return height(node.avlRight) - height(node.avlLeft); + } + } + + private static class IterableList { + public static <T extends Comparable<T>> Queue<T> prepend(Queue<T> head, Queue<T> node) { + assert !isLinked(node) : node + " is already linked"; + if (head != null) { + Queue<T> tail = head.iterPrev; + tail.iterNext = node; + head.iterPrev = node; + node.iterNext = head; + node.iterPrev = tail; + } else { + node.iterNext = node; + node.iterPrev = node; + } + return node; + } + + public static <T extends Comparable<T>> Queue<T> append(Queue<T> head, Queue<T> node) { + assert !isLinked(node) : node + " is already linked"; + if (head != null) { + Queue<T> tail = head.iterPrev; + tail.iterNext = node; + node.iterNext = head; + node.iterPrev = tail; + head.iterPrev = node; + return head; + } + node.iterNext = node; + node.iterPrev = node; + return node; + } + + public static <T extends Comparable<T>> Queue<T> appendList(Queue<T> head, Queue<T> otherHead) { + if (head == null) return otherHead; + if (otherHead == null) return head; + + Queue<T> tail = head.iterPrev; + Queue<T> otherTail = otherHead.iterPrev; + tail.iterNext = otherHead; + otherHead.iterPrev = tail; + otherTail.iterNext = head; + head.iterPrev = otherTail; + return head; + } + + private static <T extends Comparable<T>> Queue<T> remove(Queue<T> head, Queue<T> node) { + assert isLinked(node) : node + " is not linked"; + if (node != node.iterNext) { + node.iterPrev.iterNext = node.iterNext; + node.iterNext.iterPrev = node.iterPrev; + head = (head == node) ? node.iterNext : head; + } else { + head = null; + } + node.iterNext = null; + node.iterPrev = null; + return head; + } + + private static <T extends Comparable<T>> boolean isLinked(Queue<T> node) { + return node.iterPrev != null && node.iterNext != null; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java index b858e0c..3a30527 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyColumnFamilyProcedure.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -182,10 +181,8 @@ public class ModifyColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_MODIFY_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "modify family"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java index a6300dd..6663e46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos; @@ -215,10 +214,8 @@ public class ModifyTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - getTableName(), - EventType.C_M_MODIFY_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "modify table"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index bdcd89c..970c9c9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -553,13 +553,13 @@ implements ServerProcedureInterface { @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.getMasterServices().isServerCrashProcessingEnabled()) return false; - return env.getProcedureQueue().tryAcquireServerExclusiveLock(this); + if (env.waitServerCrashProcessingEnabled(this)) return false; + return env.getProcedureQueue().tryAcquireServerExclusiveLock(getServerName()); } @Override protected void releaseLock(final MasterProcedureEnv env) { - env.getProcedureQueue().releaseServerExclusiveLock(this); + env.getProcedureQueue().releaseServerExclusiveLock(getServerName()); } @Override @@ -751,6 +751,11 @@ implements ServerProcedureInterface { return this.carryingMeta; } + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.CRASH_HANDLER; + } + /** * For this procedure, yield at end of each successful flow step so that all crashed servers * can make progress rather than do the default which has each procedure running to completion http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index 5b0c45f..b5c24ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -28,6 +28,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public interface ServerProcedureInterface { + public enum ServerOperationType { + CRASH_HANDLER + }; + /** * @return Name of this server instance. */ @@ -37,4 +41,12 @@ public interface ServerProcedureInterface { * @return True if this server has an hbase:meta table region. */ boolean hasMetaTableRegion(); -} \ No newline at end of file + + /** + * Given an operation type we can take decisions about what to do with pending operations. + * e.g. if we get a crash handler and we have some assignment operation pending + * we can abort those operations. + * @return the operation type that the procedure is executing. + */ + ServerOperationType getServerOperationType(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java index 2e39b80..0d17bf6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TruncateTableProcedure.java @@ -182,7 +182,7 @@ public class TruncateTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; + if (env.waitInitialized(this)) return false; return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "truncate table"); } http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index 4d0093c..99e0e3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -128,14 +128,14 @@ public class TestMaster { MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); HMaster m = cluster.getMaster(); try { - m.initialized = false; // fake it, set back later + m.setInitialized(false); // fake it, set back later HRegionInfo meta = HRegionInfo.FIRST_META_REGIONINFO; m.move(meta.getEncodedNameAsBytes(), null); fail("Region should not be moved since master is not initialized"); } catch (IOException ioe) { assertTrue(ioe instanceof PleaseHoldException); } finally { - m.initialized = true; + m.setInitialized(true); } } @@ -172,13 +172,13 @@ public class TestMaster { try { List<HRegionInfo> tableRegions = admin.getTableRegions(tableName); - master.initialized = false; // fake it, set back later + master.setInitialized(false); // fake it, set back later admin.move(tableRegions.get(0).getEncodedNameAsBytes(), null); fail("Region should not be moved since master is not initialized"); } catch (IOException ioe) { assertTrue(StringUtils.stringifyException(ioe).contains("PleaseHoldException")); } finally { - master.initialized = true; + master.setInitialized(true); TEST_UTIL.deleteTable(tableName); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java index 398a898..cafee7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterNoCluster.java @@ -306,7 +306,7 @@ public class TestMasterNoCluster { try { // Wait till master is initialized. - while (!master.initialized) Threads.sleep(10); + while (!master.isInitialized()) Threads.sleep(10); LOG.info("Master is initialized"); assertFalse("The dead server should not be pulled in", http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java new file mode 100644 index 0000000..af8d6ba --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureEvents.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Category({MasterTests.class, MediumTests.class}) +public class TestMasterProcedureEvents { + private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class); + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static long nonceGroup = HConstants.NO_NONCE; + private static long nonce = HConstants.NO_NONCE; + + private static void setupConf(Configuration conf) { + conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 8); + conf.setBoolean("hbase.procedure.store.wal.use.hsync", false); + } + + @BeforeClass + public static void setupCluster() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(3); + } + + @AfterClass + public static void cleanupTest() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test + public void testMasterInitializedEvent() throws Exception { + TableName tableName = TableName.valueOf("testMasterInitializedEvent"); + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); + MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); + + HRegionInfo hri = new HRegionInfo(tableName); + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor hcd = new HColumnDescriptor("f"); + htd.addFamily(hcd); + + while (!master.isInitialized()) Thread.sleep(250); + master.setInitialized(false); // fake it, set back later + + CreateTableProcedure proc = new CreateTableProcedure( + procExec.getEnvironment(), htd, new HRegionInfo[] { hri }); + + long pollCalls = procSched.getPollCalls(); + long nullPollCalls = procSched.getNullPollCalls(); + + long procId = procExec.submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE); + for (int i = 0; i < 10; ++i) { + Thread.sleep(100); + assertEquals(pollCalls + 1, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + master.setInitialized(true); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + assertEquals(pollCalls + 2, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + @Test + public void testServerCrashProcedureEvent() throws Exception { + TableName tableName = TableName.valueOf("testServerCrashProcedureEventTb"); + HMaster master = UTIL.getMiniHBaseCluster().getMaster(); + ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor(); + MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue(); + + while (!master.isServerCrashProcessingEnabled() || !master.isInitialized() || + master.getAssignmentManager().getRegionStates().isRegionsInTransition()) { + Thread.sleep(25); + } + + UTIL.createTable(tableName, HBaseTestingUtility.COLUMNS[0]); + try (Table t = UTIL.getConnection().getTable(tableName)) { + // Load the table with a bit of data so some logs to split and some edits in each region. + UTIL.loadTable(t, HBaseTestingUtility.COLUMNS[0]); + } + + master.setServerCrashProcessingEnabled(false); // fake it, set back later + + long pollCalls = procSched.getPollCalls(); + long nullPollCalls = procSched.getNullPollCalls(); + + // Kill a server. Master will notice but do nothing other than add it to list of dead servers. + HRegionServer hrs = getServerWithRegions(); + boolean carryingMeta = master.getAssignmentManager().isCarryingMeta(hrs.getServerName()); + UTIL.getHBaseCluster().killRegionServer(hrs.getServerName()); + hrs.join(); + + // Wait until the expiration of the server has arrived at the master. We won't process it + // by queuing a ServerCrashProcedure because we have disabled crash processing... but wait + // here so ServerManager gets notice and adds expired server to appropriate queues. + while (!master.getServerManager().isServerDead(hrs.getServerName())) Thread.sleep(10); + + // Do some of the master processing of dead servers so when SCP runs, it has expected 'state'. + master.getServerManager().moveFromOnelineToDeadServers(hrs.getServerName()); + + long procId = procExec.submitProcedure( + new ServerCrashProcedure(hrs.getServerName(), true, carryingMeta)); + + for (int i = 0; i < 10; ++i) { + Thread.sleep(100); + assertEquals(pollCalls + 1, procSched.getPollCalls()); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + } + + // Now, reenable processing else we can't get a lock on the ServerCrashProcedure. + master.setServerCrashProcessingEnabled(true); + ProcedureTestingUtility.waitProcedure(procExec, procId); + + LOG.debug("server crash processing poll calls: " + procSched.getPollCalls()); + assertTrue(procSched.getPollCalls() >= (pollCalls + 2)); + assertEquals(nullPollCalls, procSched.getNullPollCalls()); + + UTIL.deleteTable(tableName); + } + + private HRegionServer getServerWithRegions() { + for (int i = 0; i < 3; ++i) { + HRegionServer hrs = UTIL.getHBaseCluster().getRegionServer(i); + if (hrs.getNumberOfOnlineRegions() > 0) { + return hrs; + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java deleted file mode 100644 index 7e6e356..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java +++ /dev/null @@ -1,484 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.master.procedure; - - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.testclassification.MasterTests; - -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -@Category({MasterTests.class, SmallTests.class}) -public class TestMasterProcedureQueue { - private static final Log LOG = LogFactory.getLog(TestMasterProcedureQueue.class); - - private MasterProcedureQueue queue; - private Configuration conf; - - @Before - public void setUp() throws IOException { - conf = HBaseConfiguration.create(); - queue = new MasterProcedureQueue(conf, new TableLockManager.NullTableLockManager()); - } - - @After - public void tearDown() throws IOException { - assertEquals(0, queue.size()); - } - - @Test - public void testConcurrentCreateDelete() throws Exception { - final MasterProcedureQueue procQueue = queue; - final TableName table = TableName.valueOf("testtb"); - final AtomicBoolean running = new AtomicBoolean(true); - final AtomicBoolean failure = new AtomicBoolean(false); - Thread createThread = new Thread() { - @Override - public void run() { - try { - while (running.get() && !failure.get()) { - if (procQueue.tryAcquireTableExclusiveLock(table, "create")) { - procQueue.releaseTableExclusiveLock(table); - } - } - } catch (Throwable e) { - LOG.error("create failed", e); - failure.set(true); - } - } - }; - - Thread deleteThread = new Thread() { - @Override - public void run() { - try { - while (running.get() && !failure.get()) { - if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) { - procQueue.releaseTableExclusiveLock(table); - } - procQueue.markTableAsDeleted(table); - } - } catch (Throwable e) { - LOG.error("delete failed", e); - failure.set(true); - } - } - }; - - createThread.start(); - deleteThread.start(); - for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) { - Thread.sleep(100); - } - running.set(false); - createThread.join(); - deleteThread.join(); - assertEquals(false, failure.get()); - } - - /** - * Verify simple create/insert/fetch/delete of the table queue. - */ - @Test - public void testSimpleTableOpsQueues() throws Exception { - final int NUM_TABLES = 10; - final int NUM_ITEMS = 10; - - int count = 0; - for (int i = 1; i <= NUM_TABLES; ++i) { - TableName tableName = TableName.valueOf(String.format("test-%04d", i)); - // insert items - for (int j = 1; j <= NUM_ITEMS; ++j) { - queue.addBack(new TestTableProcedure(i * 1000 + j, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - assertEquals(++count, queue.size()); - } - } - assertEquals(NUM_TABLES * NUM_ITEMS, queue.size()); - - for (int j = 1; j <= NUM_ITEMS; ++j) { - for (int i = 1; i <= NUM_TABLES; ++i) { - Long procId = queue.poll(); - assertEquals(--count, queue.size()); - assertEquals(i * 1000 + j, procId.longValue()); - } - } - assertEquals(0, queue.size()); - - for (int i = 1; i <= NUM_TABLES; ++i) { - TableName tableName = TableName.valueOf(String.format("test-%04d", i)); - // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); - } - } - - /** - * Check that the table queue is not deletable until every procedure - * in-progress is completed (this is a special case for write-locks). - */ - @Test - public void testCreateDeleteTableOperationsWithWriteLock() throws Exception { - TableName tableName = TableName.valueOf("testtb"); - - queue.addBack(new TestTableProcedure(1, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - - // table can't be deleted because one item is in the queue - assertFalse(queue.markTableAsDeleted(tableName)); - - // fetch item and take a lock - assertEquals(1, queue.poll().longValue()); - // take the xlock - assertTrue(queue.tryAcquireTableExclusiveLock(tableName, "write")); - // table can't be deleted because we have the lock - assertEquals(0, queue.size()); - assertFalse(queue.markTableAsDeleted(tableName)); - // release the xlock - queue.releaseTableExclusiveLock(tableName); - // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); - } - - /** - * Check that the table queue is not deletable until every procedure - * in-progress is completed (this is a special case for read-locks). - */ - @Test - public void testCreateDeleteTableOperationsWithReadLock() throws Exception { - final TableName tableName = TableName.valueOf("testtb"); - final int nitems = 2; - - for (int i = 1; i <= nitems; ++i) { - queue.addBack(new TestTableProcedure(i, tableName, - TableProcedureInterface.TableOperationType.READ)); - } - - // table can't be deleted because one item is in the queue - assertFalse(queue.markTableAsDeleted(tableName)); - - for (int i = 1; i <= nitems; ++i) { - // fetch item and take a lock - assertEquals(i, queue.poll().longValue()); - // take the rlock - assertTrue(queue.tryAcquireTableSharedLock(tableName, "read " + i)); - // table can't be deleted because we have locks and/or items in the queue - assertFalse(queue.markTableAsDeleted(tableName)); - } - - for (int i = 1; i <= nitems; ++i) { - // table can't be deleted because we have locks - assertFalse(queue.markTableAsDeleted(tableName)); - // release the rlock - queue.releaseTableSharedLock(tableName); - } - - // there are no items and no lock in the queeu - assertEquals(0, queue.size()); - // complete the table deletion - assertTrue(queue.markTableAsDeleted(tableName)); - } - - /** - * Verify the correct logic of RWLocks on the queue - */ - @Test - public void testVerifyRwLocks() throws Exception { - TableName tableName = TableName.valueOf("testtb"); - queue.addBack(new TestTableProcedure(1, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - queue.addBack(new TestTableProcedure(2, tableName, - TableProcedureInterface.TableOperationType.READ)); - queue.addBack(new TestTableProcedure(3, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - queue.addBack(new TestTableProcedure(4, tableName, - TableProcedureInterface.TableOperationType.READ)); - queue.addBack(new TestTableProcedure(5, tableName, - TableProcedureInterface.TableOperationType.READ)); - - // Fetch the 1st item and take the write lock - Long procId = queue.poll(); - assertEquals(1, procId.longValue()); - assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); - - // Fetch the 2nd item and verify that the lock can't be acquired - assertEquals(null, queue.poll()); - - // Release the write lock and acquire the read lock - queue.releaseTableExclusiveLock(tableName); - - // Fetch the 2nd item and take the read lock - procId = queue.poll(); - assertEquals(2, procId.longValue()); - assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); - - // Fetch the 3rd item and verify that the lock can't be acquired - procId = queue.poll(); - assertEquals(3, procId.longValue()); - assertEquals(false, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); - - // release the rdlock of item 2 and take the wrlock for the 3d item - queue.releaseTableSharedLock(tableName); - assertEquals(true, queue.tryAcquireTableExclusiveLock(tableName, "write " + procId)); - - // Fetch 4th item and verify that the lock can't be acquired - assertEquals(null, queue.poll()); - - // Release the write lock and acquire the read lock - queue.releaseTableExclusiveLock(tableName); - - // Fetch the 4th item and take the read lock - procId = queue.poll(); - assertEquals(4, procId.longValue()); - assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); - - // Fetch the 4th item and take the read lock - procId = queue.poll(); - assertEquals(5, procId.longValue()); - assertEquals(true, queue.tryAcquireTableSharedLock(tableName, "read " + procId)); - - // Release 4th and 5th read-lock - queue.releaseTableSharedLock(tableName); - queue.releaseTableSharedLock(tableName); - - // remove table queue - assertEquals(0, queue.size()); - assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName)); - } - - /** - * Verify that "write" operations for a single table are serialized, - * but different tables can be executed in parallel. - */ - @Test(timeout=90000) - public void testConcurrentWriteOps() throws Exception { - final TestTableProcSet procSet = new TestTableProcSet(queue); - - final int NUM_ITEMS = 10; - final int NUM_TABLES = 4; - final AtomicInteger opsCount = new AtomicInteger(0); - for (int i = 0; i < NUM_TABLES; ++i) { - TableName tableName = TableName.valueOf(String.format("testtb-%04d", i)); - for (int j = 1; j < NUM_ITEMS; ++j) { - procSet.addBack(new TestTableProcedure(i * 100 + j, tableName, - TableProcedureInterface.TableOperationType.EDIT)); - opsCount.incrementAndGet(); - } - } - assertEquals(opsCount.get(), queue.size()); - - final Thread[] threads = new Thread[NUM_TABLES * 2]; - final HashSet<TableName> concurrentTables = new HashSet<TableName>(); - final ArrayList<String> failures = new ArrayList<String>(); - final AtomicInteger concurrentCount = new AtomicInteger(0); - for (int i = 0; i < threads.length; ++i) { - threads[i] = new Thread() { - @Override - public void run() { - while (opsCount.get() > 0) { - try { - TableProcedureInterface proc = procSet.acquire(); - if (proc == null) { - queue.signalAll(); - if (opsCount.get() > 0) { - continue; - } - break; - } - synchronized (concurrentTables) { - assertTrue("unexpected concurrency on " + proc.getTableName(), - concurrentTables.add(proc.getTableName())); - } - assertTrue(opsCount.decrementAndGet() >= 0); - try { - long procId = ((Procedure)proc).getProcId(); - TableName tableId = proc.getTableName(); - int concurrent = concurrentCount.incrementAndGet(); - assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES, - concurrent >= 1 && concurrent <= NUM_TABLES); - LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); - Thread.sleep(2000); - concurrent = concurrentCount.decrementAndGet(); - LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent); - assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES); - } finally { - synchronized (concurrentTables) { - assertTrue(concurrentTables.remove(proc.getTableName())); - } - procSet.release(proc); - } - } catch (Throwable e) { - LOG.error("Failed " + e.getMessage(), e); - synchronized (failures) { - failures.add(e.getMessage()); - } - } finally { - queue.signalAll(); - } - } - } - }; - threads[i].start(); - } - for (int i = 0; i < threads.length; ++i) { - threads[i].join(); - } - assertTrue(failures.toString(), failures.isEmpty()); - assertEquals(0, opsCount.get()); - assertEquals(0, queue.size()); - - for (int i = 1; i <= NUM_TABLES; ++i) { - TableName table = TableName.valueOf(String.format("testtb-%04d", i)); - assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table)); - } - } - - public static class TestTableProcSet { - private final MasterProcedureQueue queue; - private Map<Long, TableProcedureInterface> procsMap = - new ConcurrentHashMap<Long, TableProcedureInterface>(); - - public TestTableProcSet(final MasterProcedureQueue queue) { - this.queue = queue; - } - - public void addBack(TableProcedureInterface tableProc) { - Procedure proc = (Procedure)tableProc; - procsMap.put(proc.getProcId(), tableProc); - queue.addBack(proc); - } - - public void addFront(TableProcedureInterface tableProc) { - Procedure proc = (Procedure)tableProc; - procsMap.put(proc.getProcId(), tableProc); - queue.addFront(proc); - } - - public TableProcedureInterface acquire() { - TableProcedureInterface proc = null; - boolean avail = false; - while (!avail) { - Long procId = queue.poll(); - proc = procId != null ? procsMap.remove(procId) : null; - if (proc == null) break; - switch (proc.getTableOperationType()) { - case CREATE: - case DELETE: - case EDIT: - avail = queue.tryAcquireTableExclusiveLock(proc.getTableName(), - "op="+ proc.getTableOperationType()); - break; - case READ: - avail = queue.tryAcquireTableSharedLock(proc.getTableName(), - "op="+ proc.getTableOperationType()); - break; - } - if (!avail) { - addFront(proc); - LOG.debug("yield procId=" + procId); - } - } - return proc; - } - - public void release(TableProcedureInterface proc) { - switch (proc.getTableOperationType()) { - case CREATE: - case DELETE: - case EDIT: - queue.releaseTableExclusiveLock(proc.getTableName()); - break; - case READ: - queue.releaseTableSharedLock(proc.getTableName()); - break; - } - } - } - - public static class TestTableProcedure extends Procedure<Void> - implements TableProcedureInterface { - private final TableOperationType opType; - private final TableName tableName; - - public TestTableProcedure() { - throw new UnsupportedOperationException("recovery should not be triggered here"); - } - - public TestTableProcedure(long procId, TableName tableName, TableOperationType opType) { - this.tableName = tableName; - this.opType = opType; - setProcId(procId); - } - - @Override - public TableName getTableName() { - return tableName; - } - - @Override - public TableOperationType getTableOperationType() { - return opType; - } - - @Override - protected Procedure[] execute(Void env) { - return null; - } - - @Override - protected void rollback(Void env) { - throw new UnsupportedOperationException(); - } - - @Override - protected boolean abort(Void env) { - throw new UnsupportedOperationException(); - } - - @Override - protected void serializeStateData(final OutputStream stream) throws IOException {} - - @Override - protected void deserializeStateData(final InputStream stream) throws IOException {} - } -}