IO opts

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/67b94013
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/67b94013
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/67b94013

Branch: refs/heads/ignite-comm-balance
Commit: 67b9401300a0c12d274486bdd09822ccfbe9d2f5
Parents: 0516fce
Author: Yakov Zhdanov <[email protected]>
Authored: Fri Oct 21 12:28:09 2016 +0300
Committer: Yakov Zhdanov <[email protected]>
Committed: Fri Oct 21 12:28:09 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  42 ++-
 .../managers/communication/GridIoMessage.java   |  21 ++
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |   9 +
 .../dht/atomic/GridNearAtomicUpdateRequest.java |   9 +
 .../util/SingleConsumerSpinCircularBuffer.java  | 370 +++++++++++++++++++
 .../ignite/internal/util/StripedExecutor.java   | 194 ++++++++++
 .../util/nio/GridNioRecoveryDescriptor.java     |  19 +-
 .../ignite/internal/util/nio/GridNioServer.java | 109 +++++-
 .../communication/tcp/TcpCommunicationSpi.java  |   3 +-
 9 files changed, 754 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index b465919..dffa875 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -61,6 +60,7 @@ import 
org.apache.ignite.internal.processors.platform.message.PlatformMessageFil
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -270,6 +270,28 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
 
         startSpi();
 
+//        striped = new 
StripedExecutor(Runtime.getRuntime().availableProcessors());
+//        responseExec = new StripedExecutor(1);
+
+//        Thread t = new Thread(new Runnable() {
+//            @Override public void run() {
+//                for (;;) {
+//                    try {
+//                        Thread.sleep(5000);
+//                    }
+//                    catch (InterruptedException e) {
+//                        e.printStackTrace();
+//                    }
+//
+//                    striped.dumpStats(log);
+//                }
+//            }
+//        });
+//
+//        t.setDaemon(true);
+//
+//        t.start();
+
         pubPool = ctx.getExecutorService();
         p2pPool = ctx.getPeerClassLoadingExecutorService();
         sysPool = ctx.getSystemExecutorService();
@@ -701,6 +723,12 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         if (log.isDebugEnabled())
             log.debug(stopInfo());
 
+        if (striped != null)
+            striped.stop();
+
+        if (responseExec != null)
+            responseExec.stop();
+
         Arrays.fill(ioPools, null);
     }
 
@@ -919,6 +947,9 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
         }
     }
 
