Repository: ignite Updated Branches: refs/heads/master e9f1a7e0b -> 4e86660dc
Revert "IGNITE-6809: Use MPSC queue in striped pool. This closes #2960." This reverts commit 6f94659b53f056c176b3a3c6dd8f11a5db2ad74f. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e86660d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e86660d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e86660d Branch: refs/heads/master Commit: 4e86660dc95e32797cf8a4eb2fa2d38e09934643 Parents: e9f1a7e Author: devozerov <[email protected]> Authored: Tue Feb 27 12:40:43 2018 +0300 Committer: devozerov <[email protected]> Committed: Tue Feb 27 12:40:43 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/util/MpscQueue.java | 240 --------------- .../ignite/internal/util/StripedExecutor.java | 304 ++++++++++--------- 2 files changed, 167 insertions(+), 377 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4e86660d/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java deleted file mode 100644 index cc10124..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java +++ /dev/null @@ -1,240 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.util; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -/** - * MP-SC concurrent linked queue implementation based on Dmitry Vyukov's <a href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue"> - * Non-intrusive MPSC node-based queue</a>. - */ -@SuppressWarnings({"WeakerAccess", "PackageVisibleField", "unused"}) -public final class MpscQueue<E> extends Head<E> { - /** Padding. */ - long p00, p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; - - public MpscQueue() { - Node node = new Node(null); - - tail = node; - head = node; - } -} - -/** - * Head of {@link MpscQueue}. - */ -@SuppressWarnings({"WeakerAccess", "AtomicFieldUpdaterIssues", "ClassNameDiffersFromFileName"}) -abstract class Head<E> extends PaddingL1<E> { - /** Head field updater. */ - private static final AtomicReferenceFieldUpdater<Head, Node> updater = - AtomicReferenceFieldUpdater.newUpdater(Head.class, Node.class, "head"); - - /** Head. */ - protected volatile Node head; - - /** - * Poll element. - * - * @return Element. - */ - @SuppressWarnings("unchecked") - public E poll() { - Node node = peekNode(); - - if (node != null) { - Object val = node.value(); - - node.value(null); - - updater.lazySet(this, node); - - return (E)val; - } - else - return null; - } - - - /** - * @return queue size. - */ - public int size() { - Node node = peekNode(); - - int size = 0; - - for (;;) { - if (node == null || node.value() == null) - break; - - Node next = node.next(); - - if (node == next) - break; - - node = next; - - if (++size == Integer.MAX_VALUE) - break; - } - - return size; - } - - /** {@inheritDoc} */ - public String toString() { - Node node = peekNode(); - - StringBuilder sb = new StringBuilder().append('['); - - for (;;) { - if (node == null) - break; - - Object value = node.value(); - - if (value == null) - break; - - if(sb.length() > 1) - sb.append(',').append(' '); - - sb.append(value); - - Node next = node.next(); - - if (node == next) - break; - - node = next; - } - - return sb.append(']').toString(); - } - - /** - * @return The node after the head of the queue (the first element in the queue). - */ - private Node peekNode() { - Node head = this.head; - Node next = head.next(); - - if (next == null && head != tail) { - do { - next = head.next(); - } while (next == null); - } - return next; - } -} - -/** - * Padding. - */ -@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", "unused"}) -abstract class PaddingL1<E> extends Tail<E> { - /** Padding. */ - long p00, p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; -} - -/** - * Tail of {@link MpscQueue}. - */ -@SuppressWarnings({"ClassNameDiffersFromFileName", "AtomicFieldUpdaterIssues"}) -abstract class Tail<E> extends PaddingL0 { - /** Tail field updater. */ - private static final AtomicReferenceFieldUpdater<Tail, Node> updater = - AtomicReferenceFieldUpdater.newUpdater(Tail.class, Node.class, "tail"); - - /** Tail. */ - protected volatile Node tail; - - /** - * Offer element. - * - * @param e Element. - */ - public void offer(final E e) { - if (e == null) - throw new IllegalArgumentException("Null are not allowed."); - - Node newTail = new Node(e); - - Node prevTail = updater.getAndSet(this, newTail); - - prevTail.next(newTail); - } -} - -/** - * Padding. - */ -@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", "unused"}) -abstract class PaddingL0 { - /** Padding. */ - long p00, p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; -} - -@SuppressWarnings({"UnusedDeclaration", "ClassNameDiffersFromFileName"}) -final class Node { - /** Next field updater. */ - private static final AtomicReferenceFieldUpdater<Node, Node> updater = - AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next"); - - /** Value. */ - private Object val; - - /** Next node. */ - private volatile Node next; - - /** - * Constructor. - * - * @param val Value. - */ - Node(Object val) { - this.val = val; - } - - /** - * Set next node. - * - * @param next Next node. - */ - void next(Node next) { - updater.lazySet(this, next); - } - - /** Value. */ - Object value() { - return val; - } - - void value(Object val) { - this.val = val; - } - - /** Next node. */ - Node next() { - return next; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/4e86660d/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java index 1f9b08d..630d34c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -22,36 +22,34 @@ import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.List; -import java.util.Random; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.NotNull; -import org.jsr166.ConcurrentLinkedDeque8; /** * Striped executor. */ public class StripedExecutor implements ExecutorService { - /** */ - private static final int SPIN_CNT = 2048; - /** Stripes. */ private final Stripe[] stripes; @@ -78,11 +76,12 @@ public class StripedExecutor implements ExecutorService { * @param log Logger. * @param stealTasks {@code True} to steal tasks. */ - public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, - boolean stealTasks) { + public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, boolean stealTasks) { A.ensure(cnt > 0, "cnt > 0"); - stripes = stealTasks ? StealingStripe.create(igniteInstanceName, poolName, log, cnt) : new Stripe[cnt]; + boolean success = false; + + stripes = new Stripe[cnt]; completedCntrs = new long[cnt]; @@ -90,12 +89,17 @@ public class StripedExecutor implements ExecutorService { this.log = log; - boolean success = false; - try { - if(!stealTasks) { - for (int i = 0; i < cnt; i++) - stripes[i] = new MpscQueueBasedStripe(igniteInstanceName, poolName, i, log); + for (int i = 0; i < cnt; i++) { + stripes[i] = stealTasks ? new StripeConcurrentQueue( + igniteInstanceName, + poolName, + i, + log, stripes) : new StripeConcurrentQueue( + igniteInstanceName, + poolName, + i, + log); } for (int i = 0; i < cnt; i++) @@ -405,7 +409,7 @@ public class StripedExecutor implements ExecutorService { /** * Stripe. */ - private abstract static class Stripe implements Runnable { + private static abstract class Stripe implements Runnable { /** */ private final String igniteInstanceName; @@ -436,7 +440,7 @@ public class StripedExecutor implements ExecutorService { * @param idx Stripe index. * @param log Logger. */ - Stripe( + public Stripe( String igniteInstanceName, String poolName, int idx, @@ -489,8 +493,10 @@ public class StripedExecutor implements ExecutorService { /** {@inheritDoc} */ @Override public void run() { while (!stopping) { + Runnable cmd; + try { - Runnable cmd = take(); + cmd = take(); if (cmd != null) { active = true; @@ -546,79 +552,52 @@ public class StripedExecutor implements ExecutorService { } } - /** */ - private static final class StealingStripe extends Stripe { + /** + * Stripe. + */ + private static class StripeConcurrentQueue extends Stripe { /** */ private static final int IGNITE_TASKS_STEALING_THRESHOLD = IgniteSystemProperties.getInteger( IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD, 4); - /** */ - @GridToStringExclude - private final Deque<Runnable>[] queues; - - /** */ - @GridToStringExclude - private final IgniteRunnable unpark; + /** Queue. */ + private final Queue<Runnable> queue; /** */ @GridToStringExclude - private Random rnd; - - /** */ - private final Deque<Runnable> queue; + private final Stripe[] others; /** */ - private final AtomicBoolean parked = new AtomicBoolean(); - - /** */ - @SuppressWarnings("unchecked") - static Stripe[] create(String igniteInstanceName, String poolName, IgniteLogger log, final int poolSize) { - final StealingStripe[] stripes = new StealingStripe[poolSize]; - Deque<Runnable>[] queues = new Deque[poolSize]; - - IgniteRunnable unpark = new IgniteRunnable() { - @Override public void run() { - int init = ThreadLocalRandom.current().nextInt(poolSize); - - for (int cur = init;;) { - AtomicBoolean parked = stripes[cur].parked; - - if (parked.get() && parked.compareAndSet(true, false)) { - LockSupport.unpark(stripes[cur].thread); - - break; - } - - if ((cur = (cur + 1) % poolSize) == init) - break; - } - } - }; - - for (int i = 0; i < poolSize; i++) { - queues[i] = new ConcurrentLinkedDeque8<>(); - stripes[i] = new StealingStripe(i, igniteInstanceName, poolName, log, queues, unpark); - } + private volatile boolean parked; - return stripes; + /** + * @param igniteInstanceName Ignite instance name. + * @param poolName Pool name. + * @param idx Stripe index. + * @param log Logger. + */ + StripeConcurrentQueue( + String igniteInstanceName, + String poolName, + int idx, + IgniteLogger log + ) { + this(igniteInstanceName, poolName, idx, log, null); } /** - * @param idx Stripe index. * @param igniteInstanceName Ignite instance name. * @param poolName Pool name. + * @param idx Stripe index. * @param log Logger. - * @param queues Other queues to steal tasks from. - * @param unpark Unpark callback, unparks random parked stripe from the pool. */ - private StealingStripe( - int idx, + StripeConcurrentQueue( String igniteInstanceName, String poolName, + int idx, IgniteLogger log, - Deque<Runnable>[] queues, - IgniteRunnable unpark + Stripe[] others ) { super( igniteInstanceName, @@ -626,62 +605,73 @@ public class StripedExecutor implements ExecutorService { idx, log); - this.queues = queues; - this.unpark = unpark; + this.others = others; - queue = queues[idx]; + this.queue = others == null ? new ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>(); } /** {@inheritDoc} */ @Override Runnable take() throws InterruptedException { - Runnable task; + Runnable r; + + for (int i = 0; i < 2048; i++) { + r = queue.poll(); - for (int i = 0; i < SPIN_CNT; i++) { - if ((task = queue.poll()) != null) - return task; + if (r != null) + return r; } - for (;;) { - parked.set(true); + parked = true; - if ((task = queue.poll()) != null) { - parked.set(false); + try { + for (;;) { + r = queue.poll(); - return task; - } + if (r != null) + return r; - int len = queues.length, init = random().nextInt(len); + if(others != null) { + int len = others.length; + int init = ThreadLocalRandom.current().nextInt(len); + int cur = init; - for (int cur = init;;) { - if(cur != idx) { - Deque<Runnable> queue = queues[cur]; + while (true) { + if(cur != idx) { + Deque<Runnable> queue = (Deque<Runnable>) ((StripeConcurrentQueue) others[cur]).queue; - if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (task = queue.pollLast()) != null) { - parked.set(false); + if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null) + return r; + } - return task; + if ((cur = (cur + 1) % len) == init) + break; } } - if ((cur = (cur + 1) % len) == init) - break; - } - - LockSupport.park(); + LockSupport.park(); - if (Thread.interrupted()) - throw new InterruptedException(); + if (Thread.interrupted()) + throw new InterruptedException(); + } + } + finally { + parked = false; } } /** {@inheritDoc} */ - @Override void execute(Runnable cmd) { + void execute(Runnable cmd) { queue.add(cmd); - if (parked.get() && parked.compareAndSet(true, false)) + if (parked) LockSupport.unpark(thread); - else if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD) - unpark.run(); + + if(others != null && queueSize() > IGNITE_TASKS_STEALING_THRESHOLD) { + for (Stripe other : others) { + if(((StripeConcurrentQueue)other).parked) + LockSupport.unpark(other.thread); + } + } } /** {@inheritDoc} */ @@ -694,24 +684,18 @@ public class StripedExecutor implements ExecutorService { return queue.size(); } - /** */ - private Random random() { - return rnd == null ? rnd = ThreadLocalRandom.current() : rnd; - } - /** {@inheritDoc} */ @Override public String toString() { - return S.toString(StealingStripe.class, this, super.toString()); + return S.toString(StripeConcurrentQueue.class, this, super.toString()); } } - /** */ - private static final class MpscQueueBasedStripe extends Stripe { - /** */ - private final AtomicBoolean parked = new AtomicBoolean(); - - /** */ - private final MpscQueue<Runnable> queue = new MpscQueue<>(); + /** + * Stripe. + */ + private static class StripeConcurrentQueueNoPark extends Stripe { + /** Queue. */ + private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); /** * @param igniteInstanceName Ignite instance name. @@ -719,41 +703,82 @@ public class StripedExecutor implements ExecutorService { * @param idx Stripe index. * @param log Logger. */ - private MpscQueueBasedStripe(String igniteInstanceName, String poolName, int idx, IgniteLogger log) { - super(igniteInstanceName, poolName, idx, log); + public StripeConcurrentQueueNoPark( + String igniteInstanceName, + String poolName, + int idx, + IgniteLogger log + ) { + super(igniteInstanceName, + poolName, + idx, + log); } /** {@inheritDoc} */ - @Override void execute(Runnable cmd) { - queue.offer(cmd); + @Override Runnable take() { + for (;;) { + Runnable r = queue.poll(); - if (parked.get() && parked.compareAndSet(true, false)) - LockSupport.unpark(thread); + if (r != null) + return r; + } } /** {@inheritDoc} */ - @Override Runnable take() throws InterruptedException { - Runnable task; + void execute(Runnable cmd) { + queue.add(cmd); + } - for (int i = 0; i < SPIN_CNT; i++) { - if ((task = queue.poll()) != null) - return task; - } + /** {@inheritDoc} */ + @Override int queueSize() { + return queue.size(); + } - for (;;) { - parked.set(true); + /** {@inheritDoc} */ + @Override String queueToString() { + return String.valueOf(queue); + } - if ((task = queue.poll()) != null) { - parked.set(false); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StripeConcurrentQueueNoPark.class, this, super.toString()); + } + } - return task; - } + /** + * Stripe. + */ + private static class StripeConcurrentBlockingQueue extends Stripe { + /** Queue. */ + private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); - LockSupport.park(); + /** + * @param igniteInstanceName Ignite instance name. + * @param poolName Pool name. + * @param idx Stripe index. + * @param log Logger. + */ + public StripeConcurrentBlockingQueue( + String igniteInstanceName, + String poolName, + int idx, + IgniteLogger log + ) { + super(igniteInstanceName, + poolName, + idx, + log); + } - if (Thread.interrupted()) - throw new InterruptedException(); - } + /** {@inheritDoc} */ + @Override Runnable take() throws InterruptedException { + return queue.take(); + } + + /** {@inheritDoc} */ + void execute(Runnable cmd) { + queue.add(cmd); } /** {@inheritDoc} */ @@ -765,5 +790,10 @@ public class StripedExecutor implements ExecutorService { @Override String queueToString() { return String.valueOf(queue); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StripeConcurrentBlockingQueue.class, this, super.toString()); + } } }
