Nikolay,

We already have the following method for queries with transformer. It
currently throws exception for ContinuousQuery.

<T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer)

Would it be possible to utilize it instead of creating new API?

-Val

On Wed, Jul 26, 2017 at 5:26 AM, Николай Ижиков <[email protected]>
wrote:

> Hello, Igniters.
>
> I'm working on IGNITE-425 [1] issue.
> I made a couple of changes in my branch [2] so I want to confirm that
> changes with community before moving forward:
>
> Text of issue:
>
> ```
> Currently if updated entry passes the filter, it is sent to node initiated
> the query entirely.
> It would be good to provide user with the ability to transform entry and,
> for example,
> select only fields that are important. This may bring huge economy to
> traffic and lower GC pressure as well.
> ```
>
> 1. I create new class ContinuousQueryWithTransformer extends Query:
>
> Reasons to create entirely new class without extending ContinuousQuery:
>
>     a. ContinuousQuery is final so user can't extends it. I don't want to
> change that.
>     b. ContinuousQuery contains some deprecated methods(setRemoteFilter) so
> with new class we can get rid of them.
>     c. Such public API design disallow usage of existing localEventListener
> with new transformedEventListenr in compile time.
>
> ```
>     public final class ContinuousQueryWithTransformer<K, V, T> extends
> Query<Cache.Entry<K, V>> {
>         public ContinuousQueryWithTransformer<K, V, T>
> setRemoteFilterFactory(Factory<? extends CacheEntryEventFilter<K, V>>
> rmtFilterFactory) { /**/ }
>
>         public ContinuousQueryWithTransformer<K, V, T>
> setRemoteTransformerFactory(Factory<? extends IgniteBiClosure<K, V, T>>
> factory) { /**/ }
>
>         public ContinuousQueryWithTransformer<K, V, T>
> setLocalTransformedEventListener(TransformedEventListener<T>
> locTransEvtLsnr) { /**/ }
>
>         public interface TransformedEventListener<T> {
>             void onUpdated(Iterable<? extends T> events) throws
> CacheEntryListenerException;
>         }
>     }
> ```
>
> 2. I want to edit all tests from package
> `core/src/test/java/org/apach/ignite/internal/processors/
> cache/query/continuous/`
> to ensure my implementation fully support existing tests.
> I want to make each test can work both for regular ContinousQuery and
> ContinuousQueryWithTransformer:
>
> Existing test:
>
> ```
>         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
>
>         qry.setLocalListener(new CacheEntryUpdatedListener<Object,
> Object>() {
>             @Override public void onUpdated(Iterable<CacheEntryEvent<?,
> ?>>
> evts) {
>                 for (CacheEntryEvent evt : evts) {
>                     if ((Integer)evt.getValue() >= 0)
>                         evtCnt.incrementAndGet();
>                 }
>             }
>         });
>
> ```
>
> To be:
>
> ```
>         Query qry = createContinuousQuery();
>
>         setLocalListener(qry, new CI1<T2<Object, Object>>() {
>             @Override public void apply(T2<Object, Object> e) {
>                 if ((Integer)e.getValue() >= 0)
>                     evtCnt.incrementAndGet();
>             }
>         });
> ```
>
> Base class to support setLocalListener:
>
> ```
>     protected <K, V> void setLocalListener(Query q, CI1<T2<K, V>> lsnrClsr)
> {
>         if (isContinuousWithTransformer()) {
>             ((ContinuousQueryWithTransformer)q)
>                 .setLocalTransformedEventListener(new
> TransformedEventListenerImpl(lsnrClsr));
>         } else
>             ((ContinuousQuery)q).setLocalListener(new
> CacheInvokeListener(lsnrClsr));
>     }
>
>     protected static class CacheInvokeListener<K, V>  {
>         private CI1<T2<K, V>> clsr;
>
>         @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
> K, ? extends V>> events)
>             throws CacheEntryListenerException {
>             for (CacheEntryEvent<? extends K, ? extends V> e : events)
>                 clsr.apply(ignite, new T2<>(e.getKey(), e.getValue()));
>         }
>     }
>
>     protected static class TransformedEventListenerImpl<K, V> implements
> TransformedEventListener {
>         private IgniteBiInClosure<Ignite, T2<K, V>> clsr;
>
>         @Override public void onUpdated(Iterable evts) throws
> CacheEntryListenerException {
>             for (Object e : evts) {
>                 clsr.apply((T2)e);
>             }
>         }
>     }
> ```
>
> Thoughts?
>
> [1] https://issues.apache.org/jira/browse/IGNITE-425
> [2] https://github.com/nizhikov/ignite/pull/9/files
>
> --
> Nikolay Izhikov
> [email protected]
>

Reply via email to