IGNITE-425: Implementation of ContinuousQueryWithTransformer

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a83f3038
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a83f3038
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a83f3038

Branch: refs/heads/master
Commit: a83f3038a9424a5eb826cf35da8d22496a551b50
Parents: c917327
Author: Nikolay Izhikov <nizhi...@apache.org>
Authored: Thu Feb 8 14:51:32 2018 +0300
Committer: Nikolay Izhikov <nizhi...@apache.org>
Committed: Thu Feb 8 14:51:32 2018 +0300

----------------------------------------------------------------------
 .../cache/query/AbstractContinuousQuery.java    | 202 ++++++++
 .../ignite/cache/query/ContinuousQuery.java     | 172 +------
 .../query/ContinuousQueryWithTransformer.java   | 192 +++++++
 .../processors/cache/IgniteCacheProxyImpl.java  |  82 ++-
 .../continuous/CacheContinuousQueryEntry.java   |   9 +-
 .../continuous/CacheContinuousQueryHandler.java | 149 +++++-
 .../CacheContinuousQueryHandlerV2.java          |   9 +-
 .../CacheContinuousQueryHandlerV3.java          | 185 +++++++
 .../continuous/CacheContinuousQueryManager.java |  44 +-
 ...acheContinuousQueryRandomOperationsTest.java | 156 ++++--
 ...ContinuousWithTransformerClientSelfTest.java |  40 ++
 ...heContinuousWithTransformerFailoverTest.java | 309 +++++++++++
 ...eContinuousWithTransformerLocalSelfTest.java |  29 ++
 ...nuousWithTransformerPartitionedSelfTest.java |  29 ++
 ...uousWithTransformerRandomOperationsTest.java |  31 ++
 ...inuousWithTransformerReplicatedSelfTest.java | 511 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite3.java         |  13 +
 17 files changed, 1919 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java
