http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java new file mode 100644 index 0000000..dbaafe1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterRandomOperationTest.java @@ -0,0 +1,725 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import javax.cache.configuration.CacheEntryListenerConfiguration; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryExpiredListener; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.CacheQueryEntryEvent; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.jetbrains.annotations.NotNull; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterRandomOperationTest.NonSerializableFilter.isAccepted; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; +import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheContinuousQueryFactoryFilterRandomOperationTest extends CacheContinuousQueryRandomOperationsTest { + /** */ + private static final int NODES = 5; + + /** */ + private static final int KEYS = 50; + + /** */ + private static final int VALS = 10; + + /** */ + public static final int ITERATION_CNT = 40; + + /** + * @throws Exception If failed. + */ + public void testInternalQuery() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + final IgniteCache<Object, Object> cache = grid(0).createCache(ccfg); + + UUID uuid = null; + + try { + for (int i = 0; i < 10; i++) + cache.put(i, i); + + final CountDownLatch latch = new CountDownLatch(5); + + CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() { + @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { + for (Object evt : iterable) { + latch.countDown(); + + log.info("Received event: " + evt); + } + } + }; + + uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries() + .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true); + + for (int i = 10; i < 20; i++) + cache.put(i, i); + + assertTrue(latch.await(3, SECONDS)); + } + finally { + if (uuid != null) + grid(0).context().cache().cache(cache.getName()).context().continuousQueries() + .cancelInternalQuery(uuid); + + grid(0).destroyCache(ccfg.getName()); + } + } + + /** {@inheritDoc} */ + @Override protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) + throws Exception { + ignite(0).createCache(ccfg); + + try { + long seed = System.currentTimeMillis(); + + Random rnd = new Random(seed); + + log.info("Random seed: " + seed); + + List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>(); + + Collection<QueryCursor<?>> curs = new ArrayList<>(); + + Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>(); + + if (deploy == CLIENT) + evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean())); + else if (deploy == SERVER) + evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs, + rnd.nextBoolean())); + else { + boolean isSync = rnd.nextBoolean(); + + for (int i = 0; i < NODES - 1; i++) + evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync)); + } + + ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); + + Map<Integer, Long> partCntr = new ConcurrentHashMap<>(); + + try { + for (int i = 0; i < ITERATION_CNT; i++) { + if (i % 10 == 0) + log.info("Iteration: " + i); + + for (int idx = 0; idx < NODES; idx++) + randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName())); + } + } + finally { + for (QueryCursor<?> cur : curs) + cur.close(); + + for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs) + grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2()); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param cacheName Cache name. + * @param nodeIdx Node index. + * @param curs Cursors. + * @param lsnrCfgs Listener configurations. + * @return Event queue + */ + private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName, + int nodeIdx, + Collection<QueryCursor<?>> curs, + Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs, + boolean sync) { + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); + + if (ThreadLocalRandom.current().nextBoolean()) { + MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = + new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(new LocalNonSerialiseListener() { + @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }), + createFilterFactory(), + true, + sync + ); + + grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg); + + lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg)); + } + else { + ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + for (CacheEntryEvent<?, ?> evt : evts) + evtsQueue.add(evt); + } + }); + + qry.setRemoteFilterFactory(createFilterFactory()); + + QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry); + + curs.add(cur); + } + + return evtsQueue; + } + + /** + * @return Filter factory. + */ + @NotNull protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> createFilterFactory() { + return new FilterFactory(); + } + + /** + * @param rnd Random generator. + * @param evtsQueues Events queue. + * @param expData Expected cache data. + * @param partCntr Partition counter. + * @param cache Cache. + * @throws Exception If failed. + */ + private void randomUpdate( + Random rnd, + List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, + ConcurrentMap<Object, Object> expData, + Map<Integer, Long> partCntr, + IgniteCache<Object, Object> cache) + throws Exception { + Object key = new QueryTestKey(rnd.nextInt(KEYS)); + Object newVal = value(rnd); + Object oldVal = expData.get(key); + + int op = rnd.nextInt(11); + + Ignite ignite = cache.unwrap(Ignite.class); + + Transaction tx = null; + + if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) + tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); + + try { + // log.info("Random operation [key=" + key + ", op=" + op + ']'); + + switch (op) { + case 0: { + cache.put(key, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 1: { + cache.getAndPut(key, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 2: { + cache.remove(key); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 3: { + cache.getAndRemove(key); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 4: { + cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 5: { + cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); + + expData.remove(key); + + break; + } + + case 6: { + cache.putIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 7: { + cache.getAndPutIfAbsent(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal == null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 8: { + cache.replace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 9: { + cache.getAndReplace(key, newVal); + + if (tx != null) + tx.commit(); + + if (oldVal != null) { + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueues); + + break; + } + + case 10: { + if (oldVal != null) { + Object replaceVal = value(rnd); + + boolean success = replaceVal.equals(oldVal); + + if (success) { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + updatePartitionCounter(cache, key, partCntr); + + waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); + + expData.put(key, newVal); + } + else { + cache.replace(key, replaceVal, newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(evtsQueues); + } + } + else { + cache.replace(key, value(rnd), newVal); + + if (tx != null) + tx.commit(); + + checkNoEvent(evtsQueues); + } + + break; + } + + default: + fail("Op:" + op); + } + } + finally { + if (tx != null) + tx.close(); + } + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionIsolation}. + */ + private TransactionIsolation txRandomIsolation(Random rnd) { + int val = rnd.nextInt(3); + + if (val == 0) + return READ_COMMITTED; + else if (val == 1) + return REPEATABLE_READ; + else + return SERIALIZABLE; + } + + /** + * @param rnd {@link Random}. + * @return {@link TransactionConcurrency}. + */ + private TransactionConcurrency txRandomConcurrency(Random rnd) { + return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; + } + + /** + * @param cache Cache. + * @param key Key + * @param cntrs Partition counters. + */ + private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) { + Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName()); + + int part = aff.partition(key); + + Long partCntr = cntrs.get(part); + + if (partCntr == null) + partCntr = 0L; + + cntrs.put(part, ++partCntr); + } + + /** + * @param rnd Random generator. + * @return Cache value. + */ + private static Object value(Random rnd) { + return new QueryTestValue(rnd.nextInt(VALS)); + } + + /** + * @param evtsQueues Event queue. + * @param partCntrs Partition counters. + * @param aff Affinity function. + * @param key Key. + * @param val Value. + * @param oldVal Old value. + * @throws Exception If failed. + */ + private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, + Map<Integer, Long> partCntrs, + Affinity<Object> aff, + Object key, + Object val, + Object oldVal) + throws Exception { + if ((val == null && oldVal == null + || (val != null && !isAccepted((QueryTestValue)val)))) { + checkNoEvent(evtsQueues); + + return; + } + + for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { + CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); + + assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt); + assertEquals(key, evt.getKey()); + assertEquals(val, evt.getValue()); + assertEquals(oldVal, evt.getOldValue()); + + long cntr = partCntrs.get(aff.partition(key)); + CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class); + + assertNotNull(cntr); + assertNotNull(qryEntryEvt); + + assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); + } + } + + /** + * @param evtsQueues Event queue. + * @throws Exception If failed. + */ + private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception { + for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { + CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); + + assertNull(evt); + } + } + + /** + * + */ + protected static class NonSerializableFilter + implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey, + CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable { + /** */ + public NonSerializableFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) { + return isAccepted(evt.getValue()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + fail("Entry filter should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fail("Entry filter should not be marshaled."); + } + + /** + * @param val Value. + * @return {@code True} if value is even. + */ + public static boolean isAccepted(QueryTestValue val) { + return val == null || val.val1 % 2 == 0; + } + } + + /** + * + */ + protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer> { + /** */ + public SerializableFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) + throws CacheEntryListenerException { + return isAccepted(evt.getValue()); + } + + /** + * @return {@code True} if value is even. + */ + public static boolean isAccepted(Integer val) { + return val == null || val % 2 == 0; + } + } + + /** + * + */ + protected static class FilterFactory implements Factory<NonSerializableFilter> { + /** {@inheritDoc} */ + @Override public NonSerializableFilter create() { + return new NonSerializableFilter(); + } + } + + /** + * + */ + public abstract class LocalNonSerialiseListener implements + CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>, + CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, + CacheEntryExpiredListener<QueryTestKey, QueryTestValue>, + CacheEntryRemovedListener<QueryTestKey, QueryTestValue>, + Externalizable { + /** */ + public LocalNonSerialiseListener() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) throws CacheEntryListenerException { + onEvents(evts); + } + + /** + * @param evts Events. + */ + protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts); + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + throw new UnsupportedOperationException("Failed. Listener should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled."); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java deleted file mode 100644 index 55340d5..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java +++ /dev/null @@ -1,714 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import javax.cache.configuration.CacheEntryListenerConfiguration; -import javax.cache.configuration.Factory; -import javax.cache.configuration.FactoryBuilder; -import javax.cache.configuration.MutableCacheEntryListenerConfiguration; -import javax.cache.event.CacheEntryCreatedListener; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryExpiredListener; -import javax.cache.event.CacheEntryListenerException; -import javax.cache.event.CacheEntryRemovedListener; -import javax.cache.event.CacheEntryUpdatedListener; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; -import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.query.CacheQueryEntryEvent; -import org.apache.ignite.cache.query.ContinuousQuery; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.transactions.Transaction; -import org.apache.ignite.transactions.TransactionConcurrency; -import org.apache.ignite.transactions.TransactionIsolation; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; -import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; -import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; - -/** - * - */ -public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest { - /** */ - private static final int NODES = 5; - - /** */ - private static final int KEYS = 50; - - /** */ - private static final int VALS = 10; - - /** */ - public static final int ITERATION_CNT = 40; - - /** - * @throws Exception If failed. - */ - public void testInternalQuery() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, - 1, - ATOMIC, - ONHEAP_TIERED, - false); - - final IgniteCache<Object, Object> cache = grid(0).createCache(ccfg); - - UUID uuid = null; - - try { - for (int i = 0; i < 10; i++) - cache.put(i, i); - - final CountDownLatch latch = new CountDownLatch(5); - - CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - for (Object evt : iterable) { - latch.countDown(); - - log.info("Received event: " + evt); - } - } - }; - - uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries() - .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true); - - for (int i = 10; i < 20; i++) - cache.put(i, i); - - assertTrue(latch.await(3, SECONDS)); - } - finally { - if (uuid != null) - grid(0).context().cache().cache(cache.getName()).context().continuousQueries() - .cancelInternalQuery(uuid); - - grid(0).destroyCache(ccfg.getName()); - } - } - - /** {@inheritDoc} */ - @Override protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) - throws Exception { - ignite(0).createCache(ccfg); - - try { - long seed = System.currentTimeMillis(); - - Random rnd = new Random(seed); - - log.info("Random seed: " + seed); - - List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>(); - - Collection<QueryCursor<?>> curs = new ArrayList<>(); - - Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>(); - - if (deploy == CLIENT) - evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean())); - else if (deploy == SERVER) - evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs, - rnd.nextBoolean())); - else { - boolean isSync = rnd.nextBoolean(); - - for (int i = 0; i < NODES - 1; i++) - evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync)); - } - - ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); - - Map<Integer, Long> partCntr = new ConcurrentHashMap<>(); - - try { - for (int i = 0; i < ITERATION_CNT; i++) { - if (i % 10 == 0) - log.info("Iteration: " + i); - - for (int idx = 0; idx < NODES; idx++) - randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName())); - } - } - finally { - for (QueryCursor<?> cur : curs) - cur.close(); - - for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs) - grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2()); - } - } - finally { - ignite(0).destroyCache(ccfg.getName()); - } - } - - /** - * @param cacheName Cache name. - * @param nodeIdx Node index. - * @param curs Cursors. - * @param lsnrCfgs Listener configurations. - * @return Event queue - */ - private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName, - int nodeIdx, - Collection<QueryCursor<?>> curs, - Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs, - boolean sync) { - final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); - - if (ThreadLocalRandom.current().nextBoolean()) { - MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = - new MutableCacheEntryListenerConfiguration<>( - FactoryBuilder.factoryOf(new LocalNonSerialiseListener() { - @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }), - new FilterFactory(), - true, - sync - ); - - grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg); - - lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg)); - } - else { - ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); - - qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); - - qry.setRemoteFilterFactory(new FilterFactory()); - - QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry); - - curs.add(cur); - } - - return evtsQueue; - } - - /** - * @param rnd Random generator. - * @param evtsQueues Events queue. - * @param expData Expected cache data. - * @param partCntr Partition counter. - * @param cache Cache. - * @throws Exception If failed. - */ - private void randomUpdate( - Random rnd, - List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, - ConcurrentMap<Object, Object> expData, - Map<Integer, Long> partCntr, - IgniteCache<Object, Object> cache) - throws Exception { - Object key = new QueryTestKey(rnd.nextInt(KEYS)); - Object newVal = value(rnd); - Object oldVal = expData.get(key); - - int op = rnd.nextInt(11); - - Ignite ignite = cache.unwrap(Ignite.class); - - Transaction tx = null; - - if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) - tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); - - try { - // log.info("Random operation [key=" + key + ", op=" + op + ']'); - - switch (op) { - case 0: { - cache.put(key, newVal); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - - break; - } - - case 1: { - cache.getAndPut(key, newVal); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - - break; - } - - case 2: { - cache.remove(key); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); - - expData.remove(key); - - break; - } - - case 3: { - cache.getAndRemove(key); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); - - expData.remove(key); - - break; - } - - case 4: { - cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - - break; - } - - case 5: { - cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); - - expData.remove(key); - - break; - } - - case 6: { - cache.putIfAbsent(key, newVal); - - if (tx != null) - tx.commit(); - - if (oldVal == null) { - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); - - expData.put(key, newVal); - } - else - checkNoEvent(evtsQueues); - - break; - } - - case 7: { - cache.getAndPutIfAbsent(key, newVal); - - if (tx != null) - tx.commit(); - - if (oldVal == null) { - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); - - expData.put(key, newVal); - } - else - checkNoEvent(evtsQueues); - - break; - } - - case 8: { - cache.replace(key, newVal); - - if (tx != null) - tx.commit(); - - if (oldVal != null) { - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - } - else - checkNoEvent(evtsQueues); - - break; - } - - case 9: { - cache.getAndReplace(key, newVal); - - if (tx != null) - tx.commit(); - - if (oldVal != null) { - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - } - else - checkNoEvent(evtsQueues); - - break; - } - - case 10: { - if (oldVal != null) { - Object replaceVal = value(rnd); - - boolean success = replaceVal.equals(oldVal); - - if (success) { - cache.replace(key, replaceVal, newVal); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - } - else { - cache.replace(key, replaceVal, newVal); - - if (tx != null) - tx.commit(); - - checkNoEvent(evtsQueues); - } - } - else { - cache.replace(key, value(rnd), newVal); - - if (tx != null) - tx.commit(); - - checkNoEvent(evtsQueues); - } - - break; - } - - default: - fail("Op:" + op); - } - } finally { - if (tx != null) - tx.close(); - } - } - - /** - * @param rnd {@link Random}. - * @return {@link TransactionIsolation}. - */ - private TransactionIsolation txRandomIsolation(Random rnd) { - int val = rnd.nextInt(3); - - if (val == 0) - return READ_COMMITTED; - else if (val == 1) - return REPEATABLE_READ; - else - return SERIALIZABLE; - } - - /** - * @param rnd {@link Random}. - * @return {@link TransactionConcurrency}. - */ - private TransactionConcurrency txRandomConcurrency(Random rnd) { - return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; - } - - /** - * @param cache Cache. - * @param key Key - * @param cntrs Partition counters. - */ - private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) { - Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName()); - - int part = aff.partition(key); - - Long partCntr = cntrs.get(part); - - if (partCntr == null) - partCntr = 0L; - - cntrs.put(part, ++partCntr); - } - - /** - * @param rnd Random generator. - * @return Cache value. - */ - private static Object value(Random rnd) { - return new QueryTestValue(rnd.nextInt(VALS)); - } - - /** - * @param evtsQueues Event queue. - * @param partCntrs Partition counters. - * @param aff Affinity function. - * @param key Key. - * @param val Value. - * @param oldVal Old value. - * @throws Exception If failed. - */ - private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, - Map<Integer, Long> partCntrs, - Affinity<Object> aff, - Object key, - Object val, - Object oldVal) - throws Exception { - if ((val == null && oldVal == null - || (val != null && !isAccepted((QueryTestValue)val)))) { - checkNoEvent(evtsQueues); - - return; - } - - for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { - CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); - - assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt); - assertEquals(key, evt.getKey()); - assertEquals(val, evt.getValue()); - assertEquals(oldVal, evt.getOldValue()); - - long cntr = partCntrs.get(aff.partition(key)); - CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class); - - assertNotNull(cntr); - assertNotNull(qryEntryEvt); - - assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); - } - } - - /** - * @param evtsQueues Event queue. - * @throws Exception If failed. - */ - private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception { - for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { - CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); - - assertNull(evt); - } - } - - /** - * - */ - protected static class NonSerializableFilter - implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey, - CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable { - /** */ - public NonSerializableFilter() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) - throws CacheEntryListenerException { - return isAccepted(event.getValue()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - fail("Entry filter should not be marshaled."); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - fail("Entry filter should not be marshaled."); - } - - /** - * @return {@code True} if value is even. - */ - public static boolean isAccepted(QueryTestValue val) { - return val == null || val.val1 % 2 == 0; - } - } - - /** - * - */ - protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer>{ - /** */ - public SerializableFilter() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event) - throws CacheEntryListenerException { - return isAccepted(event.getValue()); - } - - /** - * @return {@code True} if value is even. - */ - public static boolean isAccepted(Integer val) { - return val == null || val % 2 == 0; - } - } - - /** - * - */ - protected static class FilterFactory implements Factory<NonSerializableFilter> { - @Override public NonSerializableFilter create() { - return new NonSerializableFilter(); - } - } - - /** - * - */ - public abstract class LocalNonSerialiseListener implements - CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>, - CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, - CacheEntryExpiredListener<QueryTestKey, QueryTestValue>, - CacheEntryRemovedListener<QueryTestKey, QueryTestValue>, - Externalizable { - /** */ - public LocalNonSerialiseListener() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - onEvents(evts); - } - - /** {@inheritDoc} */ - @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - onEvents(evts); - } - - /** {@inheritDoc} */ - @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - onEvents(evts); - } - - /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - onEvents(evts); - } - - /** - * @param evts Events. - */ - protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts); - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - throw new UnsupportedOperationException("Failed. Listener should not be marshaled."); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled."); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 4226537..083367c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -99,7 +100,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; -import static org.apache.ignite.cache.CacheMemoryMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -167,6 +168,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } /** + * @return Async callback flag. + */ + protected boolean asyncCallback() { + return false; + } + + /** * @return Near cache configuration. */ protected NearCacheConfiguration nearCacheConfiguration() { @@ -476,7 +484,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC for (int j = 0; j < 50; ++j) { ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() + : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -560,7 +569,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -721,7 +730,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -841,7 +850,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Affinity<Object> aff = qryClient.affinity(null); - CacheEventListener1 lsnr = new CacheEventListener1(false); + CacheEventListener1 lsnr = asyncCallback() ? new CacheEventAsyncListener1(false) + : new CacheEventListener1(false); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -1545,7 +1555,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC qry.setLocalListener(lsnr); - qry.setRemoteFilter(new CacheEventFilter()); + qry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); QueryCursor<?> cur = qryClnCache.query(qry); @@ -1639,7 +1649,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC newQry.setLocalListener(dinLsnr); - newQry.setRemoteFilter(new CacheEventFilter()); + newQry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); dinQry = qryClnCache.query(newQry); @@ -1786,7 +1796,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null); - final CacheEventListener2 lsnr = new CacheEventListener2(); + final CacheEventListener2 lsnr = asyncCallback() ? new CacheEventAsyncListener2() : new CacheEventListener2(); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -2144,6 +2154,19 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncListener1 extends CacheEventListener1 { + /** + * @param saveAll Save all events flag. + */ + CacheEventAsyncListener1(boolean saveAll) { + super(saveAll); + } + } + + /** + * + */ private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> { /** */ private volatile CountDownLatch latch; @@ -2208,6 +2231,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncListener2 extends CacheEventListener2 { + // No-op. + } + + /** + * + */ private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> { /** */ @LoggerResource @@ -2275,6 +2306,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + public static class CacheEventAsyncListener3 extends CacheEventListener3 { + // No-op. + } + + /** + * + */ public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object, Object>, CacheEntryEventSerializableFilter<Object, Object> { /** Keys. */ @@ -2303,6 +2342,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncFilter extends CacheEventFilter { + // No-op. + } + + /** + * + */ public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> { /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java index 025dd80..b469a86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java @@ -60,6 +60,12 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes super.beforeTest(); startGridsMultiThreaded(2); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == 2; + } + }, 10000L); } /** {@inheritDoc} */ @@ -140,6 +146,14 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes // node2 now becomes the primary for the key. stopGrid(0); + final int prevSize = grid(1).cluster().nodes().size(); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return prevSize - 1 == grid(1).cluster().nodes().size(); + } + }, 5000L); + cache2.put(key, "2"); // Sanity check. http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java new file mode 100644 index 0000000..0d027a9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * + */ +public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbstractTest { + /** */ + public static final int KEYS = 10; + + /** */ + public static final int KEYS_FROM_CALLBACK = 20; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + public static final int ITERATION_CNT = 20; + + /** */ + public static final int SYSTEM_POOL_SIZE = 10; + + /** */ + private boolean client; + + /** */ + private static AtomicInteger filterCbCntr = new AtomicInteger(0); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setSystemThreadPoolSize(SYSTEM_POOL_SIZE); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(100); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + filterCbCntr.set(0); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicTwoBackups() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, FULL_SYNC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackupsFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackupsFilterPrimary() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, PRIMARY_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedPrimary() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, PRIMARY_SYNC); + + doTest(ccfg, true); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + protected void doTest(final CacheConfiguration ccfg, boolean fromLsnr) throws Exception { + ignite(0).createCache(ccfg); + + List<QueryCursor<?>> qries = new ArrayList<>(); + + assertEquals(0, filterCbCntr.get()); + + try { + List<Set<T2<QueryTestKey, QueryTestValue>>> rcvdEvts = new ArrayList<>(NODES); + List<Set<T2<QueryTestKey, QueryTestValue>>> evtsFromCallbacks = new ArrayList<>(NODES); + + final AtomicInteger qryCntr = new AtomicInteger(0); + + final AtomicInteger cbCntr = new AtomicInteger(0); + + final int threadCnt = SYSTEM_POOL_SIZE * 2; + + for (int idx = 0; idx < NODES; idx++) { + Set<T2<QueryTestKey, QueryTestValue>> evts = Collections. + newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, QueryTestValue>, Boolean>()); + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb = Collections. + newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, QueryTestValue>, Boolean>()); + + IgniteCache<Object, Object> cache = grid(idx).getOrCreateCache(ccfg.getName()); + + ContinuousQuery qry = new ContinuousQuery(); + + qry.setLocalListener(new TestCacheAsyncEventListener(evts, evtsFromCb, + fromLsnr ? cache : null, qryCntr, cbCntr)); + + if (!fromLsnr) + qry.setRemoteFilterFactory( + FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(ccfg.getName()))); + + rcvdEvts.add(evts); + evtsFromCallbacks.add(evtsFromCb); + + QueryCursor qryCursor = cache.query(qry); + + qries.add(qryCursor); + } + + IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < ITERATION_CNT; i++) { + IgniteCache<QueryTestKey, QueryTestValue> cache = + grid(rnd.nextInt(NODES)).cache(ccfg.getName()); + + QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS)); + + boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == + TRANSACTIONAL && rnd.nextBoolean(); + + Transaction tx = null; + + if (startTx) + tx = cache.unwrap(Ignite.class).transactions().txStart(); + + try { + if ((cache.get(key) == null) || rnd.nextBoolean()) + cache.invoke(key, new IncrementTestEntryProcessor()); + else { + QueryTestValue val; + QueryTestValue newVal; + + do { + val = cache.get(key); + + newVal = val == null ? + new QueryTestValue(0) : new QueryTestValue(val.val1 + 1); + } + while (!cache.replace(key, val, newVal)); + } + } + finally { + if (tx != null) + tx.commit(); + } + } + } + }, threadCnt, "put-thread"); + + f.get(30, TimeUnit.SECONDS); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return qryCntr.get() >= ITERATION_CNT * threadCnt * NODES; + } + }, TimeUnit.MINUTES.toMillis(2)); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : rcvdEvts) + checkEvents(set, ITERATION_CNT * threadCnt, grid(0).cache(ccfg.getName()), false); + + if (fromLsnr) { + final int expCnt = qryCntr.get() * NODES * KEYS_FROM_CALLBACK; + + boolean res = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cbCntr.get() >= expCnt; + } + }, TimeUnit.SECONDS.toMillis(60)); + + assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get() + "]", res); + + assertEquals(expCnt, cbCntr.get()); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks) + checkEvents(set, qryCntr.get() * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true); + } + else { + final int expInvkCnt = ITERATION_CNT * threadCnt * + (ccfg.getCacheMode() != REPLICATED ? (ccfg.getBackups() + 1) : NODES - 1) * NODES; + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return filterCbCntr.get() >= expInvkCnt; + } + }, TimeUnit.SECONDS.toMillis(60)); + + assertEquals(expInvkCnt, filterCbCntr.get()); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks) + checkEvents(set, expInvkCnt * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true); + } + } + finally { + for (QueryCursor<?> qry : qries) + qry.close(); + + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param expCnt Expected count. + * @param cache Cache. + * @param set Received events. + * @throws Exception If failed. + */ + private void checkEvents(final Set<T2<QueryTestKey, QueryTestValue>> set, final int expCnt, IgniteCache cache, + boolean cb) throws Exception { + assertTrue("Expected size: " + expCnt + ", actual: " + set.size(), GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return set.size() >= expCnt; + } + }, 10000L)); + + int startKey = cb ? KEYS : 0; + int endKey = cb ? KEYS + KEYS_FROM_CALLBACK : KEYS; + + for (int i = startKey; i < endKey; i++) { + QueryTestKey key = new QueryTestKey(i); + + QueryTestValue maxVal = (QueryTestValue)cache.get(key); + + for (int val = 0; val <= maxVal.val1; val++) + assertTrue(set.remove(new T2<>(key, new QueryTestValue(val)))); + } + + assertTrue(set.isEmpty()); + } + + /** + * + */ + private static class IncrementTestEntryProcessor implements + CacheEntryProcessor<QueryTestKey, QueryTestValue, Object> { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<QueryTestKey, QueryTestValue> entry, Object... arguments) + throws EntryProcessorException { + if (entry.exists()) + entry.setValue(new QueryTestValue(entry.getValue().val1 + 1)); + else + entry.setValue(new QueryTestValue(0)); + + return null; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private String cacheName; + + /** + * @param cacheName Cache name. + */ + public CacheTestRemoteFilterAsync(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) + throws CacheEntryListenerException { + if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) { + IgniteCache<QueryTestKey, QueryTestValue> cache = ignite.cache(cacheName); + + if (ThreadLocalRandom.current().nextBoolean()) { + Set<QueryTestKey> keys = new LinkedHashSet<>(); + + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + keys.add(new QueryTestKey(key)); + + cache.invokeAll(keys, new IncrementTestEntryProcessor()); + } + else { + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); + } + + filterCbCntr.incrementAndGet(); + } + + return true; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class TestCacheAsyncEventListener + implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> { + /** */ + private final Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts; + + /** */ + private final AtomicInteger cntr; + + /** */ + private final AtomicInteger cbCntr; + + /** */ + private final Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb; + + /** */ + private IgniteCache<QueryTestKey, QueryTestValue> cache; + + /** + * @param rcvsEvts Set for received events. + * @param evtsFromCb Set for received events. + * @param cache Ignite cache. + * @param cntr Received events counter. + * @param cbCntr Received events counter from callbacks. + */ + public TestCacheAsyncEventListener(Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts, + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb, + @Nullable IgniteCache cache, + AtomicInteger cntr, + AtomicInteger cbCntr) { + this.rcvsEvts = rcvsEvts; + this.evtsFromCb = evtsFromCb; + this.cache = cache; + this.cntr = cntr; + this.cbCntr = cbCntr; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : evts) { + if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) { + rcvsEvts.add(new T2<>(e.getKey(), e.getValue())); + + cntr.incrementAndGet(); + + if (cache != null) { + if (ThreadLocalRandom.current().nextBoolean()) { + Set<QueryTestKey> keys = new LinkedHashSet<>(); + + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + keys.add(new QueryTestKey(key)); + + cache.invokeAll(keys, new IncrementTestEntryProcessor()); + } + else { + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); + } + } + } + else { + evtsFromCb.add(new T2<>(e.getKey(), e.getValue())); + + cbCntr.incrementAndGet(); + } + } + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param writeMode Write sync mode. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheWriteSynchronizationMode writeMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + writeMode + "-" + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(writeMode); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +}
