IGNITE-2466 - Use current NIO back pressure mechanism to limit received messages. Mark them process only when backups acknowledged.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/220db882 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/220db882 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/220db882 Branch: refs/heads/ignite-4932 Commit: 220db882b466c03eadd148b3a19a0bf70d82d4a6 Parents: a039260 Author: dkarachentsev <[email protected]> Authored: Mon Apr 10 10:28:15 2017 +0300 Committer: dkarachentsev <[email protected]> Committed: Mon Apr 10 10:28:15 2017 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 12 +- .../dht/atomic/GridDhtAtomicCache.java | 23 ++- .../util/nio/GridNioBackPressureControl.java | 39 ++++- .../util/nio/GridNioMessageTracker.java | 7 + .../CacheAtomicPrimarySyncBackPressureTest.java | 151 +++++++++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 3 + 6 files changed, 220 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index bb3add4..dbd5db6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -738,7 +738,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Runnable c = new Runnable() { @Override public void run() { try { - threadProcessingMessage(true); + threadProcessingMessage(true, msgC); GridMessageListener lsnr = listenerGet0(msg.topic()); @@ -752,7 +752,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa invokeListener(msg.policy(), lsnr, nodeId, obj); } finally { - threadProcessingMessage(false); + threadProcessingMessage(false, null); msgC.run(); } @@ -787,12 +787,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Runnable c = new Runnable() { @Override public void run() { try { - threadProcessingMessage(true); + threadProcessingMessage(true, msgC); processRegularMessage0(msg, nodeId); } finally { - threadProcessingMessage(false); + threadProcessingMessage(false, null); msgC.run(); } @@ -1148,12 +1148,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa Runnable c = new Runnable() { @Override public void run() { try { - threadProcessingMessage(true); + threadProcessingMessage(true, msgC); unwindMessageSet(msgSet0, lsnr); } finally { - threadProcessingMessage(false); + threadProcessingMessage(false, null); } } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 463fc57..047be87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -84,6 +84,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.nio.GridNioBackPressureControl; +import org.apache.ignite.internal.util.nio.GridNioMessageTracker; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; @@ -92,14 +94,15 @@ import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; @@ -110,6 +113,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; @@ -1904,8 +1908,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (req.writeSynchronizationMode() != FULL_ASYNC) req.cleanup(!node.isLocal()); - if (dhtFut != null) + if (dhtFut != null) { + if (req.writeSynchronizationMode() == PRIMARY_SYNC && !dhtFut.isDone()) { + final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker(); + + if (tracker != null && tracker instanceof GridNioMessageTracker) { + ((GridNioMessageTracker)tracker).onMessageReceived(); + + dhtFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { + @Override public void apply(IgniteInternalFuture<Void> fut) { + ((GridNioMessageTracker)tracker).onMessageProcessed(); + } + }); + } + } + ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut); + } } else // Should remap all keys. http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java index 96a1ab3..37d985f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioBackPressureControl.java @@ -17,14 +17,17 @@ package org.apache.ignite.internal.util.nio; +import org.apache.ignite.lang.IgniteRunnable; +import org.jetbrains.annotations.Nullable; + /** * Utility class that allows to ignore back-pressure control for threads that are processing messages. */ public class GridNioBackPressureControl { /** Thread local flag indicating that thread is processing message. */ - private static ThreadLocal<Boolean> threadProcMsg = new ThreadLocal<Boolean>() { - @Override protected Boolean initialValue() { - return Boolean.FALSE; + private static ThreadLocal<Holder> threadProcMsg = new ThreadLocal<Holder>() { + @Override protected Holder initialValue() { + return new Holder(); } }; @@ -32,13 +35,35 @@ public class GridNioBackPressureControl { * @return Flag indicating whether current thread is processing message. */ public static boolean threadProcessingMessage() { - return threadProcMsg.get(); + return threadProcMsg.get().procMsg; } /** * @param processing Flag indicating whether current thread is processing message. + * @param tracker Thread local back pressure tracker of messages, associated with one connection. + */ + public static void threadProcessingMessage(boolean processing, @Nullable IgniteRunnable tracker) { + Holder holder = threadProcMsg.get(); + + holder.procMsg = processing; + holder.tracker = tracker; + } + + /** + * @return Thread local back pressure tracker of messages, associated with one connection. */ - public static void threadProcessingMessage(boolean processing) { - threadProcMsg.set(processing); + @Nullable public static IgniteRunnable threadTracker() { + return threadProcMsg.get().tracker; + } + + /** + * + */ + private static class Holder { + /** Process message. */ + private boolean procMsg; + + /** Tracker. */ + private IgniteRunnable tracker; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java index e02c7ca..f05ee0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageTracker.java @@ -93,6 +93,13 @@ public class GridNioMessageTracker implements IgniteRunnable { } /** + * + */ + public void onMessageProcessed() { + run(); + } + + /** */ public void onMessageReceived() { int cnt = msgCnt.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java new file mode 100644 index 0000000..49e3e5c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAtomicPrimarySyncBackPressureTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Checks that back-pressure control restricts uncontrolled growing + * of backup message queue. This means, if queue too big - any reads + * will be stopped until received acks from backup nodes. + */ +public class CacheAtomicPrimarySyncBackPressureTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration ccfg = new CacheConfiguration("cache"); + + ccfg.setBackups(1); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC); + ccfg.setMemoryMode(CacheMemoryMode.ONHEAP_TIERED); + ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + + TestCommunicationSpi spi = new TestCommunicationSpi(); + + spi.setMessageQueueLimit(100); + + cfg.setCommunicationSpi(spi); + cfg.setClientMode(gridName.contains("client")); + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testClientPut() throws Exception { + Ignite srv1 = startGrid("server1"); + Ignite srv2 = startGrid("server2"); + + final Ignite client = startGrid("client"); + + checkBackPressure(client, srv1, srv2); + } + + /** + * @throws Exception If failed. + */ + public void testServerPut() throws Exception { + Ignite srv1 = startGrid("server1"); + Ignite srv2 = startGrid("server2"); + + final Ignite client = startGrid("server3"); + + checkBackPressure(client, srv1, srv2); + } + + /** + * @param client Producer node. + * @throws InterruptedException If failed. + */ + private void checkBackPressure(Ignite client, final Ignite srv1, final Ignite srv2) throws Exception { + final IgniteCache<Integer, String> cache = client.cache("cache"); + + awaitPartitionMapExchange(); + + for (int i = 0; i < 10000; i++) { + cache.put(i, String.valueOf(i)); + + if (i % 100 == 0) { + int size1 = futuresNum(srv1); + int size2 = futuresNum(srv2); + + assert size1 < 150 : size1; + assert size2 < 150 : size2; + } + } + } + + /** + * @param ignite Ignite. + * @return Size of the backup queue. + */ + private int futuresNum(Ignite ignite) { + return ((IgniteKernal)ignite).context().cache().context().mvcc().atomicFutures().size(); + } + + /** + * Delays backup update acks. + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, + IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { + if (((GridIoMessage)msg).message() instanceof GridDhtAtomicDeferredUpdateResponse) + sleep(100); + + super.sendMessage(node, msg, ackC); + } + } + + /** + * @param millis Millis. + */ + private static void sleep(long millis) { + try { + Thread.sleep(millis); + } + catch (InterruptedException e) { + throw new IgniteSpiException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/220db882/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index 5a09a1c..9fcf31a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -94,6 +94,7 @@ import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest; import org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest; import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest; import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; @@ -334,6 +335,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(IgniteCacheNearOnlyTxTest.class); + suite.addTestSuite(CacheAtomicPrimarySyncBackPressureTest.class); + return suite; } } \ No newline at end of file
