Repository: hbase Updated Branches: refs/heads/branch-2.0 5c3579883 -> e87e41115
HBASE-20939 There will be race when we call suspendIfNotReady and then throw ProcedureSuspendedException Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e87e4111 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e87e4111 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e87e4111 Branch: refs/heads/branch-2.0 Commit: e87e411150f1e1192d2fa1d64e0ac9c091e84d2f Parents: 5c35798 Author: zhangduo <zhang...@apache.org> Authored: Fri Jul 27 17:26:40 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Jul 27 21:30:38 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/util/IdLock.java | 137 ++++++++++++++++++ .../hbase/procedure2/ProcedureExecutor.java | 20 +-- .../procedure2/ProcedureSuspendedException.java | 6 +- .../org/apache/hadoop/hbase/util/IdLock.java | 138 ------------------- 4 files changed, 152 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e87e4111/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java new file mode 100644 index 0000000..269bf83 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IdLock.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +/** + * Allows multiple concurrent clients to lock on a numeric id with a minimal + * memory overhead. The intended usage is as follows: + * + * <pre> + * IdLock.Entry lockEntry = idLock.getLockEntry(id); + * try { + * // User code. + * } finally { + * idLock.releaseLockEntry(lockEntry); + * }</pre> + */ +@InterfaceAudience.Private +public class IdLock { + + /** An entry returned to the client as a lock object */ + public static final class Entry { + private final long id; + private int numWaiters; + private boolean locked = true; + + private Entry(long id) { + this.id = id; + } + + @Override + public String toString() { + return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" + + locked; + } + } + + private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>(); + + /** + * Blocks until the lock corresponding to the given id is acquired. + * + * @param id an arbitrary number to lock on + * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release + * the lock + * @throws IOException if interrupted + */ + public Entry getLockEntry(long id) throws IOException { + Entry entry = new Entry(id); + Entry existing; + while ((existing = map.putIfAbsent(entry.id, entry)) != null) { + synchronized (existing) { + if (existing.locked) { + ++existing.numWaiters; // Add ourselves to waiters. + while (existing.locked) { + try { + existing.wait(); + } catch (InterruptedException e) { + --existing.numWaiters; // Remove ourselves from waiters. + throw new InterruptedIOException( + "Interrupted waiting to acquire sparse lock"); + } + } + + --existing.numWaiters; // Remove ourselves from waiters. + existing.locked = true; + return existing; + } + // If the entry is not locked, it might already be deleted from the + // map, so we cannot return it. We need to get our entry into the map + // or get someone else's locked entry. + } + } + return entry; + } + + /** + * Must be called in a finally block to decrease the internal counter and + * remove the monitor object for the given id if the caller is the last + * client. + * + * @param entry the return value of {@link #getLockEntry(long)} + */ + public void releaseLockEntry(Entry entry) { + synchronized (entry) { + entry.locked = false; + if (entry.numWaiters > 0) { + entry.notify(); + } else { + map.remove(entry.id); + } + } + } + + /** For testing */ + void assertMapEmpty() { + assert map.isEmpty(); + } + + @VisibleForTesting + public void waitForWaiters(long id, int numWaiters) throws InterruptedException { + for (Entry entry;;) { + entry = map.get(id); + if (entry != null) { + synchronized (entry) { + if (entry.numWaiters >= numWaiters) { + return; + } + } + } + Thread.sleep(100); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e87e4111/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index ef1ce59..16df77e 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.IdLock; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; @@ -308,6 +309,14 @@ public class ProcedureExecutor<TEnvironment> { private final boolean checkOwnerSet; + // To prevent concurrent execution of the same procedure. + // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the + // procedure is woken up before we finish the suspend which causes the same procedures to be + // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also + // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent + // execution of the same procedure. + private final IdLock procExecutionLock = new IdLock(); + public ProcedureExecutor(final Configuration conf, final TEnvironment environment, final ProcedureStore store) { this(conf, environment, store, new SimpleProcedureScheduler()); @@ -1452,14 +1461,7 @@ public class ProcedureExecutor<TEnvironment> { // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. // The exception is caught below and then we hurry to the exit without disturbing state. The // idea is that the processing of this procedure will be unsuspended later by an external event - // such the report of a region open. TODO: Currently, its possible for two worker threads - // to be working on the same procedure concurrently (locking in procedures is NOT about - // concurrency but about tying an entity to a procedure; i.e. a region to a particular - // procedure instance). This can make for issues if both threads are changing state. - // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); - // in RegionTransitionProcedure#reportTransition for example of Procedure putting - // itself back on the scheduler making it possible for two threads running against - // the one Procedure. Might be ok if they are both doing different, idempotent sections. + // such the report of a region open. boolean suspended = false; // Whether to 're-' -execute; run through the loop again. @@ -1756,12 +1758,14 @@ public class ProcedureExecutor<TEnvironment> { LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), runningCount, activeCount); executionStartTime.set(EnvironmentEdgeManager.currentTime()); + IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); try { executeProcedure(proc); } catch (AssertionError e) { LOG.info("ASSERT pid=" + proc.getProcId(), e); throw e; } finally { + procExecutionLock.releaseLockEntry(lockEntry); activeCount = activeExecutorCount.decrementAndGet(); runningCount = store.setRunningProcedureCount(activeCount); LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), http://git-wip-us.apache.org/repos/asf/hbase/blob/e87e4111/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java index 5090fb1..9f52121 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureSuspendedException.java @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.procedure2; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; @InterfaceAudience.Private -@InterfaceStability.Stable public class ProcedureSuspendedException extends ProcedureException { + + private static final long serialVersionUID = -8328419627678496269L; + /** default constructor */ public ProcedureSuspendedException() { super(); http://git-wip-us.apache.org/repos/asf/hbase/blob/e87e4111/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java deleted file mode 100644 index eba9acd..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdLock.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.util; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.yetus.audience.InterfaceAudience; - -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - -/** - * Allows multiple concurrent clients to lock on a numeric id with a minimal - * memory overhead. The intended usage is as follows: - * - * <pre> - * IdLock.Entry lockEntry = idLock.getLockEntry(id); - * try { - * // User code. - * } finally { - * idLock.releaseLockEntry(lockEntry); - * }</pre> - */ -@InterfaceAudience.Private -public class IdLock { - - /** An entry returned to the client as a lock object */ - public static class Entry { - private final long id; - private int numWaiters; - private boolean locked = true; - - private Entry(long id) { - this.id = id; - } - - @Override - public String toString() { - return "id=" + id + ", numWaiter=" + numWaiters + ", isLocked=" - + locked; - } - } - - private ConcurrentMap<Long, Entry> map = new ConcurrentHashMap<>(); - - /** - * Blocks until the lock corresponding to the given id is acquired. - * - * @param id an arbitrary number to lock on - * @return an "entry" to pass to {@link #releaseLockEntry(Entry)} to release - * the lock - * @throws IOException if interrupted - */ - public Entry getLockEntry(long id) throws IOException { - Entry entry = new Entry(id); - Entry existing; - while ((existing = map.putIfAbsent(entry.id, entry)) != null) { - synchronized (existing) { - if (existing.locked) { - ++existing.numWaiters; // Add ourselves to waiters. - while (existing.locked) { - try { - existing.wait(); - } catch (InterruptedException e) { - --existing.numWaiters; // Remove ourselves from waiters. - throw new InterruptedIOException( - "Interrupted waiting to acquire sparse lock"); - } - } - - --existing.numWaiters; // Remove ourselves from waiters. - existing.locked = true; - return existing; - } - // If the entry is not locked, it might already be deleted from the - // map, so we cannot return it. We need to get our entry into the map - // or get someone else's locked entry. - } - } - return entry; - } - - /** - * Must be called in a finally block to decrease the internal counter and - * remove the monitor object for the given id if the caller is the last - * client. - * - * @param entry the return value of {@link #getLockEntry(long)} - */ - public void releaseLockEntry(Entry entry) { - synchronized (entry) { - entry.locked = false; - if (entry.numWaiters > 0) { - entry.notify(); - } else { - map.remove(entry.id); - } - } - } - - /** For testing */ - void assertMapEmpty() { - assert map.isEmpty(); - } - - @VisibleForTesting - public void waitForWaiters(long id, int numWaiters) throws InterruptedException { - for (Entry entry;;) { - entry = map.get(id); - if (entry != null) { - synchronized (entry) { - if (entry.numWaiters >= numWaiters) { - return; - } - } - } - Thread.sleep(100); - } - } -}