HBASE-14837 Procedure v2 - Procedure Queue Improvement
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/18a48af2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/18a48af2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/18a48af2 Branch: refs/heads/hbase-12439 Commit: 18a48af2424a9a45d24c08014d4948e3274513a1 Parents: dc57996 Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Thu Jan 14 08:29:10 2016 -0800 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Thu Jan 14 09:24:42 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/ProcedureInfo.java | 4 +- .../hbase/procedure2/ProcedureExecutor.java | 3 +- .../procedure2/ProcedureFairRunQueues.java | 174 --- .../hbase/procedure2/ProcedureRunnableSet.java | 4 +- .../procedure2/ProcedureSimpleRunQueue.java | 8 +- .../procedure2/TestProcedureFairRunQueues.java | 155 --- .../org/apache/hadoop/hbase/master/HMaster.java | 39 +- .../procedure/AddColumnFamilyProcedure.java | 7 +- .../procedure/CreateNamespaceProcedure.java | 4 +- .../master/procedure/CreateTableProcedure.java | 2 +- .../procedure/DeleteColumnFamilyProcedure.java | 7 +- .../master/procedure/DeleteTableProcedure.java | 2 +- .../master/procedure/DisableTableProcedure.java | 7 +- .../master/procedure/EnableTableProcedure.java | 7 +- .../master/procedure/MasterProcedureEnv.java | 34 +- .../master/procedure/MasterProcedureQueue.java | 578 -------- .../procedure/MasterProcedureScheduler.java | 1241 ++++++++++++++++++ .../procedure/ModifyColumnFamilyProcedure.java | 7 +- .../master/procedure/ModifyTableProcedure.java | 7 +- .../master/procedure/ServerCrashProcedure.java | 11 +- .../procedure/ServerProcedureInterface.java | 14 +- .../procedure/TruncateTableProcedure.java | 2 +- .../apache/hadoop/hbase/master/TestMaster.java | 8 +- .../hbase/master/TestMasterNoCluster.java | 2 +- .../procedure/TestMasterProcedureEvents.java | 179 +++ .../procedure/TestMasterProcedureQueue.java | 484 ------- .../procedure/TestMasterProcedureScheduler.java | 489 +++++++ 27 files changed, 2020 insertions(+), 1459 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java index b7ea47e..fca2eac 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/ProcedureInfo.java @@ -224,10 +224,10 @@ public class ProcedureInfo implements Cloneable { procProto.getOwner(), procProto.getState(), procProto.hasParentId() ? procProto.getParentId() : -1, - procProto.getState() == ProcedureState.ROLLEDBACK ? procProto.getException() : null, + procProto.hasException() ? procProto.getException() : null, procProto.getLastUpdate(), procProto.getStartTime(), - procProto.getState() == ProcedureState.FINISHED ? procProto.getResult().toByteArray() : null); + procProto.hasResult() ? procProto.getResult().toByteArray() : null); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 11073c6..74d28d7 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -785,8 +785,7 @@ public class ProcedureExecutor<TEnvironment> { */ private void execLoop() { while (isRunning()) { - Long procId = runnables.poll(); - Procedure proc = procId != null ? procedures.get(procId) : null; + Procedure proc = runnables.poll(); if (proc == null) continue; try { http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java deleted file mode 100644 index 242ae86..0000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import java.util.Map; - -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.ConcurrentSkipListMap; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * This class is a container of queues that allows to select a queue - * in a round robin fashion, considering priority of the queue. - * - * the quantum is just how many poll() will return the same object. - * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B - * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B - * then the object priority is just a priority * quantum - * - * Example: - * - three queues (A, B, C) with priorities (1, 1, 2) - * - The first poll() will return A - * - The second poll() will return B - * - The third and forth poll() will return C - * - and so on again and again. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> { - private ConcurrentSkipListMap<TKey, TQueue> objMap = - new ConcurrentSkipListMap<TKey, TQueue>(); - - private final ReentrantLock lock = new ReentrantLock(); - private final int quantum; - - private Map.Entry<TKey, TQueue> current = null; - private int currentQuantum = 0; - - public interface FairObject { - boolean isAvailable(); - int getPriority(); - } - - /** - * @param quantum how many poll() will return the same object. - */ - public ProcedureFairRunQueues(final int quantum) { - this.quantum = quantum; - } - - public TQueue get(final TKey key) { - return objMap.get(key); - } - - public TQueue add(final TKey key, final TQueue queue) { - TQueue oldq = objMap.putIfAbsent(key, queue); - return oldq != null ? oldq : queue; - } - - public TQueue remove(final TKey key) { - TQueue queue = objMap.get(key); - if (queue != null) { - lock.lock(); - try { - queue = objMap.remove(key); - if (current != null && queue == current.getValue()) { - currentQuantum = 0; - current = null; - } - } finally { - lock.unlock(); - } - } - return queue; - } - - public void clear() { - lock.lock(); - try { - currentQuantum = 0; - current = null; - objMap.clear(); - } finally { - lock.unlock(); - } - } - - /** - * @return the next available item if present - */ - public TQueue poll() { - lock.lock(); - try { - TQueue queue; - if (currentQuantum == 0) { - if (nextObject() == null) { - // nothing here - return null; - } - - queue = current.getValue(); - currentQuantum = calculateQuantum(queue) - 1; - } else { - currentQuantum--; - queue = current.getValue(); - } - - if (!queue.isAvailable()) { - Map.Entry<TKey, TQueue> last = current; - // Try the next one - do { - if (nextObject() == null) - return null; - } while (current.getValue() != last.getValue() && !current.getValue().isAvailable()); - - queue = current.getValue(); - currentQuantum = calculateQuantum(queue) - 1; - } - - return queue; - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append('{'); - for (Map.Entry<TKey, TQueue> entry: objMap.entrySet()) { - builder.append(entry.getKey()); - builder.append(':'); - builder.append(entry.getValue()); - } - builder.append('}'); - return builder.toString(); - } - - private Map.Entry<TKey, TQueue> nextObject() { - Map.Entry<TKey, TQueue> next = null; - - // If we have already a key, try the next one - if (current != null) { - next = objMap.higherEntry(current.getKey()); - } - - // if there is no higher key, go back to the first - current = (next != null) ? next : objMap.firstEntry(); - return current; - } - - private int calculateQuantum(final TQueue fairObject) { - // TODO - return Math.max(1, fairObject.getPriority() * quantum); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java index 2d7ba39..65df692 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureRunnableSet.java @@ -55,9 +55,9 @@ public interface ProcedureRunnableSet { /** * Fetch one Procedure from the queue - * @return the Procedure ID to execute, or null if nothing present. + * @return the Procedure to execute, or null if nothing present. */ - Long poll(); + Procedure poll(); /** * In case the class is blocking on poll() waiting for items to be added, http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java index 7b17fb2..d23680d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSimpleRunQueue.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { - private final Deque<Long> runnables = new ArrayDeque<Long>(); + private final Deque<Procedure> runnables = new ArrayDeque<Procedure>(); private final ReentrantLock lock = new ReentrantLock(); private final Condition waitCond = lock.newCondition(); @@ -40,7 +40,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { public void addFront(final Procedure proc) { lock.lock(); try { - runnables.addFirst(proc.getProcId()); + runnables.addFirst(proc); waitCond.signal(); } finally { lock.unlock(); @@ -51,7 +51,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { public void addBack(final Procedure proc) { lock.lock(); try { - runnables.addLast(proc.getProcId()); + runnables.addLast(proc); waitCond.signal(); } finally { lock.unlock(); @@ -65,7 +65,7 @@ public class ProcedureSimpleRunQueue implements ProcedureRunnableSet { @Override @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Long poll() { + public Procedure poll() { lock.lock(); try { if (runnables.isEmpty()) { http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java deleted file mode 100644 index e36a295..0000000 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureFairRunQueues.java +++ /dev/null @@ -1,155 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.testclassification.MasterTests; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.assertEquals; - -@Category({MasterTests.class, SmallTests.class}) -public class TestProcedureFairRunQueues { - private static class TestRunQueue implements ProcedureFairRunQueues.FairObject { - private final int priority; - private final String name; - - private boolean available = true; - - public TestRunQueue(String name, int priority) { - this.name = name; - this.priority = priority; - } - - @Override - public String toString() { - return name; - } - - private void setAvailable(boolean available) { - this.available = available; - } - - @Override - public boolean isAvailable() { - return available; - } - - @Override - public int getPriority() { - return priority; - } - } - - @Test - public void testEmptyFairQueues() throws Exception { - ProcedureFairRunQueues<String, TestRunQueue> fairq - = new ProcedureFairRunQueues<String, TestRunQueue>(1); - for (int i = 0; i < 3; ++i) { - assertEquals(null, fairq.poll()); - } - } - - @Test - public void testFairQueues() throws Exception { - ProcedureFairRunQueues<String, TestRunQueue> fairq - = new ProcedureFairRunQueues<String, TestRunQueue>(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - for (int i = 0; i < 3; ++i) { - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - } - } - - @Test - public void testFairQueuesNotAvailable() throws Exception { - ProcedureFairRunQueues<String, TestRunQueue> fairq - = new ProcedureFairRunQueues<String, TestRunQueue>(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - // m is not available - m.setAvailable(false); - for (int i = 0; i < 3; ++i) { - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - } - - // m is available - m.setAvailable(true); - for (int i = 0; i < 3; ++i) { - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(a, fairq.poll()); - assertEquals(b, fairq.poll()); - } - - // b is not available - b.setAvailable(false); - for (int i = 0; i < 3; ++i) { - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.poll()); - assertEquals(a, fairq.poll()); - } - - assertEquals(m, fairq.poll()); - m.setAvailable(false); - // m should be fetched next, but is no longer available - assertEquals(a, fairq.poll()); - assertEquals(a, fairq.poll()); - b.setAvailable(true); - for (int i = 0; i < 3; ++i) { - assertEquals(b, fairq.poll()); - assertEquals(a, fairq.poll()); - } - } - - @Test - public void testFairQueuesDelete() throws Exception { - ProcedureFairRunQueues<String, TestRunQueue> fairq - = new ProcedureFairRunQueues<String, TestRunQueue>(1); - TestRunQueue a = fairq.add("A", new TestRunQueue("A", 1)); - TestRunQueue b = fairq.add("B", new TestRunQueue("B", 1)); - TestRunQueue m = fairq.add("M", new TestRunQueue("M", 2)); - - // Fetch A and then remove it - assertEquals(a, fairq.poll()); - assertEquals(a, fairq.remove("A")); - - // Fetch B and then remove it - assertEquals(b, fairq.poll()); - assertEquals(b, fairq.remove("B")); - - // Fetch M and then remove it - assertEquals(m, fairq.poll()); - assertEquals(m, fairq.remove("M")); - - // nothing left - assertEquals(null, fairq.poll()); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8c34b91..9f5e7e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; @@ -277,14 +278,15 @@ public class HMaster extends HRegionServer implements MasterServices { // flag set after we complete initialization once active, // it is not private since it's used in unit tests - volatile boolean initialized = false; + private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); // flag set after master services are started, // initialization may have not completed yet. volatile boolean serviceStarted = false; // flag set after we complete assignMeta. - private volatile boolean serverCrashProcessingEnabled = false; + private final ProcedureEvent serverCrashProcessingEnabled = + new ProcedureEvent("server crash processing"); LoadBalancer balancer; private RegionNormalizer normalizer; @@ -781,8 +783,10 @@ public class HMaster extends HRegionServer implements MasterServices { status.markComplete("Initialization successful"); LOG.info("Master has completed initialization"); configurationManager.registerObserver(this.balancer); + // Set master as 'initialized'. - initialized = true; + setInitialized(true); + // assign the meta replicas Set<ServerName> EMPTY_SET = new HashSet<ServerName>(); int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM, @@ -976,8 +980,8 @@ public class HMaster extends HRegionServer implements MasterServices { // servers. This is required so that if meta is assigning to a server which dies after // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be // stuck here waiting forever if waitForMeta is specified. - if (!serverCrashProcessingEnabled) { - serverCrashProcessingEnabled = true; + if (!isServerCrashProcessingEnabled()) { + setServerCrashProcessingEnabled(true); this.serverManager.processQueuedDeadServers(); } @@ -1207,7 +1211,7 @@ public class HMaster extends HRegionServer implements MasterServices { public boolean balance(boolean force) throws IOException { // if master not initialized, don't run balancer. - if (!this.initialized) { + if (!isInitialized()) { LOG.debug("Master has not been initialized, don't run balancer."); return false; } @@ -1308,7 +1312,7 @@ public class HMaster extends HRegionServer implements MasterServices { * is globally disabled) */ public boolean normalizeRegions() throws IOException { - if (!this.initialized) { + if (!isInitialized()) { LOG.debug("Master has not been initialized, don't run region normalizer."); return false; } @@ -1615,7 +1619,7 @@ public class HMaster extends HRegionServer implements MasterServices { } } - private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) + private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd) throws IOException { // FIFO compaction has some requirements // Actually FCP ignores periodic major compactions @@ -1672,7 +1676,7 @@ public class HMaster extends HRegionServer implements MasterServices { } } } - + // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled. private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey, String message, Exception cause) throws IOException { @@ -2300,6 +2304,15 @@ public class HMaster extends HRegionServer implements MasterServices { */ @Override public boolean isInitialized() { + return initialized.isReady(); + } + + @VisibleForTesting + public void setInitialized(boolean isInitialized) { + procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized); + } + + public ProcedureEvent getInitializedEvent() { return initialized; } @@ -2310,12 +2323,16 @@ public class HMaster extends HRegionServer implements MasterServices { */ @Override public boolean isServerCrashProcessingEnabled() { - return this.serverCrashProcessingEnabled; + return serverCrashProcessingEnabled.isReady(); } @VisibleForTesting public void setServerCrashProcessingEnabled(final boolean b) { - this.serverCrashProcessingEnabled = b; + procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b); + } + + public ProcedureEvent getServerCrashProcessingEnabledEvent() { + return serverCrashProcessingEnabled; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java index 58da1d1..b57540b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AddColumnFamilyProcedure.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -185,10 +184,8 @@ public class AddColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_ADD_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "add family"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java index f934737..87b411e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateNamespaceProcedure.java @@ -205,7 +205,9 @@ public class CreateNamespaceProcedure return true; } - return false; + if (env.waitInitialized(this)) { + return false; + } } return getTableNamespaceManager(env).acquireExclusiveLock(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java index 7b48f3b..d786bb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java @@ -266,7 +266,7 @@ public class CreateTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized() && !getTableName().isSystemTable()) { + if (!getTableName().isSystemTable() && env.waitInitialized(this)) { return false; } return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "create table"); http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java index 5781ae6..7e135f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteColumnFamilyProcedure.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -202,10 +201,8 @@ public class DeleteColumnFamilyProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_DELETE_FAMILY.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "delete family"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java index baef112..0c43c57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DeleteTableProcedure.java @@ -200,7 +200,7 @@ public class DeleteTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; + if (env.waitInitialized(this)) return false; return env.getProcedureQueue().tryAcquireTableExclusiveLock(getTableName(), "delete table"); } http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index 716897f..fcc1b7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.constraint.ConstraintException; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; @@ -215,10 +214,8 @@ public class DisableTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_DISABLE_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "disable table"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java index bc1fc0f..d24d94b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/EnableTableProcedure.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.TableState; -import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.BulkAssigner; import org.apache.hadoop.hbase.master.GeneralBulkAssigner; @@ -235,10 +234,8 @@ public class EnableTableProcedure @Override protected boolean acquireLock(final MasterProcedureEnv env) { - if (!env.isInitialized()) return false; - return env.getProcedureQueue().tryAcquireTableExclusiveLock( - tableName, - EventType.C_M_ENABLE_TABLE.toString()); + if (env.waitInitialized(this)) return false; + return env.getProcedureQueue().tryAcquireTableExclusiveLock(tableName, "enable table"); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 6700b63..090b8cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.security.User; @@ -85,12 +87,12 @@ public class MasterProcedureEnv { } } - private final MasterProcedureQueue procQueue; + private final MasterProcedureScheduler procSched; private final MasterServices master; public MasterProcedureEnv(final MasterServices master) { this.master = master; - this.procQueue = new MasterProcedureQueue(master.getConfiguration(), + this.procSched = new MasterProcedureScheduler(master.getConfiguration(), master.getTableLockManager()); } @@ -114,8 +116,8 @@ public class MasterProcedureEnv { return master.getMasterCoprocessorHost(); } - public MasterProcedureQueue getProcedureQueue() { - return procQueue; + public MasterProcedureScheduler getProcedureQueue() { + return procSched; } public boolean isRunning() { @@ -125,4 +127,28 @@ public class MasterProcedureEnv { public boolean isInitialized() { return master.isInitialized(); } + + public boolean waitInitialized(Procedure proc) { + return procSched.waitEvent(((HMaster)master).getInitializedEvent(), proc); + } + + public boolean waitServerCrashProcessingEnabled(Procedure proc) { + return procSched.waitEvent(((HMaster)master).getServerCrashProcessingEnabledEvent(), proc); + } + + public void wake(ProcedureEvent event) { + procSched.wake(event); + } + + public void suspend(ProcedureEvent event) { + procSched.suspend(event); + } + + public void setEventReady(ProcedureEvent event, boolean isReady) { + if (isReady) { + procSched.wake(event); + } else { + procSched.suspend(event); + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/18a48af2/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java deleted file mode 100644 index c4c7747..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java +++ /dev/null @@ -1,578 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.master.procedure; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.Deque; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableExistsException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureFairRunQueues; -import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; -import org.apache.hadoop.hbase.master.TableLockManager; -import org.apache.hadoop.hbase.master.TableLockManager.TableLock; -import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; - -/** - * ProcedureRunnableSet for the Master Procedures. - * This RunnableSet tries to provide to the ProcedureExecutor procedures - * that can be executed without having to wait on a lock. - * Most of the master operations can be executed concurrently, if they - * are operating on different tables (e.g. two create table can be performed - * at the same, time assuming table A and table B) or against two different servers; say - * two servers that crashed at about the same time. - * - * <p>Each procedure should implement an interface providing information for this queue. - * for example table related procedures should implement TableProcedureInterface. - * each procedure will be pushed in its own queue, and based on the operation type - * we may take smarter decision. e.g. we can abort all the operations preceding - * a delete table, or similar. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class MasterProcedureQueue implements ProcedureRunnableSet { - private static final Log LOG = LogFactory.getLog(MasterProcedureQueue.class); - - // Two queues to ensure that server procedures run ahead of table precedures always. - private final ProcedureFairRunQueues<TableName, RunQueue> tableFairQ; - /** - * Rely on basic fair q. ServerCrashProcedure will yield if meta is not assigned. This way, the - * server that was carrying meta should rise to the top of the queue (this is how it used to - * work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers - * that were carrying system tables on crash; do I need to have these servers have priority? - * - * <p>Apart from the special-casing of meta and system tables, fairq is what we want - */ - private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ; - - private final ReentrantLock lock = new ReentrantLock(); - private final Condition waitCond = lock.newCondition(); - private final TableLockManager lockManager; - - private final int metaTablePriority; - private final int userTablePriority; - private final int sysTablePriority; - private static final int DEFAULT_SERVER_PRIORITY = 1; - - /** - * Keeps count across server and table queues. - */ - private int queueSize; - - public MasterProcedureQueue(final Configuration conf, final TableLockManager lockManager) { - this.tableFairQ = new ProcedureFairRunQueues<TableName, RunQueue>(1); - this.serverFairQ = new ProcedureFairRunQueues<ServerName, RunQueue>(1); - this.lockManager = lockManager; - - // TODO: should this be part of the HTD? - metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); - sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); - userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); - } - - @Override - public void addFront(final Procedure proc) { - lock.lock(); - try { - getRunQueueOrCreate(proc).addFront(proc); - queueSize++; - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void addBack(final Procedure proc) { - lock.lock(); - try { - getRunQueueOrCreate(proc).addBack(proc); - queueSize++; - waitCond.signal(); - } finally { - lock.unlock(); - } - } - - @Override - public void yield(final Procedure proc) { - addBack(proc); - } - - @Override - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") - public Long poll() { - Long pollResult = null; - lock.lock(); - try { - if (queueSize == 0) { - waitCond.await(); - if (queueSize == 0) { - return null; - } - } - // For now, let server handling have precedence over table handling; presumption is that it - // is more important handling crashed servers than it is running the - // enabling/disabling tables, etc. - pollResult = doPoll(serverFairQ.poll()); - if (pollResult == null) { - pollResult = doPoll(tableFairQ.poll()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } finally { - lock.unlock(); - } - return pollResult; - } - - private Long doPoll(final RunQueue rq) { - if (rq == null || !rq.isAvailable()) return null; - this.queueSize--; - return rq.poll(); - } - - @Override - public void signalAll() { - lock.lock(); - try { - waitCond.signalAll(); - } finally { - lock.unlock(); - } - } - - @Override - public void clear() { - lock.lock(); - try { - serverFairQ.clear(); - tableFairQ.clear(); - queueSize = 0; - } finally { - lock.unlock(); - } - } - - @Override - public int size() { - lock.lock(); - try { - return queueSize; - } finally { - lock.unlock(); - } - } - - @Override - public String toString() { - lock.lock(); - try { - return "MasterProcedureQueue size=" + queueSize + ": tableFairQ: " + tableFairQ + - ", serverFairQ: " + serverFairQ; - } finally { - lock.unlock(); - } - } - - @Override - public void completionCleanup(Procedure proc) { - if (proc instanceof TableProcedureInterface) { - TableProcedureInterface iProcTable = (TableProcedureInterface)proc; - boolean tableDeleted; - if (proc.hasException()) { - IOException procEx = proc.getException().unwrapRemoteException(); - if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { - // create failed because the table already exist - tableDeleted = !(procEx instanceof TableExistsException); - } else { - // the operation failed because the table does not exist - tableDeleted = (procEx instanceof TableNotFoundException); - } - } else { - // the table was deleted - tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); - } - if (tableDeleted) { - markTableAsDeleted(iProcTable.getTableName()); - } - } - // No cleanup for ServerProcedureInterface types, yet. - } - - private RunQueue getRunQueueOrCreate(final Procedure proc) { - if (proc instanceof TableProcedureInterface) { - final TableName table = ((TableProcedureInterface)proc).getTableName(); - return getRunQueueOrCreate(table); - } - if (proc instanceof ServerProcedureInterface) { - return getRunQueueOrCreate((ServerProcedureInterface)proc); - } - // TODO: at the moment we only have Table and Server procedures - // if you are implementing a non-table/non-server procedure, you have two options: create - // a group for all the non-table/non-server procedures or try to find a key for your - // non-table/non-server procedures and implement something similar to the TableRunQueue. - throw new UnsupportedOperationException("RQs for non-table procedures are not implemented yet"); - } - - private TableRunQueue getRunQueueOrCreate(final TableName table) { - final TableRunQueue queue = getRunQueue(table); - if (queue != null) return queue; - return (TableRunQueue)tableFairQ.add(table, createTableRunQueue(table)); - } - - private ServerRunQueue getRunQueueOrCreate(final ServerProcedureInterface spi) { - final ServerRunQueue queue = getRunQueue(spi.getServerName()); - if (queue != null) return queue; - return (ServerRunQueue)serverFairQ.add(spi.getServerName(), createServerRunQueue(spi)); - } - - private TableRunQueue createTableRunQueue(final TableName table) { - int priority = userTablePriority; - if (table.equals(TableName.META_TABLE_NAME)) { - priority = metaTablePriority; - } else if (table.isSystemTable()) { - priority = sysTablePriority; - } - return new TableRunQueue(priority); - } - - private ServerRunQueue createServerRunQueue(final ServerProcedureInterface spi) { - return new ServerRunQueue(DEFAULT_SERVER_PRIORITY); - } - - private TableRunQueue getRunQueue(final TableName table) { - return (TableRunQueue)tableFairQ.get(table); - } - - private ServerRunQueue getRunQueue(final ServerName sn) { - return (ServerRunQueue)serverFairQ.get(sn); - } - - /** - * Try to acquire the write lock on the specified table. - * other operations in the table-queue will be executed after the lock is released. - * @param table Table to lock - * @param purpose Human readable reason for locking the table - * @return true if we were able to acquire the lock on the table, otherwise false. - */ - public boolean tryAcquireTableExclusiveLock(final TableName table, final String purpose) { - return getRunQueueOrCreate(table).tryExclusiveLock(lockManager, table, purpose); - } - - /** - * Release the write lock taken with tryAcquireTableWrite() - * @param table the name of the table that has the write lock - */ - public void releaseTableExclusiveLock(final TableName table) { - getRunQueue(table).releaseExclusiveLock(lockManager, table); - } - - /** - * Try to acquire the read lock on the specified table. - * other read operations in the table-queue may be executed concurrently, - * otherwise they have to wait until all the read-locks are released. - * @param table Table to lock - * @param purpose Human readable reason for locking the table - * @return true if we were able to acquire the lock on the table, otherwise false. - */ - public boolean tryAcquireTableSharedLock(final TableName table, final String purpose) { - return getRunQueueOrCreate(table).trySharedLock(lockManager, table, purpose); - } - - /** - * Release the read lock taken with tryAcquireTableRead() - * @param table the name of the table that has the read lock - */ - public void releaseTableSharedLock(final TableName table) { - getRunQueue(table).releaseSharedLock(lockManager, table); - } - - /** - * Try to acquire the write lock on the specified server. - * @see #releaseServerExclusiveLock(ServerProcedureInterface) - * @param spi Server to lock - * @return true if we were able to acquire the lock on the server, otherwise false. - */ - public boolean tryAcquireServerExclusiveLock(final ServerProcedureInterface spi) { - return getRunQueueOrCreate(spi).tryExclusiveLock(); - } - - /** - * Release the write lock - * @see #tryAcquireServerExclusiveLock(ServerProcedureInterface) - * @param spi the server that has the write lock - */ - public void releaseServerExclusiveLock(final ServerProcedureInterface spi) { - getRunQueue(spi.getServerName()).releaseExclusiveLock(); - } - - /** - * Try to acquire the read lock on the specified server. - * @see #releaseServerSharedLock(ServerProcedureInterface) - * @param spi Server to lock - * @return true if we were able to acquire the lock on the server, otherwise false. - */ - public boolean tryAcquireServerSharedLock(final ServerProcedureInterface spi) { - return getRunQueueOrCreate(spi).trySharedLock(); - } - - /** - * Release the read lock taken - * @see #tryAcquireServerSharedLock(ServerProcedureInterface) - * @param spi the server that has the read lock - */ - public void releaseServerSharedLock(final ServerProcedureInterface spi) { - getRunQueue(spi.getServerName()).releaseSharedLock(); - } - - /** - * Tries to remove the queue and the table-lock of the specified table. - * If there are new operations pending (e.g. a new create), - * the remove will not be performed. - * @param table the name of the table that should be marked as deleted - * @return true if deletion succeeded, false otherwise meaning that there are - * other new operations pending for that table (e.g. a new create). - */ - protected boolean markTableAsDeleted(final TableName table) { - TableRunQueue queue = getRunQueue(table); - if (queue != null) { - lock.lock(); - try { - if (queue.isEmpty() && queue.acquireDeleteLock()) { - tableFairQ.remove(table); - - // Remove the table lock - try { - lockManager.tableDeleted(table); - } catch (IOException e) { - LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical - } - } else { - // TODO: If there are no create, we can drop all the other ops - return false; - } - } finally { - lock.unlock(); - } - } - return true; - } - - private interface RunQueue extends ProcedureFairRunQueues.FairObject { - void addFront(Procedure proc); - void addBack(Procedure proc); - Long poll(); - boolean acquireDeleteLock(); - } - - /** - * Base abstract class for RunQueue implementations. - * Be careful honoring synchronizations in subclasses. In here we protect access but if you are - * acting on a state found in here, be sure dependent code keeps synchronization. - * Implements basic in-memory read/write locking mechanism to prevent procedure steps being run - * in parallel. - */ - private static abstract class AbstractRunQueue implements RunQueue { - // All modification of runnables happens with #lock held. - private final Deque<Long> runnables = new ArrayDeque<Long>(); - private final int priority; - private boolean exclusiveLock = false; - private int sharedLock = 0; - - public AbstractRunQueue(int priority) { - this.priority = priority; - } - - boolean isEmpty() { - return this.runnables.isEmpty(); - } - - @Override - public boolean isAvailable() { - synchronized (this) { - return !exclusiveLock && !runnables.isEmpty(); - } - } - - @Override - public int getPriority() { - return this.priority; - } - - @Override - public void addFront(Procedure proc) { - this.runnables.addFirst(proc.getProcId()); - } - - @Override - public void addBack(Procedure proc) { - this.runnables.addLast(proc.getProcId()); - } - - @Override - public Long poll() { - return this.runnables.poll(); - } - - @Override - public synchronized boolean acquireDeleteLock() { - return tryExclusiveLock(); - } - - public synchronized boolean isLocked() { - return isExclusiveLock() || sharedLock > 0; - } - - public synchronized boolean isExclusiveLock() { - return this.exclusiveLock; - } - - public synchronized boolean trySharedLock() { - if (isExclusiveLock()) return false; - sharedLock++; - return true; - } - - public synchronized void releaseSharedLock() { - sharedLock--; - } - - /** - * @return True if only one instance of a shared lock outstanding. - */ - synchronized boolean isSingleSharedLock() { - return sharedLock == 1; - } - - public synchronized boolean tryExclusiveLock() { - if (isLocked()) return false; - exclusiveLock = true; - return true; - } - - public synchronized void releaseExclusiveLock() { - exclusiveLock = false; - } - - @Override - public String toString() { - return this.runnables.toString(); - } - } - - /** - * Run Queue for Server procedures. - */ - private static class ServerRunQueue extends AbstractRunQueue { - public ServerRunQueue(int priority) { - super(priority); - } - } - - /** - * Run Queue for a Table. It contains a read-write lock that is used by the - * MasterProcedureQueue to decide if we should fetch an item from this queue - * or skip to another one which will be able to run without waiting for locks. - */ - private static class TableRunQueue extends AbstractRunQueue { - private TableLock tableLock = null; - - public TableRunQueue(int priority) { - super(priority); - } - - // TODO: Improve run-queue push with TableProcedureInterface.getType() - // we can take smart decisions based on the type of the operation (e.g. create/delete) - @Override - public void addBack(final Procedure proc) { - super.addBack(proc); - } - - public synchronized boolean trySharedLock(final TableLockManager lockManager, - final TableName tableName, final String purpose) { - if (isExclusiveLock()) return false; - - // Take zk-read-lock - tableLock = lockManager.readLock(tableName, purpose); - try { - tableLock.acquire(); - } catch (IOException e) { - LOG.error("failed acquire read lock on " + tableName, e); - tableLock = null; - return false; - } - trySharedLock(); - return true; - } - - public synchronized void releaseSharedLock(final TableLockManager lockManager, - final TableName tableName) { - releaseTableLock(lockManager, isSingleSharedLock()); - releaseSharedLock(); - } - - public synchronized boolean tryExclusiveLock(final TableLockManager lockManager, - final TableName tableName, final String purpose) { - if (isLocked()) return false; - // Take zk-write-lock - tableLock = lockManager.writeLock(tableName, purpose); - try { - tableLock.acquire(); - } catch (IOException e) { - LOG.error("failed acquire write lock on " + tableName, e); - tableLock = null; - return false; - } - tryExclusiveLock(); - return true; - } - - public synchronized void releaseExclusiveLock(final TableLockManager lockManager, - final TableName tableName) { - releaseTableLock(lockManager, true); - releaseExclusiveLock(); - } - - private void releaseTableLock(final TableLockManager lockManager, boolean reset) { - for (int i = 0; i < 3; ++i) { - try { - tableLock.release(); - if (reset) { - tableLock = null; - } - break; - } catch (IOException e) { - LOG.warn("Could not release the table write-lock", e); - } - } - } - } -}