IGNITE-2546 - Added transformers to SCAN queries

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07e7df99
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07e7df99
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07e7df99

Branch: refs/heads/ignite-2546
Commit: 07e7df9931a950b05165b5011a1b39c250020dfa
Parents: 5730c06
Author: Valentin Kulichenko <[email protected]>
Authored: Wed Mar 23 15:30:09 2016 -0700
Committer: Valentin Kulichenko <[email protected]>
Committed: Wed Mar 23 15:30:09 2016 -0700

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryManager.java      | 31 +++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/07e7df99/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 786052a..de9d6da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1341,8 +1341,10 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                 // Preparing query closures.
                 IgniteClosure<Cache.Entry<K, V>, Object> trans =
                     (IgniteClosure<Cache.Entry<K, V>, 
Object>)qryInfo.transformer();
+                IgniteReducer<Map.Entry<K, V>, Object> rdc = 
(IgniteReducer<Map.Entry<K, V>, Object>)qryInfo.reducer();
 
                 injectResources(trans);
+                injectResources(rdc);
 
                 GridCacheQueryAdapter<?> qry = qryInfo.query();
 
@@ -1503,15 +1505,26 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                         }
                     }
 
-                    // Unwrap entry for transformer only.
-                    if (trans != null) {
+                    // Unwrap entry for transformer or reducer only.
+                    if (trans != null || rdc != null) {
                         key = (K)cctx.unwrapBinaryIfNeeded(key, 
qry.keepBinary());
                         val = (V)cctx.unwrapBinaryIfNeeded(val, 
qry.keepBinary());
+                    }
+
+                    if (rdc != null) {
+                        if (!rdc.collect(F.t(key, val)) || !iter.hasNext()) {
+                            onPageReady(loc, qryInfo, 
Collections.singletonList(rdc.reduce()), true, null);
 
-                        data.add(trans.apply(new CacheEntryImpl<>(key, val)));
+                            pageSent = true;
+
+                            break;
+                        }
+                        else
+                            continue;
                     }
-                    else
-                        data.add(!loc ? new GridCacheQueryResponseEntry<>(key, 
val) : F.t(key, val));
+
+                    data.add(trans != null ? trans.apply(new 
CacheEntryImpl<>(key, val)) :
+                        !loc ? new GridCacheQueryResponseEntry<>(key, val) : 
F.t(key, val));
 
                     if (!loc) {
                         if (++cnt == pageSize || !iter.hasNext()) {
@@ -1535,8 +1548,12 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
                     }
                 }
 
-                if (!pageSent)
-                    onPageReady(loc, qryInfo, data, true, null);
+                if (!pageSent) {
+                    if (rdc == null)
+                        onPageReady(loc, qryInfo, data, true, null);
+                    else
+                        onPageReady(loc, qryInfo, 
Collections.singletonList(rdc.reduce()), true, null);
+                }
             }
             catch (Throwable e) {
                 if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))

Reply via email to