IGNITE-143 - Continuous queries refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f8f0699d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f8f0699d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f8f0699d Branch: refs/heads/ignite-143 Commit: f8f0699d1dc484a24f41f651377fed729652304e Parents: 0751bcf Author: Valentin Kulichenko <vkuliche...@gridgain.com> Authored: Wed Feb 11 10:36:02 2015 -0800 Committer: Valentin Kulichenko <vkuliche...@gridgain.com> Committed: Wed Feb 11 10:36:02 2015 -0800 ---------------------------------------------------------------------- .../datagrid/CacheContinuousQueryExample.java | 53 +- .../cache/query/CacheContinuousQuery.java | 284 ------ .../cache/query/CacheContinuousQueryEntry.java | 49 - .../ignite/cache/query/ContinuousQuery.java | 103 +- .../org/apache/ignite/cache/query/Query.java | 9 + .../ignite/events/CacheQueryExecutedEvent.java | 9 +- .../ignite/events/CacheQueryReadEvent.java | 9 +- .../processors/cache/CacheEntryEvent.java | 75 -- .../processors/cache/GridCacheContext.java | 6 +- .../processors/cache/GridCacheProcessor.java | 11 +- .../processors/cache/IgniteCacheProxy.java | 112 +++ .../CacheDataStructuresManager.java | 44 +- .../processors/cache/query/CacheQueries.java | 10 - .../cache/query/GridCacheQueriesImpl.java | 5 - .../cache/query/GridCacheQueriesProxy.java | 12 - .../continuous/CacheContinuousQueryEntry.java | 256 +++++ .../continuous/CacheContinuousQueryEvent.java | 81 ++ .../CacheContinuousQueryFilterEx.java | 31 + .../continuous/CacheContinuousQueryHandler.java | 484 +++++++++ .../CacheContinuousQueryListener.java | 41 + .../continuous/CacheContinuousQueryManager.java | 619 ++++++++++++ .../GridCacheContinuousQueryAdapter.java | 318 ------ .../GridCacheContinuousQueryEntry.java | 766 -------------- .../GridCacheContinuousQueryFilterEx.java | 33 - .../GridCacheContinuousQueryHandler.java | 571 ----------- .../GridCacheContinuousQueryListener.java | 41 - .../GridCacheContinuousQueryManager.java | 784 --------------- .../service/GridServiceProcessor.java | 70 +- .../optimized/optimized-classnames.properties | 4 +- ...ridCacheContinuousQueryAbstractSelfTest.java | 997 ++++--------------- ...dCacheContinuousQueryReplicatedSelfTest.java | 95 +- .../GridContinuousOperationsLoadTest.java | 54 +- .../loadtests/hashmap/GridCacheTestContext.java | 2 +- .../hadoop/jobtracker/GridHadoopJobTracker.java | 49 +- 34 files changed, 2120 insertions(+), 3967 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java index ce05988..26fd2d2 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java @@ -18,11 +18,10 @@ package org.apache.ignite.examples.datagrid; import org.apache.ignite.*; -import org.apache.ignite.cache.*; import org.apache.ignite.cache.query.*; -import org.apache.ignite.lang.*; -import java.util.*; +import javax.cache.*; +import javax.cache.event.*; /** * This examples demonstrates continuous query API. @@ -48,46 +47,40 @@ public class CacheContinuousQueryExample { System.out.println(); System.out.println(">>> Cache continuous query example started."); - GridCache<Integer, String> cache = ignite.cache(CACHE_NAME); + IgniteCache<Integer, String> cache = ignite.jcache(CACHE_NAME); // Clean up caches on all nodes before run. - cache.clear(0); + cache.clear(); int keyCnt = 20; for (int i = 0; i < keyCnt; i++) - cache.putx(i, Integer.toString(i)); + cache.put(i, Integer.toString(i)); // Create new continuous query. - try (CacheContinuousQuery<Integer, String> qry = cache.queries().createContinuousQuery()) { - // Callback that is called locally when update notifications are received. - qry.localCallback( - new IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<Integer, String>>>() { - @Override public boolean apply( - UUID nodeId, - Collection<CacheContinuousQueryEntry<Integer, String>> entries - ) { - for (CacheContinuousQueryEntry<Integer, String> e : entries) - System.out.println("Queried entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + ContinuousQuery<Integer, String> qry = Query.continuous(); - return true; // Return true to continue listening. - } - }); + // Callback that is called locally when update notifications are received. + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) + System.out.println("Queried entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + } + }); - // This filter will be evaluated remotely on all nodes - // Entry that pass this filter will be sent to the caller. - qry.remoteFilter(new IgnitePredicate<CacheContinuousQueryEntry<Integer, String>>() { - @Override public boolean apply(CacheContinuousQueryEntry<Integer, String> e) { - return e.getKey() > 15; - } - }); - - // Execute query. - qry.execute(); + // This filter will be evaluated remotely on all nodes. + // Entry that pass this filter will be sent to the caller. + qry.setRemoteFilter(new CacheEntryEventFilter<Integer, String>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { + return e.getKey() > 15; + } + }); + // Execute query. + try (QueryCursor<Cache.Entry<Integer, String>> ignored = cache.query(qry)) { // Add a few more keys and watch more query notifications. for (int i = keyCnt; i < keyCnt + 5; i++) - cache.putx(i, Integer.toString(i)); + cache.put(i, Integer.toString(i)); // Wait for a while while callback is notified about remaining puts. Thread.sleep(2000); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java deleted file mode 100644 index ff4d38a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQuery.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.query; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.lang.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -/** - * API for configuring and executing continuous cache queries. - * <p> - * Continuous queries are executed as follows: - * <ol> - * <li> - * Query is sent to requested grid nodes. Note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} - * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches query will be always executed - * locally. - * </li> - * <li> - * Each node iterates through existing cache data and registers listeners that will - * notify about further updates. - * <li> - * Each key-value pair is passed through optional filter and if this filter returns - * true, key-value pair is sent to the master node (the one that executed query). - * If filter is not provided, all pairs are sent. - * </li> - * <li> - * When master node receives key-value pairs, it notifies the local callback. - * </li> - * </ol> - * <h2 class="header">NOTE</h2> - * Under some concurrent circumstances callback may get several notifications - * for one cache update. This should be taken into account when implementing callback. - * <h1 class="header">Query usage</h1> - * As an example, suppose we have cache with {@code 'Person'} objects and we need - * to query all persons with salary above 1000. - * <p> - * Here is the {@code Person} class: - * <pre name="code" class="java"> - * public class Person { - * // Name. - * private String name; - * - * // Salary. - * private double salary; - * - * ... - * } - * </pre> - * <p> - * You can create and execute continuous query like so: - * <pre name="code" class="java"> - * // Create new continuous query. - * qry = cache.createContinuousQuery(); - * - * // Callback that is called locally when update notifications are received. - * // It simply prints out information about all created persons. - * qry.callback(new GridPredicate2<UUID, Collection<Map.Entry<UUID, Person>>>() { - * @Override public boolean apply(UUID uuid, Collection<Map.Entry<UUID, Person>> entries) { - * for (Map.Entry<UUID, Person> e : entries) { - * Person p = e.getValue(); - * - * X.println(">>>"); - * X.println(">>> " + p.getFirstName() + " " + p.getLastName() + - * "'s salary is " + p.getSalary()); - * X.println(">>>"); - * } - * - * return true; - * } - * }); - * - * // This query will return persons with salary above 1000. - * qry.filter(new GridPredicate2<UUID, Person>() { - * @Override public boolean apply(UUID uuid, Person person) { - * return person.getSalary() > 1000; - * } - * }); - * - * // Execute query. - * qry.execute(); - * </pre> - * This will execute query on all nodes that have cache you are working with and notify callback - * with both data that already exists in cache and further updates. - * <p> - * To stop receiving updates call {@link #close()} method: - * <pre name="code" class="java"> - * qry.cancel(); - * </pre> - * Note that one query instance can be executed only once. After it's cancelled, it's non-operational. - * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create - * new query. - */ -public interface CacheContinuousQuery<K, V> extends AutoCloseable { - /** - * Default buffer size. Size of {@code 1} means that all entries - * will be sent to master node immediately (buffering is disabled). - */ - public static final int DFLT_BUF_SIZE = 1; - - /** Maximum default time interval after which buffer will be flushed (if buffering is enabled). */ - public static final long DFLT_TIME_INTERVAL = 0; - - /** - * Default value for automatic unsubscription flag. Remote filters - * will be unregistered by default if master node leaves topology. - */ - public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; - - /** - * Sets local callback. This callback is called only - * in local node when new updates are received. - * <p> - * The callback predicate accepts ID of the node from where updates - * are received and collection of received entries. Note that - * for removed entries value will be {@code null}. - * <p> - * If the predicate returns {@code false}, query execution will - * be cancelled. - * <p> - * <b>WARNING:</b> all operations that involve any kind of JVM-local - * or distributed locking (e.g., synchronization or transactional - * cache operations), should be executed asynchronously without - * blocking the thread that called the callback. Otherwise, you - * can get deadlocks. - * - * @param locCb Local callback. - */ - public void localCallback(IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> locCb); - - /** - * Gets local callback. See {@link #localCallback(IgniteBiPredicate)} for more information. - * - * @return Local callback. - */ - @Nullable public IgniteBiPredicate<UUID, Collection<CacheContinuousQueryEntry<K, V>>> localCallback(); - - /** - * Sets optional key-value filter. This filter is called before - * entry is sent to the master node. - * <p> - * <b>WARNING:</b> all operations that involve any kind of JVM-local - * or distributed locking (e.g., synchronization or transactional - * cache operations), should be executed asynchronously without - * blocking the thread that called the filter. Otherwise, you - * can get deadlocks. - * - * @param filter Key-value filter. - */ - public void remoteFilter(@Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> filter); - - /** - * Gets key-value filter. See {@link #remoteFilter(IgnitePredicate)} for more information. - * - * @return Key-value filter. - */ - @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> remoteFilter(); - - /** - * Sets buffer size. - * <p> - * When a cache update happens, entry is first put into a buffer. - * Entries from buffer will be sent to the master node only if - * the buffer is full or time provided via {@link #timeInterval(long)} - * method is exceeded. - * <p> - * Default buffer size is {@code 1} which means that entries will - * be sent immediately (buffering is disabled). - * - * @param bufSize Buffer size. - */ - public void bufferSize(int bufSize); - - /** - * Gets buffer size. See {@link #bufferSize(int)} for more information. - * - * @return Buffer size. - */ - public int bufferSize(); - - /** - * Sets time interval. - * <p> - * When a cache update happens, entry is first put into a buffer. - * Entries from buffer will be sent to the master node only if - * the buffer is full (its size can be provided via {@link #bufferSize(int)} - * method) or time provided via this method is exceeded. - * <p> - * Default time interval is {@code 0} which means that time check is - * disabled and entries will be sent only when buffer is full. - * - * @param timeInterval Time interval. - */ - public void timeInterval(long timeInterval); - - /** - * Gets time interval. See {@link #timeInterval(long)} for more information. - * - * @return Gets time interval. - */ - public long timeInterval(); - - /** - * Sets automatic unsubscribe flag. - * <p> - * This flag indicates that query filters on remote nodes should be automatically - * unregistered if master node (node that initiated the query) leaves topology. - * If this flag is {@code false}, filters will be unregistered only when - * the query is cancelled from master node, and won't ever be unregistered if - * master node leaves grid. - * <p> - * Default value for this flag is {@code true}. - * - * @param autoUnsubscribe Automatic unsubscription flag. - */ - public void autoUnsubscribe(boolean autoUnsubscribe); - - /** - * Gets automatic unsubscribe flag. See {@link #autoUnsubscribe(boolean)} - * for more information. - * - * @return Automatic unsubscribe flag. - */ - public boolean isAutoUnsubscribe(); - - /** - * Starts continuous query execution on the whole grid. - * <p> - * Note that if grid contains nodes without appropriate cache, - * these nodes will be filtered out. - * <p> - * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} - * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches - * query will be always executed locally. - * - * @throws IgniteCheckedException In case of error. - */ - public void execute() throws IgniteCheckedException; - - /** - * Starts continuous query execution on provided set of nodes. - * <p> - * Note that if provided projection contains nodes without - * appropriate cache, these nodes will be filtered out. - * <p> - * Also note that for {@link org.apache.ignite.cache.CacheMode#LOCAL LOCAL} - * and {@link org.apache.ignite.cache.CacheMode#REPLICATED REPLICATED} caches - * query will be always executed locally. - * - * @param prj Grid projection. - * @throws IgniteCheckedException In case of error. - */ - public void execute(@Nullable ClusterGroup prj) throws IgniteCheckedException; - - /** - * Stops continuous query execution. - * <p> - * Note that one query instance can be executed only once. - * After it's cancelled, it's non-operational. - * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} - * method to create new query. - * - * @throws IgniteCheckedException In case of error. - */ - @Override public void close() throws IgniteCheckedException; -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java deleted file mode 100644 index 90d3602..0000000 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheContinuousQueryEntry.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.query; - -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; - -/** - * Entry used for continuous query notifications. - */ -public interface CacheContinuousQueryEntry<K, V> extends Map.Entry<K, V>, Serializable { - /** - * Gets entry key. - * - * @return Entry key. - */ - @Override public K getKey(); - - /** - * Gets entry new value. New value may be null, if entry is being removed. - * - * @return Entry new value. - */ - @Override @Nullable public V getValue(); - - /** - * Gets entry old value. Old value may be null if entry is being inserted (not updated). - * - * @return Gets entry old value. - */ - @Nullable public V getOldValue(); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java index b02c65f..8d79101 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java @@ -17,7 +17,7 @@ package org.apache.ignite.cache.query; -import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.internal.*; import javax.cache.event.*; @@ -106,7 +106,7 @@ import javax.cache.event.*; * If you need to repeat execution, use {@link org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create * new query. */ -public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> implements AutoCloseable { +public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> { /** */ private static final long serialVersionUID = 0L; @@ -125,6 +125,26 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp */ public static final boolean DFLT_AUTO_UNSUBSCRIBE = true; + /** Local listener. */ + private CacheEntryUpdatedListener<K, V> locLsnr; + + /** Remote filter. */ + private CacheEntryEventFilter<K, V> rmtFilter; + + /** Buffer size. */ + private int bufSize = DFLT_BUF_SIZE; + + /** Time interval. */ + private long timeInterval = DFLT_TIME_INTERVAL; + + /** Automatic unsubscription flag. */ + private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE; + + /** + * TODO + * + * @param filter TODO + */ public void setInitialPredicate(Query filter) { // TODO: implement. } @@ -143,7 +163,16 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp * @param locLsnr Local callback. */ public void setLocalListener(CacheEntryUpdatedListener<K, V> locLsnr) { - // TODO: implement. + this.locLsnr = locLsnr; + } + + /** + * Gets local listener. + * + * @return Local listener. + */ + public CacheEntryUpdatedListener<K, V> getLocalListener() { + return locLsnr; } /** @@ -153,56 +182,86 @@ public final class ContinuousQuery<K, V> extends Query<ContinuousQuery<K,V>> imp * (e.g., synchronization or transactional cache operations), should be executed asynchronously * without blocking the thread that called the filter. Otherwise, you can get deadlocks. * - * @param filter Key-value filter. + * @param rmtFilter Key-value filter. */ - public void setRemoteFilter(CacheEntryEventFilter<K, V> filter) { - // TODO: implement. + public void setRemoteFilter(CacheEntryEventFilter<K, V> rmtFilter) { + this.rmtFilter = rmtFilter; + } + + /** + * Gets remote filter. + * + * @return Remote filter. + */ + public CacheEntryEventFilter<K, V> getRemoteFilter() { + return rmtFilter; } /** * Sets buffer size. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will be - * sent to the master node only if the buffer is full or time provided via {@link #timeInterval(long)} method is + * sent to the master node only if the buffer is full or time provided via {@link #setTimeInterval(long)} method is * exceeded. <p> Default buffer size is {@code 1} which means that entries will be sent immediately (buffering is * disabled). * * @param bufSize Buffer size. */ - public void bufferSize(int bufSize) { - // TODO: implement. + public void setBufferSize(int bufSize) { + A.ensure(bufSize > 0, "bufSize > 0"); + + this.bufSize = bufSize; + } + + /** + * Gets buffer size. + * + * @return Buffer size. + */ + public int getBufferSize() { + return bufSize; } /** * Sets time interval. <p> When a cache update happens, entry is first put into a buffer. Entries from buffer will - * be sent to the master node only if the buffer is full (its size can be provided via {@link #bufferSize(int)} + * be sent to the master node only if the buffer is full (its size can be provided via {@link #setBufferSize(int)} * method) or time provided via this method is exceeded. <p> Default time interval is {@code 0} which means that * time check is disabled and entries will be sent only when buffer is full. * * @param timeInterval Time interval. */ - public void timeInterval(long timeInterval) { - // TODO: implement. + public void setTimeInterval(long timeInterval) { + A.ensure(timeInterval >= 0, "timeInterval >= 0"); + + this.timeInterval = timeInterval; + } + + /** + * Gets time interval. + * + * @return Time interval. + */ + public long getTimeInterval() { + return timeInterval; } /** * Sets automatic unsubscribe flag. <p> This flag indicates that query filters on remote nodes should be - * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is {@code - * false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be + * automatically unregistered if master node (node that initiated the query) leaves topology. If this flag is + * {@code false}, filters will be unregistered only when the query is cancelled from master node, and won't ever be * unregistered if master node leaves grid. <p> Default value for this flag is {@code true}. * * @param autoUnsubscribe Automatic unsubscription flag. */ - public void autoUnsubscribe(boolean autoUnsubscribe) { - // TODO: implement. + public void setAutoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; } /** - * Stops continuous query execution. <p> Note that one query instance can be executed only once. After it's - * cancelled, it's non-operational. If you need to repeat execution, use {@link - * org.apache.ignite.internal.processors.cache.query.CacheQueries#createContinuousQuery()} method to create new query. + * Gets automatic unsubscription flag value. * - * @throws IgniteCheckedException In case of error. + * @return Automatic unsubscription flag. */ - @Override public void close() throws IgniteCheckedException { - // TODO: implement. + public boolean isAutoUnsubscribe() { + return autoUnsubscribe; } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java index 744d8d2..c24d704 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/query/Query.java @@ -106,6 +106,15 @@ public abstract class Query<T extends Query> implements Serializable { } /** + * Factory method for continuous queries. + * + * @return Continuous query. + */ + public static <K, V> ContinuousQuery<K, V> continuous() { + return new ContinuousQuery<>(); + } + + /** * Gets optional page size, if {@code 0}, then {@link CacheQueryConfiguration#getPageSize()} is used. * * @return Optional page size. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java index 2733d64..a7563a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryExecutedEvent.java @@ -17,15 +17,14 @@ package org.apache.ignite.events; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.util.*; /** @@ -85,7 +84,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter { /** Continuous query filter. */ @GridToStringInclude - private final IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter; + private final CacheEntryEventFilter<K, V> contQryFilter; /** Query arguments. */ @GridToStringInclude @@ -118,7 +117,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter { @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate<K, V> scanQryFilter, - @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter, + @Nullable CacheEntryEventFilter<K, V> contQryFilter, @Nullable Object[] args, @Nullable UUID subjId, @Nullable String taskName) { @@ -195,7 +194,7 @@ public class CacheQueryExecutedEvent<K, V> extends EventAdapter { * * @return Continuous query filter. */ - @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> continuousQueryFilter() { + @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() { return contQryFilter; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java index 322feff..1959976 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java +++ b/modules/core/src/main/java/org/apache/ignite/events/CacheQueryReadEvent.java @@ -17,15 +17,14 @@ package org.apache.ignite.events; -import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.processors.cache.query.*; -import org.apache.ignite.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; +import javax.cache.event.*; import java.util.*; /** @@ -85,7 +84,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter { /** Continuous query filter. */ @GridToStringInclude - private final IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter; + private final CacheEntryEventFilter<K, V> contQryFilter; /** Query arguments. */ @GridToStringInclude @@ -136,7 +135,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter { @Nullable String clsName, @Nullable String clause, @Nullable IgniteBiPredicate<K, V> scanQryFilter, - @Nullable IgnitePredicate<CacheContinuousQueryEntry<K, V>> contQryFilter, + @Nullable CacheEntryEventFilter<K, V> contQryFilter, @Nullable Object[] args, @Nullable UUID subjId, @Nullable String taskName, @@ -221,7 +220,7 @@ public class CacheQueryReadEvent<K, V> extends EventAdapter { * * @return Continuous query filter. */ - @Nullable public IgnitePredicate<CacheContinuousQueryEntry<K, V>> continuousQueryFilter() { + @Nullable public CacheEntryEventFilter<K, V> continuousQueryFilter() { return contQryFilter; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java deleted file mode 100644 index 1ff4be8..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryEvent.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; - -import javax.cache.event.*; - -/** - * Implementation of {@link javax.cache.event.CacheEntryEvent}. - */ -public class CacheEntryEvent<K, V> extends javax.cache.event.CacheEntryEvent<K, V> { - /** */ - private final CacheContinuousQueryEntry<K, V> e; - - /** - * @param src Cache. - * @param type Event type. - * @param e Ignite event. - */ - public CacheEntryEvent(IgniteCache src, EventType type, CacheContinuousQueryEntry<K, V> e) { - super(src, type); - - this.e = e; - } - - /** {@inheritDoc} */ - @Override public V getOldValue() { - return e.getOldValue(); - } - - /** {@inheritDoc} */ - @Override public boolean isOldValueAvailable() { - return e.getOldValue() != null; - } - - /** {@inheritDoc} */ - @Override public K getKey() { - return e.getKey(); - } - - /** {@inheritDoc} */ - @Override public V getValue() { - return e.getValue(); - } - - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> cls) { - throw new IllegalArgumentException(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return "CacheEntryEvent [evtType=" + getEventType() + - ", key=" + getKey() + - ", val=" + getValue() + - ", oldVal=" + getOldValue() + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 8fe80e3..b40d089 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -110,7 +110,7 @@ public class GridCacheContext<K, V> implements Externalizable { private GridCacheQueryManager<K, V> qryMgr; /** Continuous query manager. */ - private GridCacheContinuousQueryManager<K, V> contQryMgr; + private CacheContinuousQueryManager<K, V> contQryMgr; /** Swap manager. */ private GridCacheSwapManager<K, V> swapMgr; @@ -235,7 +235,7 @@ public class GridCacheContext<K, V> implements Externalizable { GridCacheStoreManager<K, V> storeMgr, GridCacheEvictionManager<K, V> evictMgr, GridCacheQueryManager<K, V> qryMgr, - GridCacheContinuousQueryManager<K, V> contQryMgr, + CacheContinuousQueryManager<K, V> contQryMgr, GridCacheAffinityManager<K, V> affMgr, CacheDataStructuresManager<K, V> dataStructuresMgr, GridCacheTtlManager<K, V> ttlMgr, @@ -854,7 +854,7 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @return Continuous query manager, {@code null} if disabled. */ - public GridCacheContinuousQueryManager<K, V> continuousQueries() { + public CacheContinuousQueryManager<K, V> continuousQueries() { return contQryMgr; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 53427d5..d85bf05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -627,7 +627,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL || !GridCacheUtils.isNearEnabled(cfg)); GridCacheEvictionManager evictMgr = new GridCacheEvictionManager(); GridCacheQueryManager qryMgr = queryManager(cfg); - GridCacheContinuousQueryManager contQryMgr = new GridCacheContinuousQueryManager(); + CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager(); CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager(); GridCacheTtlManager ttlMgr = new GridCacheTtlManager(); GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class); @@ -763,7 +763,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * 2. GridCacheIoManager * 3. GridCacheDeploymentManager * 4. GridCacheQueryManager (note, that we start it for DHT cache though). - * 5. GridCacheContinuousQueryManager (note, that we start it for DHT cache though). + * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though). * 6. GridCacheDgcManager * 7. GridCacheTtlManager. * =============================================== @@ -1595,6 +1595,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { } /** + * @return Utility cache. + */ + public <K, V> IgniteCache<K, V> utilityJCache() { + return jcache(CU.UTILITY_CACHE_NAME); + } + + /** * Gets utility cache for atomic data structures. * * @return Utility cache for atomic data structures. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 20d40e6..ebb8a60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -24,6 +24,8 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.cache.query.continuous.*; +import org.apache.ignite.internal.processors.continuous.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.tostring.*; @@ -31,6 +33,7 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.apache.ignite.mxbean.*; +import org.apache.ignite.plugin.security.*; import org.jetbrains.annotations.*; import javax.cache.*; @@ -42,6 +45,8 @@ import java.io.*; import java.util.*; import java.util.concurrent.locks.*; +import static org.apache.ignite.cache.CacheDistributionMode.*; + /** * Cache proxy. */ @@ -70,6 +75,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V /** Projection. */ private GridCacheProjectionImpl<K, V> prj; + /** Logger. */ + private IgniteLogger log; + /** * Empty constructor required for {@link Externalizable}. */ @@ -97,6 +105,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V this.prj = prj; gate = ctx.gate(); + + log = ctx.logger(getClass()); } /** @@ -323,6 +333,102 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** + * Executes continuous query. + * + * @param qry Query. + * @param grp Cluster group. + * @return Initial iteration cursor. + */ + private QueryCursor<Entry<K,V>> queryContinuous(ContinuousQuery<K, V> qry, @Nullable ClusterGroup grp) { + if (qry.getLocalListener() == null) + throw new IllegalStateException("Mandatory local listener is not set for the query: " + qry); + + ctx.checkSecurity(GridSecurityPermission.CACHE_READ); + + IgniteEx grid = ctx.kernalContext().grid(); + + if (grp == null) + grp = grid; + + grp = grp.forCacheNodes(ctx.name()); + + Collection<ClusterNode> nodes = grp.nodes(); + + if (nodes.isEmpty()) + throw new ClusterTopologyException("Failed to execute continuous query (empty cluster group is " + + "provided): " + qry); + + boolean skipPrimaryCheck = false; + + switch (ctx.config().getCacheMode()) { + case LOCAL: + if (!nodes.contains(ctx.localNode())) + throw new ClusterTopologyException("Continuous query for LOCAL cache can be executed " + + "only locally (provided projection contains remote nodes only): " + qry); + else if (nodes.size() > 1) + U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " + + "ignored): " + this); + + grp = grp.forNode(ctx.localNode()); + + break; + + case REPLICATED: + if (nodes.size() == 1 && F.first(nodes).equals(ctx.localNode())) { + CacheDistributionMode distributionMode = ctx.config().getDistributionMode(); + + if (distributionMode == PARTITIONED_ONLY || distributionMode == NEAR_PARTITIONED) + skipPrimaryCheck = true; + } + + break; + } + + int taskNameHash = ctx.kernalContext().security().enabled() ? + ctx.kernalContext().job().currentTaskNameHash() : 0; + + GridContinuousHandler hnd = new CacheContinuousQueryHandler<>( + ctx.name(), + ctx.continuousQueries().topic(), + qry.getLocalListener(), + qry.getRemoteFilter(), + false, + false, + false, + true, + skipPrimaryCheck, + taskNameHash, + false); + + try { + final UUID routineId = ctx.kernalContext().continuous().startRoutine(hnd, qry.getBufferSize(), + qry.getTimeInterval(), qry.isAutoUnsubscribe(), grp.predicate()).get(); + + return new QueryCursor<Entry<K, V>>() { + @Override public Iterator<Entry<K, V>> iterator() { + return new GridEmptyIterator<>(); + } + + @Override public List<Entry<K, V>> getAll() { + return Collections.emptyList(); + } + + @Override public void close() { + try { + ctx.kernalContext().continuous().stopRoutine(routineId).get(); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + }; + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** * @param local Enforce local. * @return Local node cluster group. */ @@ -331,6 +437,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public QueryCursor<Entry<K,V>> query(Query qry) { A.notNull(qry, "qry"); @@ -345,6 +452,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return ctx.kernalContext().query().queryTwoStep(ctx.name(), p.getType(), p.getSql(), p.getArgs()); } + else if (qry instanceof ContinuousQuery) + return queryContinuous((ContinuousQuery<K, V>)qry, projection(false)); return query(qry, projection(false)); } @@ -401,6 +510,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public QueryCursor<Entry<K,V>> localQuery(Query qry) { A.notNull(qry, "qry"); @@ -409,6 +519,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { if (qry instanceof SqlQuery) return doLocalQuery((SqlQuery)qry); + else if (qry instanceof ContinuousQuery) + return queryContinuous((ContinuousQuery<K, V>)qry, projection(true)); return query(qry, projection(true)); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index 5f68a7a..5ee2b31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -23,7 +23,6 @@ import org.apache.ignite.cache.query.*; import org.apache.ignite.cluster.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.cache.*; -import org.apache.ignite.internal.processors.cache.query.continuous.*; import org.apache.ignite.internal.processors.datastructures.*; import org.apache.ignite.internal.processors.task.*; import org.apache.ignite.internal.util.*; @@ -33,6 +32,9 @@ import org.apache.ignite.resources.*; import org.jdk8.backport.*; import org.jetbrains.annotations.*; +import javax.cache.*; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -59,7 +61,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, private CacheProjection<GridCacheQueueHeaderKey, GridCacheQueueHeader> queueHdrView; /** Query notifying about queue update. */ - private GridCacheContinuousQueryAdapter queueQry; + private QueryCursor<Cache.Entry<Object, Object>> queueQryCur; /** Queue query creation guard. */ private final AtomicBoolean queueQryGuard = new AtomicBoolean(); @@ -98,14 +100,8 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, @Override protected void onKernalStop0(boolean cancel) { busyLock.block(); - if (queueQry != null) { - try { - queueQry.close(); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to cancel queue header query.", e); - } - } + if (queueQryCur != null) + queueQryCur.close(); for (GridCacheQueueProxy q : queuesMap.values()) q.delegate().onKernalStop(); @@ -188,17 +184,15 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, return null; if (queueQryGuard.compareAndSet(false, true)) { - queueQry = (GridCacheContinuousQueryAdapter)cctx.cache().queries().createContinuousQuery(); + ContinuousQuery<Object, Object> qry = Query.continuous(); - queueQry.remoteFilter(new QueueHeaderPredicate()); - - queueQry.localCallback(new IgniteBiPredicate<UUID, Collection<GridCacheContinuousQueryEntry>>() { - @Override public boolean apply(UUID id, Collection<GridCacheContinuousQueryEntry> entries) { + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { if (!busyLock.enterBusy()) - return false; + return; try { - for (GridCacheContinuousQueryEntry e : entries) { + for (CacheEntryEvent<?, ?> e : evts) { GridCacheQueueHeaderKey key = (GridCacheQueueHeaderKey)e.getKey(); GridCacheQueueHeader hdr = (GridCacheQueueHeader)e.getValue(); @@ -220,8 +214,6 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } } } - - return true; } finally { busyLock.leaveBusy(); @@ -229,11 +221,11 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } }); - queueQry.execute(cctx.isLocal() || cctx.isReplicated() ? cctx.grid().forLocal() : null, - true, - false, - false, - true); + qry.setRemoteFilter(new QueueHeaderPredicate()); + + IgniteCache<Object, Object> jCache = cctx.kernalContext().cache().utilityJCache(); + + queueQryCur = cctx.isLocal() || cctx.isReplicated() ? jCache.localQuery(qry) : jCache.query(qry); } GridCacheQueueProxy queue = queuesMap.get(hdr.id()); @@ -544,7 +536,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, /** * Predicate for queue continuous query. */ - private static class QueueHeaderPredicate implements IgnitePredicate<CacheContinuousQueryEntry>, Externalizable { + private static class QueueHeaderPredicate implements CacheEntryEventFilter<Object, Object>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -556,7 +548,7 @@ public class CacheDataStructuresManager<K, V> extends GridCacheManagerAdapter<K, } /** {@inheritDoc} */ - @Override public boolean apply(CacheContinuousQueryEntry e) { + @Override public boolean evaluate(CacheEntryEvent<?, ?> e) { return e.getKey() instanceof GridCacheQueueHeaderKey; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java index 4dad74c..3dcb82a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueries.java @@ -93,16 +93,6 @@ public interface CacheQueries<K, V> { public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter); /** - * Creates new continuous query. - * <p> - * For more information refer to {@link org.apache.ignite.cache.query.CacheContinuousQuery} documentation. - * - * @return Created continuous query. - * @see org.apache.ignite.cache.query.CacheContinuousQuery - */ - public CacheContinuousQuery<K, V> createContinuousQuery(); - - /** * Forces this cache to rebuild all search indexes of given value type. Sometimes indexes * may hold references to objects that have already been removed from cache. Although * not affecting query results, these objects may consume extra memory. Rebuilding http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java index 797990f..52e8d96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesImpl.java @@ -177,11 +177,6 @@ public class GridCacheQueriesImpl<K, V> implements GridCacheQueriesEx<K, V>, Ext } /** {@inheritDoc} */ - @Override public CacheContinuousQuery<K, V> createContinuousQuery() { - return ctx.continuousQueries().createQuery(prj == null ? null : prj.predicate()); - } - - /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> rebuildIndexes(Class<?> cls) { A.notNull(cls, "cls"); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java index 509a7b2..5e19cd5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueriesProxy.java @@ -151,18 +151,6 @@ public class GridCacheQueriesProxy<K, V> implements GridCacheQueriesEx<K, V>, Ex } /** {@inheritDoc} */ - @Override public CacheContinuousQuery<K, V> createContinuousQuery() { - GridCacheProjectionImpl<K, V> prev = gate.enter(prj); - - try { - return delegate.createContinuousQuery(); - } - finally { - gate.leave(prev); - } - } - - /** {@inheritDoc} */ @Override public <R> CacheQuery<R> createSpiQuery() { GridCacheProjectionImpl<K, V> prev = gate.enter(prj); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java new file mode 100644 index 0000000..92f0f0a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*; + +/** + * Continuous query event. + */ +class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Key. */ + @GridToStringInclude + private K key; + + /** New value. */ + @GridToStringInclude + private V newVal; + + /** Old value. */ + @GridToStringInclude + private V oldVal; + + /** Serialized key. */ + @GridToStringExclude + private byte[] keyBytes; + + /** Serialized value. */ + @GridToStringExclude + private GridCacheValueBytes newValBytes; + + /** Serialized value. */ + @GridToStringExclude + private GridCacheValueBytes oldValBytes; + + /** Cache name. */ + private String cacheName; + + /** Deployment info. */ + @GridToStringExclude + private GridDeploymentInfo depInfo; + + public CacheContinuousQueryEntry() { + // No-op. + } + + CacheContinuousQueryEntry(K key, @Nullable V newVal, @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal, + @Nullable GridCacheValueBytes oldValBytes) { + + this.key = key; + this.newVal = newVal; + this.newValBytes = newValBytes; + this.oldVal = oldVal; + this.oldValBytes = oldValBytes; + } + + /** + * @param cacheName Cache name. + */ + void cacheName(String cacheName) { + this.cacheName = cacheName; + } + + /** + * @return cache name. + */ + String cacheName() { + return cacheName; + } + + /** + * Unmarshals value from bytes if needed. + * + * @param marsh Marshaller. + * @param ldr Class loader. + * @throws IgniteCheckedException In case of error. + */ + void initValue(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + assert marsh != null; + + if (newVal == null && newValBytes != null && !newValBytes.isNull()) + newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); + } + + /** + * @param marsh Marshaller. + * @throws IgniteCheckedException In case of error. + */ + void p2pMarshal(Marshaller marsh) throws IgniteCheckedException { + assert marsh != null; + + assert key != null; + + keyBytes = marsh.marshal(key); + + if (newValBytes == null || newValBytes.isNull()) + newValBytes = newVal != null ? + newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null; + + if (oldValBytes == null || oldValBytes.isNull()) + oldValBytes = oldVal != null ? + oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null; + } + + /** + * @param marsh Marshaller. + * @param ldr Class loader. + * @throws IgniteCheckedException In case of error. + */ + void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException { + assert marsh != null; + + assert key == null : "Key should be null: " + key; + assert newVal == null : "New value should be null: " + newVal; + assert oldVal == null : "Old value should be null: " + oldVal; + assert keyBytes != null; + + key = marsh.unmarshal(keyBytes, ldr); + + if (newValBytes != null && !newValBytes.isNull()) + newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr); + + if (oldValBytes != null && !oldValBytes.isNull()) + oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr); + } + + /** + * @return Key. + */ + K key() { + return key; + } + + /** + * @return New value. + */ + V value() { + return newVal; + } + + /** + * @return Old value. + */ + V oldValue() { + return oldVal; + } + + /** + * Nullifies old value. + */ + void nullifyOldValue() { + oldVal = null; + oldValBytes = null; + } + + /** {@inheritDoc} */ + @Override public void prepare(GridDeploymentInfo depInfo) { + this.depInfo = depInfo; + } + + /** {@inheritDoc} */ + @Override public GridDeploymentInfo deployInfo() { + return depInfo; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + boolean b = keyBytes != null; + + out.writeBoolean(b); + + if (b) { + U.writeByteArray(out, keyBytes); + + if (newValBytes != null && !newValBytes.isNull()) { + out.writeBoolean(true); + out.writeBoolean(newValBytes.isPlain()); + U.writeByteArray(out, newValBytes.get()); + } + else + out.writeBoolean(false); + + if (oldValBytes != null && !oldValBytes.isNull()) { + out.writeBoolean(true); + out.writeBoolean(oldValBytes.isPlain()); + U.writeByteArray(out, oldValBytes.get()); + } + else + out.writeBoolean(false); + + U.writeString(out, cacheName); + out.writeObject(depInfo); + } + else { + out.writeObject(key); + out.writeObject(newVal); + out.writeObject(oldVal); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + boolean b = in.readBoolean(); + + if (b) { + keyBytes = U.readByteArray(in); + + if (in.readBoolean()) + newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); + + if (in.readBoolean()) + oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in)); + + cacheName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } + else { + key = (K)in.readObject(); + newVal = (V)in.readObject(); + oldVal = (V)in.readObject(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryEntry.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java new file mode 100644 index 0000000..b284ef2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import javax.cache.*; +import javax.cache.event.*; + +/** + * Continuous query event. + */ +class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> { + /** Entry. */ + private final CacheContinuousQueryEntry<K, V> e; + + /** + * @param source Source cache. + * @param eventType Event type. + * @param e Entry. + */ + CacheContinuousQueryEvent(Cache source, EventType eventType, CacheContinuousQueryEntry<K, V> e) { + super(source, eventType); + + assert e != null; + + this.e = e; + } + + /** + * @return Entry. + */ + CacheContinuousQueryEntry<K, V> entry() { + return e; + } + + /** {@inheritDoc} */ + @Override public K getKey() { + return e.key(); + } + + /** {@inheritDoc} */ + @Override public V getValue() { + return e.value(); + } + + /** {@inheritDoc} */ + @Override public V getOldValue() { + return e.oldValue(); + } + + /** {@inheritDoc} */ + @Override public boolean isOldValueAvailable() { + return e.oldValue() != null; + } + + /** {@inheritDoc} */ + @Override public <T> T unwrap(Class<T> clazz) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryEvent.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java new file mode 100644 index 0000000..897f481 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilterEx.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + + +import javax.cache.event.*; + +/** + * Extended continuous query filter. + */ +public interface CacheContinuousQueryFilterEx<K, V> extends CacheEntryEventFilter<K, V> { + /** + * Callback for query unregister event. + */ + public void onQueryUnregister(); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f8f0699d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java new file mode 100644 index 0000000..5c03488 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.events.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.deployment.*; +import org.apache.ignite.internal.processors.cache.*; +import org.apache.ignite.internal.processors.cache.query.*; +import org.apache.ignite.internal.processors.continuous.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.*; +import java.io.*; +import java.util.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Continuous query handler. + */ +public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { + /** */ + private static final long serialVersionUID = 0L; + + /** Cache name. */ + private String cacheName; + + /** Topic for ordered messages. */ + private Object topic; + + /** Local callback. */ + private CacheEntryUpdatedListener<K, V> locLsnr; + + /** Filter. */ + private CacheEntryEventFilter<K, V> rmtFilter; + + /** Deployable object for filter. */ + private DeployableObject filterDep; + + /** Internal flag. */ + private boolean internal; + + /** Entry listener flag. */ + private boolean entryLsnr; + + /** Synchronous listener flag. */ + private boolean sync; + + /** {@code True} if old value is required. */ + private boolean oldVal; + + /** Task name hash code. */ + private int taskHash; + + /** Keep portable flag. */ + private boolean keepPortable; + + /** Whether to skip primary check for REPLICATED cache. */ + private transient boolean skipPrimaryCheck; + + /** + * Required by {@link Externalizable}. + */ + public CacheContinuousQueryHandler() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheName Cache name. + * @param topic Topic for ordered messages. + * @param locLsnr Local listener. + * @param rmtFilter Remote filter. + * @param internal If {@code true} then query is notified about internal entries updates. + * @param entryLsnr {@code True} if query created for {@link CacheEntryListener}. + * @param sync {@code True} if query created for synchronous {@link CacheEntryListener}. + * @param oldVal {@code True} if old value is required. + * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache. + * @param taskHash Task name hash code. + */ + public CacheContinuousQueryHandler(@Nullable String cacheName, Object topic, + CacheEntryUpdatedListener<K, V> locLsnr, CacheEntryEventFilter<K, V> rmtFilter, boolean internal, + boolean entryLsnr, boolean sync, boolean oldVal, boolean skipPrimaryCheck, int taskHash, boolean keepPortable) { + assert topic != null; + assert locLsnr != null; + assert !sync || entryLsnr; + + this.cacheName = cacheName; + this.topic = topic; + this.locLsnr = locLsnr; + this.rmtFilter = rmtFilter; + this.internal = internal; + this.entryLsnr = entryLsnr; + this.sync = sync; + this.oldVal = oldVal; + this.taskHash = taskHash; + this.keepPortable = keepPortable; + this.skipPrimaryCheck = skipPrimaryCheck; + } + + /** {@inheritDoc} */ + @Override public boolean isForEvents() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isForMessaging() { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean isForQuery() { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) + throws IgniteCheckedException { + assert nodeId != null; + assert routineId != null; + assert ctx != null; + + if (locLsnr != null) + ctx.resource().injectGeneric(locLsnr); + + if (rmtFilter != null) + ctx.resource().injectGeneric(rmtFilter); + + final boolean loc = nodeId.equals(ctx.localNodeId()); + + CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() { + @Override public void onExecution() { + if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + ctx.event().record(new CacheQueryExecutedEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.CONTINUOUS, + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName() + )); + } + } + + @Override public void onEntryUpdate(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt) { + GridCacheContext<K, V> cctx = cacheContext(ctx); + + if (cctx.isReplicated() && !skipPrimaryCheck && !primary) + return; + + boolean notify = true; + + if (rmtFilter != null) { + CacheFlag[] f = cctx.forceLocalRead(); + + try { + notify = rmtFilter.evaluate(evt); + } + finally { + cctx.forceFlags(f); + } + } + + if (notify) { + if (!oldVal) + evt.entry().nullifyOldValue(); + + if (loc) + locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); + else { + try { + ClusterNode node = ctx.discovery().node(nodeId); + + if (ctx.config().isPeerClassLoadingEnabled() && node != null && + U.hasCache(node, cacheName)) { + evt.entry().p2pMarshal(ctx.config().getMarshaller()); + + evt.entry().cacheName(cacheName); + + GridCacheDeploymentManager depMgr = + ctx.cache().internalCache(cacheName).context().deploy(); + + depMgr.prepare(evt.entry()); + } + + ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex); + } + } + + if (!entryLsnr && recordIgniteEvt) { + ctx.event().record(new CacheQueryReadEvent<>( + ctx.discovery().localNode(), + "Continuous query executed.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.CONTINUOUS, + cacheName, + null, + null, + null, + rmtFilter, + null, + nodeId, + taskName(), + evt.getKey(), + evt.getValue(), + evt.getOldValue(), + null + )); + } + } + } + + @Override public void onUnregister() { + if (rmtFilter != null && rmtFilter instanceof CacheContinuousQueryFilterEx) + ((CacheContinuousQueryFilterEx)rmtFilter).onQueryUnregister(); + } + + @Nullable private String taskName() { + return ctx.security().enabled() ? ctx.task().resolveTaskName(taskHash) : null; + } + }; + + return manager(ctx).registerListener(routineId, lsnr, internal, entryLsnr); + } + + /** {@inheritDoc} */ + @Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void unregister(UUID routineId, GridKernalContext ctx) { + assert routineId != null; + assert ctx != null; + + manager(ctx).unregisterListener(internal, routineId); + } + + /** + * @param ctx Kernal context. + * @return Continuous query manager. + */ + private CacheContinuousQueryManager<K, V> manager(GridKernalContext ctx) { + return cacheContext(ctx).continuousQueries(); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void notifyCallback(UUID nodeId, UUID routineId, Collection<?> objs, GridKernalContext ctx) { + assert nodeId != null; + assert routineId != null; + assert objs != null; + assert ctx != null; + + Collection<CacheEntryEvent<? extends K, ? extends V>> evts = + (Collection<CacheEntryEvent<? extends K, ? extends V>>)objs; + + if (ctx.config().isPeerClassLoadingEnabled()) { + for (CacheEntryEvent<? extends K, ? extends V> evt : evts) { + assert evt instanceof CacheContinuousQueryEvent; + + CacheContinuousQueryEntry<? extends K, ? extends V> e = ((CacheContinuousQueryEvent)evt).entry(); + + GridCacheAdapter cache = ctx.cache().internalCache(e.cacheName()); + + ClassLoader ldr = null; + + if (cache != null) { + GridCacheDeploymentManager depMgr = cache.context().deploy(); + + GridDeploymentInfo depInfo = e.deployInfo(); + + if (depInfo != null) { + depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(), + depInfo.participants(), depInfo.localDeploymentOwner()); + } + + ldr = depMgr.globalLoader(); + } + else { + U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " + + "when peer class loading is enabled: " + e.cacheName() + ". Will try to unmarshal " + + "with default class loader."); + } + + try { + e.p2pUnmarshal(ctx.config().getMarshaller(), ldr); + } + catch (IgniteCheckedException ex) { + U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex); + } + } + } + + locLsnr.onUpdated(evts); + } + + /** {@inheritDoc} */ + @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { + assert ctx != null; + assert ctx.config().isPeerClassLoadingEnabled(); + + if (rmtFilter != null && !U.isGrid(rmtFilter.getClass())) + filterDep = new DeployableObject(rmtFilter, ctx); + } + + /** {@inheritDoc} */ + @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + assert nodeId != null; + assert ctx != null; + assert ctx.config().isPeerClassLoadingEnabled(); + + if (filterDep != null) + rmtFilter = filterDep.unmarshal(nodeId, ctx); + } + + /** {@inheritDoc} */ + @Nullable @Override public Object orderedTopic() { + return topic; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeString(out, cacheName); + out.writeObject(topic); + + boolean b = filterDep != null; + + out.writeBoolean(b); + + if (b) + out.writeObject(filterDep); + else + out.writeObject(rmtFilter); + + out.writeBoolean(internal); + out.writeBoolean(entryLsnr); + out.writeBoolean(sync); + out.writeBoolean(oldVal); + out.writeInt(taskHash); + out.writeBoolean(keepPortable); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + cacheName = U.readString(in); + topic = in.readObject(); + + boolean b = in.readBoolean(); + + if (b) + filterDep = (DeployableObject)in.readObject(); + else + rmtFilter = (CacheEntryEventFilter<K, V>)in.readObject(); + + internal = in.readBoolean(); + entryLsnr = in.readBoolean(); + sync = in.readBoolean(); + oldVal = in.readBoolean(); + taskHash = in.readInt(); + keepPortable = in.readBoolean(); + } + + /** + * @param ctx Kernal context. + * @return Cache context. + */ + private GridCacheContext<K, V> cacheContext(GridKernalContext ctx) { + assert ctx != null; + + return ctx.cache().<K, V>internalCache(cacheName).context(); + } + + /** + * Deployable object. + */ + private static class DeployableObject implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Serialized object. */ + private byte[] bytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** + * Required by {@link Externalizable}. + */ + public DeployableObject() { + // No-op. + } + + /** + * @param obj Object. + * @param ctx Kernal context. + * @throws IgniteCheckedException In case of error. + */ + private DeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException { + assert obj != null; + assert ctx != null; + + Class cls = U.detectClass(obj); + + clsName = cls.getName(); + + GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj); + + depInfo = new GridDeploymentInfoBean(dep); + + bytes = ctx.config().getMarshaller().marshal(obj); + } + + /** + * @param nodeId Node ID. + * @param ctx Kernal context. + * @return Deserialized object. + * @throws IgniteCheckedException In case of error. + */ + <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException { + assert ctx != null; + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName, + depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null); + + if (dep == null) + throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName); + + return ctx.config().getMarshaller().unmarshal(bytes, dep.classLoader()); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeByteArray(out, bytes); + U.writeString(out, clsName); + out.writeObject(depInfo); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + bytes = U.readByteArray(in); + clsName = U.readString(in); + depInfo = (GridDeploymentInfo)in.readObject(); + } + } +}