IGNITE-2546 - Added transformers to SCAN queries
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07e7df99 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07e7df99 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07e7df99 Branch: refs/heads/ignite-2546 Commit: 07e7df9931a950b05165b5011a1b39c250020dfa Parents: 5730c06 Author: Valentin Kulichenko <[email protected]> Authored: Wed Mar 23 15:30:09 2016 -0700 Committer: Valentin Kulichenko <[email protected]> Committed: Wed Mar 23 15:30:09 2016 -0700 ---------------------------------------------------------------------- .../cache/query/GridCacheQueryManager.java | 31 +++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/07e7df99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 786052a..de9d6da 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1341,8 +1341,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // Preparing query closures. IgniteClosure<Cache.Entry<K, V>, Object> trans = (IgniteClosure<Cache.Entry<K, V>, Object>)qryInfo.transformer(); + IgniteReducer<Map.Entry<K, V>, Object> rdc = (IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer(); injectResources(trans); + injectResources(rdc); GridCacheQueryAdapter<?> qry = qryInfo.query(); @@ -1503,15 +1505,26 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } } - // Unwrap entry for transformer only. - if (trans != null) { + // Unwrap entry for transformer or reducer only. + if (trans != null || rdc != null) { key = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary()); val = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary()); + } + + if (rdc != null) { + if (!rdc.collect(F.t(key, val)) || !iter.hasNext()) { + onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null); - data.add(trans.apply(new CacheEntryImpl<>(key, val))); + pageSent = true; + + break; + } + else + continue; } - else - data.add(!loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val)); + + data.add(trans != null ? trans.apply(new CacheEntryImpl<>(key, val)) : + !loc ? new GridCacheQueryResponseEntry<>(key, val) : F.t(key, val)); if (!loc) { if (++cnt == pageSize || !iter.hasNext()) { @@ -1535,8 +1548,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } } - if (!pageSent) - onPageReady(loc, qryInfo, data, true, null); + if (!pageSent) { + if (rdc == null) + onPageReady(loc, qryInfo, data, true, null); + else + onPageReady(loc, qryInfo, Collections.singletonList(rdc.reduce()), true, null); + } } catch (Throwable e) { if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
