GG-1346: Implemented striped spin busy lock.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aec0631e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aec0631e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aec0631e Branch: refs/heads/ignite-843 Commit: aec0631eee9ab0e570d995b53557405748f4d891 Parents: 9b06cf3 Author: vozerov-gridgain <[email protected]> Authored: Tue Sep 1 16:38:42 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Sep 1 16:38:42 2015 +0300 ---------------------------------------------------------------------- .../callback/PlatformCallbackGateway.java | 10 +- .../internal/util/GridStripedSpinBusyLock.java | 125 +++++++++++++++++++ 2 files changed, 130 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/aec0631e/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index 93698c2..a348888 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.platform.callback; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.GridStripedSpinBusyLock; /** * Gateway to all platform-dependent callbacks. Implementers might extend this class and provide additional callbacks. @@ -30,7 +30,7 @@ public class PlatformCallbackGateway { protected final long envPtr; /** Lock. */ - private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock(); + private final GridStripedSpinBusyLock lock = new GridStripedSpinBusyLock(); /** * Native gateway. @@ -922,7 +922,7 @@ public class PlatformCallbackGateway { * Enter gateway. */ protected void enter() { - if (!lock.tryReadLock()) + if (!lock.enterBusy()) throw new IgniteException("Failed to execute native callback because grid is stopping."); } @@ -930,13 +930,13 @@ public class PlatformCallbackGateway { * Leave gateway. */ protected void leave() { - lock.readUnlock(); + lock.leaveBusy(); } /** * Block gateway. */ protected void block() { - lock.writeLock(); + lock.block(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/aec0631e/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java new file mode 100644 index 0000000..678d521 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridStripedSpinBusyLock.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Striped spin busy lock. Aimed to provide efficient "read" lock semantics while still maintaining safety when + * entering "busy" state. + */ +public class GridStripedSpinBusyLock { + /** Writer mask. */ + private static int WRITER_MASK = 1 << 30; + + /** Default amount of stripes. */ + private static final int DFLT_STRIPE_CNT = Runtime.getRuntime().availableProcessors() * 4; + + /** States; they are not subjects to false-sharing because actual values are located far from each other. */ + private final AtomicInteger[] states; + + /** Thread index. */ + private static ThreadLocal<Integer> threadIdx = new ThreadLocal<Integer>() { + @Override protected Integer initialValue() { + return new Random().nextInt(Integer.MAX_VALUE); + } + }; + + /** + * Default constructor. + */ + public GridStripedSpinBusyLock() { + this(DFLT_STRIPE_CNT); + } + + /** + * Constructor. + * + * @param stripeCnt Amount of stripes. + */ + public GridStripedSpinBusyLock(int stripeCnt) { + states = new AtomicInteger[stripeCnt]; + + for (int i = 0; i < stripeCnt; i++) + states[i] = new AtomicInteger(); + } + + /** + * Enter busy state. + * + * @return {@code True} if entered busy state. + */ + public boolean enterBusy() { + int val = state().incrementAndGet(); + + if ((val & WRITER_MASK) == WRITER_MASK) { + leaveBusy(); + + return false; + } + else + return true; + } + + /** + * Leave busy state. + */ + public void leaveBusy() { + state().decrementAndGet(); + } + + /** + * Block. + */ + public void block() { + boolean interrupt = false; + + for (AtomicInteger state : states) { + // 1. CAS-loop to set a writer bit. + while (true) { + int oldVal = state.get(); + + if (state.compareAndSet(oldVal, oldVal | WRITER_MASK)) + break; + } + + // 2. Wait until all readers are out. + while (state.get() != WRITER_MASK) { + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + interrupt = true; + } + } + } + + if (interrupt) + Thread.currentThread().interrupt(); + } + + /** + * Gets state of thread's stripe. + * + * @return State. + */ + private AtomicInteger state() { + return states[threadIdx.get() % states.length]; + } +}
