Repository: ignite Updated Branches: refs/heads/ignite-2004 3f2c2e65d -> 94e301d8c
http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java index 8fe088a..ec6ed4a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java @@ -266,7 +266,7 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC Map<Integer, Long> partCntr, IgniteCache<Object, Object> cache) throws Exception { - Object key = new QueryTestKey(rnd.nextInt(KEYS)); + Object key = rnd.nextInt(KEYS); Object newVal = value(rnd); Object oldVal = expData.get(key); @@ -280,7 +280,7 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); try { - // log.info("Random operation [key=" + key + ", op=" + op + ']'); + log.info("Random operation [key=" + key + ", op=" + op + ']'); switch (op) { case 0: { @@ -491,7 +491,8 @@ public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheC default: fail("Op:" + op); } - } finally { + } + finally { if (tx != null) tx.close(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java new file mode 100644 index 0000000..c42533e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTest { + /** */ + public static final int LISTENER_CNT = 20; + + /** */ + public static final int KEYS = 10; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + public static final int ITERATION_CNT = 100; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(1000); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.OFFHEAP_TIERED); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.OFFHEAP_VALUES); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicared() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicaredOffheap() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.OFFHEAP_TIERED); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapWithoutBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.ATOMIC, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, CacheAtomicityMode.TRANSACTIONAL, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheap() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, CacheAtomicityMode.TRANSACTIONAL, + CacheMemoryMode.ONHEAP_TIERED); + + doOrderingTest(ccfg, false); + } + + /** + * @param ccfg Cache configuration. + * @param async Async filter. + * @throws Exception If failed. + */ + protected void doOrderingTest( + final CacheConfiguration ccfg, + final boolean async) + throws Exception { + ignite(0).createCache(ccfg); + + List<QueryCursor<?>> qries = new ArrayList<>(); + + try { + List<BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>> rcvdEvts = + new ArrayList<>(LISTENER_CNT); + + final AtomicInteger qryCntr = new AtomicInteger(0); + + final int threadCnt = LISTENER_CNT; + + for (int i = 0; i < LISTENER_CNT; i++) { + BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue = + new ArrayBlockingQueue<>(ITERATION_CNT * threadCnt); + + ContinuousQuery qry = new ContinuousQuery(); + + qry.setLocalListener(new TestCacheEventListener(queue, qryCntr)); + + rcvdEvts.add(queue); + + IgniteCache<Object, Object> cache = + grid(ThreadLocalRandom.current().nextInt(NODES)).cache(ccfg.getName()); + + QueryCursor qryCursor = cache.query(qry); + + qries.add(qryCursor); + } + + IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < ITERATION_CNT; i++) { + IgniteCache<QueryTestKey, QueryTestValue> cache = + grid(rnd.nextInt(NODES)).cache(ccfg.getName()); + + QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS)); + + boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == + CacheAtomicityMode.TRANSACTIONAL && rnd.nextBoolean(); + + Transaction tx = null; + + if (startTx) + tx = cache.unwrap(Ignite.class).transactions().txStart(); + + try { + if ((cache.get(key) == null) || rnd.nextBoolean()) { + cache.invoke(key, new CacheEntryProcessor<QueryTestKey, QueryTestValue, Object>() { + @Override public Object process( + MutableEntry<QueryTestKey, QueryTestValue> entry, + Object... arguments) + throws EntryProcessorException { + if (entry.exists()) + entry.setValue(new QueryTestValue(entry.getValue().val1 + 1)); + else + entry.setValue(new QueryTestValue(0)); + + return null; + } + }); + } + else { + QueryTestValue val; + QueryTestValue newVal; + + do { + val = cache.get(key); + + newVal = val == null ? + new QueryTestValue(0) : new QueryTestValue(val.val1 + 1); + } + while (!cache.replace(key, val, newVal)); + } + } + finally { + if (tx != null) + tx.commit(); + } + } + } + }, threadCnt, "put-thread"); + + f.get(15, TimeUnit.SECONDS); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return qryCntr.get() >= ITERATION_CNT * threadCnt * LISTENER_CNT; + } + }, 1000L); + + for (BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue : rcvdEvts) + checkEvents(queue, ITERATION_CNT * threadCnt); + } + finally { + for (QueryCursor<?> qry : qries) + qry.close(); + + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param queue Event queue. + * @throws Exception If failed. + */ + private void checkEvents(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue, int expCnt) + throws Exception { + CacheEntryEvent<QueryTestKey, QueryTestValue> evt; + int cnt = 0; + Map<QueryTestKey, Integer> vals = new HashMap<>(); + + while ((evt = queue.poll(100, TimeUnit.MILLISECONDS)) != null) { + assertNotNull(evt); + assertNotNull(evt.getKey()); + + Integer preVal = vals.get(evt.getKey()); + + if (preVal == null) + assertEquals(new QueryTestValue(0), evt.getValue()); + else { + if (!new QueryTestValue(preVal + 1).equals(evt.getValue())) + assertEquals(new QueryTestValue(preVal + 1), evt.getValue()); + } + + vals.put(evt.getKey(), evt.getValue().val1); + + ++cnt; + } + + assertEquals(expCnt, cnt); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TimeUnit.MINUTES.toMillis(8); + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter { + /** + * @param clsr Closure. + */ + public CacheTestRemoteFilterAsync( + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) { + super(clsr); + } + } + + /** + * + */ + private static class CacheTestRemoteFilter implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr; + + /** + * @param clsr Closure. + */ + public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> clsr) { + this.clsr = clsr; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) + throws CacheEntryListenerException { + clsr.apply(ignite, e); + + return true; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class TestCacheEventListenerAsync extends TestCacheEventListener { + /** + * @param queue Queue. + * @param cntr Received events counter. + */ + public TestCacheEventListenerAsync(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue, + AtomicInteger cntr) { + super(queue, cntr); + } + } + + /** + * + */ + private static class TestCacheEventListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> { + /** */ + private final BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue; + + /** */ + private final AtomicInteger cntr; + + /** + * @param queue Queue. + * @param cntr Received events counter. + */ + public TestCacheEventListener(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue, + AtomicInteger cntr) { + this.queue = queue; + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> events) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) { + queue.add((CacheEntryEvent<QueryTestKey, QueryTestValue>)e); + + cntr.incrementAndGet(); + } + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index dbe282e..fc101d4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; @@ -311,6 +312,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } }); + qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, Integer>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event) + throws CacheEntryListenerException { + return true; + } + }); + try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) { cache.put(1, 1); cache.put(2, 2); http://git-wip-us.apache.org/repos/asf/ignite/blob/94e301d8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java index fa4e642..ab48ae2 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java @@ -46,4 +46,4 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite { return suite; } -} +} \ No newline at end of file
