Repository: ignite Updated Branches: refs/heads/ignite-2004 126dc4b66 -> 7e68b9896
IGNITE-2004 Added async query example. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7e68b989 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7e68b989 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7e68b989 Branch: refs/heads/ignite-2004 Commit: 7e68b98960519274d2803c8bf325287fb93af5cd Parents: 126dc4b Author: nikolay_tikhonov <[email protected]> Authored: Fri Apr 15 14:28:00 2016 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Apr 15 14:28:00 2016 +0300 ---------------------------------------------------------------------- .../CacheContinuousAsyncQueryExample.java | 137 +++++++++++++++++++ .../datagrid/CacheContinuousQueryExample.java | 14 +- 2 files changed, 148 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7e68b989/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java new file mode 100644 index 0000000..32286e3 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousAsyncQueryExample.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.datagrid; + +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.resources.IgniteInstanceResource; + +/** + * This examples demonstrates asynchronous continuous query API. + * <p> + * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + * <p> + * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-ignite.xml} configuration. + */ +public class CacheContinuousAsyncQueryExample { + /** Cache name. */ + private static final String CACHE_NAME = CacheContinuousAsyncQueryExample.class.getSimpleName(); + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws Exception If example execution failed. + */ + public static void main(String[] args) throws Exception { + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache continuous query example started."); + + // Auto-close cache at the end of the example. + try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(CACHE_NAME)) { + int keyCnt = 20; + + // These entries will be queried by initial predicate. + for (int i = 0; i < keyCnt; i++) + cache.put(i, Integer.toString(i)); + + // Create new continuous query. + ContinuousQuery<Integer, String> qry = new ContinuousQuery<>(); + + qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() { + @Override public boolean apply(Integer key, String val) { + return key > 10; + } + })); + + // Callback that is called locally when update notifications are received. + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends String> e : evts) + System.out.println("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + } + }); + + // This filter will be evaluated remotely on all nodes. + // Entry that pass this filter will be sent to the caller. + qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() { + @Override public CacheEntryEventFilter<Integer, String> create() { + return new CacheEntryFilter(); + } + }); + + // Execute query. + try (QueryCursor<Cache.Entry<Integer, String>> cur = cache.query(qry)) { + // Iterate through existing data. + for (Cache.Entry<Integer, String> e : cur) + System.out.println("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']'); + + // Add a few more keys and watch more query notifications. + for (int i = 0; i < keyCnt; i++) + cache.put(i, Integer.toString(i)); + + // Wait for a while while callback is notified about remaining puts. + Thread.sleep(2000); + } + + for (int i = 0; i < 10; i++) + System.out.println("Entry updated from filter [key=" + i + ", val=" + cache.get(i) + ']'); + } + finally { + // Distributed cache could be removed from cluster only by #destroyCache() call. + ignite.destroyCache(CACHE_NAME); + } + } + } + + /** + * Filter returns {@code true} for entries which have key bigger than 10. + */ + @IgniteAsyncCallback + private static class CacheEntryFilter implements CacheEntryEventFilter<Integer, String> { + /** Ignite instance. */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) + throws CacheEntryListenerException { + // This cache operation is safe because filter has Ignite AsyncCallback annotation. + if (e.getKey() < 10 && String.valueOf(e.getKey()).equals(e.getValue())) + ignite.cache(CACHE_NAME).put(e.getKey(), e.getValue() + "_less_than_10"); + + return e.getKey() > 10; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/7e68b989/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java index 59759af..6db968d 100644 --- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheContinuousQueryExample.java @@ -18,7 +18,10 @@ package org.apache.ignite.examples.datagrid; import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryEventFilter; import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -28,6 +31,7 @@ import org.apache.ignite.cache.query.ContinuousQuery; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteBiPredicate; /** @@ -81,9 +85,13 @@ public class CacheContinuousQueryExample { // This filter will be evaluated remotely on all nodes. // Entry that pass this filter will be sent to the caller. - qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer, String>() { - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { - return e.getKey() > 10; + qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() { + @Override public CacheEntryEventFilter<Integer, String> create() { + return new CacheEntryEventFilter<Integer, String>() { + @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) { + return e.getKey() > 10; + } + }; } });
