Repository: ignite Updated Branches: refs/heads/ignite-2004 07d62cea1 -> 39fad6505
IGNITE-2004 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39fad650 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39fad650 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39fad650 Branch: refs/heads/ignite-2004 Commit: 39fad650555907fc5c1f44d4d5bcd4682b8deab7 Parents: 07d62ce Author: nikolay_tikhonov <[email protected]> Authored: Wed Apr 13 13:57:12 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Apr 13 13:57:12 2016 +0300 ---------------------------------------------------------------------- .../continuous/CacheContinuousQueryHandler.java | 39 ++++++-- .../thread/IgniteStripedThreadPoolExecutor.java | 4 + ...FailoverAtomicPrimaryWriteOrderSelfTest.java | 49 ++++++++++ ...sQueryAsyncFailoverTxReplicatedSelfTest.java | 37 ++++++++ ...eContinuousQueryAsyncFailoverTxSelfTest.java | 44 +++++++++ ...ContinuousQueryFailoverAbstractSelfTest.java | 63 +++++++++++-- ...ontinuousQueryOperationFromCallbackTest.java | 78 +++++++++++---- .../CacheContinuousQueryOrderingEventTest.java | 99 ++++++++++++++++---- .../IgniteCacheQuerySelfTestSuite4.java | 7 ++ 9 files changed, 370 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index ed39a17..dc6d20e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -579,18 +579,43 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler assert objs != null; assert ctx != null; - final Collection<CacheContinuousQueryEntry> ents = (Collection<CacheContinuousQueryEntry>)objs; + final Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs; - if (!ents.isEmpty()) { + if (!entries.isEmpty()) { if (asyncCallback) { - ctx.asyncCallbackPool().execute(new Runnable() { - @Override public void run() { - notifyCallback0(nodeId, ctx, ents); + if (entries.size() != 1) { + Map<Integer, Collection<CacheContinuousQueryEntry>> entriesByPart = new HashMap<>(); + + for (CacheContinuousQueryEntry e : entries) { + Collection<CacheContinuousQueryEntry> ents = entriesByPart.get(e.partition()); + + if (ents == null) { + ents = new ArrayList<>(entries.size()); + + entriesByPart.put(e.partition(), ents); + } + + ents.add(e); + } + + for (final Map.Entry<Integer, Collection<CacheContinuousQueryEntry>> e : entriesByPart.entrySet()) { + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, e.getValue()); + } + }, e.getKey()); } - }); + } + else { + ctx.asyncCallbackPool().execute(new Runnable() { + @Override public void run() { + notifyCallback0(nodeId, ctx, entries); + } + }, entries.iterator().next().partition()); + } } else - notifyCallback0(nodeId, ctx, ents); + notifyCallback0(nodeId, ctx, entries); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java index 44ea823..0dc5588 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java @@ -21,6 +21,8 @@ import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -29,7 +31,9 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jsr166.ThreadLocalRandom8; /** * An {@link ExecutorService} that executes submitted tasks using pooled grid threads. http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java new file mode 100644 index 0000000..be7a115 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicWriteOrderMode; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode writeOrderMode() { + return PRIMARY; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java new file mode 100644 index 0000000..4460498 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest extends CacheContinuousQueryFailoverTxSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java new file mode 100644 index 0000000..8f0bd0e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 4226537..083367c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -99,7 +100,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; -import static org.apache.ignite.cache.CacheMemoryMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -167,6 +168,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } /** + * @return Async callback flag. + */ + protected boolean asyncCallback() { + return false; + } + + /** * @return Near cache configuration. */ protected NearCacheConfiguration nearCacheConfiguration() { @@ -476,7 +484,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC for (int j = 0; j < 50; ++j) { ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() + : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -560,7 +569,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -721,7 +730,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -841,7 +850,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Affinity<Object> aff = qryClient.affinity(null); - CacheEventListener1 lsnr = new CacheEventListener1(false); + CacheEventListener1 lsnr = asyncCallback() ? new CacheEventAsyncListener1(false) + : new CacheEventListener1(false); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -1545,7 +1555,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC qry.setLocalListener(lsnr); - qry.setRemoteFilter(new CacheEventFilter()); + qry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); QueryCursor<?> cur = qryClnCache.query(qry); @@ -1639,7 +1649,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC newQry.setLocalListener(dinLsnr); - newQry.setRemoteFilter(new CacheEventFilter()); + newQry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); dinQry = qryClnCache.query(newQry); @@ -1786,7 +1796,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null); - final CacheEventListener2 lsnr = new CacheEventListener2(); + final CacheEventListener2 lsnr = asyncCallback() ? new CacheEventAsyncListener2() : new CacheEventListener2(); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -2144,6 +2154,19 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncListener1 extends CacheEventListener1 { + /** + * @param saveAll Save all events flag. + */ + CacheEventAsyncListener1(boolean saveAll) { + super(saveAll); + } + } + + /** + * + */ private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> { /** */ private volatile CountDownLatch latch; @@ -2208,6 +2231,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncListener2 extends CacheEventListener2 { + // No-op. + } + + /** + * + */ private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> { /** */ @LoggerResource @@ -2275,6 +2306,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + public static class CacheEventAsyncListener3 extends CacheEventListener3 { + // No-op. + } + + /** + * + */ public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object, Object>, CacheEntryEventSerializableFilter<Object, Object> { /** Keys. */ @@ -2303,6 +2342,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncFilter extends CacheEventFilter { + // No-op. + } + + /** + * + */ public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> { /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java index d301036..2ead28a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java @@ -39,6 +39,7 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; @@ -64,6 +65,7 @@ import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; /** @@ -136,7 +138,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testAtomicTwoBackups() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, FULL_SYNC); doTest(ccfg, true); } @@ -145,7 +147,16 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testAtomicReplicatedFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, FULL_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedFilterPrimary() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, PRIMARY_SYNC); doTest(ccfg, false); } @@ -154,7 +165,16 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testAtomicTwoBackupsFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, FULL_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicTwoBackupsFilterPrimary() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, PRIMARY_SYNC); doTest(ccfg, false); } @@ -163,7 +183,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testAtomicWithoutBackupsFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, ATOMIC); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, ATOMIC, FULL_SYNC); doTest(ccfg, false); } @@ -172,7 +192,16 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testTxTwoBackupsFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackupsFilterPrimary() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, PRIMARY_SYNC); doTest(ccfg, false); } @@ -181,7 +210,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testTxReplicatedFilter() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL, FULL_SYNC); doTest(ccfg, false); } @@ -190,7 +219,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testTxTwoBackup() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, FULL_SYNC); doTest(ccfg, true); } @@ -199,7 +228,16 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @throws Exception If failed. */ public void testTxReplicated() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL); + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedPrimary() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, PRIMARY_SYNC); doTest(ccfg, true); } @@ -223,7 +261,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs final AtomicInteger cbCntr = new AtomicInteger(0); - final int threadCnt = 10; + final int threadCnt = IgniteConfiguration.DFLT_SYSTEM_CORE_THREAD_CNT * 2; for (int idx = 0; idx < NODES; idx++) { Set<T2<QueryTestKey, QueryTestValue>> evts = Collections. @@ -306,12 +344,14 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs if (fromLsnr) { final int expCnt = qryCntr.get() * NODES * KEYS_FROM_CALLBACK; + boolean condition = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cbCntr.get() >= expCnt; + } + }, TimeUnit.SECONDS.toMillis(60)); + assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get() + "]", - GridTestUtils.waitForCondition(new PA() { - @Override public boolean apply() { - return cbCntr.get() >= expCnt; - } - }, TimeUnit.SECONDS.toMillis(60))); + condition); assertEquals(expCnt, cbCntr.get()); @@ -326,7 +366,7 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs @Override public boolean apply() { return filterCbCntr.get() >= expInvkCnt; } - }, TimeUnit.SECONDS.toMillis(20)); + }, TimeUnit.SECONDS.toMillis(60)); assertEquals(expInvkCnt, filterCbCntr.get()); @@ -511,18 +551,20 @@ public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbs * @param cacheMode Cache mode. * @param backups Number of backups. * @param atomicityMode Cache atomicity mode. + * @param writeMode Write sync mode. * @return Cache configuration. */ protected CacheConfiguration<Object, Object> cacheConfiguration( CacheMode cacheMode, int backups, - CacheAtomicityMode atomicityMode) { + CacheAtomicityMode atomicityMode, + CacheWriteSynchronizationMode writeMode) { CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); - ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + cacheMode + "-" + backups); + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + writeMode + "-" + backups); ccfg.setAtomicityMode(atomicityMode); ccfg.setCacheMode(cacheMode); - ccfg.setWriteSynchronizationMode(PRIMARY_SYNC); + ccfg.setWriteSynchronizationMode(writeMode); ccfg.setAtomicWriteOrderMode(PRIMARY); if (cacheMode == PARTITIONED) http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java index 7efca53..c7d0b6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; @@ -66,6 +67,7 @@ import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; /** @@ -139,7 +141,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicOnheapTwoBackup() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, - ONHEAP_TIERED); + ONHEAP_TIERED, PRIMARY_SYNC); doOrderingTest(ccfg, false); } @@ -149,7 +151,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicOffheapTwoBackup() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, - OFFHEAP_TIERED); + OFFHEAP_TIERED, PRIMARY_SYNC); doOrderingTest(ccfg, false); } @@ -159,7 +161,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicOffheapValuesTwoBackup() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, - OFFHEAP_VALUES); + OFFHEAP_VALUES, PRIMARY_SYNC); doOrderingTest(ccfg, false); } @@ -169,7 +171,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicReplicatedOffheap() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, - OFFHEAP_TIERED); + OFFHEAP_TIERED, PRIMARY_SYNC); doOrderingTest(ccfg, false); } @@ -179,7 +181,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testTxOnheapTwoBackup() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, - ONHEAP_TIERED); + ONHEAP_TIERED, FULL_SYNC); doOrderingTest(ccfg, false); } @@ -189,7 +191,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testTxOnheapWithoutBackup() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, - ONHEAP_TIERED); + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapWithoutBackupFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, FULL_SYNC); doOrderingTest(ccfg, false); } @@ -201,7 +213,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicOnheapTwoBackupAsync() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, - ONHEAP_TIERED); + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + ONHEAP_TIERED, FULL_SYNC); doOrderingTest(ccfg, true); } @@ -211,7 +233,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicOffheapTwoBackupAsync() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, - OFFHEAP_TIERED); + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_TIERED, FULL_SYNC); doOrderingTest(ccfg, true); } @@ -221,7 +253,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicOffheapValuesTwoBackupAsync() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, - OFFHEAP_VALUES); + OFFHEAP_VALUES, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_VALUES, FULL_SYNC); doOrderingTest(ccfg, true); } @@ -231,7 +273,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicReplicatedAsync() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, - ONHEAP_TIERED); + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + ONHEAP_TIERED, FULL_SYNC); doOrderingTest(ccfg, true); } @@ -241,7 +293,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicReplicatedOffheapAsync() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, - OFFHEAP_TIERED); + OFFHEAP_TIERED, PRIMARY_SYNC); doOrderingTest(ccfg, true); } @@ -251,7 +303,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testAtomicOnheapWithoutBackupAsync() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, ATOMIC, - ONHEAP_TIERED); + ONHEAP_TIERED, PRIMARY_SYNC); doOrderingTest(ccfg, true); } @@ -261,7 +313,7 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testTxOnheapTwoBackupAsync() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, - ONHEAP_TIERED); + ONHEAP_TIERED, PRIMARY_SYNC); doOrderingTest(ccfg, true); } @@ -271,7 +323,17 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes */ public void testTxOnheapAsync() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, - ONHEAP_TIERED); + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, FULL_SYNC); doOrderingTest(ccfg, true); } @@ -540,20 +602,23 @@ public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTes * @param backups Number of backups. * @param atomicityMode Cache atomicity mode. * @param memoryMode Cache memory mode. + * @param writeMode Cache write mode. * @return Cache configuration. */ protected CacheConfiguration<Object, Object> cacheConfiguration( CacheMode cacheMode, int backups, CacheAtomicityMode atomicityMode, - CacheMemoryMode memoryMode) { + CacheMemoryMode memoryMode, + CacheWriteSynchronizationMode writeMode) { CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); - ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups); + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + memoryMode + "-" + + backups); ccfg.setAtomicityMode(atomicityMode); ccfg.setCacheMode(cacheMode); ccfg.setMemoryMode(memoryMode); - ccfg.setWriteSynchronizationMode(PRIMARY_SYNC); + ccfg.setWriteSynchronizationMode(writeMode); ccfg.setAtomicWriteOrderMode(PRIMARY); if (cacheMode == PARTITIONED) http://git-wip-us.apache.org/repos/asf/ignite/blob/39fad650/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java index fa4e642..c4fcdac 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite4.java @@ -18,6 +18,9 @@ package org.apache.ignite.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryAsyncFailoverTxSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; @@ -44,6 +47,10 @@ public class IgniteCacheQuerySelfTestSuite4 extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.class); + suite.addTestSuite(CacheContinuousQueryAsyncFailoverTxSelfTest.class); + return suite; } }