+    StripedExecutor striped;
+    StripedExecutor responseExec;
+
     /**
      * @param nodeId Node ID.
      * @param msg Message.
@@ -957,6 +988,15 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             }
         }
 
+//        if (msg.partition() != -1) {
+//            striped.execute(msg.partition(), c);
+//        }
+//        else if (msg.response()) {
+//            responseExec.execute(0, c);
+//        }
+//        else
+
+
         try {
             pool(plc).execute(c);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index b28ced2..038cd5f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -20,6 +20,10 @@ package org.apache.ignite.internal.managers.communication;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.GridDirectTransient;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -321,6 +325,23 @@ public class GridIoMessage implements Message {
         return 7;
     }
 
+    /**
+     * Get single partition for this message (if applicable).
+     *
+     * @return Partition.
+     */
+    public int partition() {
+        if (msg instanceof GridNearAtomicUpdateRequest)
+            return ((GridNearAtomicUpdateRequest)msg).partition();
+        else
+            return -1;
+    }
+
+    public boolean response() {
+        return msg instanceof GridNearAtomicUpdateResponse ||
+            msg instanceof GridDhtAtomicUpdateResponse;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridIoMessage.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 55f7560..f7ae0fd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -602,6 +602,15 @@ public class GridDhtAtomicUpdateRequest extends 
GridCacheMessage implements Grid
     }
 
     /**
+     * Get single partition for that message.
+     *
+     * @return Partition.
+     */
+    public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
+    /**
      * @param idx Index.
      * @return Conflict expire time.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index eb9be4d..ec54e9a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -679,6 +679,15 @@ public class GridNearAtomicUpdateRequest extends 
GridCacheMessage implements Gri
         return ctx.atomicMessageLogger();
     }
 
+    /**
+     * Get single partition for that message.
+     *
+     * @return Partition.
+     */
+    public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java
new file mode 100644
index 0000000..81f913f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+import org.jsr166.LongAdder8;
+import sun.java2d.pipe.SpanIterator;
+
+/**
+ * This class implements a circular buffer for efficient data exchange.
+ */
+public class SingleConsumerSpinCircularBuffer<T> {
+    private static final int PARK_FREQ = 3;
+    private static final int SPINS_CNT = 32;
+
+    private static final 
AtomicLongFieldUpdater<SingleConsumerSpinCircularBuffer> writePosUpd =
+        
AtomicLongFieldUpdater.newUpdater(SingleConsumerSpinCircularBuffer.class, 
"writePos");
+
+    private volatile long readPos;
+
+    private long p01, p02, p03, p04, p05, p06, p07;
+
+    private volatile long writePos;
+
+    private long p11, p12, p13, p14, p15, p16, p17;
+
+    /** */
+    private final long sizeMask;
+
+    /** */
+    private final Item<T>[] arr;
+
+    private volatile Thread consumer;
+
+    private volatile boolean parked;
+
+    /**
+     * @param size Size.
+     */
+    public SingleConsumerSpinCircularBuffer(
+        int size
+    ) {
+        sizeMask = size - 1;
+
+        arr = (Item<T>[])new Item[size];
+
+        // Fill the array.
+        for (int i = 0; i < arr.length; i++)
+            arr[i] = new Item<>(-(arr.length + i));
+
+        readPos = writePos = arr.length * 2;
+    }
+
+    /**
+     * @return
+     */
+    public T poll() {
+        return poll0(false);
+    }
+
+    private T poll0(boolean take) {
+        if (consumer == null)
+            consumer = Thread.currentThread();
+
+        long readPos0 = readPos;
+
+        if (readPos0 == writePos) {
+            if (take) {
+                parked = true;
+
+                try {
+                    for (; readPos0 == writePos; ) {
+//                        System.out.println("parked " + consumer.getId() + 
"readPos=" + readPos + ", writePos=" + writePos);
+
+                        LockSupport.park();
+                    }
+                }
+                finally {
+                    parked = false;
+                }
+            }
+            else
+                return null;
+        }
+
+        Item<T> item = arr[(int)(readPos0 & sizeMask)];
+
+        readPos = readPos0 + 1;
+
+        T item0 = item.item(readPos0, SPINS_CNT);
+
+        if (item0 == null) {
+            parked = true;
+
+            try {
+                for (item0 = item.item(readPos0, 1); item0 == null; item0 = 
item.item(readPos0, 1)) {
+                    LockSupport.park();
+
+//                    System.out.println("readPos=" + readPos + ", writePos=" 
+ writePos + ", item=" + item);
+                }
+            }
+            finally {
+                parked = false;
+            }
+
+            assert item != null;
+        }
+
+        return item0;
+    }
+
+    public T take() throws InterruptedException {
+        return poll0(true);
+    }
+
+    public int size() {
+        return (int)(writePos - readPos);
+    }
+
+    /**
+     * @param t
+     * @return
+     */
+    public int put(T t) {
+        long writePos0;
+
+        for (;;) {
+            writePos0 = writePos;
+
+            if (writePosUpd.compareAndSet(this, writePos0, writePos0 + 1))
+                break;
+        }
+
+        Item<T> item = arr[(int)(writePos0 & sizeMask)];
+
+        item.update(writePos0, arr.length, t);
+
+        if (parked) {
+//            System.out.println("unpark " + consumer.getId() + "readPos=" + 
readPos + ", writePos=" + writePos + " " + item);
+
+            LockSupport.unpark(consumer);
+        }
+
+        return (int)(writePos0 + 1 - readPos);
+    }
+
+    public static void main_(String[] args) {
+        final ConcurrentLinkedDeque<Long> b = new ConcurrentLinkedDeque<>();
+
+        final LongAdder8 cnt = new LongAdder8();
+
+        new Thread(
+            new Runnable() {
+                @Override public void run() {
+                    for (;;) {
+                        try {
+                            Thread.sleep(1000);
+                        }
+                        catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+
+                        System.out.println("TPS: " + cnt.sumThenReset());
+                    }
+                }
+            }
+        ).start();
+
+        final Semaphore sem = new Semaphore(8192);
+
+        new Thread(
+            new Runnable() {
+                @Override public void run() {
+                    for (;;) {
+                        Long poll = b.poll();
+
+                        if (poll != null) {
+                            cnt.increment();
+
+                            sem.release();
+                        }
+                    }
+                }
+            }
+        ).start();
+
+        for (int i = 0; i < 4; i++) {
+            new Thread(
+                new Runnable() {
+                    @Override public void run() {
+                        for (long i = 0; ; i++) {
+                            sem.acquireUninterruptibly();
+                            b.add(i);
+                        }
+                    }
+                }
+            ).start();
+        }
+    }
+    public static void main(String[] args) throws InterruptedException {
+        final SingleConsumerSpinCircularBuffer<Long> b = new 
SingleConsumerSpinCircularBuffer<>(
+            1024);
+
+//        b.put(1L);
+//        b.put(2L);
+//        b.put(3L);
+//
+//        System.out.println(b.take());
+//        System.out.println(b.poll());
+//        System.out.println(b.poll());
+
+        final LongAdder8 cnt = new LongAdder8();
+
+        new Thread(
+            new Runnable() {
+                @Override public void run() {
+                    for (;;) {
+                        try {
+                            Thread.sleep(1000);
+                        }
+                        catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+
+                        System.out.println("TPS: " + cnt.sumThenReset());
+                    }
+                }
+            }
+        ).start();
+
+        final CyclicBarrier bar = new CyclicBarrier(2);
+
+        new Thread(
+            new Runnable() {
+                @Override public void run() {
+                    for (;;) {
+                        Long poll = null;
+
+                        try {
+                            poll = b.take();
+
+                            //bar.await();
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();
+                        }
+
+                        if (poll != null)
+                            cnt.increment();
+
+                    }
+                }
+            }
+        ).start();
+
+        for (int i = 0; i < 4; i++) {
+            new Thread(
+                new Runnable() {
+                    @Override public void run() {
+                        for (long i = 0; ; i++) {
+                            b.put(i);
+
+//                            LockSupport.parkNanos(1L);
+
+//                            try {
+//                                bar.await();
+//                            }
+//                            catch (InterruptedException | 
BrokenBarrierException e) {
+//                                e.printStackTrace();
+//                            }
+                        }
+                    }
+                }
+            ).start();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return SingleConsumerSpinCircularBuffer.class.toString();
+    }
+
+    /**
+     *
+     */
+    private static class Item<V> {
+        /** */
+        private V item;
+
+        /** */
+        private volatile long idx;
+
+        private long p01, p02, p03, p04, p05, p06;
+        private int i01;
+
+        /**
+         *
+         */
+        Item(long idx) {
+            this.idx = idx;
+        }
+
+        /**
+         * @return Item.
+         */
+        V item(long readPos, int spins) {
+            for (int i = 0; i < spins; i++) {
+                if (idx == readPos) {
+                    V item1 = this.item;
+
+                    idx = -readPos;
+
+                    return item1;
+                }
+            }
+
+            return null;
+        }
+
+        /**
+         * @param writePos Index.
+         * @param newItem Item.
+         */
+        void update(long writePos, long diff, V newItem) {
+            int i = 0;
+
+            for (;;) {
+                if (idx == -(writePos - diff))
+                    break;
+
+                i++;
+
+                if ((i & 3) == 0)
+                    LockSupport.parkNanos(1L);
+            }
+
+            if (i > 100)
+                System.out.println("Spins [i=" + i + ", writePos=" + writePos 
+ ']');
+
+            item = newItem;
+
+            idx = writePos;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized String toString() {
+            return "Item [idx=" + idx + ']';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
new file mode 100644
index 0000000..fe62725
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Striped executor.
+ */
+public class StripedExecutor {
+    /** Count. */
+    private final int cnt;
+
+    /** Stripes. */
+    private final Stripe[] stripes;
+
+    private volatile boolean inited;
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Count.
+     */
+    public StripedExecutor(int cnt) {
+        this.cnt = cnt;
+
+        stripes = new Stripe[cnt];
+
+        for (int i = 0; i < cnt; i++) {
+            Stripe stripe = new Stripe();
+
+            stripes[i] = stripe;
+
+            stripe.start(i);
+        }
+
+        inited = true;
+    }
+
+    /**
+     * Execute command.
+     *
+     * @param idx Index.
+     * @param cmd Command.
+     */
+    public void execute(int idx, Runnable cmd) {
+        stripes[idx % cnt].execute(cmd);
+    }
+
+    /**
+     * Stop executor.
+     */
+    public void stop() {
+        for (; !inited; )
+            ;
+
+        for (Stripe stripe : stripes)
+            stripe.signalStop();
+
+        for (Stripe stripe : stripes)
+            stripe.awaitStop();
+    }
+
+    public void dumpStats(IgniteLogger log) {
+        StringBuilder sb = new StringBuilder("Stats ");
+
+        for (int i = 0; i < stripes.length; i++) {
+            sb.append(i).append("=").append(stripes[i].cnt).append("; ");
+
+            stripes[i].cnt = 0;
+        }
+
+        if (log.isInfoEnabled())
+            log.info(sb.toString());
+    }
+
+    /**
+     * Stripe.
+     */
+    private static class Stripe implements Runnable {
+        /** Queue. */
+//        private final BlockingQueue<Runnable> queue = new 
LinkedBlockingQueue<>();
+        private final SingleConsumerSpinCircularBuffer<Runnable> queue = new 
SingleConsumerSpinCircularBuffer<>(256);
+
+        /** Stopping flag. */
+        private volatile boolean stopping;
+
+        /** Thread executing the loop. */
+        private Thread thread;
+
+        private volatile long cnt;
+
+        /**
+         * Start the stripe.
+         */
+        void start(int idx) {
+            thread = new Thread(this);
+
+            thread.setName("stripe-" + idx);
+            thread.setDaemon(true);
+
+            thread.start();
+        }
+
+        /**
+         * Stop the stripe.
+         */
+        void signalStop() {
+            stopping = true;
+
+            thread.interrupt();
+        }
+
+        /**
+         * Await thread stop.
+         */
+        void awaitStop() {
+            try {
+                thread.join();
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new IgniteInterruptedException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            while (!stopping) {
+                Runnable cmd;
+
+                try {
+                    cmd = queue.take();
+                }
+                catch (InterruptedException e) {
+                    stopping = true;
+
+                    Thread.currentThread().interrupt();
+
+                    return;
+                }
+
+                if (cmd != null)
+                    execute0(cmd);
+
+                cnt++;
+            }
+        }
+
+        /**
+         * Internal execution routine.
+         *
+         * @param cmd Command.
+         */
+        private void execute0(Runnable cmd) {
+            try {
+                cmd.run();
+            }
+            catch (Exception e) {
+                U.warn(null, "Unexpected exception in stripe loop.", e);
+            }
+        }
+
+        /**
+         * Execute the command.
+         *
+         * @param cmd Command.
+         */
+        void execute(Runnable cmd) {
+//            queue.add(cmd);
+            queue.put(cmd);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 1ecf5b0..867237d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -78,12 +78,21 @@ public class GridNioRecoveryDescriptor {
     /** Number of descriptor reservations (for info purposes). */
     private int reserveCnt;
 
+    /** Ack send threshold (from configuration). */
+    private final int ackSndThreshold;
+
     /**
      * @param queueLimit Maximum size of unacknowledged messages queue.
      * @param node Node.
      * @param log Logger.
+     * @param ackSndThreshold Ack send threshold.
      */
-    public GridNioRecoveryDescriptor(int queueLimit, ClusterNode node, 
IgniteLogger log) {
+    public GridNioRecoveryDescriptor(
+        int queueLimit,
+        ClusterNode node,
+        IgniteLogger log,
+        int ackSndThreshold
+    ) {
         assert !node.isLocal() : node;
         assert queueLimit > 0;
 
@@ -92,6 +101,14 @@ public class GridNioRecoveryDescriptor {
         this.queueLimit = queueLimit;
         this.node = node;
         this.log = log;
+        this.ackSndThreshold = ackSndThreshold;
+    }
+
+    /**
+     * @return Ack send threshold.
+     */
+    public int ackSendThreshold() {
+        return ackSndThreshold;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 852118a..acdba5e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -45,6 +45,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.LockSupport;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -87,6 +88,9 @@ import static 
org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPER
  *
  */
 public class GridNioServer<T> {
+    /** */
+    private static final boolean DISABLE_PARK = 
IgniteSystemProperties.getBoolean("DISABLE_PARK", false);
+
     /** Default session write timeout. */
     public static final int DFLT_SES_WRITE_TIMEOUT = 5000;
 
@@ -117,6 +121,8 @@ public class GridNioServer<T> {
      *
      */
     static {
+        System.out.println(">>> Disable park: " + DISABLE_PARK);
+
         // This is a workaround for JDK bug (NPE in Selector.open()).
         // http://bugs.sun.com/view_bug.do?bug_id=6427854
         try {
@@ -408,7 +414,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, 
NioOperation.CLOSE);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        clientWorkers.get(impl.selectorIndex()).offer(fut, 
impl.selectorIndex());
 
         return fut;
     }
@@ -496,7 +502,7 @@ public class GridNioServer<T> {
             }
         }
         else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, 
true))
-            
clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)req);
+            
clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)req, 
ses.selectorIndex());
 
         if (msgQueueLsnr != null)
             msgQueueLsnr.apply(ses, msgCnt);
@@ -572,7 +578,7 @@ public class GridNioServer<T> {
             ses0.resend(futs);
 
             // Wake up worker.
-            
clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0));
+            
clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0), 
ses0.selectorIndex());
         }
     }
 
@@ -600,7 +606,7 @@ public class GridNioServer<T> {
 
         NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op);
 
-        clientWorkers.get(impl.selectorIndex()).offer(fut);
+        clientWorkers.get(impl.selectorIndex()).offer(fut, 
impl.selectorIndex());
 
         return fut;
     }
@@ -610,7 +616,7 @@ public class GridNioServer<T> {
      */
     public void dumpStats() {
         for (int i = 0; i < clientWorkers.size(); i++)
-            clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, 
NioOperation.DUMP_STATS));
+            clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, 
NioOperation.DUMP_STATS), i);
     }
 
     /**
@@ -757,7 +763,7 @@ public class GridNioServer<T> {
         else
             balanceIdx = 0;
 
-        clientWorkers.get(balanceIdx).offer(req);
+        clientWorkers.get(balanceIdx).offer(req, balanceIdx);
     }
 
     /** {@inheritDoc} */
