Repository: ignite
Updated Branches:
  refs/heads/master ae02a1d3c -> a6e28082b


IGNITE-6605: SQL: common backup filter. This closes #2836.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6e28082
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6e28082
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6e28082

Branch: refs/heads/master
Commit: a6e28082ba08f2ecefe6c7bef898b201126997b9
Parents: ae02a1d
Author: devozerov <[email protected]>
Authored: Fri Oct 13 14:12:44 2017 +0300
Committer: devozerov <[email protected]>
Committed: Fri Oct 13 14:12:44 2017 +0300

----------------------------------------------------------------------
 .../cache/query/GridCacheQueryManager.java      | 40 ++--------
 .../spi/indexing/IndexingQueryCacheFilter.java  | 72 ++++++++++++++++++
 .../spi/indexing/IndexingQueryFilter.java       | 12 +--
 .../spi/indexing/IndexingQueryFilterImpl.java   | 79 ++++++++++++++++++++
 .../internal/processors/query/h2/H2Cursor.java  | 25 +++----
 .../processors/query/h2/IgniteH2Indexing.java   | 61 +--------------
 .../query/h2/database/H2PkHashIndex.java        | 20 ++---
 .../query/h2/database/H2TreeIndex.java          |  4 +-
 .../query/h2/opt/GridH2IndexBase.java           | 58 +++++++-------
 .../query/h2/opt/GridLuceneIndex.java           | 25 ++-----
 10 files changed, 211 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 64e74fb..392b19f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -119,6 +119,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.apache.ignite.spi.indexing.IndexingSpi;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
@@ -130,7 +131,6 @@ import static 
org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST;
-import static 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
 import static 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SPI;
