Repository: ignite Updated Branches: refs/heads/ignite-2384 df56be914 -> 9e39e500c
IGNITE-2384 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e39e500 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e39e500 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e39e500 Branch: refs/heads/ignite-2384 Commit: 9e39e500c5cbdbdb046abade8c838057589b8e66 Parents: df56be9 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Mon Jan 18 16:27:29 2016 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Mon Jan 18 16:27:29 2016 +0300 ---------------------------------------------------------------------- ...ContinuousQueryFailoverAbstractSelfTest.java | 80 ++++++++++++++++++++ .../CacheContinuousQueryLostPartitionTest.java | 77 ++++++++++++++++--- ...CacheContinuousQueryLostPartitionTxTest.java | 36 --------- .../IgniteBinaryCacheQueryTestSuite.java | 2 - 4 files changed, 145 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9e39e500/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 283da80..5de3d0f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -41,6 +41,9 @@ import javax.cache.CacheException; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.expiry.TouchedExpiryPolicy; import javax.cache.processor.EntryProcessorException; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; @@ -90,11 +93,13 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** * @@ -1278,6 +1283,81 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * @throws Exception If failed. */ + public void testBackupQueueEvict() throws Exception { + startGridsMultiThreaded(2); + + client = true; + + Ignite qryClient = startGrid(2); + + CacheEventListener1 lsnr = new CacheEventListener1(false); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = qryClient.cache(null).query(qry); + + final Collection<Object> backupQueue = backupQueue(ignite(0)); + + assertEquals(0, backupQueue.size()); + + long ttl = 100; + + final ExpiryPolicy expiry = new TouchedExpiryPolicy(new Duration(MILLISECONDS, ttl)); + + final IgniteCache<Object, Object> cache0 = ignite(2).cache(null).withExpiryPolicy(expiry); + + final List<Integer> keys = primaryKeys(ignite(1).cache(null), BACKUP_ACK_THRESHOLD); + + CountDownLatch latch = new CountDownLatch(keys.size()); + + lsnr.latch = latch; + + for (Integer key : keys) { + log.info("Put: " + key); + + cache0.put(key, key); + } + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return backupQueue.isEmpty(); + } + }, 2000); + + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + + boolean wait = waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return cache0.localPeek(keys.get(0)) == null; + } + }, ttl + 1000); + + assertTrue("Entry evicted.", wait); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return backupQueue.isEmpty(); + } + }, 2000); + + assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD); + + if (backupQueue.size() != 0) { + for (Object o : backupQueue) { + CacheContinuousQueryEntry e = (CacheContinuousQueryEntry)o; + + assertNotSame("Evicted entry added to backup queue.", -1L, e.updateCounter()); + } + } + + cur.close(); + } + + /** + * @throws Exception If failed. + */ public void testBackupQueueCleanupServerQuery() throws Exception { Ignite qryClient = startGridsMultiThreaded(2); http://git-wip-us.apache.org/repos/asf/ignite/blob/9e39e500/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java index 632a7a3..30613a4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static javax.cache.configuration.FactoryBuilder.factoryOf; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; @@ -51,6 +52,9 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes /** Cache name. */ public static final String CACHE_NAME = "test_cache"; + /** Cache name. */ + public static final String TX_CACHE_NAME = "tx_test_cache"; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -66,12 +70,41 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes /** * @throws Exception If failed. */ - public void testEvent() throws Exception { - IgniteCache<Integer, String> cache1 = grid(0).getOrCreateCache(CACHE_NAME); + public void testTxEvent() throws Exception { + testEvent(TX_CACHE_NAME, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicEvent() throws Exception { + testEvent(CACHE_NAME, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxClientEvent() throws Exception { + testEvent(TX_CACHE_NAME, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicClientEvent() throws Exception { + testEvent(CACHE_NAME, true); + } + + /** + * @param cacheName Cache name. + * @throws Exception If failed. + */ + public void testEvent(String cacheName, boolean client) throws Exception { + IgniteCache<Integer, String> cache1 = grid(0).getOrCreateCache(cacheName); final AllEventListener<Integer, String> lsnr1 = registerCacheListener(cache1); - IgniteCache<Integer, String> cache2 = grid(1).getOrCreateCache(CACHE_NAME); + IgniteCache<Integer, String> cache2 = grid(1).getOrCreateCache(cacheName); Integer key = primaryKey(cache1); @@ -79,7 +112,17 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes // Note the issue is only reproducible if the second registration is done right // here, after the first put() above. - final AllEventListener<Integer, String> lsnr2 = registerCacheListener(cache2); + AllEventListener<Integer, String> lsnr20; + + if (client) { + IgniteCache<Integer, String> clnCache = startGrid(3).getOrCreateCache(cacheName); + + lsnr20 = registerCacheListener(clnCache); + } + else + lsnr20 = registerCacheListener(cache2); + + final AllEventListener<Integer, String> lsnr2 = lsnr20; assert GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { @@ -127,36 +170,46 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes * @param cache Cache. * @return Event listener. */ - private AllEventListener<Integer, String> registerCacheListener( - IgniteCache<Integer, String> cache) { + private AllEventListener<Integer, String> registerCacheListener(IgniteCache<Integer, String> cache) { AllEventListener<Integer, String> lsnr = new AllEventListener<>(); + cache.registerCacheEntryListener( new MutableCacheEntryListenerConfiguration<>(factoryOf(lsnr), null, true, false)); + return lsnr; } /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration() throws Exception { - IgniteConfiguration cfg = super.getConfiguration(); + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); TcpDiscoverySpi spi = new TcpDiscoverySpi(); spi.setIpFinder(ipFinder); cfg.setDiscoverySpi(spi); - cfg.setCacheConfiguration(cache()); + cfg.setCacheConfiguration(cache(TX_CACHE_NAME), cache(CACHE_NAME)); + + if (name.endsWith("3")) + cfg.setClientMode(true); return cfg; } /** + * @param cacheName Cache name. * @return Cache configuration. */ - protected CacheConfiguration<Integer, String> cache() { - CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>(CACHE_NAME); + protected CacheConfiguration<Integer, String> cache(String cacheName) { + CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>(cacheName); cfg.setCacheMode(PARTITIONED); - cfg.setAtomicityMode(ATOMIC); + + if (cacheName.equals(CACHE_NAME)) + cfg.setAtomicityMode(ATOMIC); + else + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setRebalanceMode(SYNC); cfg.setWriteSynchronizationMode(PRIMARY_SYNC); cfg.setBackups(0); http://git-wip-us.apache.org/repos/asf/ignite/blob/9e39e500/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java deleted file mode 100644 index bd72dc2..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTxTest.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import org.apache.ignite.configuration.CacheConfiguration; - -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; - -/** - * - */ -public class CacheContinuousQueryLostPartitionTxTest extends CacheContinuousQueryLostPartitionTest { - /** {@inheritDoc} */ - @Override protected CacheConfiguration<Integer, String> cache() { - CacheConfiguration<Integer, String> ccfg = super.cache(); - - ccfg.setAtomicityMode(TRANSACTIONAL); - - return ccfg; - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/9e39e500/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index d101493..3bab1f9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -76,7 +76,6 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTest; -import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartitionTxTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest; @@ -203,7 +202,6 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class); suite.addTestSuite(CacheContinuousQueryLostPartitionTest.class); - suite.addTestSuite(CacheContinuousQueryLostPartitionTxTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);