new file mode 100644
index 0000000..2a615ee
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/AbstractContinuousQuery.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.EventType;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+
+/**
+ * Base class for continuous query.
+ *
+ * @see ContinuousQuery
+ * @see ContinuousQueryWithTransformer
+ */
+public abstract class AbstractContinuousQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
+    /**
+     * Default page size. Size of {@code 1} means that all entries
+     * will be sent to master node immediately (buffering is disabled).
+     */
+    public static final int DFLT_PAGE_SIZE = 1;
+
+    /** Maximum default time interval after which buffer will be flushed (if 
buffering is enabled). */
+    public static final long DFLT_TIME_INTERVAL = 0;
+
+    /**
+     * Default value for automatic unsubscription flag. Remote filters
+     * will be unregistered by default if master node leaves topology.
+     */
+    public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
+
+    /** Initial query. */
+    private Query<Cache.Entry<K, V>> initQry;
+
+    /** Remote filter factory. */
+    private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
+
+    /** Time interval. */
+    private long timeInterval = DFLT_TIME_INTERVAL;
+
+    /** Automatic unsubscription flag. */
+    private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
+
+    /** Whether to notify about {@link EventType#EXPIRED} events. */
+    private boolean includeExpired;
+
+    /**
+     * Sets initial query.
+     * <p>
+     * This query will be executed before continuous listener is registered
+     * which allows to iterate through entries which already existed at the
+     * time continuous query is executed.
+     *
+     * @param initQry Initial query.
+     * @return {@code this} for chaining.
+     */
+    public AbstractContinuousQuery<K, V> setInitialQuery(Query<Cache.Entry<K, 
V>> initQry) {
+        this.initQry = initQry;
+
+        return this;
+    }
+
+    /**
+     * Gets initial query.
+     *
+     * @return Initial query.
+     */
+    public Query<Cache.Entry<K, V>> getInitialQuery() {
+        return initQry;
+    }
+
+    /**
+     * Sets optional key-value filter factory. This factory produces filter is 
called before entry is
+     * sent to the master node.
+     * <p>
+     * <b>WARNING:</b> all operations that involve any kind of JVM-local or 
distributed locking
+     * (e.g., synchronization or transactional cache operations), should be 
executed asynchronously
+     * without blocking the thread that called the filter. Otherwise, you can 
get deadlocks.
+     * <p>
+     * If remote filter are annotated with {@link IgniteAsyncCallback} then it 
is executed in async callback
+     * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that 
allow to perform a cache operations.
+     *
+     * @param rmtFilterFactory Key-value filter factory.
+     * @return {@code this} for chaining.
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+     */
+    public AbstractContinuousQuery<K, V> setRemoteFilterFactory(
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
+        this.rmtFilterFactory = rmtFilterFactory;
+
+        return this;
+    }
+
+    /**
+     * Gets remote filter.
+     *
+     * @return Remote filter.
+     */
+    public Factory<? extends CacheEntryEventFilter<K, V>> 
getRemoteFilterFactory() {
+        return rmtFilterFactory;
+    }
+
+    /**
+     * Sets time interval.
+     * <p>
+     * When a cache update happens, entry is first put into a buffer. Entries 
from buffer will
+     * be sent to the master node only if the buffer is full (its size can be 
provided via {@link #setPageSize(int)}
+     * method) or time provided via this method is exceeded.
+     * <p>
+     * Default time interval is {@code 0} which means that
+     * time check is disabled and entries will be sent only when buffer is 
full.
+     *
+     * @param timeInterval Time interval.
+     * @return {@code this} for chaining.
+     */
+    public AbstractContinuousQuery<K, V> setTimeInterval(long timeInterval) {
+        if (timeInterval < 0)
+            throw new IllegalArgumentException("Time interval can't be 
negative.");
+
+        this.timeInterval = timeInterval;
+
+        return this;
+    }
+
+    /**
+     * Gets time interval.
+     *
+     * @return Time interval.
+     */
+    public long getTimeInterval() {
+        return timeInterval;
+    }
+
+    /**
+     * Sets automatic unsubscribe flag.
+     * <p>
+     * This flag indicates that query filters on remote nodes should be
+     * automatically unregistered if master node (node that initiated the 
query) leaves topology. If this flag is
+     * {@code false}, filters will be unregistered only when the query is 
cancelled from master node, and won't ever be
+     * unregistered if master node leaves grid.
+     * <p>
+     * Default value for this flag is {@code true}.
+     *
+     * @param autoUnsubscribe Automatic unsubscription flag.
+     * @return {@code this} for chaining.
+     */
+    public AbstractContinuousQuery<K, V> setAutoUnsubscribe(boolean 
autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
+
+        return this;
+    }
+
+    /**
+     * Gets automatic unsubscription flag value.
+     *
+     * @return Automatic unsubscription flag.
+     */
+    public boolean isAutoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * Sets the flag value defining whether to notify about {@link 
EventType#EXPIRED} events.
+     * If {@code true}, then the remote listener will get notifications about 
entries
+     * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link 
EventType#UPDATED}
+     * and {@link EventType#REMOVED} events will be fired in the remote 
listener.
+     * <p>
+     * This flag is {@code false} by default, so {@link EventType#EXPIRED} 
events are disabled.
+     *
+     * @param includeExpired Whether to notify about {@link EventType#EXPIRED} 
events.
+     */
+    public void setIncludeExpired(boolean includeExpired) {
+        this.includeExpired = includeExpired;
+    }
+
+    /**
+     * Gets the flag value defining whether to notify about {@link 
EventType#EXPIRED} events.
+     *
+     * @return Whether to notify about {@link EventType#EXPIRED} events.
+     */
+    public boolean isIncludeExpired() {
+        return includeExpired;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
index 49d471e..9a8fbca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQuery.java
@@ -21,9 +21,9 @@ import javax.cache.Cache;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.event.EventType;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import 
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 
@@ -103,49 +103,20 @@ import org.apache.ignite.lang.IgniteAsyncCallback;
  * is executed in async callback pool (see {@link 
IgniteConfiguration#getAsyncCallbackPoolSize()})
  * and notification order is kept the same as update order for given cache key.
  *
+ * @see ContinuousQueryWithTransformer
  * @see IgniteAsyncCallback
  * @see IgniteConfiguration#getAsyncCallbackPoolSize()
  */
-public final class ContinuousQuery<K, V> extends Query<Cache.Entry<K, V>> {
+public final class ContinuousQuery<K, V> extends AbstractContinuousQuery<K, V> 
{
     /** */
     private static final long serialVersionUID = 0L;
 
-    /**
-     * Default page size. Size of {@code 1} means that all entries
-     * will be sent to master node immediately (buffering is disabled).
-     */
-    public static final int DFLT_PAGE_SIZE = 1;
-
-    /** Maximum default time interval after which buffer will be flushed (if 
buffering is enabled). */
-    public static final long DFLT_TIME_INTERVAL = 0;
-
-    /**
-     * Default value for automatic unsubscription flag. Remote filters
-     * will be unregistered by default if master node leaves topology.
-     */
-    public static final boolean DFLT_AUTO_UNSUBSCRIBE = true;
-
-    /** Initial query. */
-    private Query<Cache.Entry<K, V>> initQry;
-
     /** Local listener. */
     private CacheEntryUpdatedListener<K, V> locLsnr;
 
     /** Remote filter. */
     private CacheEntryEventSerializableFilter<K, V> rmtFilter;
 
-    /** Remote filter factory. */
-    private Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory;
-
-    /** Time interval. */
-    private long timeInterval = DFLT_TIME_INTERVAL;
-
-    /** Automatic unsubscription flag. */
-    private boolean autoUnsubscribe = DFLT_AUTO_UNSUBSCRIBE;
-
-    /** Whether to notify about {@link EventType#EXPIRED} events. */
-    private boolean includeExpired;
-
     /**
      * Creates new continuous query.
      */
@@ -153,29 +124,9 @@ public final class ContinuousQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
         setPageSize(DFLT_PAGE_SIZE);
     }
 
-    /**
-     * Sets initial query.
-     * <p>
-     * This query will be executed before continuous listener is registered
-     * which allows to iterate through entries which already existed at the
-     * time continuous query is executed.
-     *
-     * @param initQry Initial query.
-     * @return {@code this} for chaining.
-     */
+    /** {@inheritDoc} */
     public ContinuousQuery<K, V> setInitialQuery(Query<Cache.Entry<K, V>> 
initQry) {
-        this.initQry = initQry;
-
-        return this;
-    }
-
-    /**
-     * Gets initial query.
-     *
-     * @return Initial query.
-     */
-    public Query<Cache.Entry<K, V>> getInitialQuery() {
-        return initQry;
+        return (ContinuousQuery<K, V>)super.setInitialQuery(initQry);
     }
 
     /**
@@ -197,6 +148,7 @@ public final class ContinuousQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
      * @return {@code this} for chaining.
      * @see IgniteAsyncCallback
      * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+     * @see ContinuousQueryWithTransformer#setLocalListener(EventListener)
      */
     public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, 
V> locLsnr) {
         this.locLsnr = locLsnr;
@@ -246,118 +198,14 @@ public final class ContinuousQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
         return rmtFilter;
     }
 
-    /**
-     * Sets optional key-value filter factory. This factory produces filter is 
called before entry is
-     * sent to the master node.
-     * <p>
-     * <b>WARNING:</b> all operations that involve any kind of JVM-local or 
distributed locking
-     * (e.g., synchronization or transactional cache operations), should be 
executed asynchronously
-     * without blocking the thread that called the filter. Otherwise, you can 
get deadlocks.
-     * <p>
-     * If remote filter are annotated with {@link IgniteAsyncCallback} then it 
is executed in async callback
-     * pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that 
allow to perform a cache operations.
-     *
-     * @param rmtFilterFactory Key-value filter factory.
-     * @return {@code this} for chaining.
-     * @see IgniteAsyncCallback
-     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
-     */
-    public ContinuousQuery<K, V> setRemoteFilterFactory(
-        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
-        this.rmtFilterFactory = rmtFilterFactory;
-
-        return this;
-    }
-
-    /**
-     * Gets remote filter.
-     *
-     * @return Remote filter.
-     */
-    public Factory<? extends CacheEntryEventFilter<K, V>> 
getRemoteFilterFactory() {
-        return rmtFilterFactory;
-    }
-
-    /**
-     * Sets time interval.
-     * <p>
-     * When a cache update happens, entry is first put into a buffer. Entries 
from buffer will
-     * be sent to the master node only if the buffer is full (its size can be 
provided via {@link #setPageSize(int)}
-     * method) or time provided via this method is exceeded.
-     * <p>
-     * Default time interval is {@code 0} which means that
-     * time check is disabled and entries will be sent only when buffer is 
full.
-     *
-     * @param timeInterval Time interval.
-     * @return {@code this} for chaining.
-     */
+    /** {@inheritDoc} */
     public ContinuousQuery<K, V> setTimeInterval(long timeInterval) {
-        if (timeInterval < 0)
-            throw new IllegalArgumentException("Time interval can't be 
negative.");
-
-        this.timeInterval = timeInterval;
-
-        return this;
+        return (ContinuousQuery<K, V>)super.setTimeInterval(timeInterval);
     }
 
-    /**
-     * Gets time interval.
-     *
-     * @return Time interval.
-     */
-    public long getTimeInterval() {
-        return timeInterval;
-    }
-
-    /**
-     * Sets automatic unsubscribe flag.
-     * <p>
-     * This flag indicates that query filters on remote nodes should be
-     * automatically unregistered if master node (node that initiated the 
query) leaves topology. If this flag is
-     * {@code false}, filters will be unregistered only when the query is 
cancelled from master node, and won't ever be
-     * unregistered if master node leaves grid.
-     * <p>
-     * Default value for this flag is {@code true}.
-     *
-     * @param autoUnsubscribe Automatic unsubscription flag.
-     * @return {@code this} for chaining.
-     */
+    /** {@inheritDoc} */
     public ContinuousQuery<K, V> setAutoUnsubscribe(boolean autoUnsubscribe) {
-        this.autoUnsubscribe = autoUnsubscribe;
-
-        return this;
-    }
-
-    /**
-     * Gets automatic unsubscription flag value.
-     *
-     * @return Automatic unsubscription flag.
-     */
-    public boolean isAutoUnsubscribe() {
-        return autoUnsubscribe;
-    }
-
-    /**
-     * Sets the flag value defining whether to notify about {@link 
EventType#EXPIRED} events.
-     * If {@code true}, then the remote listener will get notifications about 
entries
-     * expired in cache. Otherwise, only {@link EventType#CREATED}, {@link 
EventType#UPDATED}
-     * and {@link EventType#REMOVED} events will be fired in the remote 
listener.
-     * <p>
-     * This flag is {@code false} by default, so {@link EventType#EXPIRED} 
events are disabled.
-     *
-     * @param includeExpired Whether to notify about {@link EventType#EXPIRED} 
events.
-     */
-    public void setIncludeExpired(boolean includeExpired) {
-        this.includeExpired = includeExpired;
-    }
-
-    /**
-     * Gets the flag value defining whether to notify about {@link 
EventType#EXPIRED} events.
-     *
-     * @return Whether to notify about {@link EventType#EXPIRED} events.
-     */
-    public boolean isIncludeExpired() {
-        return includeExpired;
+        return (ContinuousQuery<K, 
V>)super.setAutoUnsubscribe(autoUnsubscribe);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java
 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java
new file mode 100644
index 0000000..122410f
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/cache/query/ContinuousQueryWithTransformer.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteClosure;
+
+/**
+ * API for configuring continuous cache queries with transformer.
+ * <p>
+ * Continuous queries allow to register a remote filter and a local listener
+ * for cache updates. If an update event passes the filter, it will be 
transformed with transformer and sent to
+ * the node that executed the query and local listener will be notified.
+ * <p>
+ * Additionally, you can execute initial query to get currently existing data.
+ * Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link 
#setInitialQuery(Query)}
+ * method.
+ * <p>
+ * Query can be executed either on all nodes in topology using {@link 
IgniteCache#query(Query)}
+ * method, or only on the local node, if {@link Query#setLocal(boolean)} 
parameter is set to {@code true}.
+ * Note that in case query is distributed and a new node joins, it will get 
the remote
+ * filter for the query during discovery process before it actually joins 
topology,
+ * so no updates will be missed.
+ * This will execute query on all nodes that have cache you are working with 
and
+ * listener will start to receive notifications for cache updates.
+ * <p>
+ * To stop receiving updates call {@link QueryCursor#close()} method.
+ * Note that this works even if you didn't provide initial query. Cursor will
+ * be empty in this case, but it will still unregister listeners when {@link 
QueryCursor#close()}
+ * is called.
+ * <p>
+ * {@link IgniteAsyncCallback} annotation is supported for {@link 
CacheEntryEventFilter}
+ * (see {@link #setRemoteFilterFactory(Factory)}) and {@link 
CacheEntryUpdatedListener}
+ * (see {@link #setRemoteTransformerFactory(Factory)}) and {@link 
CacheEntryUpdatedListener}
+ * (see {@link #setLocalListener(EventListener)} and {@link EventListener}).
+ * If filter and/or listener are annotated with {@link IgniteAsyncCallback} 
then annotated callback
+ * is executed in async callback pool (see {@link 
IgniteConfiguration#getAsyncCallbackPoolSize()})
+ * and notification order is kept the same as update order for given cache key.
+ *
+ * @see ContinuousQuery
+ * @see IgniteAsyncCallback
+ * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+ */
+public final class ContinuousQueryWithTransformer<K, V, T> extends 
AbstractContinuousQuery<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Remote transformer factory. */
+    private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? 
extends V>, T>> rmtTransFactory;
+
+    /** Local listener of transformed event */
+    private EventListener<T> locLsnr;
+
+    /**
+     * Creates new continuous query with transformer.
+     */
+    public ContinuousQueryWithTransformer() {
+        setPageSize(DFLT_PAGE_SIZE);
+    }
+
+    /** {@inheritDoc} */
+    public ContinuousQueryWithTransformer<K, V, T> 
setInitialQuery(Query<Cache.Entry<K, V>> initQry) {
+        return (ContinuousQueryWithTransformer<K, V, 
T>)super.setInitialQuery(initQry);
+    }
+
+    /** {@inheritDoc} */
+    public ContinuousQueryWithTransformer<K, V, T> setRemoteFilterFactory(
+        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory) {
+        return (ContinuousQueryWithTransformer<K, V, 
T>)super.setRemoteFilterFactory(rmtFilterFactory);
+    }
+
+    /**
+     * Sets transformer factory. This factory produces transformer is called 
after and only if entry passes the filter.
+     * <p>
+     * <b>WARNING:</b> all operations that involve any kind of JVM-local or 
distributed locking
+     * (e.g., synchronization or transactional cache operations), should be 
executed asynchronously
+     * without blocking the thread that called the filter. Otherwise, you can 
get deadlocks.
+     * <p>
+     *
+     * @param factory Remote transformer factory.
+     * @return {@code this} for chaining.
+     */
+    public ContinuousQueryWithTransformer<K, V, T> setRemoteTransformerFactory(
+        Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends 
V>, T>> factory) {
+        this.rmtTransFactory = factory;
+
+        return this;
+    }
+
+    /**
+     * Gets remote transformer factory
+     *
+     * @return Remote Transformer Factory
+     */
+    public Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? 
extends V>, T>> getRemoteTransformerFactory() {
+        return rmtTransFactory;
+    }
+
+    /**
+     * Sets local callback. This callback is called only in local node when 
new updates are received.
+     * <p>
+     * The callback predicate accepts results of transformed by {@link 
#getRemoteFilterFactory()} events
+     * <p>
+     * <b>WARNING:</b> all operations that involve any kind of JVM-local or 
distributed locking (e.g.,
+     * synchronization or transactional cache operations), should be executed 
asynchronously without
+     * blocking the thread that called the callback. Otherwise, you can get 
deadlocks.
+     * <p>
+     * If local listener are annotated with {@link IgniteAsyncCallback} then 
it is executed in async callback pool
+     * (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow 
to perform a cache operations.
+     *
+     * @param locLsnr Local callback.
+     * @return {@code this} for chaining.
+     *
+     * @see IgniteAsyncCallback
+     * @see IgniteConfiguration#getAsyncCallbackPoolSize()
+     * @see ContinuousQuery#setLocalListener(CacheEntryUpdatedListener)
+     */
+    public ContinuousQueryWithTransformer<K, V, T> 
setLocalListener(EventListener<T> locLsnr) {
+        this.locLsnr = locLsnr;
+
+        return this;
+    }
+
+    /**
+     * Gets local transformed event listener
+     *
+     * @return local transformed event listener
+     */
+    public EventListener<T> getLocalListener() {
+        return locLsnr;
+    }
+
+    /** {@inheritDoc} */
+    public ContinuousQueryWithTransformer<K, V, T> setTimeInterval(long 
timeInterval) {
+        return (ContinuousQueryWithTransformer<K, V, 
T>)super.setTimeInterval(timeInterval);
+    }
+
+    /** {@inheritDoc} */
+    public ContinuousQueryWithTransformer<K, V, T> setAutoUnsubscribe(boolean 
autoUnsubscribe) {
+        return (ContinuousQueryWithTransformer<K, V, 
T>)super.setAutoUnsubscribe(autoUnsubscribe);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ContinuousQueryWithTransformer<K, V, T> setPageSize(int 
pageSize) {
+        return (ContinuousQueryWithTransformer<K, V, 
T>)super.setPageSize(pageSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ContinuousQueryWithTransformer<K, V, T> setLocal(boolean 
loc) {
+        return (ContinuousQueryWithTransformer<K, V, T>)super.setLocal(loc);
+    }
+
+    /**
+     * Interface for local listener of {@link ContinuousQueryWithTransformer} 
to implement.
+     * Invoked if an cache entry is updated, created or if a batch call is 
made,
+     * after the entries are updated and transformed.
+     *
+     * @param <T> type of data produced by transformer {@link 
ContinuousQueryWithTransformer#getRemoteTransformerFactory()}.
+     * @see ContinuousQueryWithTransformer
+     * @see ContinuousQueryWithTransformer#setLocalListener(EventListener)
+     */
+    public interface EventListener<T> {
+        /**
+         * Called after one or more entries have been updated.
+         *
+         * @param events The entries just updated that transformed with remote 
transformer of {@link ContinuousQueryWithTransformer}.
+         */
+        void onUpdated(Iterable<? extends T> events);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 7f71c74..a834022 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -36,6 +36,8 @@ import javax.cache.Cache;
 import javax.cache.CacheException;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Configuration;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.integration.CompletionListener;
 import javax.cache.processor.EntryProcessor;
@@ -46,11 +48,15 @@ import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheManager;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.AbstractContinuousQuery;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import 
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
@@ -62,6 +68,7 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.TextQuery;
 import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.AsyncSupportAdapter;
 import org.apache.ignite.internal.IgniteEx;
@@ -92,6 +99,7 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.mxbean.CacheMetricsMXBean;
 import org.apache.ignite.plugin.security.SecurityPermission;
 import org.jetbrains.annotations.NotNull;
@@ -106,6 +114,12 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
     /** */
     private static final long serialVersionUID = 0L;
 
+    /**
+     * Ignite version that introduce {@link ContinuousQueryWithTransformer} 
feature.
+     */
+    private static final IgniteProductVersion CONT_QRY_WITH_TRANSFORMER_SINCE =
+        IgniteProductVersion.fromString("2.5.0");
+
     /** Context. */
     private volatile GridCacheContext<K, V> ctx;
 
@@ -498,22 +512,66 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
      * @return Initial iteration cursor.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery 
qry, boolean loc, boolean keepBinary) {
-        if (qry.getInitialQuery() instanceof ContinuousQuery)
+    private QueryCursor<Cache.Entry<K, V>> 
queryContinuous(AbstractContinuousQuery qry, boolean loc, boolean keepBinary) {
+        assert qry instanceof ContinuousQuery || qry instanceof 
ContinuousQueryWithTransformer;
+
+        if (qry.getInitialQuery() instanceof ContinuousQuery ||
+            qry.getInitialQuery() instanceof ContinuousQueryWithTransformer) {
             throw new IgniteException("Initial predicate for continuous query 
can't be an instance of another " +
                 "continuous query. Use SCAN or SQL query for initial 
iteration.");
+        }
+
+        CacheEntryUpdatedListener locLsnr = null;
+
+        EventListener locTransLsnr = null;
+
+        CacheEntryEventSerializableFilter rmtFilter = null;
+
+        Factory<? extends IgniteClosure> rmtTransFactory = null;
+
+        if (qry instanceof ContinuousQuery) {
+            ContinuousQuery<K, V> qry0 = (ContinuousQuery<K, V>)qry;
+
+            if (qry0.getLocalListener() == null)
+                throw new IgniteException("Mandatory local listener is not set 
for the query: " + qry);
+
+            if (qry0.getRemoteFilter() != null && 
qry0.getRemoteFilterFactory() != null)
+                throw new IgniteException("Should be used either RemoterFilter 
or RemoteFilterFactory.");
 
-        if (qry.getLocalListener() == null)
-            throw new IgniteException("Mandatory local listener is not set for 
the query: " + qry);
+            locLsnr = qry0.getLocalListener();
 
-        if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != 
null)
-            throw new IgniteException("Should be used either RemoterFilter or 
RemoteFilterFactory.");
+            rmtFilter = qry0.getRemoteFilter();
+        }
+        else {
+            ContinuousQueryWithTransformer<K, V, ?> qry0 = 
(ContinuousQueryWithTransformer<K, V, ?>)qry;
+
+            if (qry0.getLocalListener() == null)
+                throw new IgniteException("Mandatory local transformed event 
listener is not set for the query: " + qry);
+
+            if (qry0.getRemoteTransformerFactory() == null)
+                throw new IgniteException("Mandatory RemoteTransformerFactory 
is not set for the query: " + qry);
+
+            Collection<ClusterNode> nodes = context().grid().cluster().nodes();
+
+            for (ClusterNode node : nodes) {
+                if (node.version().compareTo(CONT_QRY_WITH_TRANSFORMER_SINCE) 
< 0) {
+                    throw new IgniteException("Can't start 
ContinuousQueryWithTransformer, " +
+                        "because some nodes in cluster doesn't support this 
feature: " + node);
+                }
+            }
+
+            locTransLsnr = qry0.getLocalListener();
+
+            rmtTransFactory = qry0.getRemoteTransformerFactory();
+        }
 
         try {
             final UUID routineId = ctx.continuousQueries().executeQuery(
-                qry.getLocalListener(),
-                qry.getRemoteFilter(),
+                locLsnr,
+                locTransLsnr,
+                rmtFilter,
                 qry.getRemoteFilterFactory(),
+                rmtTransFactory,
                 qry.getPageSize(),
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),
@@ -596,8 +654,8 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
 
             boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
 
-            if (qry instanceof ContinuousQuery)
-                return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, 
V>)qry, qry.isLocal(), keepBinary);
+            if (qry instanceof ContinuousQuery || qry instanceof 
ContinuousQueryWithTransformer)
+                return 
(QueryCursor<R>)queryContinuous((AbstractContinuousQuery)qry, qry.isLocal(), 
keepBinary);
 
             if (qry instanceof SqlQuery)
                 return 
(QueryCursor<R>)ctx.kernalContext().query().querySql(ctx, (SqlQuery)qry, 
keepBinary);
@@ -688,8 +746,8 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
      */
     private void validate(Query qry) {
         if (!QueryUtils.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) 
&&
-            !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery) && 
!(qry instanceof SqlQuery) &&
-            !(qry instanceof SqlFieldsQuery))
+            !(qry instanceof ContinuousQuery) && !(qry instanceof 
ContinuousQueryWithTransformer) &&
+            !(qry instanceof SpiQuery) && !(qry instanceof SqlQuery) && !(qry 
instanceof SqlFieldsQuery))
             throw new CacheException("Indexing is disabled for cache: " + 
ctx.cache().name() +
                     ". Use setIndexedTypes or setTypeMetadata methods on 
CacheConfiguration to enable.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 7e3f0b5..88005d0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -28,7 +28,6 @@ import 
org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -282,9 +281,8 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
      * @throws IgniteCheckedException In case of error.
      */
     void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
-        assert key != null;
-
-        key.prepareMarshal(cctx.cacheObjectContext());
+        if (key != null)
+            key.prepareMarshal(cctx.cacheObjectContext());
 
         if (newVal != null)
             newVal.prepareMarshal(cctx.cacheObjectContext());
@@ -300,7 +298,8 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
      */
     void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws 
IgniteCheckedException {
         if (!isFiltered()) {
-            key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+            if (key != null)
+                key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
 
             if (newVal != null)
                 newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 59b2a68..f0cd7ca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -39,6 +40,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import 
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.CacheQueryReadEvent;
@@ -69,6 +71,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -91,6 +94,19 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     static final int LSNR_MAX_BUF_SIZE =
         
IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_LISTENER_MAX_BUFFER_SIZE",
 10_000);
 
+    /**
+     * Transformer implementation for processing received remote events.
+     * They are already transformed so we simply return transformed value for 
event.
+     */
+    private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, 
?> returnValTrans =
+        new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, Object>() 
{
+            @Override public Object apply(CacheEntryEvent<? extends K, ? 
extends V> evt) {
+                assert evt.getKey() == null;
+
+                return evt.getValue();
+            }
+        };
+
     /** Cache name. */
     private String cacheName;
 
@@ -161,7 +177,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     private transient boolean ignoreClsNotFound;
 
     /** */
-    private transient boolean asyncCb;
+    transient boolean asyncCb;
 
     /** */
     private transient UUID nodeId;
@@ -196,14 +212,13 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     public CacheContinuousQueryHandler(
         String cacheName,
         Object topic,
-        CacheEntryUpdatedListener<K, V> locLsnr,
-        CacheEntryEventSerializableFilter<K, V> rmtFilter,
+        @Nullable CacheEntryUpdatedListener<K, V> locLsnr,
+        @Nullable CacheEntryEventSerializableFilter<K, V> rmtFilter,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
         boolean ignoreClsNotFound) {
         assert topic != null;
-        assert locLsnr != null;
 
         this.cacheName = cacheName;
         this.topic = topic;
@@ -505,14 +520,14 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
                         if (asyncCb) {
                             ctx.asyncCallbackPool().execute(new Runnable() {
                                 @Override public void run() {
-                                    locLsnr.onUpdated(evts);
+                                    notifyLocalListener(evts, 
getTransformer());
                                 }
                             }, part);
                         }
                         else
                             skipCtx.addProcessClosure(new Runnable() {
                                 @Override public void run() {
-                                    locLsnr.onUpdated(evts);
+                                    notifyLocalListener(evts, 
getTransformer());
                                 }
                             });
                     }
@@ -584,6 +599,20 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     }
 
     /**
+     * @return Cache entry event transformer.
+     */
+    @Nullable protected IgniteClosure<CacheEntryEvent<? extends K, ? extends 
V>, ?> getTransformer() {
+        return null;
+    }
+
+    /**
+     * @return Local listener of transformed events.
+     */
+    @Nullable protected EventListener<?> localTransformedEventListener() {
+        return null;
+    }
+
+    /**
      * @param cctx Context.
      * @param nodeId ID of the node that started routine.
      * @param entry Entry.
@@ -752,8 +781,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
             }
         }
 
-        if (!entries0.isEmpty())
-            locLsnr.onUpdated(entries0);
+        notifyLocalListener(entries0, returnValTrans);
     }
 
     /**
@@ -825,26 +853,31 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
             if (cctx == null)
                 return;
 
-            final CacheContinuousQueryEntry entry = evt.entry();
+            CacheContinuousQueryEntry entry = evt.entry();
+
+            IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans 
= getTransformer();
 
             if (loc) {
                 if (!locCache) {
                     Collection<CacheEntryEvent<? extends K, ? extends V>> evts 
= handleEvent(ctx, entry);
 
-                    if (!evts.isEmpty())
-                        locLsnr.onUpdated(evts);
+                    notifyLocalListener(evts, trans);
 
                     if (!internal && !skipPrimaryCheck)
                         sendBackupAcknowledge(ackBuf.onAcknowledged(entry), 
routineId, ctx);
                 }
                 else {
                     if (!entry.isFiltered())
-                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? 
extends V>>asList(evt));
+                        notifyLocalListener(F.<CacheEntryEvent<? extends K, ? 
extends V>>asList(evt), trans);
                 }
             }
             else {
-                if (!entry.isFiltered())
+                if (!entry.isFiltered()) {
+                    if (trans != null)
+                        entry = transformToEntry(trans, evt);
+
                     prepareEntry(cctx, nodeId, entry);
+                }
 
                 Object entryOrList = handleEntry(cctx, entry);
 
@@ -889,6 +922,28 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
     }
 
     /**
+     * Notifies local listener.
+     *
+     * @param evts Events.
+     * @param trans Transformer
+     */
+    private void notifyLocalListener(Collection<CacheEntryEvent<? extends K, ? 
extends V>> evts,
+        @Nullable IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> 
trans) {
+        EventListener locTransLsnr = localTransformedEventListener();
+
+        assert (locLsnr != null && locTransLsnr == null) || (locLsnr == null 
&& locTransLsnr != null);
+
+        if (F.isEmpty(evts))
+            return;
+
+        if (locLsnr != null)
+            locLsnr.onUpdated(evts);
+
+        if (locTransLsnr != null)
+            locTransLsnr.onUpdated(transform(trans, evts));
+    }
+
+    /**
      * @return Task name.
      */
     private String taskName() {
@@ -1258,4 +1313,72 @@ public class CacheContinuousQueryHandler<K, V> 
implements GridContinuousHandler
         }
     }
 
+    /**
+     * @param trans Transformer.
+     * @param evts Source events.
+     * @return Collection of transformed values.
+     */
+    private Iterable transform(final IgniteClosure<CacheEntryEvent<? extends 
K, ? extends V>, ?> trans,
+        Collection<CacheEntryEvent<? extends K, ? extends V>> evts) {
+        final Iterator<CacheEntryEvent<? extends K, ? extends V>> iter = 
evts.iterator();
+
+        return new Iterable() {
+            @NotNull @Override public Iterator iterator() {
+                return new Iterator() {
+                    @Override public boolean hasNext() {
+                        return iter.hasNext();
+                    }
+
+                    @Override public Object next() {
+                        return transform(trans, iter.next());
+                    }
+                };
+            }
+        };
+    }
+
+    /**
+     * Transform event data with {@link #getTransformer()} if exists.
+     *
+     * @param trans Transformer.
+     * @param evt Event to transform.
+     * @return Entry contains only transformed data if transformer exists. 
Unchanged event if transformer is not set.
+     * @see #getTransformer()
+     */
+    private CacheContinuousQueryEntry 
transformToEntry(IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> 
trans,
+        CacheContinuousQueryEvent<? extends K, ? extends V> evt) {
+        Object transVal = transform(trans, evt);
+
+        return new CacheContinuousQueryEntry(evt.entry().cacheId(),
+            evt.entry().eventType(),
+            null,
+            transVal == null ? null : 
cacheContext(ctx).toCacheObject(transVal),
+            null,
+            evt.entry().isKeepBinary(),
+            evt.entry().partition(),
+            evt.entry().updateCounter(),
+            evt.entry().topologyVersion(),
+            evt.entry().flags());
+    }
+
+    /**
+     * @param trans Transformer.
+     * @param evt Event.
+     * @return Transformed value.
+     */
+    private Object transform(IgniteClosure<CacheEntryEvent<? extends K, ? 
extends V>, ?> trans,
+        CacheEntryEvent<? extends K, ? extends V> evt) {
+        assert trans != null;
+
+        Object transVal = null;
+
+        try {
+            transVal = trans.apply(evt);
+        }
+        catch (Exception e) {
+            U.error(log, e);
+        }
+
+        return transVal;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
index e48d22e..86c1ae1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV2.java
@@ -41,7 +41,7 @@ public class CacheContinuousQueryHandlerV2<K, V> extends 
CacheContinuousQueryHan
     private static final long serialVersionUID = 0L;
 
     /** Remote filter factory. */
-    private Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
+    Factory<? extends CacheEntryEventFilter> rmtFilterFactory;
 
     /** Deployable object for filter factory. */
     private CacheContinuousQueryDeployableObject rmtFilterFactoryDep;
@@ -74,8 +74,8 @@ public class CacheContinuousQueryHandlerV2<K, V> extends 
CacheContinuousQueryHan
     public CacheContinuousQueryHandlerV2(
         String cacheName,
         Object topic,
-        CacheEntryUpdatedListener<K, V> locLsnr,
-        Factory<? extends CacheEntryEventFilter<K, V>> rmtFilterFactory,
+        @Nullable CacheEntryUpdatedListener<K, V> locLsnr,
+        @Nullable Factory<? extends CacheEntryEventFilter<K, V>> 
rmtFilterFactory,
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
@@ -89,9 +89,6 @@ public class CacheContinuousQueryHandlerV2<K, V> extends 
CacheContinuousQueryHan
             sync,
             ignoreExpired,
             ignoreClsNotFound);
-
-        assert rmtFilterFactory != null;
-
         this.rmtFilterFactory = rmtFilterFactory;
 
         if (types != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
new file mode 100644
index 0000000..14d5605
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryEventFilter;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import 
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteClosure;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query handler V3 version.
+ * Contains {@link Factory} for remote transformer and {@link EventListener}.
+ *
+ * @see ContinuousQueryWithTransformer
+ */
+public class CacheContinuousQueryHandlerV3<K, V> extends 
CacheContinuousQueryHandlerV2<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Remote transformer. */
+    private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? 
extends V>, ?>> rmtTransFactory;
+
+    /** Deployable object for transformer. */
+    private CacheContinuousQueryDeployableObject rmtTransFactoryDep;
+
+    /** Remote transformer. */
+    private transient IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, 
?> rmtTrans;
+
+    /** Local listener for transformed events. */
+    private transient EventListener<?> locTransLsnr;
+
+    /**
+     * Empty constructor.
+     */
+    public CacheContinuousQueryHandlerV3() {
+        super();
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param topic Topic.
+     * @param locTransLsnr Local listener of transformed events
+     * @param rmtFilterFactory Remote filter factory.
+     * @param rmtTransFactory Remote transformer factory.
+     * @param oldValRequired OldValRequired flag.
+     * @param sync Sync flag.
+     * @param ignoreExpired IgnoreExpired flag.
+     * @param ignoreClsNotFound IgnoreClassNotFoundException flag.
+     */
+    public CacheContinuousQueryHandlerV3(
+        String cacheName,
+        Object topic,
+        EventListener<?> locTransLsnr,
+        @Nullable Factory<? extends CacheEntryEventFilter<K, V>> 
rmtFilterFactory,
+        Factory<? extends IgniteClosure<CacheEntryEvent<? extends K, ? extends 
V>, ?>> rmtTransFactory,
+        boolean oldValRequired,
+        boolean sync,
+        boolean ignoreExpired,
+        boolean ignoreClsNotFound) {
+        super(
+            cacheName,
+            topic,
+            null,
+            rmtFilterFactory,
+            oldValRequired,
+            sync,
+            ignoreExpired,
+            ignoreClsNotFound,
+            null);
+
+        assert locTransLsnr != null;
+        assert rmtTransFactory != null;
+
+        this.locTransLsnr = locTransLsnr;
+        this.rmtTransFactory = rmtTransFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteClosure<CacheEntryEvent<? extends K, ? extends 
V>, ?> getTransformer() {
+        if (rmtTrans == null && rmtTransFactory != null)
+            rmtTrans = rmtTransFactory.create();
+
+        return rmtTrans;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected EventListener<?> localTransformedEventListener() {
+        return locTransLsnr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEntryEventFilter<K, V> getEventFilter() {
+        if (rmtFilterFactory == null)
+            return null;
+
+        return super.getEventFilter();
+    }
+
+    /** {@inheritDoc} */
+    @Override public RegisterStatus register(UUID nodeId, UUID routineId,
+        GridKernalContext ctx) throws IgniteCheckedException {
+        final IgniteClosure trans = getTransformer();
+
+        if (trans != null)
+            ctx.resource().injectGeneric(trans);
+
+        if (locTransLsnr != null) {
+            ctx.resource().injectGeneric(locTransLsnr);
+
+            asyncCb = U.hasAnnotation(locTransLsnr, IgniteAsyncCallback.class);
+        }
+
+        return super.register(nodeId, routineId, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pMarshal(GridKernalContext ctx) throws 
IgniteCheckedException {
+        super.p2pMarshal(ctx);
+
+        if (rmtTransFactory != null && !U.isGrid(rmtTransFactory.getClass()))
+            rmtTransFactoryDep = new 
CacheContinuousQueryDeployableObject(rmtTransFactory, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) 
throws IgniteCheckedException {
+        super.p2pUnmarshal(nodeId, ctx);
+
+        if (rmtTransFactoryDep != null)
+            rmtTransFactory = rmtTransFactoryDep.unmarshal(nodeId, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+
+        boolean b = rmtTransFactoryDep != null;
+
+        out.writeBoolean(b);
+
+        if (b)
+            out.writeObject(rmtTransFactoryDep);
+        else
+            out.writeObject(rmtTransFactory);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+        super.readExternal(in);
+
+        boolean b = in.readBoolean();
+
+        if (b)
+            rmtTransFactoryDep = 
(CacheContinuousQueryDeployableObject)in.readObject();
+        else
+            rmtTransFactory = (Factory<? extends 
IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?>>)in.readObject();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 628111b..1e131ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -49,6 +49,7 @@ import 
org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import 
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -57,8 +58,8 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI2;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteAsyncCallback;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteOutClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.security.SecurityPermission;
@@ -434,6 +436,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
     /**
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
+     * @param rmtFilterFactory Remote filter factory
      * @param bufSize Buffer size.
      * @param timeInterval Time interval.
      * @param autoUnsubscribe Auto unsubscribe flag.
@@ -441,9 +444,11 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
      * @return Continuous routine ID.
      * @throws IgniteCheckedException In case of error.
      */
-    public UUID executeQuery(final CacheEntryUpdatedListener locLsnr,
+    public UUID executeQuery(@Nullable final CacheEntryUpdatedListener locLsnr,
+        @Nullable final EventListener locTransLsnr,
         @Nullable final CacheEntryEventSerializableFilter rmtFilter,
         @Nullable final Factory<? extends CacheEntryEventFilter> 
rmtFilterFactory,
+        @Nullable final Factory<? extends IgniteClosure> rmtTransFactory,
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
@@ -453,12 +458,30 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
     {
         IgniteOutClosure<CacheContinuousQueryHandler> clsr;
 
-        if (rmtFilterFactory != null)
+        if (rmtTransFactory != null) {
+            clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
+                @Override public CacheContinuousQueryHandler apply() {
+                    assert locTransLsnr != null;
+
+                    return new CacheContinuousQueryHandlerV3(
+                        cctx.name(),
+                        TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
+                        locTransLsnr,
+                        rmtFilterFactory,
+                        rmtTransFactory,
+                        true,
+                        false,
+                        !includeExpired,
+                        false);
+                }
+            };
+        }
+        else if (rmtFilterFactory != null) {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
-                    CacheContinuousQueryHandler hnd;
+                    assert locLsnr != null;
 
-                    hnd = new CacheContinuousQueryHandlerV2(
+                    return new CacheContinuousQueryHandlerV2(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
                         locLsnr,
@@ -468,13 +491,15 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                         !includeExpired,
                         false,
                         null);
-
-                    return hnd;
                 }
             };
-        else
+        }
+        else {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
+                    assert locLsnr != null;
+                    assert locTransLsnr == null;
+
                     return new CacheContinuousQueryHandler(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
@@ -486,6 +511,7 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
                         false);
                 }
             };
+        }
 
         return executeQuery0(
             locLsnr,
@@ -676,6 +702,8 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         }
 
         if (notifyExisting) {
+            assert locLsnr != null : "Local listener can't be null if 
notification for existing entries are enabled";
+
             final Iterator<CacheDataRow> it = 
cctx.offheap().cacheIterator(cctx.cacheId(),
                 true,
                 true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
index 142ff35..9a815de 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryListenerException;
@@ -51,8 +52,11 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.AbstractContinuousQuery;
 import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
+import 
org.apache.ignite.cache.query.ContinuousQueryWithTransformer.EventListener;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
@@ -62,6 +66,7 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -464,7 +469,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
         IgniteCache<QueryTestKey, QueryTestValue> cache = 
grid(getClientIndex()).createCache(ccfg);
 
         try {
-            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new 
ContinuousQuery<>();
+            AbstractContinuousQuery<QueryTestKey, QueryTestValue> qry = 
createQuery();
 
             final List<CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue>> evts =
                 new CopyOnWriteArrayList<>();
@@ -472,13 +477,20 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             if (noOpFilterFactory() != null)
                 qry.setRemoteFilterFactory(noOpFilterFactory());
 
-            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, 
QueryTestValue>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends QueryTestKey,
-                    ? extends QueryTestValue>> events) throws 
CacheEntryListenerException {
-                    for (CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue> e : events)
-                        evts.add(e);
-                }
-            });
+            if (qry instanceof ContinuousQuery) {
+                ((ContinuousQuery<QueryTestKey, 
QueryTestValue>)qry).setLocalListener(new 
CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends QueryTestKey,
+                        ? extends QueryTestValue>> events) throws 
CacheEntryListenerException {
+                        for (CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue> e : events)
+                            evts.add(e);
+                    }
+                });
+            }
+            else if (qry instanceof ContinuousQueryWithTransformer)
+                initQueryWithTransformer(
+                    (ContinuousQueryWithTransformer<QueryTestKey, 
QueryTestValue, CacheEntryEvent>)qry, evts);
+            else
+                fail("Unknown query type");
 
             QueryTestKey key = new QueryTestKey(1);
 
@@ -595,7 +607,7 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
         IgniteCache<QueryTestKey, QueryTestValue> cache = 
grid(getClientIndex()).createCache(ccfg);
 
         try {
-            ContinuousQuery<QueryTestKey, QueryTestValue> qry = new 
ContinuousQuery<>();
+            AbstractContinuousQuery<QueryTestKey, QueryTestValue> qry = 
createQuery();
 
             final List<CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue>> evts =
                 new CopyOnWriteArrayList<>();
@@ -603,13 +615,20 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             if (noOpFilterFactory() != null)
                 qry.setRemoteFilterFactory(noOpFilterFactory());
 
-            qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, 
QueryTestValue>() {
-                @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends QueryTestKey,
-                    ? extends QueryTestValue>> events) throws 
CacheEntryListenerException {
-                    for (CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue> e : events)
-                        evts.add(e);
-                }
-            });
+            if (qry instanceof ContinuousQuery) {
+                ((ContinuousQuery<QueryTestKey, 
QueryTestValue>)qry).setLocalListener(new 
CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends QueryTestKey,
+                        ? extends QueryTestValue>> events) throws 
CacheEntryListenerException {
+                        for (CacheEntryEvent<? extends QueryTestKey, ? extends 
QueryTestValue> e : events)
+                            evts.add(e);
+                    }
+                });
+            }
+            else if (qry instanceof ContinuousQueryWithTransformer)
+                initQueryWithTransformer(
+                    (ContinuousQueryWithTransformer<QueryTestKey, 
QueryTestValue, CacheEntryEvent>)qry, evts);
+            else
+                fail("Unknown query type");
 
             Map<QueryTestKey, QueryTestValue> map = new TreeMap<>();
 
@@ -834,16 +853,23 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             Collection<QueryCursor<?>> curs = new ArrayList<>();
 
             if (deploy == CLIENT) {
-                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+                AbstractContinuousQuery<Object, Object> qry = createQuery();
 
                 final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new 
ArrayBlockingQueue<>(50_000);
 
-                qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
-                    @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                        for (CacheEntryEvent<?, ?> evt : evts)
-                            evtsQueue.add(evt);
-                    }
-                });
+                if (qry instanceof ContinuousQuery) {
+                    ((ContinuousQuery<Object, 
Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                        @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    });
+                }
+                else if (qry instanceof ContinuousQueryWithTransformer)
+                    initQueryWithTransformer(
+                        (ContinuousQueryWithTransformer<Object, Object, 
CacheEntryEvent>)qry, evtsQueue);
+                else
+                    fail("Unknown query type");
 
                 evtsQueues.add(evtsQueue);
 
@@ -852,16 +878,23 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
                 curs.add(cur);
             }
             else if (deploy == SERVER) {
-                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+                AbstractContinuousQuery<Object, Object> qry = createQuery();
 
                 final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new 
ArrayBlockingQueue<>(50_000);
 
-                qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
-                    @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                        for (CacheEntryEvent<?, ?> evt : evts)
-                            evtsQueue.add(evt);
-                    }
-                });
+                if (qry instanceof ContinuousQuery) {
+                    ((ContinuousQuery<Object, 
Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                        @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    });
+                }
+                else if (qry instanceof ContinuousQueryWithTransformer)
+                    initQueryWithTransformer(
+                        (ContinuousQueryWithTransformer<Object, Object, 
CacheEntryEvent>)qry, evtsQueue);
+                else
+                    fail("Unknown query type");
 
                 evtsQueues.add(evtsQueue);
 
@@ -871,16 +904,23 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
             }
             else {
                 for (int i = 0; i <= getServerNodeCount(); i++) {
-                    ContinuousQuery<Object, Object> qry = new 
ContinuousQuery<>();
+                    AbstractContinuousQuery<Object, Object> qry = 
createQuery();
 
                     final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new 
ArrayBlockingQueue<>(50_000);
 
-                    qry.setLocalListener(new CacheEntryUpdatedListener<Object, 
Object>() {
-                        @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-                            for (CacheEntryEvent<?, ?> evt : evts)
-                                evtsQueue.add(evt);
-                        }
-                    });
+                    if (qry instanceof ContinuousQuery) {
+                        ((ContinuousQuery<Object, 
Object>)qry).setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                            @Override public void 
onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                                for (CacheEntryEvent<?, ?> evt : evts)
+                                    evtsQueue.add(evt);
+                            }
+                        });
+                    }
+                    else if (qry instanceof ContinuousQueryWithTransformer)
+                        initQueryWithTransformer(
+                            (ContinuousQueryWithTransformer<Object, Object, 
CacheEntryEvent>)qry, evtsQueue);
+                    else
+                        fail("Unknown query type");
 
                     evtsQueues.add(evtsQueue);
 
@@ -1417,6 +1457,15 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
     }
 
     /**
+     * @param <K> Key type.
+     * @param <V> Value type.
+     * @return New instance of continuous query.
+     */
+    protected <K, V> AbstractContinuousQuery<K, V> createQuery() {
+        return new ContinuousQuery<>();
+    }
+
+    /**
      *
      */
     private static class TestStoreFactory implements 
Factory<CacheStore<Object, Object>> {
@@ -1586,4 +1635,37 @@ public class CacheContinuousQueryRandomOperationsTest 
extends GridCommonAbstract
     protected enum ContinuousDeploy {
         CLIENT, SERVER, ALL
     }
+
+    /**
+     * Initialize continuous query with transformer.
+     * Query will accumulate all events in accumulator.
+     *
+     * @param qry Continuous query.
+     * @param acc Accumulator for events.
+     * @param <K> Key type.
+     * @param <V> Value type.
+     */
+    private <K, V> void initQueryWithTransformer(
+        ContinuousQueryWithTransformer<K, V, CacheEntryEvent> qry,
+        Collection<CacheEntryEvent<? extends K, ? extends V>> acc) {
+
+        IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, 
CacheEntryEvent> transformer =
+            new IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, 
CacheEntryEvent>() {
+                @Override public CacheEntryEvent apply(CacheEntryEvent<? 
extends K, ? extends V> event) {
+                    return event;
+                }
+            };
+
+        ContinuousQueryWithTransformer<K, V, CacheEntryEvent> qry0 =
+            (ContinuousQueryWithTransformer<K, V, CacheEntryEvent>)qry;
+
+        
qry0.setRemoteTransformerFactory(FactoryBuilder.factoryOf(transformer));
+
+        qry0.setLocalListener(new EventListener<CacheEntryEvent>() {
+            @Override public void onUpdated(Iterable<? extends 
CacheEntryEvent> events) {
+                for (CacheEntryEvent e : events)
+                    acc.add(e);
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a83f3038/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java
new file mode 100644
index 0000000..8d8b4c1
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousWithTransformerClientSelfTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.Ignite;
+
+/**
+ */
+public class CacheContinuousWithTransformerClientSelfTest extends 
CacheContinuousWithTransformerReplicatedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        client = true;
+
+        startGrid("client");
+
+        client = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Ignite gridToRunQuery() throws Exception {
+        return grid("client");
+    }
+}

Reply via email to