ignite-2604 Review.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/155af498 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/155af498 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/155af498 Branch: refs/heads/ignite-2604 Commit: 155af4982d9f36344c5c87026ef9206646a0b412 Parents: 16dfc19 Author: sboikov <[email protected]> Authored: Mon Feb 15 16:35:13 2016 +0300 Committer: sboikov <[email protected]> Committed: Mon Feb 15 16:35:13 2016 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 4 +- .../continuous/CacheContinuousQueryManager.java | 2 +- .../continuous/CacheContinuousBatchAckTest.java | 81 +++++++++--------- .../CacheContinuousCacheFilterBatchAckTest.java | 86 ++++++++++---------- 4 files changed, 88 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/155af498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 686bc18..498f37d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -1034,10 +1034,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler Collection<ClusterNode> nodes = new HashSet<>(); for (AffinityTopologyVersion topVer : t.get2()) - nodes.addAll(ctx.discovery().remoteCacheNodes(cctx.name(), topVer)); + nodes.addAll(ctx.discovery().cacheAffinityNodes(cctx.name(), topVer)); for (ClusterNode node : nodes) { - if (!node.isClient()) { + if (!node.isLocal()) { try { cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL); } http://git-wip-us.apache.org/repos/asf/ignite/blob/155af498/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 2c45f40..968fc23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -117,7 +117,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { // Append cache name to the topic. topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name()); - if (!CU.clientNode(cctx.localNode())) { + if (cctx.affinityNode()) { cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class, new CI2<UUID, CacheContinuousQueryBatchAck>() { @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) { http://git-wip-us.apache.org/repos/asf/ignite/blob/155af498/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java index 85b082f..f683ffd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousBatchAckTest.java @@ -118,7 +118,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testPartition() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -132,7 +132,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -144,8 +144,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -153,7 +153,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testPartitionNoBackups() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -167,7 +167,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -179,8 +179,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -188,7 +188,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testPartitionTx() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -202,7 +202,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -214,8 +214,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -223,7 +223,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testPartitionTxNoBackup() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -237,7 +237,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -249,8 +249,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -258,7 +258,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testPartitionOffheap() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -272,7 +272,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -285,8 +285,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -294,7 +294,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testPartitionTxOffheap() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -308,7 +308,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -320,8 +320,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -329,7 +329,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testReplicated() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -343,7 +343,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -355,8 +355,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -364,7 +364,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testReplicatedTx() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -378,7 +378,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -390,8 +390,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -399,7 +399,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testReplicatedOffheap() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -413,7 +413,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -425,8 +425,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -434,7 +434,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen * @throws Exception If failed. */ public void testReplicatedTxOffheap() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -448,7 +448,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -460,8 +460,8 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -505,6 +505,7 @@ public class CacheContinuousBatchAckTest extends GridCommonAbstractTest implemen this.check = check; } + /** {@inheritDoc} */ @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) { if (check) { if (msg instanceof GridIoMessage && http://git-wip-us.apache.org/repos/asf/ignite/blob/155af498/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java index c83fde4..fb094b9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousCacheFilterBatchAckTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.io.Serializable; -import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.event.CacheEntryListenerException; @@ -32,6 +31,7 @@ import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.PA; @@ -51,6 +51,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.internal.IgniteNodeAttributes.*; /** * Continuous queries tests. @@ -123,7 +124,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testPartition() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -137,7 +138,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -149,8 +150,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -158,7 +159,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testPartitionNoBackups() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -172,7 +173,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -184,8 +185,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -193,7 +194,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testPartitionTx() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -207,7 +208,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -219,8 +220,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -228,7 +229,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testPartitionTxNoBackup() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -242,7 +243,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -254,8 +255,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -263,7 +264,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testPartitionOffheap() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -277,7 +278,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, ATOMIC, OFFHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -290,8 +291,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -299,7 +300,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testPartitionTxOffheap() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -313,7 +314,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL, OFFHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -325,8 +326,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -334,7 +335,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testReplicated() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -348,7 +349,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -360,8 +361,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -369,7 +370,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testReplicatedTx() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -383,7 +384,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -395,8 +396,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -404,7 +405,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testReplicatedOffheap() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -418,7 +419,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, OFFHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -430,8 +431,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -439,7 +440,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe * @throws Exception If failed. */ public void testReplicatedTxOffheap() throws Exception { - QueryCursor query = null; + QueryCursor qry = null; try { ContinuousQuery q = new ContinuousQuery(); @@ -453,7 +454,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe IgniteCache<Object, Object> cache = grid(SERVER).getOrCreateCache(cacheConfiguration(REPLICATED, 1, ATOMIC, ONHEAP_TIERED)); - query = cache.query(q); + qry = cache.query(q); for (int i = 0; i < 10000; i++) cache.put(i, i); @@ -465,8 +466,8 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe }, 2000L); } finally { - if (query != null) - query.close(); + if (qry != null) + qry.close(); } } @@ -492,7 +493,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe ccfg.setNodeFilter(new P1<ClusterNode>() { @Override public boolean apply(ClusterNode node) { - return !node.attributes().get("org.apache.ignite.ignite.name").equals(SERVER2); + return !node.attributes().get(ATTR_GRID_NAME).equals(SERVER2); } }); @@ -516,6 +517,7 @@ public class CacheContinuousCacheFilterBatchAckTest extends GridCommonAbstractTe this.check = check; } + /** {@inheritDoc} */ @Override protected void notifyListener(UUID sndId, Message msg, IgniteRunnable msgC) { if (check) { if (msg instanceof GridIoMessage &&
