Repository: ignite
Updated Branches:
  refs/heads/master e9f1a7e0b -> 4e86660dc


Revert "IGNITE-6809: Use MPSC queue in striped pool. This closes #2960."

This reverts commit 6f94659b53f056c176b3a3c6dd8f11a5db2ad74f.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e86660d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e86660d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e86660d

Branch: refs/heads/master
Commit: 4e86660dc95e32797cf8a4eb2fa2d38e09934643
Parents: e9f1a7e
Author: devozerov <[email protected]>
Authored: Tue Feb 27 12:40:43 2018 +0300
Committer: devozerov <[email protected]>
Committed: Tue Feb 27 12:40:43 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/util/MpscQueue.java  | 240 ---------------
 .../ignite/internal/util/StripedExecutor.java   | 304 ++++++++++---------
 2 files changed, 167 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4e86660d/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java 
b/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
deleted file mode 100644
index cc10124..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/MpscQueue.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util;
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-/**
- * MP-SC concurrent linked queue implementation based on Dmitry Vyukov's <a 
href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue";>
- * Non-intrusive MPSC node-based queue</a>.
- */
-@SuppressWarnings({"WeakerAccess", "PackageVisibleField", "unused"})
-public final class MpscQueue<E> extends Head<E> {
-    /** Padding. */
-    long p00, p01, p02, p03, p04, p05, p06, p07;
-    long p10, p11, p12, p13, p14, p15, p16, p17;
-
-    public MpscQueue() {
-        Node node = new Node(null);
-
-        tail = node;
-        head = node;
-    }
-}
-
-/**
- * Head of {@link MpscQueue}.
- */
-@SuppressWarnings({"WeakerAccess", "AtomicFieldUpdaterIssues", 
"ClassNameDiffersFromFileName"})
-abstract class Head<E> extends PaddingL1<E> {
-    /** Head field updater. */
-    private static final AtomicReferenceFieldUpdater<Head, Node> updater =
-        AtomicReferenceFieldUpdater.newUpdater(Head.class, Node.class, "head");
-
-    /** Head. */
-    protected volatile Node head;
-
-    /**
-     * Poll element.
-     *
-     * @return Element.
-     */
-    @SuppressWarnings("unchecked")
-    public E poll() {
-        Node node = peekNode();
-
-        if (node != null) {
-            Object val = node.value();
-
-            node.value(null);
-
-            updater.lazySet(this, node);
-
-            return (E)val;
-        }
-        else
-            return null;
-    }
-
-
-    /**
-     * @return queue size.
-     */
-    public int size() {
-        Node node = peekNode();
-
-        int size = 0;
-
-        for (;;) {
-            if (node == null || node.value() == null)
-                break;
-
-            Node next = node.next();
-
-            if (node == next)
-                break;
-
-            node = next;
-
-            if (++size == Integer.MAX_VALUE)
-                break;
-        }
-
-        return size;
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        Node node = peekNode();
-
-        StringBuilder sb = new StringBuilder().append('[');
-
-        for (;;) {
-            if (node == null)
-                break;
-
-            Object value = node.value();
-
-            if (value == null)
-                break;
-
-            if(sb.length() > 1)
-                sb.append(',').append(' ');
-
-            sb.append(value);
-
-            Node next = node.next();
-
-            if (node == next)
-                break;
-
-            node = next;
-        }
-
-        return sb.append(']').toString();
-    }
-
-    /**
-     * @return The node after the head of the queue (the first element in the 
queue).
-     */
-    private Node peekNode() {
-        Node head = this.head;
-        Node next = head.next();
-
-        if (next == null && head != tail) {
-            do {
-                next = head.next();
-            } while (next == null);
-        }
-        return next;
-    }
-}
-
-/**
- * Padding.
- */
-@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", 
"unused"})
-abstract class PaddingL1<E> extends Tail<E> {
-    /** Padding. */
-    long p00, p01, p02, p03, p04, p05, p06, p07;
-    long p10, p11, p12, p13, p14, p15, p16, p17;
-}
-
-/**
- * Tail of {@link MpscQueue}.
- */
-@SuppressWarnings({"ClassNameDiffersFromFileName", "AtomicFieldUpdaterIssues"})
-abstract class Tail<E> extends PaddingL0 {
-    /** Tail field updater. */
-    private static final AtomicReferenceFieldUpdater<Tail, Node> updater =
-        AtomicReferenceFieldUpdater.newUpdater(Tail.class, Node.class, "tail");
-
-    /** Tail. */
-    protected volatile Node tail;
-
-    /**
-     * Offer element.
-     *
-     * @param e Element.
-     */
-    public void offer(final E e) {
-        if (e == null)
-            throw new IllegalArgumentException("Null are not allowed.");
-
-        Node newTail = new Node(e);
-
-        Node prevTail = updater.getAndSet(this, newTail);
-
-        prevTail.next(newTail);
-    }
-}
-
-/**
- * Padding.
- */
-@SuppressWarnings({"PackageVisibleField", "ClassNameDiffersFromFileName", 
"unused"})
-abstract class PaddingL0 {
-    /** Padding. */
-    long p00, p01, p02, p03, p04, p05, p06, p07;
-    long p10, p11, p12, p13, p14, p15, p16, p17;
-}
-
-@SuppressWarnings({"UnusedDeclaration", "ClassNameDiffersFromFileName"})
-final class Node {
-    /** Next field updater. */
-    private static final AtomicReferenceFieldUpdater<Node, Node> updater =
-        AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "next");
-
-    /** Value. */
-    private Object val;
-
-    /** Next node. */
-    private volatile Node next;
-
-    /**
-     * Constructor.
-     *
-     * @param val Value.
-     */
-    Node(Object val) {
-        this.val = val;
-    }
-
-    /**
-     * Set next node.
-     *
-     * @param next Next node.
-     */
-    void next(Node next) {
-        updater.lazySet(this, next);
-    }
-
-    /** Value. */
-    Object value() {
-        return val;
-    }
-
-    void value(Object val) {
-        this.val = val;
-    }
-
-    /** Next node. */
-    Node next() {
-        return next;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4e86660d/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 1f9b08d..630d34c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -22,36 +22,34 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
-import java.util.Random;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.NotNull;
-import org.jsr166.ConcurrentLinkedDeque8;
 
 /**
  * Striped executor.
  */
 public class StripedExecutor implements ExecutorService {
-    /** */
-    private static final int SPIN_CNT = 2048;
-
     /** Stripes. */
     private final Stripe[] stripes;
 
@@ -78,11 +76,12 @@ public class StripedExecutor implements ExecutorService {
      * @param log Logger.
      * @param stealTasks {@code True} to steal tasks.
      */
-    public StripedExecutor(int cnt, String igniteInstanceName, String 
poolName, final IgniteLogger log,
-        boolean stealTasks) {
+    public StripedExecutor(int cnt, String igniteInstanceName, String 
poolName, final IgniteLogger log, boolean stealTasks) {
         A.ensure(cnt > 0, "cnt > 0");
 
-        stripes = stealTasks ? StealingStripe.create(igniteInstanceName, 
poolName, log, cnt) : new Stripe[cnt];
+        boolean success = false;
+
+        stripes = new Stripe[cnt];
 
         completedCntrs = new long[cnt];
 
@@ -90,12 +89,17 @@ public class StripedExecutor implements ExecutorService {
 
         this.log = log;
 
-        boolean success = false;
-
         try {
-            if(!stealTasks) {
-                for (int i = 0; i < cnt; i++)
-                    stripes[i] = new MpscQueueBasedStripe(igniteInstanceName, 
poolName, i, log);
+            for (int i = 0; i < cnt; i++) {
+                stripes[i] = stealTasks ? new StripeConcurrentQueue(
+                    igniteInstanceName,
+                    poolName,
+                    i,
+                    log, stripes) : new StripeConcurrentQueue(
+                        igniteInstanceName,
+                        poolName,
+                        i,
+                        log);
             }
 
             for (int i = 0; i < cnt; i++)
@@ -405,7 +409,7 @@ public class StripedExecutor implements ExecutorService {
     /**
      * Stripe.
      */
-    private abstract static class Stripe implements Runnable {
+    private static abstract class Stripe implements Runnable {
         /** */
         private final String igniteInstanceName;
 
@@ -436,7 +440,7 @@ public class StripedExecutor implements ExecutorService {
          * @param idx Stripe index.
          * @param log Logger.
          */
-        Stripe(
+        public Stripe(
             String igniteInstanceName,
             String poolName,
             int idx,
@@ -489,8 +493,10 @@ public class StripedExecutor implements ExecutorService {
         /** {@inheritDoc} */
         @Override public void run() {
             while (!stopping) {
+                Runnable cmd;
+
                 try {
-                    Runnable cmd = take();
+                    cmd = take();
 
                     if (cmd != null) {
                         active = true;
@@ -546,79 +552,52 @@ public class StripedExecutor implements ExecutorService {
         }
     }
 
-    /** */
-    private static final class StealingStripe extends Stripe {
+    /**
+     * Stripe.
+     */
+    private static class StripeConcurrentQueue extends Stripe {
         /** */
         private static final int IGNITE_TASKS_STEALING_THRESHOLD =
             IgniteSystemProperties.getInteger(
                 
IgniteSystemProperties.IGNITE_DATA_STREAMING_EXECUTOR_SERVICE_TASKS_STEALING_THRESHOLD,
 4);
 
-        /** */
-        @GridToStringExclude
-        private final Deque<Runnable>[] queues;
-
-        /** */
-        @GridToStringExclude
-        private final IgniteRunnable unpark;
+        /** Queue. */
+        private final Queue<Runnable> queue;
 
         /** */
         @GridToStringExclude
-        private Random rnd;
-
-        /** */
-        private final Deque<Runnable> queue;
+        private final Stripe[] others;
 
         /** */
-        private final AtomicBoolean parked = new AtomicBoolean();
-
-        /** */
-        @SuppressWarnings("unchecked")
-        static Stripe[] create(String igniteInstanceName, String poolName, 
IgniteLogger log, final int poolSize) {
-            final StealingStripe[] stripes = new StealingStripe[poolSize];
-            Deque<Runnable>[] queues = new Deque[poolSize];
-
-            IgniteRunnable unpark = new IgniteRunnable() {
-                @Override public void run() {
-                    int init = ThreadLocalRandom.current().nextInt(poolSize);
-
-                    for (int cur = init;;) {
-                        AtomicBoolean parked = stripes[cur].parked;
-
-                        if (parked.get() && parked.compareAndSet(true, false)) 
{
-                            LockSupport.unpark(stripes[cur].thread);
-
-                            break;
-                        }
-
-                        if ((cur = (cur + 1) % poolSize) == init)
-                            break;
-                    }
-                }
-            };
-
-            for (int i = 0; i < poolSize; i++) {
-                queues[i] = new ConcurrentLinkedDeque8<>();
-                stripes[i] = new StealingStripe(i, igniteInstanceName, 
poolName, log, queues, unpark);
-            }
+        private volatile boolean parked;
 
-            return stripes;
+        /**
+         * @param igniteInstanceName Ignite instance name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        StripeConcurrentQueue(
+            String igniteInstanceName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            this(igniteInstanceName, poolName, idx, log, null);
         }
 
         /**
-         * @param idx Stripe index.
          * @param igniteInstanceName Ignite instance name.
          * @param poolName Pool name.
+         * @param idx Stripe index.
          * @param log Logger.
-         * @param queues Other queues to steal tasks from.
-         * @param unpark Unpark callback, unparks random parked stripe from 
the pool.
          */
-        private StealingStripe(
-            int idx,
+        StripeConcurrentQueue(
             String igniteInstanceName,
             String poolName,
+            int idx,
             IgniteLogger log,
-            Deque<Runnable>[] queues,
-            IgniteRunnable unpark
+            Stripe[] others
         ) {
             super(
                 igniteInstanceName,
@@ -626,62 +605,73 @@ public class StripedExecutor implements ExecutorService {
                 idx,
                 log);
 
-            this.queues = queues;
-            this.unpark = unpark;
+            this.others = others;
 
-            queue = queues[idx];
+            this.queue = others == null ? new 
ConcurrentLinkedQueue<Runnable>() : new ConcurrentLinkedDeque<Runnable>();
         }
 
         /** {@inheritDoc} */
         @Override Runnable take() throws InterruptedException {
-            Runnable task;
+            Runnable r;
+
+            for (int i = 0; i < 2048; i++) {
+                r = queue.poll();
 
-            for (int i = 0; i < SPIN_CNT; i++) {
-                if ((task = queue.poll()) != null)
-                    return task;
+                if (r != null)
+                    return r;
             }
 
-            for (;;) {
-                parked.set(true);
+            parked = true;
 
-                if ((task = queue.poll()) != null) {
-                    parked.set(false);
+            try {
+                for (;;) {
+                    r = queue.poll();
 
-                    return task;
-                }
+                    if (r != null)
+                        return r;
 
-                int len = queues.length, init = random().nextInt(len);
+                    if(others != null) {
+                        int len = others.length;
+                        int init = ThreadLocalRandom.current().nextInt(len);
+                        int cur = init;
 
-                for (int cur = init;;) {
-                    if(cur != idx) {
-                        Deque<Runnable> queue = queues[cur];
+                        while (true) {
+                            if(cur != idx) {
+                                Deque<Runnable> queue = (Deque<Runnable>) 
((StripeConcurrentQueue) others[cur]).queue;
 
-                        if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD && 
(task = queue.pollLast()) != null) {
-                            parked.set(false);
+                                if(queue.size() > 
IGNITE_TASKS_STEALING_THRESHOLD && (r = queue.pollLast()) != null)
+                                    return r;
+                            }
 
-                            return task;
+                            if ((cur = (cur + 1) % len) == init)
+                                break;
                         }
                     }
 
-                    if ((cur = (cur + 1) % len) == init)
-                        break;
-                }
-
-                LockSupport.park();
+                    LockSupport.park();
 
-                if (Thread.interrupted())
-                    throw new InterruptedException();
+                    if (Thread.interrupted())
+                        throw new InterruptedException();
+                }
+            }
+            finally {
+                parked = false;
             }
         }
 
         /** {@inheritDoc} */
-        @Override void execute(Runnable cmd) {
+        void execute(Runnable cmd) {
             queue.add(cmd);
 
-            if (parked.get() && parked.compareAndSet(true, false))
+            if (parked)
                 LockSupport.unpark(thread);
-            else if(queue.size() > IGNITE_TASKS_STEALING_THRESHOLD)
-                unpark.run();
+
+            if(others != null && queueSize() > 
IGNITE_TASKS_STEALING_THRESHOLD) {
+                for (Stripe other : others) {
+                    if(((StripeConcurrentQueue)other).parked)
+                        LockSupport.unpark(other.thread);
+                }
+            }
         }
 
         /** {@inheritDoc} */
@@ -694,24 +684,18 @@ public class StripedExecutor implements ExecutorService {
             return queue.size();
         }
 
-        /** */
-        private Random random() {
-            return rnd == null ? rnd = ThreadLocalRandom.current() : rnd;
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(StealingStripe.class, this, super.toString());
+            return S.toString(StripeConcurrentQueue.class, this, 
super.toString());
         }
     }
 
-    /** */
-    private static final class MpscQueueBasedStripe extends Stripe {
-        /** */
-        private final AtomicBoolean parked = new AtomicBoolean();
-
-        /** */
-        private final MpscQueue<Runnable> queue = new MpscQueue<>();
+    /**
+     * Stripe.
+     */
+    private static class StripeConcurrentQueueNoPark extends Stripe {
+        /** Queue. */
+        private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
 
         /**
          * @param igniteInstanceName Ignite instance name.
@@ -719,41 +703,82 @@ public class StripedExecutor implements ExecutorService {
          * @param idx Stripe index.
          * @param log Logger.
          */
-        private MpscQueueBasedStripe(String igniteInstanceName, String 
poolName, int idx, IgniteLogger log) {
-            super(igniteInstanceName, poolName, idx, log);
+        public StripeConcurrentQueueNoPark(
+            String igniteInstanceName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(igniteInstanceName,
+                poolName,
+                idx,
+                log);
         }
 
         /** {@inheritDoc} */
-        @Override void execute(Runnable cmd) {
-            queue.offer(cmd);
+        @Override Runnable take() {
+            for (;;) {
+                Runnable r = queue.poll();
 
-            if (parked.get() && parked.compareAndSet(true, false))
-                LockSupport.unpark(thread);
+                if (r != null)
+                    return r;
+            }
         }
 
         /** {@inheritDoc} */
-        @Override Runnable take() throws InterruptedException {
-            Runnable task;
+        void execute(Runnable cmd) {
+            queue.add(cmd);
+        }
 
-            for (int i = 0; i < SPIN_CNT; i++) {
-                if ((task = queue.poll()) != null)
-                    return task;
-            }
+        /** {@inheritDoc} */
+        @Override int queueSize() {
+            return queue.size();
+        }
 
-            for (;;) {
-                parked.set(true);
+        /** {@inheritDoc} */
+        @Override String queueToString() {
+            return String.valueOf(queue);
+        }
 
-                if ((task = queue.poll()) != null) {
-                    parked.set(false);
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StripeConcurrentQueueNoPark.class, this, 
super.toString());
+        }
+    }
 
-                    return task;
-                }
+    /**
+     * Stripe.
+     */
+    private static class StripeConcurrentBlockingQueue extends Stripe {
+        /** Queue. */
+        private final BlockingQueue<Runnable> queue = new 
LinkedBlockingQueue<>();
 
-                LockSupport.park();
+        /**
+         * @param igniteInstanceName Ignite instance name.
+         * @param poolName Pool name.
+         * @param idx Stripe index.
+         * @param log Logger.
+         */
+        public StripeConcurrentBlockingQueue(
+            String igniteInstanceName,
+            String poolName,
+            int idx,
+            IgniteLogger log
+        ) {
+            super(igniteInstanceName,
+                poolName,
+                idx,
+                log);
+        }
 
-                if (Thread.interrupted())
-                    throw new InterruptedException();
-            }
+        /** {@inheritDoc} */
+        @Override Runnable take() throws InterruptedException {
+            return queue.take();
+        }
+
+        /** {@inheritDoc} */
+        void execute(Runnable cmd) {
+            queue.add(cmd);
         }
 
         /** {@inheritDoc} */
@@ -765,5 +790,10 @@ public class StripedExecutor implements ExecutorService {
         @Override String queueToString() {
             return String.valueOf(queue);
         }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(StripeConcurrentBlockingQueue.class, this, 
super.toString());
+        }
     }
 }

Reply via email to