Repository: ignite Updated Branches: refs/heads/ignite-2604 155af4982 -> 0eb91541d
IGNITE-2604 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0eb91541 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0eb91541 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0eb91541 Branch: refs/heads/ignite-2604 Commit: 0eb91541d48638bbd2c85b044ea695c2fe3c45d8 Parents: 155af49 Author: nikolay_tikhonov <[email protected]> Authored: Mon Feb 15 17:39:01 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Mon Feb 15 17:39:01 2016 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousBatchAckTest.java | 364 ++++--------- ...heContinuousBatchForceServerModeAckTest.java | 80 +++ .../CacheContinuousCacheFilterBatchAckTest.java | 531 ------------------- 3 files changed, 180 insertions(+), 795 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0eb91541/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java index f683ffd..c69ccf2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java @@ -28,9 +28,11 @@ import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.plugin.extensions.communication.Message; @@ -48,6 +50,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME; /** * Continuous queries tests. @@ -57,16 +60,19 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); /** */ - private static final String CLIENT = "_client"; + protected static final String CLIENT = "_client"; /** */ - private static final String SERVER = "server"; + protected static final String SERVER = "server"; /** */ - private static final String SERVER2 = "server2"; + protected static final String SERVER2 = "server2"; /** */ - private static final AtomicBoolean fail = new AtomicBoolean(false); + protected static final AtomicBoolean fail = new AtomicBoolean(false); + + /** */ + protected static final AtomicBoolean filterOn = new AtomicBoolean(false); /** {@inheritDoc} */ @SuppressWarnings("unchecked") @@ -76,10 +82,12 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen if (gridName.endsWith(CLIENT)) { cfg.setClientMode(true); - cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true)); + cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true, false)); } + else if (gridName.endsWith(SERVER2)) + cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, true)); else - cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false)); + cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, false)); TcpDiscoverySpi disco = new TcpDiscoverySpi(); @@ -112,330 +120,143 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen super.beforeTest(); fail.set(false); + + filterOn.set(false); } /** * @throws Exception If failed. */ public void testPartition() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED, false)); + } - for (int i = 0; i < 10000; i++) - cache.put(i, i); + /** + * @throws Exception If failed. + */ + public void testPartitionWithFilter() throws Exception { + filterOn.set(true); - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED, true)); } /** * @throws Exception If failed. */ public void testPartitionNoBackups() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED, false)); } /** * @throws Exception If failed. */ public void testPartitionTx() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED)); - - qry = cache.query(q); + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED, false)); + } - for (int i = 0; i < 10000; i++) - cache.put(i, i); + /** + * @throws Exception If failed. + */ + public void testPartitionTxWithFilter() throws Exception { + filterOn.set(true); - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED, true)); } /** * @throws Exception If failed. */ public void testPartitionTxNoBackup() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED)); - - qry = cache.query(q); + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED, false)); + } - for (int i = 0; i < 10000; i++) - cache.put(i, i); + /** + * @throws Exception If failed. + */ + public void testPartitionTxNoBackupWithFilter() throws Exception { + filterOn.set(true); - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED, true)); } /** * @throws Exception If failed. */ public void testPartitionOffheap() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED, false)); + } - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { + /** + * @throws Exception If failed. + */ + public void testPartitionOffheapWithFilter() throws Exception { + filterOn.set(true); - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED, true)); } /** * @throws Exception If failed. */ public void testPartitionTxOffheap() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED, false)); } /** * @throws Exception If failed. */ public void testReplicated() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED, false)); } /** * @throws Exception If failed. */ public void testReplicatedTx() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); + checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, ONHEAP_TIERED, false)); + } - for (int i = 0; i < 10000; i++) - cache.put(i, i); + /** + * @throws Exception If failed. + */ + public void testReplicatedTxWithFilter() throws Exception { + filterOn.set(true); - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, ONHEAP_TIERED, true)); } /** * @throws Exception If failed. */ public void testReplicatedOffheap() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED)); + checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED, false)); + } - qry = cache.query(q); + /** + * @throws Exception If failed. + */ + public void testReplicatedTxOffheap() throws Exception { + checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, OFFHEAP_TIERED, false)); + } - for (int i = 0; i < 10000; i++) - cache.put(i, i); + /** + * @throws Exception If failed. + */ + public void testReplicatedTxOffheapWithFilter() throws Exception { + filterOn.set(true); - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } + checkBackupAcknowledgeMessage(cacheConfiguration(REPLICATED, 1, TRANSACTIONAL, OFFHEAP_TIERED, true)); } /** + * @param ccfg Cache configuration. * @throws Exception If failed. */ - public void testReplicatedTxOffheap() throws Exception { + private void checkBackupAcknowledgeMessage(CacheConfiguration<Object, Object> ccfg) throws Exception { QueryCursor qry = null; + IgniteCache<Object, Object> cache = null; + try { ContinuousQuery q = new ContinuousQuery(); @@ -445,8 +266,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen } }); - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); + cache = grid(SERVER).getOrCreateCache(ccfg); qry = cache.query(q); @@ -457,11 +277,14 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen @Override public boolean apply() { return fail.get(); } - }, 2000L); + }, 1300L); } finally { if (qry != null) qry.close(); + + if (cache != null) + grid(SERVER).destroyCache(cache.getName()); } } @@ -471,13 +294,14 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @param backups Number of backups. * @param atomicityMode Cache atomicity mode. * @param memoryMode Cache memory mode. + * @param filter Filter enabled. * @return Cache configuration. */ private CacheConfiguration<Object, Object> cacheConfiguration( CacheMode cacheMode, int backups, CacheAtomicityMode atomicityMode, - CacheMemoryMode memoryMode) { + CacheMemoryMode memoryMode, boolean filter) { CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); ccfg.setAtomicityMode(atomicityMode); @@ -488,6 +312,13 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen if (cacheMode == PARTITIONED) ccfg.setBackups(backups); + if (filter) + ccfg.setNodeFilter(new P1<ClusterNode>() { + @Override public boolean apply(ClusterNode node) { + return !node.attributes().get(ATTR_GRID_NAME).equals(SERVER2); + } + }); + return ccfg; } @@ -498,16 +329,21 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen /** */ private boolean check; + /** */ + private boolean periodicCheck; + /** - * @param check Check inbound message. + * @param alwaysCheck Always check inbound message. + * @param periodicCheck Check when {@code filterOn} enabled. */ - public FailedTcpCommunicationSpi(boolean check) { - this.check = check; + public FailedTcpCommunicationSpi(boolean alwaysCheck, boolean periodicCheck) { + this.check = alwaysCheck; + this.periodicCheck = periodicCheck; } /** {@inheritDoc} */ @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) { - if (check) { + if (check || (periodicCheck && filterOn.get())) { if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof CacheContinuousQueryBatchAck) fail.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/0eb91541/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java new file mode 100644 index 0000000..f1794fa --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchForceServerModeAckTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; + +/** + * Continuous queries tests. + */ +public class CacheContinuousBatchForceServerModeAckTest extends CacheContinuousBatchAckTest implements Serializable { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (gridName.endsWith(CLIENT)) { + cfg.setClientMode(true); + + FailedTcpCommunicationSpi spi = new FailedTcpCommunicationSpi(true, false); + + cfg.setCommunicationSpi(spi); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setForceServerMode(true); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + } + else if (gridName.endsWith(SERVER2)) { + cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, true)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + } + else { + cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false, false)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + } + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0eb91541/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java deleted file mode 100644 index fb094b9..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java +++ /dev/null @@ -1,531 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import java.io.Serializable; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; -import javax.cache.event.CacheEntryListenerException; -import javax.cache.event.CacheEntryUpdatedListener; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheMemoryMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.query.ContinuousQuery; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteNodeAttributes; -import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.util.typedef.P1; -import org.apache.ignite.internal.util.typedef.PA; -import org.apache.ignite.lang.IgniteRunnable; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; -import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.internal.IgniteNodeAttributes.*; - -/** - * Continuous queries tests. - */ -public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTest implements Serializable { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final String CLIENT = "_client"; - - /** */ - private static final String SERVER = "server"; - - /** */ - private static final String SERVER2 = "server2"; - - /** */ - private static final AtomicBoolean fail = new AtomicBoolean(false); - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - if (gridName.endsWith(CLIENT)) { - cfg.setClientMode(true); - - cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true)); - } - else if (gridName.endsWith(SERVER2)) - cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(true)); - else - cfg.setCommunicationSpi(new FailedTcpCommunicationSpi(false)); - - TcpDiscoverySpi disco = new TcpDiscoverySpi(); - - disco.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(disco); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGrid(SERVER); - startGrid(SERVER2); - startGrid("1" + CLIENT); - startGrid("2" + CLIENT); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTest(); - - fail.set(false); - } - - /** - * @throws Exception If failed. - */ - public void testPartition() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testPartitionNoBackups() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testPartitionTx() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testPartitionTxNoBackup() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testPartitionOffheap() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testPartitionTxOffheap() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReplicated() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedTx() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedOffheap() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * @throws Exception If failed. - */ - public void testReplicatedTxOffheap() throws Exception { - QueryCursor qry = null; - - try { - ContinuousQuery q = new ContinuousQuery(); - - q.setLocalListener(new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - // No-op. - } - }); - - IgniteCache<Object, Object> cache = - grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - - qry = cache.query(q); - - for (int i = 0; i < 10000; i++) - cache.put(i, i); - - assert !GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return fail.get(); - } - }, 2000L); - } - finally { - if (qry != null) - qry.close(); - } - } - - /** - * - * @param cacheMode Cache mode. - * @param backups Number of backups. - * @param atomicityMode Cache atomicity mode. - * @param memoryMode Cache memory mode. - * @return Cache configuration. - */ - private CacheConfiguration<Object, Object> cacheConfiguration( - CacheMode cacheMode, - int backups, - CacheAtomicityMode atomicityMode, - CacheMemoryMode memoryMode) { - CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); - - ccfg.setAtomicityMode(atomicityMode); - ccfg.setCacheMode(cacheMode); - ccfg.setMemoryMode(memoryMode); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - - ccfg.setNodeFilter(new P1<ClusterNode>() { - @Override public boolean apply(ClusterNode node) { - return !node.attributes().get(ATTR_GRID_NAME).equals(SERVER2); - } - }); - - if (cacheMode == PARTITIONED) - ccfg.setBackups(backups); - - return ccfg; - } - - /** - * - */ - protected static class FailedTcpCommunicationSpi extends TcpCommunicationSpi { - /** */ - private boolean check; - - /** - * @param check Check inbound message. - */ - public FailedTcpCommunicationSpi(boolean check) { - this.check = check; - } - - /** {@inheritDoc} */ - @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) { - if (check) { - if (msg instanceof GridIoMessage && - ((GridIoMessage)msg).message() instanceof CacheContinuousQueryBatchAck) - fail.set(true); - } - - super.notifyListener(sndId, msg, msgC); - } - } -}
