Repository: ignite Updated Branches: refs/heads/ignite-2004 d760cd27c -> 5d15b1e79
ignite-2004 Review Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5d15b1e7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5d15b1e7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5d15b1e7 Branch: refs/heads/ignite-2004 Commit: 5d15b1e7955f96407158578a88e61279d18bc973 Parents: d760cd2 Author: sboikov <[email protected]> Authored: Tue Apr 12 15:39:19 2016 +0300 Committer: sboikov <[email protected]> Committed: Tue Apr 12 15:39:19 2016 +0300 ---------------------------------------------------------------------- .../cache/GridCacheUpdateAtomicResult.java | 4 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 + .../dht/atomic/GridDhtAtomicUpdateRequest.java | 3 - .../continuous/CacheContinuousQueryHandler.java | 16 ++-- .../apache/ignite/lang/IgniteAsyncCallback.java | 1 + ...eContinuousQueryAsyncFilterListenerTest.java | 8 +- ...ryFactoryAsyncFilterRandomOperationTest.java | 10 +-- ...usQueryFactoryFilterRandomOperationTest.java | 11 +-- ...ontinuousQueryOperationFromCallbackTest.java | 80 ++++++++++---------- .../IgniteCacheQuerySelfTestSuite4.java | 2 +- 10 files changed, 64 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 10d37d0..2355b7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.processors.cache; -import java.util.List; import javax.cache.processor.EntryProcessor; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -87,8 +86,7 @@ public class GridCacheUpdateAtomicResult { @Nullable GridCacheVersion rmvVer, @Nullable GridCacheVersionConflictContext<?, ?> conflictRes, boolean sndToDht, - long updateCntr - ) { + long updateCntr) { this.success = success; this.oldVal = oldVal; this.newVal = newVal; http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index b0fc948..8e91272 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -343,6 +343,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * @param clsr Continuous query closure. */ public void addContinuousQueryClosure(CI1<Boolean> clsr){ + assert !isDone() : this; + if (cntQryClsrs == null) cntQryClsrs = new ArrayList<>(10); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 9d65fa9..0232f22 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -22,14 +22,12 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.UUID; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -37,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 79d19c7..cb9e503 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -392,7 +392,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (primary || skipPrimaryCheck) { if (fut == null) onEntryUpdate(evt, notify, loc, recordIgniteEvt); - else + else { fut.addContinuousQueryClosure(new CI1<Boolean>() { @Override public void apply(Boolean suc) { if (!suc) @@ -401,6 +401,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler onEntryUpdate(evt, notify, loc, recordIgniteEvt); } }); + } } } } @@ -1395,17 +1396,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler boolean notify = filter(evt, primary); if (primary()) { - if (fut != null) { - if (waitFuture()) - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); - else { - evt.entry().markFiltered(); + if (fut != null && !waitFuture()) + evt.entry().markFiltered(); - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); - } - } - else - onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); + onEntryUpdate(evt, notify, nodeId.equals(ctx.localNodeId()), recordIgniteEvt); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java index 9c6584b..04de586 100644 --- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java @@ -38,4 +38,5 @@ import org.apache.ignite.configuration.IgniteConfiguration; @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) public @interface IgniteAsyncCallback { + // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java index bb30b39..5f7232a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java @@ -476,12 +476,12 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr /** * @param ccfg Cache configuration. * @param asyncFilter Async filter. - * @param asyncListener Async listener. + * @param asyncLsnr Async listener. * @throws Exception If failed. */ private void testNonDeadLockInFilter(CacheConfiguration ccfg, final boolean asyncFilter, - final boolean asyncListener) throws Exception { + final boolean asyncLsnr) throws Exception { ignite(0).createCache(ccfg); ThreadLocalRandom rnd = ThreadLocalRandom.current(); @@ -562,7 +562,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) { - if (asyncListener) { + if (asyncLsnr) { assertFalse("Failed: " + Thread.currentThread().getName(), Thread.currentThread().getName().contains("sys-")); @@ -586,7 +586,7 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr else conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr))); - if (asyncListener) + if (asyncLsnr) conQry.setLocalListener(new CacheInvokeListenerAsync(lsnrClsr)); else conQry.setLocalListener(new CacheInvokeListener(lsnrClsr)); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java index a556abd..37035d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java @@ -25,7 +25,6 @@ import javax.cache.configuration.Factory; import javax.cache.configuration.FactoryBuilder; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryEventFilter; -import javax.cache.event.CacheEntryListenerException; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.lang.IgniteAsyncCallback; import org.jetbrains.annotations.NotNull; @@ -53,15 +52,14 @@ public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest } /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) - throws CacheEntryListenerException { + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) { assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(), Thread.currentThread().getName().contains("contQry-")); assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(), Thread.currentThread().getName().contains("sys-")); - return isAccepted(event.getValue()); + return isAccepted(evt.getValue()); } /** {@inheritDoc} */ @@ -75,6 +73,7 @@ public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest } /** + * @param val Value. * @return {@code True} if value is even. */ public static boolean isAccepted(QueryTestValue val) { @@ -109,8 +108,7 @@ public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest } /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) - throws CacheEntryListenerException { + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) { assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(), Thread.currentThread().getName().contains("contQry-")); http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java index 9800b56..dbaafe1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java @@ -612,9 +612,8 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC } /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) - throws CacheEntryListenerException { - return isAccepted(event.getValue()); + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) { + return isAccepted(evt.getValue()); } /** {@inheritDoc} */ @@ -628,6 +627,7 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC } /** + * @param val Value. * @return {@code True} if value is even. */ public static boolean isAccepted(QueryTestValue val) { @@ -645,9 +645,9 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC } /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event) + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) throws CacheEntryListenerException { - return isAccepted(event.getValue()); + return isAccepted(evt.getValue()); } /** @@ -662,6 +662,7 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC * */ protected static class FilterFactory implements Factory<NonSerializableFilter> { + /** {@inheritDoc} */ @Override public NonSerializableFilter create() { return new NonSerializableFilter(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java index 058789e..41b30be 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java @@ -59,6 +59,8 @@ import org.eclipse.jetty.util.ConcurrentHashSet; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; @@ -86,7 +88,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs private boolean client; /** */ - private static AtomicInteger filterCallbackCntr = new AtomicInteger(0); + private static AtomicInteger filterCbCntr = new AtomicInteger(0); /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { @@ -126,14 +128,14 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs @Override protected void beforeTest() throws Exception { super.beforeTest(); - filterCallbackCntr.set(0); + filterCbCntr.set(0); } /** * @throws Exception If failed. */ public void testAtomicTwoBackups() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC); doTest(ccfg, true); } @@ -142,7 +144,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testAtomicReplicatedFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC); doTest(ccfg, false); } @@ -151,7 +153,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testAtomicTwoBackupsFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC); doTest(ccfg, false); } @@ -160,7 +162,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testAtomicWithoutBackupsFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, ATOMIC); doTest(ccfg, false); } @@ -169,7 +171,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testTxTwoBackupsFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL); doTest(ccfg, false); } @@ -178,7 +180,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testTxReplicatedFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.TRANSACTIONAL); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL); doTest(ccfg, false); } @@ -187,7 +189,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testAtomicWithoutBackup() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, ATOMIC); doTest(ccfg, true); } @@ -196,7 +198,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testTxTwoBackup() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL); doTest(ccfg, true); } @@ -205,7 +207,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testTxReplicated() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, CacheAtomicityMode.TRANSACTIONAL); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL); doTest(ccfg, true); } @@ -219,8 +221,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs List<QueryCursor<?>> qries = new ArrayList<>(); - if (!fromLsnr) - assertEquals(0, filterCallbackCntr.get()); + assertEquals(0, filterCbCntr.get()); try { List<Set<T2<QueryTestKey, QueryTestValue>>> rcvdEvts = new ArrayList<>(NODES); @@ -228,27 +229,27 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs final AtomicInteger qryCntr = new AtomicInteger(0); - final AtomicInteger callbackCntr = new AtomicInteger(0); + final AtomicInteger cbCntr = new AtomicInteger(0); final int threadCnt = 10; for (int idx = 0; idx < NODES; idx++) { Set<T2<QueryTestKey, QueryTestValue>> evts = new ConcurrentHashSet<>(); - Set<T2<QueryTestKey, QueryTestValue>> evtsFromCallback = new ConcurrentHashSet<>(); + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb = new ConcurrentHashSet<>(); IgniteCache<Object, Object> cache = grid(idx).getOrCreateCache(ccfg.getName()); ContinuousQuery qry = new ContinuousQuery(); - qry.setLocalListener(new TestCacheAsyncEventListener(evts, evtsFromCallback, - fromLsnr ? cache : null, qryCntr, callbackCntr)); + qry.setLocalListener(new TestCacheAsyncEventListener(evts, evtsFromCb, + fromLsnr ? cache : null, qryCntr, cbCntr)); if (!fromLsnr) qry.setRemoteFilterFactory( FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(ccfg.getName()))); rcvdEvts.add(evts); - evtsFromCallbacks.add(evtsFromCallback); + evtsFromCallbacks.add(evtsFromCb); QueryCursor qryCursor = cache.query(qry); @@ -266,7 +267,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS)); boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == - CacheAtomicityMode.TRANSACTIONAL && rnd.nextBoolean(); + TRANSACTIONAL && rnd.nextBoolean(); Transaction tx = null; @@ -274,9 +275,8 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs tx = cache.unwrap(Ignite.class).transactions().txStart(); try { - if ((cache.get(key) == null) || rnd.nextBoolean()) { + if ((cache.get(key) == null) || rnd.nextBoolean()) cache.invoke(key, new IncrementTestEntryProcessor()); - } else { QueryTestValue val; QueryTestValue newVal; @@ -312,14 +312,14 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs if (fromLsnr) { final int expCnt = qryCntr.get() * NODES * KEYS_FROM_CALLBACK; - assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + callbackCntr.get() + "]", + assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get() + "]", GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return callbackCntr.get() >= expCnt; + return cbCntr.get() >= expCnt; } }, TimeUnit.SECONDS.toMillis(60))); - assertEquals(expCnt, callbackCntr.get()); + assertEquals(expCnt, cbCntr.get()); for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks) checkEvents(set, qryCntr.get() * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true); @@ -330,11 +330,11 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { - return filterCallbackCntr.get() >= expInvkCnt; + return filterCbCntr.get() >= expInvkCnt; } }, TimeUnit.SECONDS.toMillis(20)); - assertEquals(expInvkCnt, filterCallbackCntr.get()); + assertEquals(expInvkCnt, filterCbCntr.get()); for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks) checkEvents(set, expInvkCnt * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true); @@ -355,15 +355,15 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ private void checkEvents(final Set<T2<QueryTestKey, QueryTestValue>> set, final int expCnt, IgniteCache cache, - boolean callback) throws Exception { + boolean cb) throws Exception { assertTrue(GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return set.size() >= expCnt; } }, 10000L)); - int startKey = callback ? KEYS : 0; - int endKey = callback ? KEYS + KEYS_FROM_CALLBACK : KEYS; + int startKey = cb ? KEYS : 0; + int endKey = cb ? KEYS + KEYS_FROM_CALLBACK : KEYS; for (int i = startKey; i < endKey; i++) { QueryTestKey key = new QueryTestKey(i); @@ -433,7 +433,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); } - filterCallbackCntr.incrementAndGet(); + filterCbCntr.incrementAndGet(); } return true; @@ -453,31 +453,31 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs private final AtomicInteger cntr; /** */ - private final AtomicInteger callbackCntr; + private final AtomicInteger cbCntr; /** */ - private final Set<T2<QueryTestKey, QueryTestValue>> evtsFromCallback; + private final Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb; /** */ private IgniteCache<QueryTestKey, QueryTestValue> cache; /** * @param rcvsEvts Set for received events. - * @param evtsFromCallback Set for received events. + * @param evtsFromCb Set for received events. * @param cache Ignite cache. * @param cntr Received events counter. - * @param callbackCntr Received events counter from callbacks. + * @param cbCntr Received events counter from callbacks. */ public TestCacheAsyncEventListener(Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts, - Set<T2<QueryTestKey, QueryTestValue>> evtsFromCallback, + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb, @Nullable IgniteCache cache, AtomicInteger cntr, - AtomicInteger callbackCntr) { + AtomicInteger cbCntr) { this.rcvsEvts = rcvsEvts; - this.evtsFromCallback = evtsFromCallback; + this.evtsFromCb = evtsFromCb; this.cache = cache; this.cntr = cntr; - this.callbackCntr = callbackCntr; + this.cbCntr = cbCntr; } /** {@inheritDoc} */ @@ -505,9 +505,9 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs } } else { - evtsFromCallback.add(new T2<>(e.getKey(), e.getValue())); + evtsFromCb.add(new T2<>(e.getKey(), e.getValue())); - callbackCntr.incrementAndGet(); + cbCntr.incrementAndGet(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5d15b1e7/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java index ab48ae2..fa4e642 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java @@ -46,4 +46,4 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite { return suite; } -} \ No newline at end of file +}
