http://git-wip-us.apache.org/repos/asf/ignite/blob/7c150fee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index 62ed66f..cdf4ffd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -28,11 +28,14 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import javax.cache.Cache; import javax.cache.configuration.Factory; import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.integration.CacheLoaderException; import javax.cache.integration.CacheWriterException; @@ -40,7 +43,9 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; @@ -55,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -132,6 +138,51 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract /** * @throws Exception If failed. */ + public void testFilterAndFactoryProvided() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg); + + try { + final ContinuousQuery qry = new ContinuousQuery(); + + qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter>() { + @Override public CacheEntryEventFilter create() { + return null; + } + }); + + qry.setRemoteFilter(new CacheEntryEventSerializableFilter() { + @Override public boolean evaluate(CacheEntryEvent event) throws CacheEntryListenerException { + return false; + } + }); + + qry.setLocalListener(new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { + // No-op. + } + }); + + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + return cache.query(qry); + } + }, IgniteException.class, null); + + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ public void testAtomicClient() throws Exception { CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, @@ -576,7 +627,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract * @param deploy The place where continuous query will be started. * @throws Exception If failed. */ - private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) + protected void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) throws Exception { ignite(0).createCache(ccfg); @@ -1124,7 +1175,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract * @param store If {@code true} configures dummy cache store. * @return Cache configuration. */ - private CacheConfiguration<Object, Object> cacheConfiguration( + protected CacheConfiguration<Object, Object> cacheConfiguration( CacheMode cacheMode, int backups, CacheAtomicityMode atomicityMode, @@ -1176,7 +1227,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract /** * */ - static class QueryTestKey implements Serializable, Comparable { + public static class QueryTestKey implements Serializable, Comparable { /** */ private final Integer key; @@ -1219,12 +1270,12 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract /** * */ - static class QueryTestValue implements Serializable { + public static class QueryTestValue implements Serializable { /** */ - private final Integer val1; + protected final Integer val1; /** */ - private final String val2; + protected final String val2; /** * @param val Value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7c150fee/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java new file mode 100644 index 0000000..359dd58 --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListenerException; + +/** + * Event filter for deployment. + */ +public class CacheDeploymentEntryEventFilter implements CacheEntryEventFilter<Integer, Integer> { + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) + throws CacheEntryListenerException { + return evt.getValue() == null || evt.getValue() % 2 != 0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7c150fee/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java ---------------------------------------------------------------------- diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java new file mode 100644 index 0000000..0d6eceb --- /dev/null +++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentEntryEventFilterFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.tests.p2p; + +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEventFilter; + +/** + * Event filter factory for deployment. + */ +public class CacheDeploymentEntryEventFilterFactory implements Factory<CacheEntryEventFilter<Integer, Integer>> { + /** {@inheritDoc} */ + @Override public CacheEntryEventFilter<Integer, Integer> create() { + return new CacheDeploymentEntryEventFilter(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7c150fee/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 3cd4579..5a54415 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -70,12 +70,20 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest; import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedAtomicTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedTxTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedAtomicTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxOffheapTieredTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicOffheapTieredTest; @@ -209,6 +217,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxOffheapTieredTest.class); suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class); + suite.addTestSuite(CacheContinuousQueryFactoryFilterTest.class); + suite.addTestSuite(CacheContinuousQueryOperationP2PTest.class); + suite.addTestSuite(CacheContinuousBatchAckTest.class); + suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);
