Repository: ignite Updated Branches: refs/heads/master d98cd3093 -> 8f697876a
IGNITE-2546 - Transformers for SCAN queries. Fixes #949. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/407071e4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/407071e4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/407071e4 Branch: refs/heads/master Commit: 407071e466d1a4fcec86571d4791ace8bb206f53 Parents: 0465874 Author: Eduard Shangareev <[email protected]> Authored: Mon Aug 29 17:32:31 2016 -0700 Committer: Valentin Kulichenko <[email protected]> Committed: Mon Aug 29 17:32:31 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 15 + .../processors/cache/IgniteCacheProxy.java | 112 +++- .../processors/cache/query/CacheQuery.java | 11 +- .../query/GridCacheDistributedQueryManager.java | 22 +- .../cache/query/GridCacheLocalQueryManager.java | 3 +- .../cache/query/GridCacheQueryAdapter.java | 69 ++- .../cache/query/GridCacheQueryBean.java | 8 +- .../cache/query/GridCacheQueryInfo.java | 8 +- .../cache/query/GridCacheQueryManager.java | 125 ++-- .../cache/query/GridCacheQueryRequest.java | 6 +- .../GridCacheQueryTransformerSelfTest.java | 570 +++++++++++++++++++ .../multijvm/IgniteCacheProcessProxy.java | 6 + .../IgniteCacheQuerySelfTestSuite.java | 2 + 13 files changed, 832 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index 40eedaf..2290fc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -56,6 +56,7 @@ import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.transactions.TransactionHeuristicException; @@ -295,6 +296,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS public <R> QueryCursor<R> query(Query<R> qry); /** + * Queries the cache transforming the entries on the server nodes. Can be used, for example, + * to avoid network overhead in case only one field out of the large is required by client. + * <p> + * Currently transformers are supported ONLY for {@link ScanQuery}. Passing any other + * subclass of {@link Query} interface to this method will end up with + * {@link UnsupportedOperationException}. + * + * @param qry Query. + * @param transformer Transformer. + * @return Cursor. + */ + public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer); + + /** * Allows for iteration over local cache entries. * * @param peekModes Peek modes. http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 0d7bc6a..9b26c1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -82,6 +82,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.plugin.security.SecurityPermission; @@ -466,50 +467,74 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } /** - * @param filter Filter. + * @param scanQry ScanQry. + * @param transformer Transformer * @param grp Optional cluster group. * @return Cursor. */ @SuppressWarnings("unchecked") - private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp) + private <T, R> QueryCursor<R> query( + final ScanQuery scanQry, + @Nullable final IgniteClosure<T, R> transformer, + @Nullable ClusterGroup grp) throws IgniteCheckedException { - final CacheQuery<Map.Entry<K, V>> qry; + + final CacheQuery<R> qry; boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary(); - if (filter instanceof ScanQuery) { - IgniteBiPredicate<K, V> p = ((ScanQuery)filter).getFilter(); + IgniteBiPredicate<K, V> p = scanQry.getFilter(); - qry = ctx.queries().createScanQuery(p, ((ScanQuery)filter).getPartition(), isKeepBinary); + qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary); - if (grp != null) - qry.projection(grp); + if (grp != null) + qry.projection(grp); - final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx, - new IgniteOutClosureX<GridCloseableIterator<Entry<K, V>>>() { - @Override public GridCloseableIterator<Entry<K, V>> applyx() throws IgniteCheckedException { - final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery(); + final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(ctx, + new IgniteOutClosureX<GridCloseableIterator<R>>() { + @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException { + final GridCloseableIterator iter0 = qry.executeScanQuery(); - return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() { - @Override protected Cache.Entry<K, V> onNext() throws IgniteCheckedException { - Map.Entry<K, V> next = iter0.nextX(); + final boolean needToConvert = transformer == null; - return new CacheEntryImpl<>(next.getKey(), next.getValue()); - } + return new GridCloseableIteratorAdapter<R>() { + @Override protected R onNext() throws IgniteCheckedException { + Object next = iter0.nextX(); - @Override protected boolean onHasNext() throws IgniteCheckedException { - return iter0.hasNextX(); - } + if (needToConvert) { + Map.Entry<K, V> entry = (Map.Entry<K, V>)next; - @Override protected void onClose() throws IgniteCheckedException { - iter0.close(); + return (R) new CacheEntryImpl<>(entry.getKey(), entry.getValue()); } - }; - } - }, false); - return new QueryCursorImpl<>(iter); - } + return (R)next; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return iter0.hasNextX(); + } + + @Override protected void onClose() throws IgniteCheckedException { + iter0.close(); + } + }; + } + }, false); + + return new QueryCursorImpl<>(iter); + } + + /** + * @param filter Filter. + * @param grp Optional cluster group. + * @return Cursor. + */ + @SuppressWarnings("unchecked") + private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp) + throws IgniteCheckedException { + final CacheQuery<Map.Entry<K, V>> qry; + + boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary(); final CacheQueryFuture<Map.Entry<K, V>> fut; @@ -692,6 +717,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p); } + if (qry instanceof ScanQuery) + return query((ScanQuery)qry, null, projection(qry.isLocal())); + return (QueryCursor<R>)query(qry, projection(qry.isLocal())); } catch (Exception e) { @@ -705,6 +733,36 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V } } + /** {@inheritDoc} */ + @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) { + A.notNull(qry, "qry"); + A.notNull(transformer, "transformer"); + + if (!(qry instanceof ScanQuery)) + throw new UnsupportedOperationException("Transformers are supported only for SCAN queries."); + + GridCacheGateway<K, V> gate = this.gate; + + CacheOperationContext prev = onEnter(gate, opCtx); + + try { + ctx.checkSecurity(SecurityPermission.CACHE_READ); + + validate(qry); + + return query((ScanQuery<K, V>)qry, transformer, projection(qry.isLocal())); + } + catch (Exception e) { + if (e instanceof CacheException) + throw (CacheException)e; + + throw new CacheException(e); + } + finally { + onLeave(gate, prev); + } + } + /** * @return {@code true} If this is a replicated cache and we are on a data node. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 47c6e89..3fa041b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -273,15 +273,6 @@ public interface CacheQuery<T> { public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args); /** - * Executes the query the same way as {@link #execute(Object...)} method but transforms result remotely. - * - * @param rmtTransform Remote transformer. - * @param args Optional arguments. - * @return Future for the query result. - */ - public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args); - - /** * Gets metrics for this query. * * @return Query metrics. @@ -296,5 +287,5 @@ public interface CacheQuery<T> { /** * @return Scan query iterator. */ - public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException; + public GridCloseableIterator executeScanQuery() throws IgniteCheckedException; } http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 5f6cb8f..d34047e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -253,7 +253,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage */ @Nullable private GridCacheQueryInfo distributedQueryInfo(UUID sndId, GridCacheQueryRequest req) { IgniteReducer<Object, Object> rdc = req.reducer(); - IgniteClosure<Object, Object> trans = req.transformer(); + IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)req.transformer(); ClusterNode sndNode = cctx.node(sndId); @@ -578,16 +578,16 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "serial"}) - @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(final GridCacheQueryAdapter qry, + @Override public GridCloseableIterator scanQueryDistributed(final GridCacheQueryAdapter qry, Collection<ClusterNode> nodes) throws IgniteCheckedException { assert !cctx.isLocal() : cctx.name(); assert qry.type() == GridCacheQueryType.SCAN: qry; - GridCloseableIterator<Map.Entry<K, V>> locIter0 = null; + GridCloseableIterator locIter0 = null; for (ClusterNode node : nodes) { if (node.isLocal()) { - locIter0 = (GridCloseableIterator)scanQueryLocal(qry, false); + locIter0 = scanQueryLocal(qry, false); Collection<ClusterNode> rmtNodes = new ArrayList<>(nodes.size() - 1); @@ -603,21 +603,21 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage } } - final GridCloseableIterator<Map.Entry<K, V>> locIter = locIter0; + final GridCloseableIterator locIter = locIter0; - final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null); + final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, qry.<K, V>transform(), null); - final CacheQueryFuture<Map.Entry<K, V>> fut = (CacheQueryFuture<Map.Entry<K, V>>)queryDistributed(bean, nodes); + final CacheQueryFuture fut = (CacheQueryFuture)queryDistributed(bean, nodes); - return new GridCloseableIteratorAdapter<Map.Entry<K, V>>() { + return new GridCloseableIteratorAdapter() { /** */ - private Map.Entry<K, V> cur; + private Object cur; - @Override protected Map.Entry<K, V> onNext() throws IgniteCheckedException { + @Override protected Object onNext() throws IgniteCheckedException { if (!onHasNext()) throw new NoSuchElementException(); - Map.Entry<K, V> e = cur; + Object e = cur; cur = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java index 183abde..147725b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.query; import java.util.Collection; import java.util.List; -import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; @@ -90,7 +89,7 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V } /** {@inheritDoc} */ - @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry, + @Override public GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry, Collection<ClusterNode> nodes) throws IgniteCheckedException { assert cctx.isLocal() : cctx.name(); http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index 90e14f4..f65b733 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -87,6 +87,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** */ private final IgniteBiPredicate<Object, Object> filter; + /** Transformer. */ + private IgniteClosure<?, ?> transform; + /** Partition. */ private Integer part; @@ -126,6 +129,39 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** * @param cctx Context. * @param type Query type. + * @param filter Scan filter. + * @param part Partition. + * @param keepBinary Keep binary flag. + */ + public GridCacheQueryAdapter(GridCacheContext<?, ?> cctx, + GridCacheQueryType type, + @Nullable IgniteBiPredicate<Object, Object> filter, + @Nullable IgniteClosure<Map.Entry, Object> transform, + @Nullable Integer part, + boolean keepBinary) { + assert cctx != null; + assert type != null; + assert part == null || part >= 0; + + this.cctx = cctx; + this.type = type; + this.filter = filter; + this.transform = transform; + this.part = part; + this.keepBinary = keepBinary; + + log = cctx.logger(getClass()); + + metrics = new GridCacheQueryMetricsAdapter(); + + this.incMeta = false; + this.clsName = null; + this.clause = null; + } + + /** + * @param cctx Context. + * @param type Query type. * @param clsName Class name. * @param clause Clause. * @param filter Scan filter. @@ -376,6 +412,14 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { } /** + * @return Transformer. + */ + @SuppressWarnings("unchecked") + @Nullable public <K, V> IgniteClosure<Map.Entry<K, V>, Object> transform() { + return (IgniteClosure<Map.Entry<K, V>, Object>) transform; + } + + /** * @return Partition. */ @Nullable public Integer partition() { @@ -402,17 +446,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** {@inheritDoc} */ @Override public CacheQueryFuture<T> execute(@Nullable Object... args) { - return execute(null, null, args); + return execute0(null, args); } /** {@inheritDoc} */ @Override public <R> CacheQueryFuture<R> execute(IgniteReducer<T, R> rmtReducer, @Nullable Object... args) { - return execute(rmtReducer, null, args); - } - - /** {@inheritDoc} */ - @Override public <R> CacheQueryFuture<R> execute(IgniteClosure<T, R> rmtTransform, @Nullable Object... args) { - return execute(null, rmtTransform, args); + return execute0(rmtReducer, args); } /** {@inheritDoc} */ @@ -427,13 +466,11 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** * @param rmtReducer Optional reducer. - * @param rmtTransform Optional transformer. * @param args Arguments. * @return Future. */ @SuppressWarnings({"IfMayBeConditional", "unchecked"}) - private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer, - @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) { + private <R> CacheQueryFuture<R> execute0(@Nullable IgniteReducer<T, R> rmtReducer, @Nullable Object... args) { assert type != SCAN : this; Collection<ClusterNode> nodes; @@ -455,7 +492,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { if (cctx.deploymentEnabled()) { try { - cctx.deploy().registerClasses(filter, rmtReducer, rmtTransform); + cctx.deploy().registerClasses(filter, rmtReducer); cctx.deploy().registerClasses(args); } catch (IgniteCheckedException e) { @@ -469,7 +506,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { taskHash = cctx.kernalContext().job().currentTaskNameHash(); final GridCacheQueryBean bean = new GridCacheQueryBean(this, (IgniteReducer<Object, Object>)rmtReducer, - (IgniteClosure<Object, Object>)rmtTransform, args); + null, args); final GridCacheQueryManager qryMgr = cctx.queries(); @@ -484,8 +521,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "unchecked"}) - @Override public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException { - assert type == SCAN: "Wrong processing of qyery: " + type; + @Override public GridCloseableIterator executeScanQuery() throws IgniteCheckedException { + assert type == SCAN : "Wrong processing of qyery: " + type; Collection<ClusterNode> nodes = nodes(); @@ -508,7 +545,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { final GridCacheQueryManager qryMgr = cctx.queries(); if (part != null && !cctx.isLocal()) - return (GridCloseableIterator<R>)new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); + return new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); else { boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); @@ -676,7 +713,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { try { GridCloseableIterator it = qryMgr.scanQueryLocal(qry, true); - tuple= new T2(it, null); + tuple = new T2(it, null); } catch (IgniteClientDisconnectedCheckedException e) { throw CU.convertToCacheException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java index 5a4d693..286ddc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryBean.java @@ -33,7 +33,7 @@ public class GridCacheQueryBean { private final IgniteReducer<Object, Object> rdc; /** */ - private final IgniteClosure<Object, Object> trans; + private final IgniteClosure<?, ?> trans; /** */ private final Object[] args; @@ -45,7 +45,7 @@ public class GridCacheQueryBean { * @param args Optional arguments. */ public GridCacheQueryBean(GridCacheQueryAdapter<?> qry, @Nullable IgniteReducer<Object, Object> rdc, - @Nullable IgniteClosure<Object, Object> trans, @Nullable Object[] args) { + @Nullable IgniteClosure<?, ?> trans, @Nullable Object[] args) { assert qry != null; this.qry = qry; @@ -71,7 +71,7 @@ public class GridCacheQueryBean { /** * @return Transformer. */ - @Nullable public IgniteClosure<Object, Object> transform() { + @Nullable public IgniteClosure<?, ?> transform() { return trans; @@ -88,4 +88,4 @@ public class GridCacheQueryBean { @Override public String toString() { return S.toString(GridCacheQueryBean.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java index 8d2e67d..0a108d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryInfo.java @@ -31,7 +31,7 @@ class GridCacheQueryInfo { private boolean loc; /** */ - private IgniteClosure<Object, Object> trans; + private IgniteClosure<?, ?> trans; /** */ private IgniteReducer<Object, Object> rdc; @@ -71,7 +71,7 @@ class GridCacheQueryInfo { */ GridCacheQueryInfo( boolean loc, - IgniteClosure<Object, Object> trans, + IgniteClosure<?, ?> trans, IgniteReducer<Object, Object> rdc, GridCacheQueryAdapter<?> qry, GridCacheLocalQueryFuture<?, ?, ?> locFut, @@ -117,7 +117,7 @@ class GridCacheQueryInfo { /** * @return Transformer. */ - IgniteClosure<?, Object> transformer() { + IgniteClosure<?, ?> transformer() { return trans; } @@ -167,4 +167,4 @@ class GridCacheQueryInfo { @Override public String toString() { return S.toString(GridCacheQueryInfo.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 163bac5..454ce04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -506,7 +506,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Iterator. * @throws IgniteCheckedException If failed. */ - public abstract GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry, + public abstract GridCloseableIterator scanQueryDistributed(GridCacheQueryAdapter qry, Collection<ClusterNode> nodes) throws IgniteCheckedException; /** @@ -1067,13 +1067,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final GridDhtLocalPartition locPart0 = locPart; return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) { - @Override protected void onClose() { - super.onClose(); + @Override protected void onClose() { + super.onClose(); - if (locPart0 != null) - locPart0.release(); - } - }; + if (locPart0 != null) + locPart0.release(); + } + }; } /** @@ -1163,9 +1163,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary); - if (filter != null || locNode) { + if (filter != null || locNode) val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary); - } if (filter != null && !filter.apply(key, val)) continue; @@ -1187,14 +1186,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param locNode Local node. * @return Final key-value iterator. */ - private GridIterator<IgniteBiTuple<K,V>> scanExpiryIterator( + private GridIterator<IgniteBiTuple<K, V>> scanExpiryIterator( final Iterator<Map.Entry<byte[], byte[]>> it, AffinityTopologyVersion topVer, @Nullable final IgniteBiPredicate<K, V> filter, ExpiryPolicy expPlc, final boolean keepBinary, boolean locNode) { - Iterator <K> keyIter = new Iterator<K>() { + Iterator<K> keyIter = new Iterator<K>() { /** {@inheritDoc} */ @Override public boolean hasNext() { return it.hasNext(); @@ -1267,10 +1266,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { // Preparing query closures. - IgniteClosure<Object, Object> trans = (IgniteClosure<Object, Object>)qryInfo.transformer(); IgniteReducer<Object, Object> rdc = (IgniteReducer<Object, Object>)qryInfo.reducer(); - injectResources(trans); injectResources(rdc); GridCacheQueryAdapter<?> qry = qryInfo.query(); @@ -1289,7 +1286,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte res = qryInfo.local() ? executeFieldsQuery(qry, qryInfo.arguments(), qryInfo.local(), qry.subjectId(), taskName, - recipient(qryInfo.senderId(), qryInfo.requestId())) : + recipient(qryInfo.senderId(), qryInfo.requestId())) : fieldsQueryResult(qryInfo, taskName); // If metadata needs to be returned to user and cleaned from internal fields - copy it. @@ -1460,10 +1457,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte try { // Preparing query closures. - IgniteClosure<Map.Entry<K, V>, Object> trans = - (IgniteClosure<Map.Entry<K, V>, Object>)qryInfo.transformer(); + IgniteClosure<Cache.Entry<K, V>, Object> trans = + (IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer(); - IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer(); + IgniteReducer<Cache.Entry<K, V>, Object> rdc = (IgniteReducer<Cache.Entry<K, V>, Object>)qryInfo.reducer(); injectResources(trans); injectResources(rdc); @@ -1481,13 +1478,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte res = loc ? executeQuery(qry, qryInfo.arguments(), loc, qry.subjectId(), taskName, - recipient(qryInfo.senderId(), qryInfo.requestId())) : + recipient(qryInfo.senderId(), qryInfo.requestId())) : queryResult(qryInfo, taskName); iter = res.iterator(recipient(qryInfo.senderId(), qryInfo.requestId())); type = res.type(); - GridCacheAdapter<K, V> cache = cctx.cache(); + final GridCacheAdapter<K, V> cache = cctx.cache(); if (log.isDebugEnabled()) log.debug("Received index iterator [iterHasNext=" + iter.hasNext() + @@ -1518,7 +1515,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte break; } - K key = row.getKey(); + final K key = row.getKey(); // Filter backups for SCAN queries, if it isn't partition scan. // Other types are filtered in indexing manager. @@ -1561,8 +1558,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } if (readEvt) { - K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary()); - V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary()); + K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary()); + V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary()); switch (type) { case SQL: @@ -1630,27 +1627,33 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } } - Map.Entry<K, V> entry = F.t(key, val); + if (rdc != null || trans != null) { + Cache.Entry<K, V> entry; - // Unwrap entry for reducer or transformer only. - if (rdc != null || trans != null) - entry = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(entry, qry.keepBinary()); + if (qry.keepBinary()) + entry = cache.<K, V>keepBinary().getEntry(key); + else + entry = cache.<K, V>getEntry(key); - // Reduce. - if (rdc != null) { - if (!rdc.collect(entry) || !iter.hasNext()) { - onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null); + // Reduce. + if (rdc != null) { + if (!rdc.collect(entry) || !iter.hasNext()) { + onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null); - pageSent = true; + pageSent = true; - break; + break; + } + else + continue; } - else - continue; + + data.add(trans != null ? trans.apply(entry) : + !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val)); } + else + data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val)); - data.add(trans != null ? trans.apply(entry) : - !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val)); if (!loc) { if (++cnt == pageSize || !iter.hasNext()) { @@ -1720,7 +1723,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param updStatisticsIfNeeded Update statistics flag. */ @SuppressWarnings({"unchecked", "serial"}) - protected GridCloseableIterator<IgniteBiTuple<K, V>> scanQueryLocal(final GridCacheQueryAdapter qry, + protected GridCloseableIterator scanQueryLocal(final GridCacheQueryAdapter qry, final boolean updStatisticsIfNeeded) throws IgniteCheckedException { if (!enterBusy()) throw new IllegalStateException("Failed to process query request (grid is stopping)."); @@ -1769,8 +1772,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); - return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { - @Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException { + return new GridCloseableIteratorAdapter<Object>() { + @Override protected Object onNext() throws IgniteCheckedException { long start = statsEnabled ? System.nanoTime() : 0L; IgniteBiTuple<K, V> next = iter.nextX(); @@ -1803,7 +1806,20 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null)); } - return next; + IgniteClosure transform = qry.transform(); + + if (transform == null) + return next; + + Cache.Entry<K, V> entry; + + if (qry.keepBinary()) + entry = cctx.cache().keepBinary().getEntry(next.getKey()); + else + entry = cctx.cache().getEntry(next.getKey()); + + + return transform.apply(entry); } @Override protected boolean onHasNext() throws IgniteCheckedException { @@ -1832,7 +1848,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Iterator. * @throws IgniteCheckedException In case of error. */ - private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { + private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, + String taskName) throws IgniteCheckedException { assert qryInfo != null; final UUID sndId = qryInfo.senderId(); @@ -2845,7 +2862,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (locNode) return new IgniteBiTuple<>(key, val); - else{ + else { if (key instanceof CacheObject) ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext()); @@ -3256,16 +3273,28 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param keepBinary Keep binary flag. * @return Created query. */ - public CacheQuery<Map.Entry<K, V>> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, + public <R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, @Nullable Integer part, boolean keepBinary) { + return createScanQuery(filter, null, part, keepBinary); + } - return new GridCacheQueryAdapter<>(cctx, + /** + * Creates user's predicate based scan query. + * + * @param filter Scan filter. + * @param part Partition. + * @param keepBinary Keep binary flag. + * @return Created query. + */ + public <T, R> CacheQuery<R> createScanQuery(@Nullable IgniteBiPredicate<K, V> filter, + @Nullable IgniteClosure<T, R> trans, + @Nullable Integer part, boolean keepBinary) { + + return new GridCacheQueryAdapter(cctx, SCAN, - null, - null, - (IgniteBiPredicate<Object, Object>)filter, + filter, + trans, part, - false, keepBinary); } http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index f50fba0..5610bef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -84,7 +84,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** */ @GridDirectTransient - private IgniteClosure<Object, Object> trans; + private IgniteClosure<?, ?> trans; /** */ private byte[] transBytes; @@ -233,7 +233,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache IgniteBiPredicate<Object, Object> keyValFilter, @Nullable Integer part, IgniteReducer<Object, Object> rdc, - IgniteClosure<Object, Object> trans, + IgniteClosure<?, ?> trans, int pageSize, boolean incBackups, Object[] args, @@ -422,7 +422,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** * @return Transformer. */ - public IgniteClosure<Object, Object> transformer() { + public IgniteClosure<?, ?> transformer() { return trans; } http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java new file mode 100644 index 0000000..6b13e05 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryTransformerSelfTest.java @@ -0,0 +1,570 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import javax.cache.Cache; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.query.SpiQuery; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cache.query.TextQuery; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test for scan query with transformer. + */ +public class GridCacheQueryTransformerSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + cfg.setMarshaller(null); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + startGridsMultiThreaded(3); + + Ignition.setClientMode(true); + + startGrid(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testGetKeys() throws Exception { + IgniteCache<Integer, String> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, "val" + i); + + IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, String>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, String> e) { + return e.getKey(); + } + }; + + List<Integer> keys = cache.query(new ScanQuery<Integer, String>(), transformer).getAll(); + + assertEquals(50, keys.size()); + + Collections.sort(keys); + + for (int i = 0; i < 50; i++) + assertEquals(i, keys.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetKeysFiltered() throws Exception { + IgniteCache<Integer, String> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, "val" + i); + + IgniteBiPredicate<Integer, String> filter = new IgniteBiPredicate<Integer, String>() { + @Override public boolean apply(Integer k, String v) { + return k % 10 == 0; + } + }; + + IgniteClosure<Cache.Entry<Integer, String>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, String>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, String> e) { + return e.getKey(); + } + }; + + List<Integer> keys = cache.query(new ScanQuery<>(filter), transformer).getAll(); + + assertEquals(5, keys.size()); + + Collections.sort(keys); + + for (int i = 0; i < 5; i++) + assertEquals(i * 10, keys.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetObjectField() throws Exception { + IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, new Value("str" + i, i * 100)); + + IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, Value> e) { + return e.getValue().idx; + } + }; + + List<Integer> res = cache.query(new ScanQuery<Integer, Value>(), transformer).getAll(); + + assertEquals(50, res.size()); + + Collections.sort(res); + + for (int i = 0; i < 50; i++) + assertEquals(i * 100, res.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testGetObjectFieldFiltered() throws Exception { + IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, new Value("str" + i, i * 100)); + + IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() { + @Override public boolean apply(Integer k, Value v) { + return v.idx % 1000 == 0; + } + }; + + IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, Value> e) { + return e.getValue().idx; + } + }; + + List<Integer> res = cache.query(new ScanQuery<>(filter), transformer).getAll(); + + assertEquals(5, res.size()); + + Collections.sort(res); + + for (int i = 0; i < 5; i++) + assertEquals(i * 1000, res.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testKeepBinary() throws Exception { + IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, new Value("str" + i, i * 100)); + + IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary(); + + IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) { + return e.getValue().field("idx"); + } + }; + + List<Integer> res = binaryCache.query(new ScanQuery<Integer, BinaryObject>(), transformer).getAll(); + + assertEquals(50, res.size()); + + Collections.sort(res); + + for (int i = 0; i < 50; i++) + assertEquals(i * 100, res.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testKeepBinaryFiltered() throws Exception { + IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, new Value("str" + i, i * 100)); + + IgniteCache<Integer, BinaryObject> binaryCache = cache.withKeepBinary(); + + IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() { + @Override public boolean apply(Integer k, BinaryObject v) { + return v.<Integer>field("idx") % 1000 == 0; + } + }; + + IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) { + return e.getValue().field("idx"); + } + }; + + List<Integer> res = binaryCache.query(new ScanQuery<>(filter), transformer).getAll(); + + assertEquals(5, res.size()); + + Collections.sort(res); + + for (int i = 0; i < 5; i++) + assertEquals(i * 1000, res.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, new Value("str" + i, i * 100)); + + Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public List<Integer> call() throws Exception { + IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, Value> e) { + return e.getValue().idx; + } + }; + + return ignite.cache("test-cache").query(new ScanQuery<Integer, Value>().setLocal(true), + transformer).getAll(); + } + }); + + List<Integer> res = new ArrayList<>(F.flatCollections(lists)); + + assertEquals(50, res.size()); + + Collections.sort(res); + + for (int i = 0; i < 50; i++) + assertEquals(i * 100, res.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLocalFiltered() throws Exception { + IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, new Value("str" + i, i * 100)); + + Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public List<Integer> call() throws Exception { + IgniteBiPredicate<Integer, Value> filter = new IgniteBiPredicate<Integer, Value>() { + @Override public boolean apply(Integer k, Value v) { + return v.idx % 1000 == 0; + } + }; + + IgniteClosure<Cache.Entry<Integer, Value>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, Value>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, Value> e) { + return e.getValue().idx; + } + }; + + return ignite.cache("test-cache").query(new ScanQuery<>(filter).setLocal(true), + transformer).getAll(); + } + }); + + List<Integer> res = new ArrayList<>(F.flatCollections(lists)); + + assertEquals(5, res.size()); + + Collections.sort(res); + + for (int i = 0; i < 5; i++) + assertEquals(i * 1000, res.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLocalKeepBinary() throws Exception { + IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, new Value("str" + i, i * 100)); + + Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public List<Integer> call() throws Exception { + IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) { + return e.getValue().field("idx"); + } + }; + + return ignite.cache("test-cache").withKeepBinary().query( + new ScanQuery<Integer, BinaryObject>().setLocal(true), transformer).getAll(); + } + }); + + List<Integer> res = new ArrayList<>(F.flatCollections(lists)); + + assertEquals(50, res.size()); + + Collections.sort(res); + + for (int i = 0; i < 50; i++) + assertEquals(i * 100, res.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testLocalKeepBinaryFiltered() throws Exception { + IgniteCache<Integer, Value> cache = grid().createCache("test-cache"); + + try { + for (int i = 0; i < 50; i++) + cache.put(i, new Value("str" + i, i * 100)); + + Collection<List<Integer>> lists = grid().compute().broadcast(new IgniteCallable<List<Integer>>() { + @IgniteInstanceResource + private Ignite ignite; + + @Override public List<Integer> call() throws Exception { + IgniteBiPredicate<Integer, BinaryObject> filter = new IgniteBiPredicate<Integer, BinaryObject>() { + @Override public boolean apply(Integer k, BinaryObject v) { + return v.<Integer>field("idx") % 1000 == 0; + } + }; + + IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, BinaryObject>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, BinaryObject> e) { + return e.getValue().field("idx"); + } + }; + + return ignite.cache("test-cache").withKeepBinary().query(new ScanQuery<>(filter).setLocal(true), + transformer).getAll(); + } + }); + + List<Integer> res = new ArrayList<>(F.flatCollections(lists)); + + assertEquals(5, res.size()); + + Collections.sort(res); + + for (int i = 0; i < 5; i++) + assertEquals(i * 1000, res.get(i).intValue()); + } + finally { + cache.destroy(); + } + } + + /** + * @throws Exception If failed. + */ + public void testUnsupported() throws Exception { + final IgniteCache<Integer, Integer> cache = grid().createCache("test-cache"); + + final IgniteClosure<Cache.Entry<Integer, Integer>, Integer> transformer = + new IgniteClosure<Cache.Entry<Integer, Integer>, Integer>() { + @Override public Integer apply(Cache.Entry<Integer, Integer> e) { + return null; + } + }; + + try { + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Override public Object call() throws Exception { + cache.query(new SqlQuery<Integer, Integer>(Integer.class, "clause"), transformer); + + return null; + } + }, + UnsupportedOperationException.class, + "Transformers are supported only for SCAN queries." + ); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Override public Object call() throws Exception { + cache.query(new SqlFieldsQuery("clause"), new IgniteClosure<List<?>, Object>() { + @Override public Object apply(List<?> objects) { + return null; + } + }); + + return null; + } + }, + UnsupportedOperationException.class, + "Transformers are supported only for SCAN queries." + ); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Override public Object call() throws Exception { + cache.query(new TextQuery<Integer, Integer>(Integer.class, "clause"), transformer); + + return null; + } + }, + UnsupportedOperationException.class, + "Transformers are supported only for SCAN queries." + ); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Override public Object call() throws Exception { + cache.query(new SpiQuery<Integer, Integer>(), transformer); + + return null; + } + }, + UnsupportedOperationException.class, + "Transformers are supported only for SCAN queries." + ); + + GridTestUtils.assertThrows( + log, + new Callable<Object>() { + @Override public Object call() throws Exception { + cache.query(new ContinuousQuery<Integer, Integer>(), transformer); + + return null; + } + }, + UnsupportedOperationException.class, + "Transformers are supported only for SCAN queries." + ); + } + finally { + cache.destroy(); + } + } + + /** + */ + private static class Value { + /** */ + @SuppressWarnings("unused") + private String str; + + /** */ + private int idx; + + /** + * @param str String. + * @param idx Integer. + */ + public Value(String str, int idx) { + this.str = str; + this.idx = idx; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java index 740b201..71dc964 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteCacheProcessProxy.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.resources.IgniteInstanceResource; @@ -172,6 +173,11 @@ public class IgniteCacheProcessProxy<K, V> implements IgniteCache<K, V> { } /** {@inheritDoc} */ + @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) { + throw new UnsupportedOperationException("Method should be supported."); + } + + /** {@inheritDoc} */ @Override public Iterable<Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException { return compute.call(new LocalEntriesTask<K, V>(cacheName, isAsync, peekModes)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/407071e4/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 1b1908d..3652acd 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheReplicatedQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryTransformerSelfTest; import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest; import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest; import org.apache.ignite.internal.processors.query.h2.sql.GridQueryParsingTest; @@ -115,6 +116,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(IgniteBinaryWrappedObjectFieldsQuerySelfTest.class); suite.addTestSuite(IgniteCacheQueryH2IndexingLeakTest.class); suite.addTestSuite(IgniteCacheQueryNoRebalanceSelfTest.class); + suite.addTestSuite(GridCacheQueryTransformerSelfTest.class); return suite; }
