http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index c01f636..9efc456 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -54,11 +54,13 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; @@ -166,23 +168,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param key Entry key. * @param partId Partition id. * @param updCntr Updated counter. - * @param topVer Topology version. - */ - public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, - KeyCacheObject key, - int partId, - long updCntr, - AffinityTopologyVersion topVer) { - skipUpdateEvent(lsnrs, key, partId, updCntr, true, topVer); - } - - /** - * @param lsnrs Listeners to notify. - * @param key Entry key. - * @param partId Partition id. - * @param updCntr Updated counter. - * @param topVer Topology version. * @param primary Primary. + * @param topVer Topology version. */ public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, KeyCacheObject key, @@ -241,6 +228,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param primary {@code True} if called on primary node. * @param preload Whether update happened during preloading. * @param updateCntr Update counter. + * @param fut Dht atomic future. * @param topVer Topology version. * @throws IgniteCheckedException In case of error. */ @@ -253,7 +241,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean primary, boolean preload, long updateCntr, - AffinityTopologyVersion topVer) throws IgniteCheckedException { + @Nullable GridDhtAtomicUpdateFuture fut, + AffinityTopologyVersion topVer + ) throws IgniteCheckedException { Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload); if (lsnrCol != null) { @@ -267,6 +257,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { primary, preload, updateCntr, + fut, topVer); } } @@ -282,6 +273,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param preload Whether update happened during preloading. * @param updateCntr Update counter. * @param topVer Topology version. + * @param fut Dht atomic future. * @throws IgniteCheckedException In case of error. */ public void onEntryUpdated( @@ -294,6 +286,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean primary, boolean preload, long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut, AffinityTopologyVersion topVer) throws IgniteCheckedException { @@ -347,7 +340,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, fut); } } @@ -401,7 +394,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); - lsnr.onEntryUpdated(evt, primary, recordIgniteEvt); + lsnr.onEntryUpdated(evt, primary, recordIgniteEvt, null); } } } @@ -511,7 +504,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { return executeQuery0( locLsnr, new IgniteClosure<Boolean, CacheContinuousQueryHandler>() { - @Override public CacheContinuousQueryHandler apply(Boolean aBoolean) { + @Override public CacheContinuousQueryHandler apply(Boolean v2) { return new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -800,6 +793,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * */ private class JCacheQuery { /** */ @@ -931,9 +925,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { /** * */ - private static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { + static class JCacheQueryLocalListener<K, V> implements CacheEntryUpdatedListener<K, V> { /** */ - private final CacheEntryListener<K, V> impl; + final CacheEntryListener<K, V> impl; /** */ private final IgniteLogger log; @@ -957,28 +951,28 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { try { switch (evt.getEventType()) { case CREATED: - assert impl instanceof CacheEntryCreatedListener; + assert impl instanceof CacheEntryCreatedListener : evt; ((CacheEntryCreatedListener<K, V>)impl).onCreated(singleton(evt)); break; case UPDATED: - assert impl instanceof CacheEntryUpdatedListener; + assert impl instanceof CacheEntryUpdatedListener : evt; ((CacheEntryUpdatedListener<K, V>)impl).onUpdated(singleton(evt)); break; case REMOVED: - assert impl instanceof CacheEntryRemovedListener; + assert impl instanceof CacheEntryRemovedListener : evt; ((CacheEntryRemovedListener<K, V>)impl).onRemoved(singleton(evt)); break; case EXPIRED: - assert impl instanceof CacheEntryExpiredListener; + assert impl instanceof CacheEntryExpiredListener : evt; ((CacheEntryExpiredListener<K, V>)impl).onExpired(singleton(evt)); @@ -1009,6 +1003,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { return evts; } + + /** + * @return {@code True} if listener should be executed in non-system thread. + */ + protected boolean async() { + return U.hasAnnotation(impl, IgniteAsyncCallback.class); + } } /** @@ -1019,7 +1020,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { private static final long serialVersionUID = 0L; /** */ - private CacheEntryEventFilter impl; + protected CacheEntryEventFilter impl; /** */ private byte types; @@ -1072,6 +1073,13 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @return {@code True} if filter should be executed in non-system thread. + */ + protected boolean async() { + return U.hasAnnotation(impl, IgniteAsyncCallback.class); + } + + /** * @param evtType Type. * @return Flag value. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java new file mode 100644 index 0000000..1e04ce6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncCallback.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.lang; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListener; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * If callback has this annotation then it will be executing in another thread. + * <p> + * Currently this annotation is supported for: + * <ol> + * <li>{@link ContinuousQuery} - {@link CacheEntryUpdatedListener} and {@link CacheEntryEventFilter}.</li> + * </ol> + * <p> + * For example, if {@link CacheEntryEventFilter filter} or {@link CacheEntryListener} + * has the annotation then callbacks will be executing to asyncCallback thread pool. It allows to use cache API + * in a callbacks. This thread pool can be configured by {@link IgniteConfiguration#setAsyncCallbackPoolSize(int)}. + * <h1 class="header">Example</h1> + * As an example, suppose we have cache with {@code 'Person'} objects and we need + * to query all persons with salary above then 1000. Also remote filter will update some entries. + * <p> + * Here is the {@code Person} class: + * <pre name="code" class="java"> + * public class Person { + * // Name. + * private String name; + * + * // Salary. + * private double salary; + * + * ... + * } + * </pre> + * <p> + * Here is the {@code ExampleCacheEntryFilter} class: + * <pre name="code" class="java"> + * @IgniteAsyncCallback + * public class ExampleCacheEntryFilter implements CacheEntryEventFilter<Integer, Person> { + * @IgniteInstanceResource + * private Ignite ignite; + * + * // Continuous listener will be notified for persons with salary above 1000. + * // Filter increases salary for some person on 100. Without @IgniteAsyncCallback annotation + * // this operation is not safe. + * public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> evt) throws CacheEntryListenerException { + * Person p = evt.getValue(); + * + * if (p.getSalary() > 1000) + * return true; + * + * ignite.cache("Person").put(evt.getKey(), new Person(p.getName(), p.getSalary() + 100)); + * + * return false; + * } + * } + * </pre> + * <p> + * Query with asynchronous callback execute as usually: + * <pre name="code" class="java"> + * // Create new continuous query. + * ContinuousQuery<Long, Person> qry = new ContinuousQuery<>(); + * + * // Callback that is called locally when update notifications are received. + * // It simply prints out information about all created persons. + * qry.setLocalListener((evts) -> { + * for (CacheEntryEvent<? extends Long, ? extends Person> e : evts) { + * Person p = e.getValue(); + * + * System.out.println(p.getFirstName() + " " + p.getLastName() + "'s salary is " + p.getSalary()); + * } + * }); + * + * // Sets remote filter. + * qry.setRemoteFilterFactory(() -> new ExampleCacheEntryFilter()); + * + * // Execute query. + * QueryCursor<Cache.Entry<Long, Person>> cur = cache.query(qry); + * </pre> + * + * @see IgniteConfiguration#getAsyncCallbackPoolSize + * @see ContinuousQuery#getRemoteFilterFactory() + * @see ContinuousQuery#getLocalListener() + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface IgniteAsyncCallback { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java index 35882b9..9f7c381 100644 --- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java +++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java @@ -17,62 +17,63 @@ package org.apache.ignite.thread; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; /** * An {@link ExecutorService} that executes submitted tasks using pooled grid threads. */ public class IgniteStripedThreadPoolExecutor implements ExecutorService { /** */ - public static final int DFLT_SEG_POOL_SIZE = 8; - - /** */ - public static final int DFLT_CONCUR_LVL = 16; - - /** */ private final ExecutorService[] execs; - /** */ - private final int segShift; - - /** */ - private final int segMask; - /** + * Create striped thread pool. * + * @param concurrentLvl Concurrency level. + * @param gridName Node name. + * @param threadNamePrefix Thread name prefix. */ - public IgniteStripedThreadPoolExecutor() { - execs = new ExecutorService[DFLT_CONCUR_LVL]; - - ThreadFactory factory = new IgniteThreadFactory(null); - - for (int i = 0; i < DFLT_CONCUR_LVL; i++) - execs[i] = Executors.newFixedThreadPool(DFLT_SEG_POOL_SIZE, factory); + public IgniteStripedThreadPoolExecutor(int concurrentLvl, String gridName, String threadNamePrefix) { + execs = new ExecutorService[concurrentLvl]; - // Find power-of-two sizes best matching arguments - int sshift = 0; - int ssize = 1; + ThreadFactory factory = new IgniteThreadFactory(gridName, threadNamePrefix); - while (ssize < DFLT_CONCUR_LVL) { - ++sshift; - - ssize <<= 1; - } + for (int i = 0; i < concurrentLvl; i++) + execs[i] = Executors.newSingleThreadExecutor(factory); + } - segShift = 32 - sshift; - segMask = ssize - 1; + /** + * Executes the given command at some time in the future. The command with the same {@code index} + * will be executed in the same thread. + * + * @param task the runnable task + * @param idx Striped index. + * @throws RejectedExecutionException if this task cannot be + * accepted for execution. + * @throws NullPointerException If command is null + */ + public void execute(Runnable task, int idx) { + execs[threadId(idx)].execute(task); + } + /** + * @param idx Index. + * @return Stripped thread ID. + */ + public int threadId(int idx) { + return idx < execs.length ? idx : idx % execs.length; } /** {@inheritDoc} */ @@ -83,7 +84,10 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { /** {@inheritDoc} */ @Override public List<Runnable> shutdownNow() { - List<Runnable> res = new LinkedList<>(); + if (execs.length == 0) + return Collections.emptyList(); + + List<Runnable> res = new ArrayList<>(execs.length); for (ExecutorService exec : execs) { for (Runnable r : exec.shutdownNow()) @@ -124,105 +128,45 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService { } /** {@inheritDoc} */ - @Override public <T> Future<T> submit(Callable<T> task) { - return execForTask(task).submit(task); + @NotNull @Override public <T> Future<T> submit(Callable<T> task) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> Future<T> submit(Runnable task, T result) { - return execForTask(task).submit(task, result); + @NotNull @Override public <T> Future<T> submit(Runnable task, T res) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public Future<?> submit(Runnable task) { - return execForTask(task).submit(task); + @NotNull @Override public Future<?> submit(Runnable task) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) - throws InterruptedException { - List<Future<T>> futs = new LinkedList<>(); - - for (Callable<T> task : tasks) - futs.add(execForTask(task).submit(task)); - - boolean done = false; - - try { - for (Future<T> fut : futs) { - try { - fut.get(); - } - catch (ExecutionException | InterruptedException ignored) { - // No-op. - } - } - - done = true; - - return futs; - } - finally { - if (!done) { - for (Future<T> fut : futs) - fut.cancel(true); - } - } + @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, - TimeUnit unit) throws InterruptedException { - throw new RuntimeException("Not implemented."); + @NotNull @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, + long timeout, + TimeUnit unit) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, - ExecutionException { - throw new RuntimeException("Not implemented."); + @NotNull @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ - @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - throw new RuntimeException("Not implemented."); + @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) { + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override public void execute(Runnable cmd) { - execForTask(cmd).execute(cmd); - } - - /** - * Applies a supplemental hash function to a given hashCode, which - * defends against poor quality hash functions. This is critical - * because ConcurrentHashMap uses power-of-two length hash tables, - * that otherwise encounter collisions for hashCodes that do not - * differ in lower or upper bits. - * - * @param h Hash code. - * @return Enhanced hash code. - */ - private int hash(int h) { - // Spread bits to regularize both segment and index locations, - // using variant of single-word Wang/Jenkins hash. - h += (h << 15) ^ 0xffffcd7d; - h ^= (h >>> 10); - h += (h << 3); - h ^= (h >>> 6); - h += (h << 2) + (h << 14); - return h ^ (h >>> 16); - } - - /** - * @param cmd Command. - * @return Service. - */ - private <T> ExecutorService execForTask(T cmd) { - assert cmd != null; - - //return execs[ThreadLocalRandom8.current().nextInt(DFLT_CONCUR_LVL)]; - return execs[(hash(cmd.hashCode()) >>> segShift) & segMask]; + throw new UnsupportedOperationException(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 6e404b4..a1153cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -26,6 +26,7 @@ import javax.cache.processor.EntryProcessorResult; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.eviction.EvictableEntry; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -540,7 +541,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr UUID subjId, String taskName, @Nullable CacheObject prevVal, - @Nullable Long updateCntr) throws IgniteCheckedException, + @Nullable Long updateCntr, + @Nullable GridDhtAtomicUpdateFuture fut) throws IgniteCheckedException, GridCacheEntryRemovedException { assert false; http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java new file mode 100644 index 0000000..62fd984 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicWriteOrderMode; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverAtomicPrimaryWriteOrderSelfTest + extends CacheContinuousQueryFailoverAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicWriteOrderMode writeOrderMode() { + return PRIMARY; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java new file mode 100644 index 0000000..4460498 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverTxReplicatedSelfTest extends CacheContinuousQueryFailoverTxSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java new file mode 100644 index 0000000..8f0bd0e --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFailoverTxSelfTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; + +/** + * + */ +public class CacheContinuousQueryAsyncFailoverTxSelfTest extends CacheContinuousQueryFailoverAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean asyncCallback() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java new file mode 100644 index 0000000..0605bc8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAsyncFilterListenerTest.java @@ -0,0 +1,986 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class CacheContinuousQueryAsyncFilterListenerTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + public static final int ITERATION_CNT = 100; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(1000); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /// + /// ASYNC FILTER AND LISTENER. TEST LISTENER. + /// + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTx() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxOffHeap() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxOffHeapJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerTxOffHeapValues() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomic() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedAtomic() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedAtomicJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedAtomicOffHeapValues() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicOffHeap() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicOffHeapValues() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicWithoutBackup() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerAtomicWithoutBackupJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListener() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicated() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInListenerReplicatedJCacheApi() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); + } + + /// + /// ASYNC FILTER AND LISTENER. TEST FILTER. + /// + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTx() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeap() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapValues() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomic() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedAtomic() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeap() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, true); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapValues() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicWithoutBackup() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilter() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicated() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedJCacheApi() throws Exception { + testNonDeadLockInFilter(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), true, true, false); + } + + /// + /// ASYNC LISTENER. TEST LISTENER. + /// + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterTxOffHeapValuesSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, OFFHEAP_VALUES), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedAtomicSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, ATOMIC, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicOffHeapValuesSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, ATOMIC, OFFHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterAtomicWithoutBackupSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 0, ATOMIC, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); + } + + /** + * @throws Exception If failed. + */ + public void testNonDeadLockInFilterReplicatedSyncFilter() throws Exception { + testNonDeadLockInListener(cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, ONHEAP_TIERED), false, true, false); + } + + /** + * @param ccfg Cache configuration. + * @param asyncFltr Async filter. + * @param asyncLsnr Async listener. + * @param jcacheApi Use JCache api for registration entry update listener. + * @throws Exception If failed. + */ + private void testNonDeadLockInListener(CacheConfiguration ccfg, + final boolean asyncFltr, + boolean asyncLsnr, + boolean jcacheApi) throws Exception { + ignite(0).createCache(ccfg); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + try { + for (int i = 0; i < ITERATION_CNT; i++) { + log.info("Start iteration: " + i); + + int nodeIdx = i % NODES; + + final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName()); + + final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1); + + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr = + new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + if (asyncFltr) { + assertFalse("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + assertTrue("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + } + } + }; + + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr = + new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); + + QueryTestValue val = e.getValue(); + + if (val == null) + return; + else if (val.equals(newVal)) { + evtFromLsnrLatch.countDown(); + + return; + } + else if (!val.equals(val0)) + return; + + Transaction tx = null; + + try { + if (cache0.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + assertEquals(val, val0); + + cache0.put(key, newVal); + + if (tx != null) + tx.commit(); + + latch.countDown(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } + } + }; + + QueryCursor qry = null; + MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = null; + + CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr) + : new CacheInvokeListener(lsnrClsr); + + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> rmtFltr = asyncFltr ? + new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr); + + if (jcacheApi) { + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(locLsnr), + FactoryBuilder.factoryOf(rmtFltr), + true, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + } + else { + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); + + conQry.setLocalListener(locLsnr); + + conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr)); + + qry = cache.query(conQry); + } + + try { + if (rnd.nextBoolean()) + cache.put(key, val0); + else { + cache.invoke(key, new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { + entry.setValue(val0); + + return null; + } + }); + } + + assertTrue("Failed to waiting event.", U.await(latch, 3, SECONDS)); + + assertEquals(cache.get(key), new QueryTestValue(2)); + + assertTrue("Failed to waiting event from listener.", U.await(latch, 3, SECONDS)); + } + finally { + if (qry != null) + qry.close(); + + if (lsnrCfg != null) + cache.deregisterCacheEntryListener(lsnrCfg); + } + + log.info("Iteration finished: " + i); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param ccfg Cache configuration. + * @param asyncFilter Async filter. + * @param asyncLsnr Async listener. + * @param jcacheApi Use JCache api for start update listener. + * @throws Exception If failed. + */ + private void testNonDeadLockInFilter(CacheConfiguration ccfg, + final boolean asyncFilter, + final boolean asyncLsnr, + boolean jcacheApi) throws Exception { + ignite(0).createCache(ccfg); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + try { + for (int i = 0; i < ITERATION_CNT; i++) { + log.info("Start iteration: " + i); + + int nodeIdx = i % NODES; + + final IgniteCache cache = grid(nodeIdx).cache(ccfg.getName()); + + final QueryTestKey key = NODES - 1 != nodeIdx ? affinityKey(cache) : new QueryTestKey(1); + + final QueryTestValue val0 = new QueryTestValue(1); + final QueryTestValue newVal = new QueryTestValue(2); + + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch evtFromLsnrLatch = new CountDownLatch(1); + + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> fltrClsr = + new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + if (asyncFilter) { + assertFalse("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + assertTrue("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + } + + IgniteCache<Object, Object> cache0 = ignite.cache(cache.getName()); + + QueryTestValue val = e.getValue(); + + if (val == null) + return; + else if (val.equals(newVal)) { + evtFromLsnrLatch.countDown(); + + return; + } + else if (!val.equals(val0)) + return; + + Transaction tx = null; + + try { + if (cache0.getConfiguration(CacheConfiguration.class) + .getAtomicityMode() == TRANSACTIONAL) + tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ); + + assertEquals(val, val0); + + cache0.put(key, newVal); + + if (tx != null) + tx.commit(); + + latch.countDown(); + } + catch (Exception exp) { + log.error("Failed: ", exp); + + throw new IgniteException(exp); + } + finally { + if (tx != null) + tx.close(); + } + } + }; + + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> lsnrClsr = + new IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>>() { + @Override public void apply(Ignite ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue> e) { + if (asyncLsnr) { + assertFalse("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + assertTrue("Failed: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + } + + QueryTestValue val = e.getValue(); + + if (val == null || !val.equals(new QueryTestValue(1))) + return; + + assertEquals(val, val0); + + latch.countDown(); + } + }; + + + QueryCursor qry = null; + MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = null; + + CacheInvokeListener locLsnr = asyncLsnr ? new CacheInvokeListenerAsync(lsnrClsr) + : new CacheInvokeListener(lsnrClsr); + + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> rmtFltr = asyncFilter ? + new CacheTestRemoteFilterAsync(fltrClsr) : new CacheTestRemoteFilter(fltrClsr); + + if (jcacheApi) { + lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + FactoryBuilder.factoryOf(locLsnr), + FactoryBuilder.factoryOf(rmtFltr), + true, + false + ); + + cache.registerCacheEntryListener(lsnrCfg); + } + else { + ContinuousQuery<QueryTestKey, QueryTestValue> conQry = new ContinuousQuery<>(); + + conQry.setLocalListener(locLsnr); + + conQry.setRemoteFilterFactory(FactoryBuilder.factoryOf(rmtFltr)); + + qry = cache.query(conQry); + } + + try { + if (rnd.nextBoolean()) + cache.put(key, val0); + else + cache.invoke(key, new CacheEntryProcessor() { + @Override public Object process(MutableEntry entry, Object... arguments) + throws EntryProcessorException { + entry.setValue(val0); + + return null; + } + }); + + assert U.await(latch, 3, SECONDS) : "Failed to waiting event."; + + assertEquals(cache.get(key), new QueryTestValue(2)); + + assertTrue("Failed to waiting event from filter.", U.await(latch, 3, SECONDS)); + } + finally { + if (qry != null) + qry.close(); + + if (lsnrCfg != null) + cache.deregisterCacheEntryListener(lsnrCfg); + } + + log.info("Iteration finished: " + i); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param cache Ignite cache. + * @return Key. + */ + private QueryTestKey affinityKey(IgniteCache cache) { + Affinity aff = affinity(cache); + + for (int i = 0; i < 10_000; i++) { + QueryTestKey key = new QueryTestKey(i); + + if (aff.isPrimary(localNode(cache), key)) + return key; + } + + throw new IgniteException("Failed to found primary key."); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TimeUnit.SECONDS.toMillis(15); + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter { + /** + * @param clsr Closure. + */ + public CacheTestRemoteFilterAsync( + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) { + super(clsr); + } + } + + /** + * + */ + private static class CacheTestRemoteFilter implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr; + + /** + * @param clsr Closure. + */ + public CacheTestRemoteFilter(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> clsr) { + this.clsr = clsr; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) + throws CacheEntryListenerException { + clsr.apply(ignite, e); + + return true; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheInvokeListenerAsync extends CacheInvokeListener { + /** + * @param clsr Closure. + */ + public CacheInvokeListenerAsync( + IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr) { + super(clsr); + } + } + + /** + * + */ + private static class CacheInvokeListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>, + CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, Serializable { + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> clsr; + + /** + * @param clsr Closure. + */ + public CacheInvokeListener(IgniteBiInClosure<Ignite, CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> clsr) { + this.clsr = clsr; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> events) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) + clsr.apply(ignite, e); + } + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> events) throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : events) + clsr.apply(ignite, e); + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue)o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/395f4738/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java new file mode 100644 index 0000000..928cfda --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryAsyncFilterRandomOperationTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.jetbrains.annotations.NotNull; + +/** + * + */ +public class CacheContinuousQueryFactoryAsyncFilterRandomOperationTest + extends CacheContinuousQueryFactoryFilterRandomOperationTest { + /** {@inheritDoc} */ + @NotNull @Override protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> + createFilterFactory() { + return new AsyncFilterFactory(); + } + + /** + * + */ + @IgniteAsyncCallback + protected static class NonSerializableAsyncFilter implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable { + /** */ + public NonSerializableAsyncFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) { + assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + + assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + return isAccepted(evt.getValue()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + fail("Entry filter should not be marshaled."); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + fail("Entry filter should not be marshaled."); + } + + /** + * @param val Value. + * @return {@code True} if value is even. + */ + public static boolean isAccepted(QueryTestValue val) { + return val == null || val.val1 % 2 == 0; + } + } + + /** + * + */ + protected static class AsyncFilterFactory implements Factory<NonSerializableAsyncFilter> { + /** {@inheritDoc} */ + @Override public NonSerializableAsyncFilter create() { + return new NonSerializableAsyncFilter(); + } + } + + /** {@inheritDoc} */ + @Override protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> noOpFilterFactory() { + return FactoryBuilder.factoryOf(NoopAsyncFilter.class); + } + + /** + * + */ + @IgniteAsyncCallback + protected static class NoopAsyncFilter implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue>, Externalizable { + /** */ + public NoopAsyncFilter() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> evt) { + assertTrue("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("callback-")); + + assertFalse("Failed. Current thread name: " + Thread.currentThread().getName(), + Thread.currentThread().getName().contains("sys-")); + + return true; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + } +}