@@ -879,9 +885,7 @@ public class GridNioServer<T> {
 
                     if (req == null) {
                         if (ses.procWrite.get()) {
-                            boolean set = ses.procWrite.compareAndSet(true, 
false);
-
-                            assert set;
+                            ses.procWrite.set(false);
 
                             if (ses.writeQueue().isEmpty())
                                 key.interestOps(key.interestOps() & 
(~SelectionKey.OP_WRITE));
@@ -967,6 +971,16 @@ public class GridNioServer<T> {
                 return;
             }
 
+            int pendingAcks0 = -1;
+
+            GridNioRecoveryDescriptor desc = null;
+
+            if (!DISABLE_PARK && writer) {
+                desc = 
((GridSelectorNioSessionImpl)key.attachment()).outRecoveryDescriptor();
+
+                pendingAcks0 = desc.messagesRequests().size() % 
desc.ackSendThreshold();
+            }
+
             ReadableByteChannel sockCh = (ReadableByteChannel)key.channel();
 
             final GridSelectorNioSessionImpl ses = 
(GridSelectorNioSessionImpl)key.attachment();
@@ -1017,6 +1031,12 @@ public class GridNioServer<T> {
             catch (IgniteCheckedException e) {
                 close(ses, e);
             }
+
+            if (!DISABLE_PARK && pendingAcks0 != -1) {
+                assert desc != null;
+
+                pendingAcks -= pendingAcks0 - (desc.messagesRequests().size() 
% desc.ackSendThreshold());
+            }
         }
 
         /**
@@ -1026,10 +1046,26 @@ public class GridNioServer<T> {
          * @throws IOException If write failed.
          */
         @Override protected void processWrite(SelectionKey key) throws 
IOException {
+            int pendingAcks0 = -1;
+
+            GridNioRecoveryDescriptor desc = null;
+
+            if (!DISABLE_PARK && writer) {
+                desc = 
((GridSelectorNioSessionImpl)key.attachment()).outRecoveryDescriptor();
+
+                pendingAcks0 = desc.messagesRequests().size() % 
desc.ackSendThreshold();
+            }
+
             if (sslFilter != null)
                 processWriteSsl(key);
             else
                 processWrite0(key);
+
+            if (!DISABLE_PARK && pendingAcks0 != -1) {
+                assert desc != null;
+
+                pendingAcks += (desc.messagesRequests().size() % 
desc.ackSendThreshold()) - pendingAcks0;
+            }
         }
 
         /**
@@ -1096,9 +1132,7 @@ public class GridNioServer<T> {
 
                             if (req == null && buf.position() == 0) {
                                 if (ses.procWrite.get()) {
-                                    boolean set = 
ses.procWrite.compareAndSet(true, false);
-
-                                    assert set;
+                                    ses.procWrite.set(false);
 
                                     if (ses.writeQueue().isEmpty())
                                         key.interestOps(key.interestOps() & 
(~SelectionKey.OP_WRITE));
@@ -1294,12 +1328,17 @@ public class GridNioServer<T> {
 
                     if (req == null && buf.position() == 0) {
                         if (ses.procWrite.get()) {
-                            boolean set = ses.procWrite.compareAndSet(true, 
false);
+                            ses.procWrite.set(false);
 
-                            assert set;
+                            if (ses.writeQueue().isEmpty()) {
+                                if ((key.interestOps() & 
SelectionKey.OP_WRITE) != 0) {
+                                    key.interestOps(key.interestOps() & 
(~SelectionKey.OP_WRITE));
 
-                            if (ses.writeQueue().isEmpty())
-                                key.interestOps(key.interestOps() & 
(~SelectionKey.OP_WRITE));
+                                    writeSesCnt--;
+
+                                    assert writeSesCnt >= 0 : writeSesCnt;
+                                }
+                            }
                             else
                                 ses.procWrite.set(true);
                         }
@@ -1402,9 +1441,21 @@ public class GridNioServer<T> {
         /** Worker index. */
         private final int idx;
 
+        /** Writer worker flag. */
+        final boolean writer;
+
         /** {@code True} if calls 'selector.select'. */
         private volatile boolean select;
 
+        /** {@code True} if calls 'LockSupport.park'. */
+        private volatile boolean park;
+
+        /** Number of sessions which require writes. */
+        protected int writeSesCnt;
+
+        /** */
+        protected int pendingAcks;
+
         /**
          * @param idx Index of this worker in server's array.
          * @param gridName Grid name.
@@ -1419,6 +1470,8 @@ public class GridNioServer<T> {
             createSelector();
 
             this.idx = idx;
+
+            writer = idx % 2 == 1;
         }
 
         /** {@inheritDoc} */
@@ -1501,12 +1554,15 @@ public class GridNioServer<T> {
          * Adds socket channel to the registration queue and wakes up reading 
thread.
          *
          * @param req Change request.
+         * @param workerIdx Worker thread index.
          */
-        private void offer(SessionChangeRequest req) {
+        private void offer(SessionChangeRequest req, int workerIdx) {
             changeReqs.offer(req);
 
             if (select)
                 selector.wakeup();
+            else if (park)
+                LockSupport.unpark(clientThreads[workerIdx]);
         }
 
         /**
@@ -1603,6 +1659,18 @@ public class GridNioServer<T> {
                         }
                     }
 
+                    if (!DISABLE_PARK && writer && writeSesCnt == 0 && 
pendingAcks == 0) {
+                        park = true;
+
+                        try {
+                            if (changeReqs.isEmpty())
+                                LockSupport.parkNanos(1_000_000_000);
+                        }
+                        finally {
+                            park = false;
+                        }
+                    }
+
                     int res = 0;
 
                     for (int i = 0; i < SELECTOR_SPINS && res == 0; i++)
@@ -1616,7 +1684,7 @@ public class GridNioServer<T> {
                             processSelectedKeysOptimized(selectedKeys.flip());
                     }
 
-                    if (!changeReqs.isEmpty())
+                    if (!changeReqs.isEmpty() || (!DISABLE_PARK && writer))
                         continue;
 
                     select = true;
@@ -1682,9 +1750,12 @@ public class GridNioServer<T> {
             SelectionKey key = ses.key();
 
             if (key.isValid()) {
-                if ((key.interestOps() & SelectionKey.OP_WRITE) == 0)
+                if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) {
                     key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
 
+                    writeSesCnt++;
+                }
+
                 // Update timestamp to protected against false write timeout.
                 ses.bytesSent(0);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index f448cbd..1f66d20 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -3261,7 +3261,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : 
(maxSize * 5);
 
             GridNioRecoveryDescriptor old =
-                recoveryDescs.putIfAbsent(key, recovery = new 
GridNioRecoveryDescriptor(queueLimit, node, log));
+                recoveryDescs.putIfAbsent(key, recovery = new 
GridNioRecoveryDescriptor(queueLimit, node, log,
+                    ackSndThreshold));
 
             if (old != null)
                 recovery = old;

Reply via email to