IO opts
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/67b94013 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/67b94013 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/67b94013 Branch: refs/heads/ignite-comm-balance Commit: 67b9401300a0c12d274486bdd09822ccfbe9d2f5 Parents: 0516fce Author: Yakov Zhdanov <[email protected]> Authored: Fri Oct 21 12:28:09 2016 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Fri Oct 21 12:28:09 2016 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 42 ++- .../managers/communication/GridIoMessage.java | 21 ++ .../dht/atomic/GridDhtAtomicUpdateRequest.java | 9 + .../dht/atomic/GridNearAtomicUpdateRequest.java | 9 + .../util/SingleConsumerSpinCircularBuffer.java | 370 +++++++++++++++++++ .../ignite/internal/util/StripedExecutor.java | 194 ++++++++++ .../util/nio/GridNioRecoveryDescriptor.java | 19 +- .../ignite/internal/util/nio/GridNioServer.java | 109 +++++- .../communication/tcp/TcpCommunicationSpi.java | 3 +- 9 files changed, 754 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index b465919..dffa875 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -40,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -61,6 +60,7 @@ import org.apache.ignite.internal.processors.platform.message.PlatformMessageFil import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.StripedExecutor; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; @@ -270,6 +270,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa startSpi(); +// striped = new StripedExecutor(Runtime.getRuntime().availableProcessors()); +// responseExec = new StripedExecutor(1); + +// Thread t = new Thread(new Runnable() { +// @Override public void run() { +// for (;;) { +// try { +// Thread.sleep(5000); +// } +// catch (InterruptedException e) { +// e.printStackTrace(); +// } +// +// striped.dumpStats(log); +// } +// } +// }); +// +// t.setDaemon(true); +// +// t.start(); + pubPool = ctx.getExecutorService(); p2pPool = ctx.getPeerClassLoadingExecutorService(); sysPool = ctx.getSystemExecutorService(); @@ -701,6 +723,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa if (log.isDebugEnabled()) log.debug(stopInfo()); + if (striped != null) + striped.stop(); + + if (responseExec != null) + responseExec.stop(); + Arrays.fill(ioPools, null); } @@ -919,6 +947,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } + StripedExecutor striped; + StripedExecutor responseExec; + /** * @param nodeId Node ID. * @param msg Message. @@ -957,6 +988,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } } +// if (msg.partition() != -1) { +// striped.execute(msg.partition(), c); +// } +// else if (msg.response()) { +// responseExec.execute(0, c); +// } +// else + + try { pool(plc).execute(c); } http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index b28ced2..038cd5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -20,6 +20,10 @@ package org.apache.ignite.internal.managers.communication; import java.io.Externalizable; import java.nio.ByteBuffer; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; @@ -321,6 +325,23 @@ public class GridIoMessage implements Message { return 7; } + /** + * Get single partition for this message (if applicable). + * + * @return Partition. + */ + public int partition() { + if (msg instanceof GridNearAtomicUpdateRequest) + return ((GridNearAtomicUpdateRequest)msg).partition(); + else + return -1; + } + + public boolean response() { + return msg instanceof GridNearAtomicUpdateResponse || + msg instanceof GridDhtAtomicUpdateResponse; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridIoMessage.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 55f7560..f7ae0fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -602,6 +602,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** + * Get single partition for that message. + * + * @return Partition. + */ + public int partition() { + return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + } + + /** * @param idx Index. * @return Conflict expire time. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java index eb9be4d..ec54e9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java @@ -679,6 +679,15 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri return ctx.atomicMessageLogger(); } + /** + * Get single partition for that message. + * + * @return Partition. + */ + public int partition() { + return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java new file mode 100644 index 0000000..81f913f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/SingleConsumerSpinCircularBuffer.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.locks.LockSupport; +import org.jsr166.LongAdder8; +import sun.java2d.pipe.SpanIterator; + +/** + * This class implements a circular buffer for efficient data exchange. + */ +public class SingleConsumerSpinCircularBuffer<T> { + private static final int PARK_FREQ = 3; + private static final int SPINS_CNT = 32; + + private static final AtomicLongFieldUpdater<SingleConsumerSpinCircularBuffer> writePosUpd = + AtomicLongFieldUpdater.newUpdater(SingleConsumerSpinCircularBuffer.class, "writePos"); + + private volatile long readPos; + + private long p01, p02, p03, p04, p05, p06, p07; + + private volatile long writePos; + + private long p11, p12, p13, p14, p15, p16, p17; + + /** */ + private final long sizeMask; + + /** */ + private final Item<T>[] arr; + + private volatile Thread consumer; + + private volatile boolean parked; + + /** + * @param size Size. + */ + public SingleConsumerSpinCircularBuffer( + int size + ) { + sizeMask = size - 1; + + arr = (Item<T>[])new Item[size]; + + // Fill the array. + for (int i = 0; i < arr.length; i++) + arr[i] = new Item<>(-(arr.length + i)); + + readPos = writePos = arr.length * 2; + } + + /** + * @return + */ + public T poll() { + return poll0(false); + } + + private T poll0(boolean take) { + if (consumer == null) + consumer = Thread.currentThread(); + + long readPos0 = readPos; + + if (readPos0 == writePos) { + if (take) { + parked = true; + + try { + for (; readPos0 == writePos; ) { +// System.out.println("parked " + consumer.getId() + "readPos=" + readPos + ", writePos=" + writePos); + + LockSupport.park(); + } + } + finally { + parked = false; + } + } + else + return null; + } + + Item<T> item = arr[(int)(readPos0 & sizeMask)]; + + readPos = readPos0 + 1; + + T item0 = item.item(readPos0, SPINS_CNT); + + if (item0 == null) { + parked = true; + + try { + for (item0 = item.item(readPos0, 1); item0 == null; item0 = item.item(readPos0, 1)) { + LockSupport.park(); + +// System.out.println("readPos=" + readPos + ", writePos=" + writePos + ", item=" + item); + } + } + finally { + parked = false; + } + + assert item != null; + } + + return item0; + } + + public T take() throws InterruptedException { + return poll0(true); + } + + public int size() { + return (int)(writePos - readPos); + } + + /** + * @param t + * @return + */ + public int put(T t) { + long writePos0; + + for (;;) { + writePos0 = writePos; + + if (writePosUpd.compareAndSet(this, writePos0, writePos0 + 1)) + break; + } + + Item<T> item = arr[(int)(writePos0 & sizeMask)]; + + item.update(writePos0, arr.length, t); + + if (parked) { +// System.out.println("unpark " + consumer.getId() + "readPos=" + readPos + ", writePos=" + writePos + " " + item); + + LockSupport.unpark(consumer); + } + + return (int)(writePos0 + 1 - readPos); + } + + public static void main_(String[] args) { + final ConcurrentLinkedDeque<Long> b = new ConcurrentLinkedDeque<>(); + + final LongAdder8 cnt = new LongAdder8(); + + new Thread( + new Runnable() { + @Override public void run() { + for (;;) { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("TPS: " + cnt.sumThenReset()); + } + } + } + ).start(); + + final Semaphore sem = new Semaphore(8192); + + new Thread( + new Runnable() { + @Override public void run() { + for (;;) { + Long poll = b.poll(); + + if (poll != null) { + cnt.increment(); + + sem.release(); + } + } + } + } + ).start(); + + for (int i = 0; i < 4; i++) { + new Thread( + new Runnable() { + @Override public void run() { + for (long i = 0; ; i++) { + sem.acquireUninterruptibly(); + b.add(i); + } + } + } + ).start(); + } + } + public static void main(String[] args) throws InterruptedException { + final SingleConsumerSpinCircularBuffer<Long> b = new SingleConsumerSpinCircularBuffer<>( + 1024); + +// b.put(1L); +// b.put(2L); +// b.put(3L); +// +// System.out.println(b.take()); +// System.out.println(b.poll()); +// System.out.println(b.poll()); + + final LongAdder8 cnt = new LongAdder8(); + + new Thread( + new Runnable() { + @Override public void run() { + for (;;) { + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("TPS: " + cnt.sumThenReset()); + } + } + } + ).start(); + + final CyclicBarrier bar = new CyclicBarrier(2); + + new Thread( + new Runnable() { + @Override public void run() { + for (;;) { + Long poll = null; + + try { + poll = b.take(); + + //bar.await(); + } + catch (Exception e) { + e.printStackTrace(); + } + + if (poll != null) + cnt.increment(); + + } + } + } + ).start(); + + for (int i = 0; i < 4; i++) { + new Thread( + new Runnable() { + @Override public void run() { + for (long i = 0; ; i++) { + b.put(i); + +// LockSupport.parkNanos(1L); + +// try { +// bar.await(); +// } +// catch (InterruptedException | BrokenBarrierException e) { +// e.printStackTrace(); +// } + } + } + } + ).start(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return SingleConsumerSpinCircularBuffer.class.toString(); + } + + /** + * + */ + private static class Item<V> { + /** */ + private V item; + + /** */ + private volatile long idx; + + private long p01, p02, p03, p04, p05, p06; + private int i01; + + /** + * + */ + Item(long idx) { + this.idx = idx; + } + + /** + * @return Item. + */ + V item(long readPos, int spins) { + for (int i = 0; i < spins; i++) { + if (idx == readPos) { + V item1 = this.item; + + idx = -readPos; + + return item1; + } + } + + return null; + } + + /** + * @param writePos Index. + * @param newItem Item. + */ + void update(long writePos, long diff, V newItem) { + int i = 0; + + for (;;) { + if (idx == -(writePos - diff)) + break; + + i++; + + if ((i & 3) == 0) + LockSupport.parkNanos(1L); + } + + if (i > 100) + System.out.println("Spins [i=" + i + ", writePos=" + writePos + ']'); + + item = newItem; + + idx = writePos; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return "Item [idx=" + idx + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java new file mode 100644 index 0000000..fe62725 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Striped executor. + */ +public class StripedExecutor { + /** Count. */ + private final int cnt; + + /** Stripes. */ + private final Stripe[] stripes; + + private volatile boolean inited; + + /** + * Constructor. + * + * @param cnt Count. + */ + public StripedExecutor(int cnt) { + this.cnt = cnt; + + stripes = new Stripe[cnt]; + + for (int i = 0; i < cnt; i++) { + Stripe stripe = new Stripe(); + + stripes[i] = stripe; + + stripe.start(i); + } + + inited = true; + } + + /** + * Execute command. + * + * @param idx Index. + * @param cmd Command. + */ + public void execute(int idx, Runnable cmd) { + stripes[idx % cnt].execute(cmd); + } + + /** + * Stop executor. + */ + public void stop() { + for (; !inited; ) + ; + + for (Stripe stripe : stripes) + stripe.signalStop(); + + for (Stripe stripe : stripes) + stripe.awaitStop(); + } + + public void dumpStats(IgniteLogger log) { + StringBuilder sb = new StringBuilder("Stats "); + + for (int i = 0; i < stripes.length; i++) { + sb.append(i).append("=").append(stripes[i].cnt).append("; "); + + stripes[i].cnt = 0; + } + + if (log.isInfoEnabled()) + log.info(sb.toString()); + } + + /** + * Stripe. + */ + private static class Stripe implements Runnable { + /** Queue. */ +// private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); + private final SingleConsumerSpinCircularBuffer<Runnable> queue = new SingleConsumerSpinCircularBuffer<>(256); + + /** Stopping flag. */ + private volatile boolean stopping; + + /** Thread executing the loop. */ + private Thread thread; + + private volatile long cnt; + + /** + * Start the stripe. + */ + void start(int idx) { + thread = new Thread(this); + + thread.setName("stripe-" + idx); + thread.setDaemon(true); + + thread.start(); + } + + /** + * Stop the stripe. + */ + void signalStop() { + stopping = true; + + thread.interrupt(); + } + + /** + * Await thread stop. + */ + void awaitStop() { + try { + thread.join(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedException(e); + } + } + + /** {@inheritDoc} */ + @Override public void run() { + while (!stopping) { + Runnable cmd; + + try { + cmd = queue.take(); + } + catch (InterruptedException e) { + stopping = true; + + Thread.currentThread().interrupt(); + + return; + } + + if (cmd != null) + execute0(cmd); + + cnt++; + } + } + + /** + * Internal execution routine. + * + * @param cmd Command. + */ + private void execute0(Runnable cmd) { + try { + cmd.run(); + } + catch (Exception e) { + U.warn(null, "Unexpected exception in stripe loop.", e); + } + } + + /** + * Execute the command. + * + * @param cmd Command. + */ + void execute(Runnable cmd) { +// queue.add(cmd); + queue.put(cmd); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 1ecf5b0..867237d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -78,12 +78,21 @@ public class GridNioRecoveryDescriptor { /** Number of descriptor reservations (for info purposes). */ private int reserveCnt; + /** Ack send threshold (from configuration). */ + private final int ackSndThreshold; + /** * @param queueLimit Maximum size of unacknowledged messages queue. * @param node Node. * @param log Logger. + * @param ackSndThreshold Ack send threshold. */ - public GridNioRecoveryDescriptor(int queueLimit, ClusterNode node, IgniteLogger log) { + public GridNioRecoveryDescriptor( + int queueLimit, + ClusterNode node, + IgniteLogger log, + int ackSndThreshold + ) { assert !node.isLocal() : node; assert queueLimit > 0; @@ -92,6 +101,14 @@ public class GridNioRecoveryDescriptor { this.queueLimit = queueLimit; this.node = node; this.log = log; + this.ackSndThreshold = ackSndThreshold; + } + + /** + * @return Ack send threshold. + */ + public int ackSendThreshold() { + return ackSndThreshold; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 852118a..acdba5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.LockSupport; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -87,6 +88,9 @@ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPER * */ public class GridNioServer<T> { + /** */ + private static final boolean DISABLE_PARK = IgniteSystemProperties.getBoolean("DISABLE_PARK", false); + /** Default session write timeout. */ public static final int DFLT_SES_WRITE_TIMEOUT = 5000; @@ -117,6 +121,8 @@ public class GridNioServer<T> { * */ static { + System.out.println(">>> Disable park: " + DISABLE_PARK); + // This is a workaround for JDK bug (NPE in Selector.open()). // http://bugs.sun.com/view_bug.do?bug_id=6427854 try { @@ -408,7 +414,7 @@ public class GridNioServer<T> { NioOperationFuture<Boolean> fut = new NioOperationFuture<>(impl, NioOperation.CLOSE); - clientWorkers.get(impl.selectorIndex()).offer(fut); + clientWorkers.get(impl.selectorIndex()).offer(fut, impl.selectorIndex()); return fut; } @@ -496,7 +502,7 @@ public class GridNioServer<T> { } } else if (!ses.procWrite.get() && ses.procWrite.compareAndSet(false, true)) - clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)req); + clientWorkers.get(ses.selectorIndex()).offer((SessionChangeRequest)req, ses.selectorIndex()); if (msgQueueLsnr != null) msgQueueLsnr.apply(ses, msgCnt); @@ -572,7 +578,7 @@ public class GridNioServer<T> { ses0.resend(futs); // Wake up worker. - clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0)); + clientWorkers.get(ses0.selectorIndex()).offer(((SessionChangeRequest)fut0), ses0.selectorIndex()); } } @@ -600,7 +606,7 @@ public class GridNioServer<T> { NioOperationFuture<?> fut = new NioOperationFuture<Void>(impl, op); - clientWorkers.get(impl.selectorIndex()).offer(fut); + clientWorkers.get(impl.selectorIndex()).offer(fut, impl.selectorIndex()); return fut; } @@ -610,7 +616,7 @@ public class GridNioServer<T> { */ public void dumpStats() { for (int i = 0; i < clientWorkers.size(); i++) - clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS)); + clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS), i); } /** @@ -757,7 +763,7 @@ public class GridNioServer<T> { else balanceIdx = 0; - clientWorkers.get(balanceIdx).offer(req); + clientWorkers.get(balanceIdx).offer(req, balanceIdx); } /** {@inheritDoc} */ @@ -879,9 +885,7 @@ public class GridNioServer<T> { if (req == null) { if (ses.procWrite.get()) { - boolean set = ses.procWrite.compareAndSet(true, false); - - assert set; + ses.procWrite.set(false); if (ses.writeQueue().isEmpty()) key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); @@ -967,6 +971,16 @@ public class GridNioServer<T> { return; } + int pendingAcks0 = -1; + + GridNioRecoveryDescriptor desc = null; + + if (!DISABLE_PARK && writer) { + desc = ((GridSelectorNioSessionImpl)key.attachment()).outRecoveryDescriptor(); + + pendingAcks0 = desc.messagesRequests().size() % desc.ackSendThreshold(); + } + ReadableByteChannel sockCh = (ReadableByteChannel)key.channel(); final GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); @@ -1017,6 +1031,12 @@ public class GridNioServer<T> { catch (IgniteCheckedException e) { close(ses, e); } + + if (!DISABLE_PARK && pendingAcks0 != -1) { + assert desc != null; + + pendingAcks -= pendingAcks0 - (desc.messagesRequests().size() % desc.ackSendThreshold()); + } } /** @@ -1026,10 +1046,26 @@ public class GridNioServer<T> { * @throws IOException If write failed. */ @Override protected void processWrite(SelectionKey key) throws IOException { + int pendingAcks0 = -1; + + GridNioRecoveryDescriptor desc = null; + + if (!DISABLE_PARK && writer) { + desc = ((GridSelectorNioSessionImpl)key.attachment()).outRecoveryDescriptor(); + + pendingAcks0 = desc.messagesRequests().size() % desc.ackSendThreshold(); + } + if (sslFilter != null) processWriteSsl(key); else processWrite0(key); + + if (!DISABLE_PARK && pendingAcks0 != -1) { + assert desc != null; + + pendingAcks += (desc.messagesRequests().size() % desc.ackSendThreshold()) - pendingAcks0; + } } /** @@ -1096,9 +1132,7 @@ public class GridNioServer<T> { if (req == null && buf.position() == 0) { if (ses.procWrite.get()) { - boolean set = ses.procWrite.compareAndSet(true, false); - - assert set; + ses.procWrite.set(false); if (ses.writeQueue().isEmpty()) key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); @@ -1294,12 +1328,17 @@ public class GridNioServer<T> { if (req == null && buf.position() == 0) { if (ses.procWrite.get()) { - boolean set = ses.procWrite.compareAndSet(true, false); + ses.procWrite.set(false); - assert set; + if (ses.writeQueue().isEmpty()) { + if ((key.interestOps() & SelectionKey.OP_WRITE) != 0) { + key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); - if (ses.writeQueue().isEmpty()) - key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); + writeSesCnt--; + + assert writeSesCnt >= 0 : writeSesCnt; + } + } else ses.procWrite.set(true); } @@ -1402,9 +1441,21 @@ public class GridNioServer<T> { /** Worker index. */ private final int idx; + /** Writer worker flag. */ + final boolean writer; + /** {@code True} if calls 'selector.select'. */ private volatile boolean select; + /** {@code True} if calls 'LockSupport.park'. */ + private volatile boolean park; + + /** Number of sessions which require writes. */ + protected int writeSesCnt; + + /** */ + protected int pendingAcks; + /** * @param idx Index of this worker in server's array. * @param gridName Grid name. @@ -1419,6 +1470,8 @@ public class GridNioServer<T> { createSelector(); this.idx = idx; + + writer = idx % 2 == 1; } /** {@inheritDoc} */ @@ -1501,12 +1554,15 @@ public class GridNioServer<T> { * Adds socket channel to the registration queue and wakes up reading thread. * * @param req Change request. + * @param workerIdx Worker thread index. */ - private void offer(SessionChangeRequest req) { + private void offer(SessionChangeRequest req, int workerIdx) { changeReqs.offer(req); if (select) selector.wakeup(); + else if (park) + LockSupport.unpark(clientThreads[workerIdx]); } /** @@ -1603,6 +1659,18 @@ public class GridNioServer<T> { } } + if (!DISABLE_PARK && writer && writeSesCnt == 0 && pendingAcks == 0) { + park = true; + + try { + if (changeReqs.isEmpty()) + LockSupport.parkNanos(1_000_000_000); + } + finally { + park = false; + } + } + int res = 0; for (int i = 0; i < SELECTOR_SPINS && res == 0; i++) @@ -1616,7 +1684,7 @@ public class GridNioServer<T> { processSelectedKeysOptimized(selectedKeys.flip()); } - if (!changeReqs.isEmpty()) + if (!changeReqs.isEmpty() || (!DISABLE_PARK && writer)) continue; select = true; @@ -1682,9 +1750,12 @@ public class GridNioServer<T> { SelectionKey key = ses.key(); if (key.isValid()) { - if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) + if ((key.interestOps() & SelectionKey.OP_WRITE) == 0) { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); + writeSesCnt++; + } + // Update timestamp to protected against false write timeout. ses.bytesSent(0); } http://git-wip-us.apache.org/repos/asf/ignite/blob/67b94013/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index f448cbd..1f66d20 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -3261,7 +3261,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : (maxSize * 5); GridNioRecoveryDescriptor old = - recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log)); + recoveryDescs.putIfAbsent(key, recovery = new GridNioRecoveryDescriptor(queueLimit, node, log, + ackSndThreshold)); if (old != null) recovery = old;
