Repository: ignite Updated Branches: refs/heads/ignite-2004 cd90d65d7 -> 2f1856bd7
IGNITE-2004 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f1856bd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f1856bd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f1856bd Branch: refs/heads/ignite-2004 Commit: 2f1856bd70c02ff0640eda08a6beb6e4618c0cf2 Parents: cd90d65 Author: nikolay_tikhonov <[email protected]> Authored: Tue Apr 12 13:14:06 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Tue Apr 12 13:14:06 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 1 - ...eContinuousQueryAsyncFilterListenerTest.java | 188 +----- .../CacheContinuousQueryDeadlockTest.java | 523 --------------- ...ontinuousQueryOperationFromCallbackTest.java | 630 +++++++++++++++++++ .../CacheContinuousQueryOrderingEventTest.java | 14 - .../IgniteCacheQuerySelfTestSuite3.java | 6 +- 6 files changed, 634 insertions(+), 728 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2f1856bd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index f64803a..122b7b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -34,7 +34,6 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.eviction.EvictableEntry; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; http://git-wip-us.apache.org/repos/asf/ignite/blob/2f1856bd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java index 6780c18..bb30b39 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.query.continuous; import java.io.Serializable; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -37,10 +36,6 @@ import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; @@ -48,13 +43,13 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; -import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; @@ -479,187 +474,6 @@ public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstr } /** - * @throws Exception If failed. - */ - public void testDeadLockInListenerAtomic() throws Exception { - testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED)); - } - - /** - * @param ccfg Cache configuration. - * @throws Exception If failed. - */ - private void testDeadLockInListener(CacheConfiguration ccfg) throws Exception { - ignite(0).createCache(ccfg); - - final IgniteCache cache = grid(0).cache(ccfg.getName()); - - final QueryTestKey key = affinityKey(cache); - - final QueryTestValue val0 = new QueryTestValue(1); - final QueryTestValue newVal = new QueryTestValue(2); - - ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); - - final CountDownLatch latch = new CountDownLatch(1); - - IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr = - new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { - @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue> e) { - IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); - - QueryTestValue val = e.getValue(); - - if (val == null || !val.equals(val0)) - return; - - Transaction tx = null; - - try { - if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) - tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); - - assertEquals(val, val0); - - latch.countDown(); - - cache0.put(key, newVal); - - if (tx != null) - tx.commit(); - } - catch (Exception exp) { - log.error("Failed: ", exp); - - throw new IgniteException(exp); - } - finally { - if (tx != null) - tx.close(); - } - } - }; - - conQry.setLocalListener(new CacheInvokeListener(lsnrClsr)); - - try (QueryCursor qry = cache.query(conQry)) { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - cache.put(key, val0); - - return null; - } - }); - - f.get(1, SECONDS); - - return null; - } - }, IgniteFutureTimeoutCheckedException.class, null); - - assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS)); - } - } - - /** - * @throws Exception If failed. - */ - public void testDeadLockInFilterAtomic() throws Exception { - testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED)); - } - - /** - * @param ccfg Cache configuration. - * @throws Exception If failed. - */ - private void testDeadLockInFilter(CacheConfiguration ccfg) throws Exception { - ignite(0).createCache(ccfg); - - final IgniteCache cache = grid(0).cache(ccfg.getName()); - - final QueryTestKey key = affinityKey(cache); - - final QueryTestValue val0 = new QueryTestValue(1); - final QueryTestValue newVal = new QueryTestValue(2); - - ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); - - final CountDownLatch latch = new CountDownLatch(1); - - IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr = - new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { - @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue> e) { - IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); - - QueryTestValue val = e.getValue(); - - if (val == null || !val.equals(val0)) - return; - - Transaction tx = null; - - try { - if (cache0.getConfiguration(CacheConfiguration.class) - .getAtomicityMode() == TRANSACTIONAL) - tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); - - assertEquals(val, val0); - - latch.countDown(); - - cache0.put(key, newVal); - - if (tx != null) - tx.commit(); - } - catch (Exception exp) { - log.error("Failed: ", exp); - - throw new IgniteException(exp); - } - finally { - if (tx != null) - tx.close(); - } - } - }; - - conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr))); - - conQry.setLocalListener(new CacheInvokeListener( - new CI2<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { - @Override public void apply(Ignite ignite, - CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) { - // No-op. - } - })); - - try (QueryCursor qry = cache.query(conQry)) { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - cache.put(key, val0); - - return null; - } - }); - - f.get(1, SECONDS); - - return null; - } - }, IgniteFutureTimeoutCheckedException.class, null); - - assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS)); - } - } - - /** * @param ccfg Cache configuration. * @param asyncFilter Async filter. * @param asyncListener Async listener. http://git-wip-us.apache.org/repos/asf/ignite/blob/2f1856bd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java deleted file mode 100644 index 59d5382..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeadlockTest.java +++ /dev/null @@ -1,523 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import java.io.Serializable; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import javax.cache.configuration.FactoryBuilder; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryListenerException; -import javax.cache.event.CacheEntryUpdatedListener; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteException; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; -import org.apache.ignite.cache.CacheMemoryMode; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.query.ContinuousQuery; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiInClosure; -import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.Transaction; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; -import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; -import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; - -/** - * - */ -public class CacheContinuousQueryDeadlockTest extends GridCommonAbstractTest { - /** */ - private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int NODES = 5; - - /** */ - private boolean client; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); - - cfg.setClientMode(client); - - MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); - storeSpi.setExpireCount(1000); - - cfg.setEventStorageSpi(storeSpi); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(NODES - 1); - - client = true; - - startGrid(NODES - 1); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - - super.afterTestsStopped(); - } - - /** - * @throws Exception If failed. - */ - public void testDeadLockInListenerAtomic() throws Exception { - testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED)); - } - - /** - * @throws Exception If failed. - */ - public void testDeadLockInListenerAtomicWithOffheap() throws Exception { - testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED)); - } - - /** - * @throws Exception If failed. - */ - public void testDeadLockInListenerAtomicWithOffheapValues() throws Exception { - testDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_VALUES)); - } - - /** - * @throws Exception If failed. - */ - public void testDeadLockInListenerReplicatedAtomic() throws Exception { - testDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED)); - } - - /** - * @param ccfg Cache configuration. - * @throws Exception If failed. - */ - private void testDeadLockInListener(CacheConfiguration ccfg) throws Exception { - ignite(0).createCache(ccfg); - - final IgniteCache cache = grid(0).cache(ccfg.getName()); - - final QueryTestKey key = affinityKey(cache); - - final QueryTestValue val0 = new QueryTestValue(1); - final QueryTestValue newVal = new QueryTestValue(2); - - ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); - - final CountDownLatch latch = new CountDownLatch(1); - - IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr = - new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { - @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue> e) { - IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); - - QueryTestValue val = e.getValue(); - - if (val == null || !val.equals(val0)) - return; - - Transaction tx = null; - - try { - if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) - tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); - - assertEquals(val, val0); - - latch.countDown(); - - cache0.put(key, newVal); - - if (tx != null) - tx.commit(); - } - catch (Exception exp) { - log.error("Failed: ", exp); - - throw new IgniteException(exp); - } - } - }; - - conQry.setLocalListener(new CacheInvokeListener(lsnrClsr)); - - try (QueryCursor qry = cache.query(conQry)) { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - cache.put(key, val0); - - return null; - } - }); - - f.get(1, SECONDS); - - return null; - } - }, IgniteFutureTimeoutCheckedException.class, null); - - assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS)); - } - } - - /** - * @throws Exception If failed. - */ - public void testDeadLockInFilterAtomic() throws Exception { - testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED)); - } - - /** - * @throws Exception If failed. - */ - public void testDeadLockInFilterAtomicOffheapValues() throws Exception { - testDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_VALUES)); - } - - /** - * @throws Exception If failed. - */ - public void testDeadLockInFilterReplicated() throws Exception { - testDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED)); - } - - /** - * @param ccfg Cache configuration. - * @throws Exception If failed. - */ - private void testDeadLockInFilter(CacheConfiguration ccfg) throws Exception { - ignite(0).createCache(ccfg); - - final IgniteCache cache = grid(0).cache(ccfg.getName()); - - final QueryTestKey key = affinityKey(cache); - - final QueryTestValue val0 = new QueryTestValue(1); - final QueryTestValue newVal = new QueryTestValue(2); - - ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); - - final CountDownLatch latch = new CountDownLatch(1); - - IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr = - new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { - @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue> e) { - IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); - - QueryTestValue val = e.getValue(); - - if (val == null || !val.equals(val0)) - return; - - Transaction tx = null; - - try { - if (cache0.getConfiguration(CacheConfiguration.class) - .getAtomicityMode() == TRANSACTIONAL) - tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); - - assertEquals(val, val0); - - latch.countDown(); - - cache0.put(key, newVal); - - if (tx != null) - tx.commit(); - } - catch (Exception exp) { - log.error("Failed: ", exp); - - throw new IgniteException(exp); - } - finally { - if (tx != null) - tx.close(); - } - } - }; - - conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(new CacheTestRemoteFilter(fltrClsr))); - - conQry.setLocalListener(new CacheInvokeListener( - new CI2<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { - @Override public void apply(Ignite ignite, - CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) { - // No-op. - } - })); - - try (QueryCursor qry = cache.query(conQry)) { - GridTestUtils.assertThrows(log, new Callable<Object>() { - @Override public Object call() throws Exception { - IgniteInternalFuture<Void> f = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - cache.put(key, val0); - - return null; - } - }); - - f.get(1, SECONDS); - - return null; - } - }, IgniteFutureTimeoutCheckedException.class, null); - - assertTrue("Failed. Deadlock early than put happened.", U.await(latch, 3, SECONDS)); - } - } - - /** - * @param cache Ignite cache. - * @return Key. - */ - private QueryTestKey affinityKey(IgniteCache cache) { - Affinity aff = affinity(cache); - - for (int i = 0; i < 10_000; i++) { - QueryTestKey key = new QueryTestKey(i); - - if (aff.isPrimary(localNode(cache), key)) - return key; - } - - throw new IgniteException("Failed to found primary key."); - } - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return TimeUnit.SECONDS.toMillis(30); - } - - /** - * - */ - private static class CacheTestRemoteFilter implements - CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { - /** */ - @IgniteInstanceResource - private Ignite ignite; - - /** */ - private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr; - - /** - * @param clsr Closure. - */ - public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> clsr) { - this.clsr = clsr; - } - - /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) - throws CacheEntryListenerException { - clsr.apply(ignite, e); - - return true; - } - } - - /** - * - */ - private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> { - @IgniteInstanceResource - private Ignite ignite; - - /** */ - private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr; - - /** - * @param clsr Closure. - */ - public CacheInvokeListener(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> clsr) { - this.clsr = clsr; - } - - /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> events) - throws CacheEntryListenerException { - for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) - clsr.apply(ignite, e); - } - } - - /** - * @param cacheMode Cache mode. - * @param backups Number of backups. - * @param atomicityMode Cache atomicity mode. - * @param memoryMode Cache memory mode. - * @return Cache configuration. - */ - protected CacheConfiguration<Object, Object> cacheConfiguration( - CacheMode cacheMode, - int backups, - CacheAtomicityMode atomicityMode, - CacheMemoryMode memoryMode) { - CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); - - ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups); - ccfg.setAtomicityMode(atomicityMode); - ccfg.setCacheMode(cacheMode); - ccfg.setMemoryMode(memoryMode); - ccfg.setWriteSynchronizationMode(FULL_SYNC); - ccfg.setAtomicWriteOrderMode(PRIMARY); - - if (cacheMode == PARTITIONED) - ccfg.setBackups(backups); - - return ccfg; - } - - /** - * - */ - public static class QueryTestKey implements Serializable, Comparable { - /** */ - private final Integer key; - - /** - * @param key Key. - */ - public QueryTestKey(Integer key) { - this.key = key; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - QueryTestKey that = (QueryTestKey)o; - - return key.equals(that.key); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return key.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(QueryTestKey.class, this); - } - - /** {@inheritDoc} */ - @Override public int compareTo(Object o) { - return key - ((QueryTestKey)o).key; - } - } - - /** - * - */ - public static class QueryTestValue implements Serializable { - /** */ - @GridToStringInclude - protected final Integer val1; - - /** */ - @GridToStringInclude - protected final String val2; - - /** - * @param val Value. - */ - public QueryTestValue(Integer val) { - this.val1 = val; - this.val2 = String.valueOf(val); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - QueryTestValue that = (QueryTestValue)o; - - return val1.equals(that.val1) && val2.equals(that.val2); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = val1.hashCode(); - - res = 31 * res + val2.hashCode(); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(QueryTestValue.class, this); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f1856bd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java new file mode 100644 index 0000000..058789e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java @@ -0,0 +1,630 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * + */ +public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbstractTest { + /** */ + public static final int KEYS = 10; + + /** */ + public static final int KEYS_FROM_CALLBACK = 20; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + public static final int ITERATION_CNT = 20; + + /** */ + private boolean client; + + /** */ + private static AtomicInteger filterCallbackCntr = new AtomicInteger(0); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(100); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + filterCallbackCntr.set(0); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicTwoBackups() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicTwoBackupsFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicWithoutBackupsFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackupsFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.TRANSACTIONAL); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicWithoutBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, CacheAtomicityMode.TRANSACTIONAL); + + doTest(ccfg, true); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + protected void doTest(final CacheConfiguration ccfg, boolean fromLsnr) throws Exception { + ignite(0).createCache(ccfg); + + List<QueryCursor<?>> qries = new ArrayList<>(); + + if (!fromLsnr) + assertEquals(0, filterCallbackCntr.get()); + + try { + List<Set<T2<QueryTestKey, QueryTestValue>>> rcvdEvts = new ArrayList<>(NODES); + List<Set<T2<QueryTestKey, QueryTestValue>>> evtsFromCallbacks = new ArrayList<>(NODES); + + final AtomicInteger qryCntr = new AtomicInteger(0); + + final AtomicInteger callbackCntr = new AtomicInteger(0); + + final int threadCnt = 10; + + for (int idx = 0; idx < NODES; idx++) { + Set<T2<QueryTestKey, QueryTestValue>> evts = new ConcurrentHashSet<>(); + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCallback = new ConcurrentHashSet<>(); + + IgniteCache<Object, Object> cache = grid(idx).getOrCreateCache(ccfg.getName()); + + ContinuousQuery qry = new ContinuousQuery(); + + qry.setLocalListener(new TestCacheAsyncEventListener(evts, evtsFromCallback, + fromLsnr ? cache : null, qryCntr, callbackCntr)); + + if (!fromLsnr) + qry.setRemoteFilterFactory( + FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(ccfg.getName()))); + + rcvdEvts.add(evts); + evtsFromCallbacks.add(evtsFromCallback); + + QueryCursor qryCursor = cache.query(qry); + + qries.add(qryCursor); + } + + IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < ITERATION_CNT; i++) { + IgniteCache<QueryTestKey, QueryTestValue> cache = + grid(rnd.nextInt(NODES)).cache(ccfg.getName()); + + QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS)); + + boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == + CacheAtomicityMode.TRANSACTIONAL && rnd.nextBoolean(); + + Transaction tx = null; + + if (startTx) + tx = cache.unwrap(Ignite.class).transactions().txStart(); + + try { + if ((cache.get(key) == null) || rnd.nextBoolean()) { + cache.invoke(key, new IncrementTestEntryProcessor()); + } + else { + QueryTestValue val; + QueryTestValue newVal; + + do { + val = cache.get(key); + + newVal = val == null ? + new QueryTestValue(0) : new QueryTestValue(val.val1 + 1); + } + while (!cache.replace(key, val, newVal)); + } + } + finally { + if (tx != null) + tx.commit(); + } + } + } + }, threadCnt, "put-thread"); + + f.get(30, TimeUnit.SECONDS); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return qryCntr.get() >= ITERATION_CNT * threadCnt * NODES; + } + }, TimeUnit.MINUTES.toMillis(2)); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : rcvdEvts) + checkEvents(set, ITERATION_CNT * threadCnt, grid(0).cache(ccfg.getName()), false); + + if (fromLsnr) { + final int expCnt = qryCntr.get() * NODES * KEYS_FROM_CALLBACK; + + assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + callbackCntr.get() + "]", + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return callbackCntr.get() >= expCnt; + } + }, TimeUnit.SECONDS.toMillis(60))); + + assertEquals(expCnt, callbackCntr.get()); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks) + checkEvents(set, qryCntr.get() * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true); + } + else { + final int expInvkCnt = ITERATION_CNT * threadCnt * + (ccfg.getCacheMode() != REPLICATED ? (ccfg.getBackups() + 1) : NODES - 1) * NODES; + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return filterCallbackCntr.get() >= expInvkCnt; + } + }, TimeUnit.SECONDS.toMillis(20)); + + assertEquals(expInvkCnt, filterCallbackCntr.get()); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks) + checkEvents(set, expInvkCnt * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true); + } + } + finally { + for (QueryCursor<?> qry : qries) + qry.close(); + + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param expCnt Expected count. + * @param cache Cache. + * @param set Received events. + * @throws Exception If failed. + */ + private void checkEvents(final Set<T2<QueryTestKey, QueryTestValue>> set, final int expCnt, IgniteCache cache, + boolean callback) throws Exception { + assertTrue(GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return set.size() >= expCnt; + } + }, 10000L)); + + int startKey = callback ? KEYS : 0; + int endKey = callback ? KEYS + KEYS_FROM_CALLBACK : KEYS; + + for (int i = startKey; i < endKey; i++) { + QueryTestKey key = new QueryTestKey(i); + + QueryTestValue maxVal = (QueryTestValue)cache.get(key); + + for (int val = 0; val <= maxVal.val1; val++) + assertTrue(set.remove(new T2<>(key, new QueryTestValue(val)))); + } + + assertTrue(set.isEmpty()); + } + + /** + * + */ + private static class IncrementTestEntryProcessor implements + CacheEntryProcessor<QueryTestKey, QueryTestValue, Object> { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<QueryTestKey, QueryTestValue> entry, Object... arguments) + throws EntryProcessorException { + if (entry.exists()) + entry.setValue(new QueryTestValue(entry.getValue().val1 + 1)); + else + entry.setValue(new QueryTestValue(0)); + + return null; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private String cacheName; + + /** + * @param cacheName Cache name. + */ + public CacheTestRemoteFilterAsync(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) + throws CacheEntryListenerException { + if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) { + IgniteCache<QueryTestKey, QueryTestValue> cache = ignite.cache(cacheName); + + if (ThreadLocalRandom.current().nextBoolean()) { + Set<QueryTestKey> keys = new LinkedHashSet<>(); + + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + keys.add(new QueryTestKey(key)); + + cache.invokeAll(keys, new IncrementTestEntryProcessor()); + } + else { + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); + } + + filterCallbackCntr.incrementAndGet(); + } + + return true; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class TestCacheAsyncEventListener + implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> { + /** */ + private final Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts; + + /** */ + private final AtomicInteger cntr; + + /** */ + private final AtomicInteger callbackCntr; + + /** */ + private final Set<T2<QueryTestKey, QueryTestValue>> evtsFromCallback; + + /** */ + private IgniteCache<QueryTestKey, QueryTestValue> cache; + + /** + * @param rcvsEvts Set for received events. + * @param evtsFromCallback Set for received events. + * @param cache Ignite cache. + * @param cntr Received events counter. + * @param callbackCntr Received events counter from callbacks. + */ + public TestCacheAsyncEventListener(Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts, + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCallback, + @Nullable IgniteCache cache, + AtomicInteger cntr, + AtomicInteger callbackCntr) { + this.rcvsEvts = rcvsEvts; + this.evtsFromCallback = evtsFromCallback; + this.cache = cache; + this.cntr = cntr; + this.callbackCntr = callbackCntr; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : evts) { + if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) { + rcvsEvts.add(new T2<>(e.getKey(), e.getValue())); + + cntr.incrementAndGet(); + + if (cache != null) { + if (ThreadLocalRandom.current().nextBoolean()) { + Set<QueryTestKey> keys = new LinkedHashSet<>(); + + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + keys.add(new QueryTestKey(key)); + + cache.invokeAll(keys, new IncrementTestEntryProcessor()); + } + else { + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); + } + } + } + else { + evtsFromCallback.add(new T2<>(e.getKey(), e.getValue())); + + callbackCntr.incrementAndGet(); + } + } + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + cacheMode + "-" + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(PRIMARY_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f1856bd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java index 8a7eb86..0827dfa 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java @@ -525,24 +525,10 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> events) throws CacheEntryListenerException { - Integer prevVal = null; - for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) { - if (prevVal == null) - prevVal = e.getValue().val1; - queue.add((CacheEntryEvent<QueryTestKey, QueryTestValue>)e); cntr.incrementAndGet(); - - if (prevVal > e.getValue().val1) { - int z = 0; - - ++z; - } - else - prevVal = e.getValue().val1; - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f1856bd/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java index 0b43613..78cd62f 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite3.java @@ -21,10 +21,10 @@ import junit.framework.TestSuite; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFilterListenerTest; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryDeadlockTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryAsyncFilterRandomOperationTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryExecuteInPrimaryTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationFromCallbackTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOrderingEventTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest; @@ -95,8 +95,8 @@ public class IgniteCacheQuerySelfTestSuite3 extends TestSuite { suite.addTestSuite(CacheContinuousQueryAsyncFilterListenerTest.class); suite.addTestSuite(CacheContinuousQueryFactoryFilterRandomOperationTest.class); suite.addTestSuite(CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.class); - suite.addTestSuite(CacheContinuousQueryDeadlockTest.class); suite.addTestSuite(CacheContinuousQueryOrderingEventTest.class); + suite.addTestSuite(CacheContinuousQueryOperationFromCallbackTest.class); suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class); suite.addTestSuite(CacheContinuousBatchAckTest.class); suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);
