Repository: ignite Updated Branches: refs/heads/master b7f2a8f4a -> 6f94659b5
IGNITE-6809: Use MPSC queue in striped pool. This closes #2960. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f94659b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f94659b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f94659b Branch: refs/heads/master Commit: 6f94659b53f056c176b3a3c6dd8f11a5db2ad74f Parents: b7f2a8f Author: Igor Seliverstov <[email protected]> Authored: Tue Nov 7 12:02:12 2017 +0300 Committer: devozerov <[email protected]> Committed: Tue Nov 7 12:02:12 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/util/MpscQueue.java | 240 +++++++++++++++ .../ignite/internal/util/StripedExecutor.java | 304 +++++++++---------- 2 files changed, 377 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6f94659b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java new file mode 100644 index 0000000..cc10124 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +/** + * MP-SC concurrent linked queue implementation based on Dmitry Vyukov's <a href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue"> + * Non-intrusive MPSC node-based queue</a>. + */ +@SuppressWarnings({"WeakerAccess", "PackageVisibleField", "unused"}) +public final class MpscQueue<E> extends Head<E> { + /** Padding. */ + long p00, p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; + + public MpscQueue() { + Node node = new Node(null); + + tail = node; + head = node; + } +} + +/** + * Head of {@link MpscQueue}. + */ +@SuppressWarnings({"WeakerAccess", "AtomicFieldUpdaterIssues", "ClassNameDiffersFromFileName"}) +abstract class Head<E> extends PaddingL1<E> { + /** Head field updater. */ + private static final AtomicReferenceFieldUpdater<Head, Node> updater = + AtomicReferenceFieldUpdater.newUpdater(Head.class, Node.class, "head"); + + /** Head. */ + protected volatile Node head; + + /** + * Poll element. + * + * @return Element. + */ + @SuppressWarnings("unchecked") + public E poll() { + Node node = peekNode(); + + if (node != null) { + Object val = node.value(); + + node.value(null); + + updater.lazySet(this, node); + + return (E)val; + } + else + return null; + } + + + /** + * @return queue size. + */ + public int size() { + Node node = peekNode(); + + int size = 0; + + for (;;) { + if (node == null || node.value() == null) + break; + + Node next = node.next(); + + if (node == next) + break; + + node = next; + + if (++size == Integer.MAX_VALUE) + break; + } + + return size; + } + + /** {@inheritDoc} */ + public String toString() { + Node node = peekNode(); + + StringBuilder sb = new StringBuilder().append('['); + + for (;;) { + if (node == null) + break; + + Object value = node.value(); + + if (value == null) + break; + + if(sb.length() > 1) + sb.append(',').append(' '); + + sb.append(value); + + Node next = node.next(); + + if (node == next) + break; + + node = next; + } + + return sb.append(']').toString(); + } + + /** + * @return The node after the head of the queue (the first element in the queue). + */ + private Node peekNode() { + Node head = this.head; + Node next = head.next(); + + if (next == null && head != tail) { + do { + next = head.next(); + } while (next == null); + } + return next; + } +} + +/** + * Padding. + */ +@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", "unused"}) +abstract class PaddingL1<E> extends Tail<E> { + /** Padding. */ + long p00, p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +/** + * Tail of {@link MpscQueue}. + */ +@SuppressWarnings({"ClassNameDiffersFromFileName", "AtomicFieldUpdaterIssues"}) +abstract class Tail<E> extends PaddingL0 { + /** Tail field updater. */ + private static final AtomicReferenceFieldUpdater<Tail, Node> updater = + AtomicReferenceFieldUpdater.newUpdater(Tail.class, Node.class, "tail"); + + /** Tail. */ + protected volatile Node tail; + + /** + * Offer element. + * + * @param e Element. + */ + public void offer(final E e) { + if (e == null) + throw new IllegalArgumentException("Null are not allowed."); + + Node newTail = new Node(e); + + Node prevTail = updater.getAndSet(this, newTail); + + prevTail.next(newTail); + } +} + +/** + * Padding. + */ +@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", "unused"}) +abstract class PaddingL0 { + /** Padding. */ + long p00, p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +@SuppressWarnings({"UnusedDeclaration", "ClassNameDiffersFromFileName"}) +final class Node { + /** Next field updater. */ + private static final AtomicReferenceFieldUpdater<Node, Node> updater = + AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next"); + + /** Value. */ + private Object val; + + /** Next node. */ + private volatile Node next; + + /** + * Constructor. + * + * @param val Value. + */ + Node(Object val) { + this.val = val; + } + + /** + * Set next node. + * + * @param next Next node. + */ + void next(Node next) { + updater.lazySet(this, next); + } + + /** Value. */ + Object value() { + return val; + } + + void value(Object val) { + this.val = val; + } + + /** Next node. */ + Node next() { + return next; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6f94659b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index 630d34c..1f9b08d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -22,34 +22,36 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.List; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; +import java.util.Random; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.NotNull; +import org.jsr166.ConcurrentLinkedDeque8; /** * Striped executor. */ public class StripedExecutor implements ExecutorService { + /** */ + private static final int SPIN_CNT = 2048; + /** Stripes. */ private final Stripe[] stripes; @@ -76,12 +78,11 @@ public class StripedExecutor implements ExecutorService { * @param log Logger. * @param stealTasks {@code True} to steal tasks. */ - public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, boolean stealTasks) { + public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, + boolean stealTasks) { A.ensure(cnt > 0, "cnt > 0"); - boolean success = false; - - stripes = new Stripe[cnt]; + stripes = stealTasks ? StealingStripe.create(igniteInstanceName, poolName, log, cnt) : new Stripe[cnt]; completedCntrs = new long[cnt]; @@ -89,17 +90,12 @@ public class StripedExecutor implements ExecutorService { this.log = log; + boolean success = false; + try { - for (int i = 0; i < cnt; i++) { - stripes[i] = stealTasks ? new StripeConcurrentQueue( - igniteInstanceName, - poolName, - i, - log, stripes) : new StripeConcurrentQueue( - igniteInstanceName, - poolName, - i, - log); + if(!stealTasks) { + for (int i = 0; i < cnt; i++) + stripes[i] = new MpscQueueBasedStripe(igniteInstanceName, poolName, i, log); } for (int i = 0; i < cnt; i++) @@ -409,7 +405,7 @@ public class StripedExecutor implements ExecutorService { /** * Stripe. */ - private static abstract class Stripe implements Runnable { + private abstract static class Stripe implements Runnable { /** */ private final String igniteInstanceName; @@ -440,7 +436,7 @@ public class StripedExecutor implements ExecutorService { * @param idx Stripe index. * @param log Logger. */ - public Stripe( + Stripe( String igniteInstanceName, String poolName, int idx, @@ -493,10 +489,8 @@ public class StripedExecutor implements ExecutorService { /** {@inheritDoc} */ @Override public void run() { while (!stopping) { - Runnable cmd; - try { - cmd = take(); + Runnable cmd = take(); if (cmd != null) { active = true; @@ -552,52 +546,79 @@ public class StripedExecutor implements ExecutorService { } } - /** - * Stripe. - */ - private static class StripeConcurrentQueue extends Stripe { + /** */ + private static final class StealingStripe extends Stripe { /** */ private static final int IGNITE_TASKS_STEALING_THRESHOLD = IgniteSystemProperties.getInteger( IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4); - /** Queue. */ - private final Queue<Runnable> queue; + /** */ + @GridToStringExclude + private final Deque<Runnable>[] queues; + + /** */ + @GridToStringExclude + private final IgniteRunnable unpark; /** */ @GridToStringExclude - private final Stripe[] others; + private Random rnd; /** */ - private volatile boolean parked; + private final Deque<Runnable> queue; - /** - * @param igniteInstanceName Ignite instance name. - * @param poolName Pool name. - * @param idx Stripe index. - * @param log Logger. - */ - StripeConcurrentQueue( - String igniteInstanceName, - String poolName, - int idx, - IgniteLogger log - ) { - this(igniteInstanceName, poolName, idx, log, null); + /** */ + private final AtomicBoolean parked = new AtomicBoolean(); + + /** */ + @SuppressWarnings("unchecked") + static Stripe[] create(String igniteInstanceName, String poolName, IgniteLogger log, final int poolSize) { + final StealingStripe[] stripes = new StealingStripe[poolSize]; + Deque<Runnable>[] queues = new Deque[poolSize]; + + IgniteRunnable unpark = new IgniteRunnable() { + @Override public void run() { + int init = ThreadLocalRandom.current().nextInt(poolSize); + + for (int cur = init;;) { + AtomicBoolean parked = stripes[cur].parked; + + if (parked.get() && parked.compareAndSet(true, false)) { + LockSupport.unpark(stripes[cur].thread); + + break; + } + + if ((cur = (cur + 1) % poolSize) == init) + break; + } + } + }; + + for (int i = 0; i < poolSize; i++) { + queues[i] = new ConcurrentLinkedDeque8<>(); + stripes[i] = new StealingStripe(i, igniteInstanceName, poolName, log, queues, unpark); + } + + return stripes; } /** + * @param idx Stripe index. * @param igniteInstanceName Ignite instance name. * @param poolName Pool name. - * @param idx Stripe index. * @param log Logger. + * @param queues Other queues to steal tasks from. + * @param unpark Unpark callback, unparks random parked stripe from the pool. */ - StripeConcurrentQueue( + private StealingStripe( + int idx, String igniteInstanceName, String poolName, - int idx, IgniteLogger log, - Stripe[] others + Deque<Runnable>[] queues, + IgniteRunnable unpark ) { super( igniteInstanceName, @@ -605,73 +626,62 @@ public class StripedExecutor implements ExecutorService { idx, log); - this.others = others; + this.queues = queues; + this.unpark = unpark; - this.queue = others == null ? new ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>(); + queue = queues[idx]; } /** {@inheritDoc} */ @Override Runnable take() throws InterruptedException { - Runnable r; - - for (int i = 0; i < 2048; i++) { - r = queue.poll(); + Runnable task; - if (r != null) - return r; + for (int i = 0; i < SPIN_CNT; i++) { + if ((task = queue.poll()) != null) + return task; } - parked = true; + for (;;) { + parked.set(true); - try { - for (;;) { - r = queue.poll(); + if ((task = queue.poll()) != null) { + parked.set(false); - if (r != null) - return r; + return task; + } - if(others != null) { - int len = others.length; - int init = ThreadLocalRandom.current().nextInt(len); - int cur = init; + int len = queues.length, init = random().nextInt(len); - while (true) { - if(cur != idx) { - Deque<Runnable> queue = (Deque<Runnable>) ((StripeConcurrentQueue) others[cur]).queue; + for (int cur = init;;) { + if(cur != idx) { + Deque<Runnable> queue = queues[cur]; - if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null) - return r; - } + if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (task = queue.pollLast()) != null) { + parked.set(false); - if ((cur = (cur + 1) % len) == init) - break; + return task; } } - LockSupport.park(); - - if (Thread.interrupted()) - throw new InterruptedException(); + if ((cur = (cur + 1) % len) == init) + break; } - } - finally { - parked = false; + + LockSupport.park(); + + if (Thread.interrupted()) + throw new InterruptedException(); } } /** {@inheritDoc} */ - void execute(Runnable cmd) { + @Override void execute(Runnable cmd) { queue.add(cmd); - if (parked) + if (parked.get() && parked.compareAndSet(true, false)) LockSupport.unpark(thread); - - if(others != null && queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) { - for (Stripe other : others) { - if(((StripeConcurrentQueue)other).parked) - LockSupport.unpark(other.thread); - } - } + else if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD) + unpark.run(); } /** {@inheritDoc} */ @@ -684,18 +694,24 @@ public class StripedExecutor implements ExecutorService { return queue.size(); } + /** */ + private Random random() { + return rnd == null ? rnd = ThreadLocalRandom.current() : rnd; + } + /** {@inheritDoc} */ @Override public String toString() { - return S.toString(StripeConcurrentQueue.class, this, super.toString()); + return S.toString(StealingStripe.class, this, super.toString()); } } - /** - * Stripe. - */ - private static class StripeConcurrentQueueNoPark extends Stripe { - /** Queue. */ - private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); + /** */ + private static final class MpscQueueBasedStripe extends Stripe { + /** */ + private final AtomicBoolean parked = new AtomicBoolean(); + + /** */ + private final MpscQueue<Runnable> queue = new MpscQueue<>(); /** * @param igniteInstanceName Ignite instance name. @@ -703,82 +719,41 @@ public class StripedExecutor implements ExecutorService { * @param idx Stripe index. * @param log Logger. */ - public StripeConcurrentQueueNoPark( - String igniteInstanceName, - String poolName, - int idx, - IgniteLogger log - ) { - super(igniteInstanceName, - poolName, - idx, - log); + private MpscQueueBasedStripe(String igniteInstanceName, String poolName, int idx, IgniteLogger log) { + super(igniteInstanceName, poolName, idx, log); } /** {@inheritDoc} */ - @Override Runnable take() { - for (;;) { - Runnable r = queue.poll(); - - if (r != null) - return r; - } - } + @Override void execute(Runnable cmd) { + queue.offer(cmd); - /** {@inheritDoc} */ - void execute(Runnable cmd) { - queue.add(cmd); + if (parked.get() && parked.compareAndSet(true, false)) + LockSupport.unpark(thread); } /** {@inheritDoc} */ - @Override int queueSize() { - return queue.size(); - } + @Override Runnable take() throws InterruptedException { + Runnable task; - /** {@inheritDoc} */ - @Override String queueToString() { - return String.valueOf(queue); - } + for (int i = 0; i < SPIN_CNT; i++) { + if ((task = queue.poll()) != null) + return task; + } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString()); - } - } + for (;;) { + parked.set(true); - /** - * Stripe. - */ - private static class StripeConcurrentBlockingQueue extends Stripe { - /** Queue. */ - private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); + if ((task = queue.poll()) != null) { + parked.set(false); - /** - * @param igniteInstanceName Ignite instance name. - * @param poolName Pool name. - * @param idx Stripe index. - * @param log Logger. - */ - public StripeConcurrentBlockingQueue( - String igniteInstanceName, - String poolName, - int idx, - IgniteLogger log - ) { - super(igniteInstanceName, - poolName, - idx, - log); - } + return task; + } - /** {@inheritDoc} */ - @Override Runnable take() throws InterruptedException { - return queue.take(); - } + LockSupport.park(); - /** {@inheritDoc} */ - void execute(Runnable cmd) { - queue.add(cmd); + if (Thread.interrupted()) + throw new InterruptedException(); + } } /** {@inheritDoc} */ @@ -790,10 +765,5 @@ public class StripedExecutor implements ExecutorService { @Override String queueToString() { return String.valueOf(queue); } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString()); - } } }