@@ -1985,39 +1985,6 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
     }
 
     /**
-     * @param <K> Key type.
-     * @param <V> Value type.
-     * @param includeBackups Include backups.
-     * @return Predicate.
-     */
-    @SuppressWarnings("unchecked")
-    @Nullable public <K, V> IndexingQueryFilter backupsFilter(boolean 
includeBackups) {
-        if (includeBackups)
-            return null;
-
-        return new IndexingQueryFilter() {
-            @Nullable @Override public IgniteBiPredicate<K, V> forCache(final 
String cacheName) {
-                final GridKernalContext ctx = cctx.kernalContext();
-
-                final GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
-
-                if (cache.context().isReplicated() || 
cache.configuration().getBackups() == 0)
-                    return null;
-
-                return new IgniteBiPredicate<K, V>() {
-                    @Override public boolean apply(K k, V v) {
-                        return 
cache.context().affinity().primaryByKey(ctx.discovery().localNode(), k, NONE);
-                    }
-                };
-            }
-
-            @Override public boolean isValueRequired() {
-                return false;
-            }
-        };
-    }
-
-    /**
      * @return Topology version for query requests.
      */
     public AffinityTopologyVersion queryTopologyVersion() {
@@ -2029,7 +1996,10 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
      * @return Filter.
      */
     private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) {
-        return backupsFilter(qry.includeBackups());
+        if (qry.includeBackups())
+            return null;
+
+        return new IndexingQueryFilterImpl(cctx.kernalContext(), 
AffinityTopologyVersion.NONE, null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryCacheFilter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryCacheFilter.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryCacheFilter.java
new file mode 100644
index 0000000..6257f47
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryCacheFilter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.indexing;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+
+import java.util.Set;
+
+/**
+ * Indexing query filter for specific cache.
+ */
+public class IndexingQueryCacheFilter {
+    /** Affinity manager. */
+    private final GridCacheAffinityManager aff;
+
+    /** Partitions. */
+    private final Set<Integer> parts;
+
+    /** Topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /** Local node. */
+    private final ClusterNode locNode;
+
+    /**
+     * Constructor.
+     *
+     * @param aff Affinity.
+     * @param parts Partitions.
+     * @param topVer Topology version.
+     * @param locNode Local node.
+     */
+    public IndexingQueryCacheFilter(GridCacheAffinityManager aff, Set<Integer> 
parts,
+        AffinityTopologyVersion topVer, ClusterNode locNode) {
+        this.aff = aff;
+        this.parts = parts;
+        this.topVer = topVer;
+        this.locNode = locNode;
+    }
+
+    /**
+     * Apply filter.
+     *
+     * @param key Key.
+     * @return {@code True} if passed.
+     */
+    public boolean apply(Object key) {
+        int part = aff.partition(key);
+
+        if (parts == null)
+            return aff.primaryByPartition(locNode, part, topVer);
+        else
+            return parts.contains(part);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java
index 74d349a..b0d9693 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.spi.indexing;
 
-import org.apache.ignite.lang.IgniteBiPredicate;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -30,14 +29,5 @@ public interface IndexingQueryFilter {
      * @param cacheName Cache name.
      * @return Predicate or {@code null} if no filtering is needed.
      */
-    @Nullable public <K, V> IgniteBiPredicate<K, V> forCache(String cacheName);
-
-    /**
-     * Is the value required for filtering logic?
-     * If false then null instead of value will be passed
-     * to IgniteBiPredicate returned by {@link #forCache(String)} method.
-     *
-     * @return true if value is required for filtering, false otherwise.
-     */
-    public boolean isValueRequired();
+    @Nullable public IndexingQueryCacheFilter forCache(String cacheName);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java
new file mode 100644
index 0000000..53dcbf6
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.indexing;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.HashSet;
+
+/**
+ * Indexing query filter.
+ */
+public class IndexingQueryFilterImpl implements IndexingQueryFilter {
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Topology version. */
+    private final AffinityTopologyVersion topVer;
+
+    /** Partitions. */
+    private final HashSet<Integer> parts;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     * @param topVer Topology version.
+     * @param partsArr Partitions array.
+     */
+    public IndexingQueryFilterImpl(GridKernalContext ctx, @Nullable 
AffinityTopologyVersion topVer,
+        @Nullable int[] partsArr) {
+        this.ctx = ctx;
+
+        this.topVer = topVer != null ? topVer : AffinityTopologyVersion.NONE;
+
+        if (F.isEmpty(partsArr))
+            parts = null;
+        else {
+            parts = new HashSet<>();
+
+            for (int part : partsArr)
+                parts.add(part);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IndexingQueryCacheFilter forCache(String 
cacheName) {
+        final GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
+
+        // REPLICATED -> nothing to filter (explicit partitions are not 
supported).
+        if (cache.context().isReplicated())
+            return null;
+
+        // No backups and explicit partitions -> nothing to filter.
+        if (cache.configuration().getBackups() == 0 && parts == null)
+            return null;
+
+        return new IndexingQueryCacheFilter(cache.context().affinity(), parts, 
topVer,
+            ctx.discovery().localNode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java
index de3111d..e09108d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java
@@ -17,14 +17,15 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.query.h2.opt.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.h2.index.*;
-import org.h2.message.*;
-import org.h2.result.*;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
+import org.h2.index.Cursor;
+import org.h2.message.DbException;
+import org.h2.result.Row;
+import org.h2.result.SearchRow;
 
 /**
  * Cursor.
@@ -34,7 +35,7 @@ public class H2Cursor implements Cursor {
     private final GridCursor<GridH2Row> cursor;
 
     /** */
-    private final IgniteBiPredicate<Object,Object> filter;
+    private final IndexingQueryCacheFilter filter;
 
     /** */
     private final long time = U.currentTimeMillis();
@@ -43,7 +44,7 @@ public class H2Cursor implements Cursor {
      * @param cursor Cursor.
      * @param filter Filter.
      */
-    public H2Cursor(GridCursor<GridH2Row> cursor, IgniteBiPredicate<Object, 
Object> filter) {
+    public H2Cursor(GridCursor<GridH2Row> cursor, IndexingQueryCacheFilter 
filter) {
         assert cursor != null;
 
         this.cursor = cursor;
@@ -85,12 +86,10 @@ public class H2Cursor implements Cursor {
                     return true;
 
                 Object key = row.getValue(0).getObject();
-                Object val = row.getValue(1).getObject();
 
                 assert key != null;
-                assert val != null;
 
-                if (filter.apply(key, val))
+                if (filter.apply(key))
                     return true;
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index fddd2e8..c172e65 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -65,8 +65,6 @@ import 
org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -130,7 +128,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
-import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -139,6 +136,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl;
 import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
 import org.h2.command.Prepared;
@@ -2375,62 +2373,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     /** {@inheritDoc} */
     @Override public IndexingQueryFilter backupFilter(@Nullable final 
AffinityTopologyVersion topVer,
         @Nullable final int[] parts) {
-        final AffinityTopologyVersion topVer0 = topVer != null ? topVer : 
AffinityTopologyVersion.NONE;
-
-        return new IndexingQueryFilter() {
-            @Nullable @Override public <K, V> IgniteBiPredicate<K, V> 
forCache(String cacheName) {
-                final GridCacheAdapter<Object, Object> cache = 
ctx.cache().internalCache(cacheName);
-
-                if (cache.context().isReplicated())
-                    return null;
-
-                final GridCacheAffinityManager aff = 
cache.context().affinity();
-
-                if (parts != null) {
-                    if (parts.length < 64) { // Fast scan for small arrays.
-                        return new IgniteBiPredicate<K, V>() {
-                            @Override public boolean apply(K k, V v) {
-                                int p = aff.partition(k);
-
-                                for (int p0 : parts) {
-                                    if (p0 == p)
-                                        return true;
-
-                                    if (p0 > p) // Array is sorted.
-                                        return false;
-                                }
-
-                                return false;
-                            }
-                        };
-                    }
-
-                    return new IgniteBiPredicate<K, V>() {
-                        @Override public boolean apply(K k, V v) {
-                            int p = aff.partition(k);
-
-                            return Arrays.binarySearch(parts, p) >= 0;
-                        }
-                    };
-                }
-
-                final ClusterNode locNode = ctx.discovery().localNode();
-
-                return new IgniteBiPredicate<K, V>() {
-                    @Override public boolean apply(K k, V v) {
-                        return aff.primaryByKey(locNode, k, topVer0);
-                    }
-                };
-            }
-
-            @Override public boolean isValueRequired() {
-                return false;
-            }
-
-            @Override public String toString() {
-                return "IndexingQueryFilter [ver=" + topVer + ']';
-            }
-        };
+        return new IndexingQueryFilterImpl(ctx, topVer, parts);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index b32bfb8..6691485 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -31,8 +31,8 @@ import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.util.lang.GridCursor;
-import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
 import org.h2.index.IndexType;
@@ -85,7 +85,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
     /** {@inheritDoc} */
     @Override public Cursor find(Session ses, final SearchRow lower, final 
SearchRow upper) {
         IndexingQueryFilter f = threadLocalFilter();
-        IgniteBiPredicate<Object, Object> p = null;
+        IndexingQueryCacheFilter p = null;
 
         if (f != null) {
             String cacheName = getTable().cacheName();
@@ -179,13 +179,13 @@ public class H2PkHashIndex extends GridH2IndexBase {
         final GridCursor<? extends CacheDataRow> cursor;
 
         /** */
-        final IgniteBiPredicate<Object, Object> filter;
+        final IndexingQueryCacheFilter filter;
 
         /**
          * @param cursor Cursor.
          * @param filter Filter.
          */
-        private H2Cursor(GridCursor<? extends CacheDataRow> cursor, 
IgniteBiPredicate<Object, Object> filter) {
+        private H2Cursor(GridCursor<? extends CacheDataRow> cursor, 
IndexingQueryCacheFilter filter) {
             assert cursor != null;
 
             this.cursor = cursor;
@@ -222,17 +222,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
 
                     CacheDataRow dataRow = cursor.get();
 
-                    GridH2Row row = 
tbl.rowDescriptor().createRow(dataRow.key(), dataRow.partition(), 
dataRow.value(), dataRow.version(), 0);
-
-                    row.link(dataRow.link());
-
-                    Object key = row.getValue(0).getObject();
-                    Object val = row.getValue(1).getObject();
-
-                    assert key != null;
-                    assert val != null;
-
-                    if (filter.apply(key, val))
+                    if (filter.apply(dataRow.key()))
                         return true;
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 5c3e1bd..1a3ea4a 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -34,8 +34,8 @@ import 
org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
 import org.h2.index.IndexType;
@@ -166,7 +166,7 @@ public class H2TreeIndex extends GridH2IndexBase {
     @Override public Cursor find(Session ses, SearchRow lower, SearchRow 
upper) {
         try {
             IndexingQueryFilter f = threadLocalFilter();
-            IgniteBiPredicate<Object, Object> p = null;
+            IndexingQueryCacheFilter p = null;
 
             if (f != null) {
                 String cacheName = getTable().cacheName();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 048192a..92b7d10 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -17,12 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
@@ -41,17 +35,18 @@ import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
 import 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteTree;
+import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.CIX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
 import org.h2.index.Cursor;
@@ -68,13 +63,29 @@ import org.h2.value.Value;
 import org.h2.value.ValueNull;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.CacheException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 import static java.util.Collections.emptyIterator;
 import static java.util.Collections.singletonList;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
-import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.VAL_COL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
+import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 import static 
org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
 import static 
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
@@ -1574,15 +1585,13 @@ public abstract class GridH2IndexBase extends BaseIndex 
{
     protected static class FilteringCursor implements GridCursor<GridH2Row> {
         /** */
         private final GridCursor<GridH2Row> cursor;
+
         /** */
-        private final IgniteBiPredicate<Object, Object> fltr;
+        private final IndexingQueryCacheFilter fltr;
 
         /** */
         private final long time;
 
-        /** Is value required for filtering predicate? */
-        private final boolean isValRequired;
-
         /** */
         private GridH2Row next;
 
@@ -1595,19 +1604,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
         protected FilteringCursor(GridCursor<GridH2Row> cursor, long time, 
IndexingQueryFilter qryFilter,
             String cacheName) {
             this.cursor = cursor;
-
             this.time = time;
-
-            if (qryFilter != null) {
-                this.fltr = qryFilter.forCache(cacheName);
-
-                this.isValRequired = qryFilter.isValueRequired();
-            }
-            else {
-                this.fltr = null;
-
-                this.isValRequired = false;
-            }
+            this.fltr = qryFilter != null ? qryFilter.forCache(cacheName) : 
null;
         }
 
         /**
@@ -1623,12 +1621,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 return true;
 
             Object key = row.getValue(KEY_COL).getObject();
-            Object val = isValRequired ? row.getValue(VAL_COL).getObject() : 
null;
-
-            assert key != null;
-            assert !isValRequired || val != null;
 
-            return fltr.apply(key, val);
+            return fltr.apply(key);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index f8d3ef2..b5d2456 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -32,9 +32,9 @@ import 
org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.document.Field;
@@ -290,7 +290,7 @@ public class GridLuceneIndex implements AutoCloseable {
             throw new IgniteCheckedException(e);
         }
 
-        IgniteBiPredicate<K, V> fltr = null;
+        IndexingQueryCacheFilter fltr = null;
 
         if (filters != null)
             fltr = filters.forCache(cacheName);
@@ -321,7 +321,7 @@ public class GridLuceneIndex implements AutoCloseable {
         private final ScoreDoc[] docs;
 
         /** */
-        private final IgniteBiPredicate<K, V> filters;
+        private final IndexingQueryCacheFilter filters;
 
         /** */
         private int idx;
@@ -341,7 +341,7 @@ public class GridLuceneIndex implements AutoCloseable {
          * @param filters Filters over result.
          * @throws IgniteCheckedException if failed.
          */
-        private It(IndexReader reader, IndexSearcher searcher, ScoreDoc[] 
docs, IgniteBiPredicate<K, V> filters)
+        private It(IndexReader reader, IndexSearcher searcher, ScoreDoc[] 
docs, IndexingQueryCacheFilter filters)
             throws IgniteCheckedException {
             this.reader = reader;
             this.searcher = searcher;
@@ -354,17 +354,6 @@ public class GridLuceneIndex implements AutoCloseable {
         }
 
         /**
-         * Filters key using predicates.
-         *
-         * @param key Key.
-         * @param val Value.
-         * @return {@code True} if key passes filter.
-         */
-        private boolean filter(K key, V val) {
-            return filters == null || filters.apply(key, val);
-        }
-
-        /**
          * @param bytes Bytes.
          * @param ldr Class loader.
          * @return Object.
@@ -404,15 +393,15 @@ public class GridLuceneIndex implements AutoCloseable {
 
                 K k = unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes, 
ldr);
 
+                if (filters != null && !filters.apply(k))
+                    continue;
+
                 V v = type.valueClass() == String.class ?
                     (V)doc.get(VAL_STR_FIELD_NAME) :
                     
this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr);
 
                 assert v != null;
 
-                if (!filter(k, v))
-                    continue;
-
                 curr = new IgniteBiTuple<>(k, v);
 
                 break;

Reply via email to