Repository: ignite Updated Branches: refs/heads/master ae02a1d3c -> a6e28082b
IGNITE-6605: SQL: common backup filter. This closes #2836. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a6e28082 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a6e28082 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a6e28082 Branch: refs/heads/master Commit: a6e28082ba08f2ecefe6c7bef898b201126997b9 Parents: ae02a1d Author: devozerov <[email protected]> Authored: Fri Oct 13 14:12:44 2017 +0300 Committer: devozerov <[email protected]> Committed: Fri Oct 13 14:12:44 2017 +0300 ---------------------------------------------------------------------- .../cache/query/GridCacheQueryManager.java | 40 ++-------- .../spi/indexing/IndexingQueryCacheFilter.java | 72 ++++++++++++++++++ .../spi/indexing/IndexingQueryFilter.java | 12 +-- .../spi/indexing/IndexingQueryFilterImpl.java | 79 ++++++++++++++++++++ .../internal/processors/query/h2/H2Cursor.java | 25 +++---- .../processors/query/h2/IgniteH2Indexing.java | 61 +-------------- .../query/h2/database/H2PkHashIndex.java | 20 ++--- .../query/h2/database/H2TreeIndex.java | 4 +- .../query/h2/opt/GridH2IndexBase.java | 58 +++++++------- .../query/h2/opt/GridLuceneIndex.java | 25 ++----- 10 files changed, 211 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 64e74fb..392b19f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -119,6 +119,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.spi.IgniteSpiCloseableIterator; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl; import org.apache.ignite.spi.indexing.IndexingSpi; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -130,7 +131,6 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; -import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SCAN; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SPI; @@ -1985,39 +1985,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** - * @param <K> Key type. - * @param <V> Value type. - * @param includeBackups Include backups. - * @return Predicate. - */ - @SuppressWarnings("unchecked") - @Nullable public <K, V> IndexingQueryFilter backupsFilter(boolean includeBackups) { - if (includeBackups) - return null; - - return new IndexingQueryFilter() { - @Nullable @Override public IgniteBiPredicate<K, V> forCache(final String cacheName) { - final GridKernalContext ctx = cctx.kernalContext(); - - final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); - - if (cache.context().isReplicated() || cache.configuration().getBackups() == 0) - return null; - - return new IgniteBiPredicate<K, V>() { - @Override public boolean apply(K k, V v) { - return cache.context().affinity().primaryByKey(ctx.discovery().localNode(), k, NONE); - } - }; - } - - @Override public boolean isValueRequired() { - return false; - } - }; - } - - /** * @return Topology version for query requests. */ public AffinityTopologyVersion queryTopologyVersion() { @@ -2029,7 +1996,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @return Filter. */ private IndexingQueryFilter filter(GridCacheQueryAdapter<?> qry) { - return backupsFilter(qry.includeBackups()); + if (qry.includeBackups()) + return null; + + return new IndexingQueryFilterImpl(cctx.kernalContext(), AffinityTopologyVersion.NONE, null); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryCacheFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryCacheFilter.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryCacheFilter.java new file mode 100644 index 0000000..6257f47 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryCacheFilter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.indexing; + +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; + +import java.util.Set; + +/** + * Indexing query filter for specific cache. + */ +public class IndexingQueryCacheFilter { + /** Affinity manager. */ + private final GridCacheAffinityManager aff; + + /** Partitions. */ + private final Set<Integer> parts; + + /** Topology version. */ + private final AffinityTopologyVersion topVer; + + /** Local node. */ + private final ClusterNode locNode; + + /** + * Constructor. + * + * @param aff Affinity. + * @param parts Partitions. + * @param topVer Topology version. + * @param locNode Local node. + */ + public IndexingQueryCacheFilter(GridCacheAffinityManager aff, Set<Integer> parts, + AffinityTopologyVersion topVer, ClusterNode locNode) { + this.aff = aff; + this.parts = parts; + this.topVer = topVer; + this.locNode = locNode; + } + + /** + * Apply filter. + * + * @param key Key. + * @return {@code True} if passed. + */ + public boolean apply(Object key) { + int part = aff.partition(key); + + if (parts == null) + return aff.primaryByPartition(locNode, part, topVer); + else + return parts.contains(part); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java index 74d349a..b0d9693 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilter.java @@ -17,7 +17,6 @@ package org.apache.ignite.spi.indexing; -import org.apache.ignite.lang.IgniteBiPredicate; import org.jetbrains.annotations.Nullable; /** @@ -30,14 +29,5 @@ public interface IndexingQueryFilter { * @param cacheName Cache name. * @return Predicate or {@code null} if no filtering is needed. */ - @Nullable public <K, V> IgniteBiPredicate<K, V> forCache(String cacheName); - - /** - * Is the value required for filtering logic? - * If false then null instead of value will be passed - * to IgniteBiPredicate returned by {@link #forCache(String)} method. - * - * @return true if value is required for filtering, false otherwise. - */ - public boolean isValueRequired(); + @Nullable public IndexingQueryCacheFilter forCache(String cacheName); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java new file mode 100644 index 0000000..53dcbf6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.indexing; + +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +import java.util.HashSet; + +/** + * Indexing query filter. + */ +public class IndexingQueryFilterImpl implements IndexingQueryFilter { + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Topology version. */ + private final AffinityTopologyVersion topVer; + + /** Partitions. */ + private final HashSet<Integer> parts; + + /** + * Constructor. + * + * @param ctx Kernal context. + * @param topVer Topology version. + * @param partsArr Partitions array. + */ + public IndexingQueryFilterImpl(GridKernalContext ctx, @Nullable AffinityTopologyVersion topVer, + @Nullable int[] partsArr) { + this.ctx = ctx; + + this.topVer = topVer != null ? topVer : AffinityTopologyVersion.NONE; + + if (F.isEmpty(partsArr)) + parts = null; + else { + parts = new HashSet<>(); + + for (int part : partsArr) + parts.add(part); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public IndexingQueryCacheFilter forCache(String cacheName) { + final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); + + // REPLICATED -> nothing to filter (explicit partitions are not supported). + if (cache.context().isReplicated()) + return null; + + // No backups and explicit partitions -> nothing to filter. + if (cache.configuration().getBackups() == 0 && parts == null) + return null; + + return new IndexingQueryCacheFilter(cache.context().affinity(), parts, topVer, + ctx.discovery().localNode()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java index de3111d..e09108d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2Cursor.java @@ -17,14 +17,15 @@ package org.apache.ignite.internal.processors.query.h2; -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.query.h2.opt.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.h2.index.*; -import org.h2.message.*; -import org.h2.result.*; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; +import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; +import org.h2.index.Cursor; +import org.h2.message.DbException; +import org.h2.result.Row; +import org.h2.result.SearchRow; /** * Cursor. @@ -34,7 +35,7 @@ public class H2Cursor implements Cursor { private final GridCursor<GridH2Row> cursor; /** */ - private final IgniteBiPredicate<Object,Object> filter; + private final IndexingQueryCacheFilter filter; /** */ private final long time = U.currentTimeMillis(); @@ -43,7 +44,7 @@ public class H2Cursor implements Cursor { * @param cursor Cursor. * @param filter Filter. */ - public H2Cursor(GridCursor<GridH2Row> cursor, IgniteBiPredicate<Object, Object> filter) { + public H2Cursor(GridCursor<GridH2Row> cursor, IndexingQueryCacheFilter filter) { assert cursor != null; this.cursor = cursor; @@ -85,12 +86,10 @@ public class H2Cursor implements Cursor { return true; Object key = row.getValue(0).getObject(); - Object val = row.getValue(1).getObject(); assert key != null; - assert val != null; - if (filter.apply(key, val)) + if (filter.apply(key)) return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index fddd2e8..c172e65 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -65,8 +65,6 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.CacheObjectUtils; import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; -import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -130,7 +128,6 @@ import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; -import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteInClosure; @@ -139,6 +136,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.spi.indexing.IndexingQueryFilterImpl; import org.h2.api.ErrorCode; import org.h2.api.JavaObjectSerializer; import org.h2.command.Prepared; @@ -2375,62 +2373,7 @@ public class IgniteH2Indexing implements GridQueryIndexing { /** {@inheritDoc} */ @Override public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer, @Nullable final int[] parts) { - final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE; - - return new IndexingQueryFilter() { - @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forCache(String cacheName) { - final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName); - - if (cache.context().isReplicated()) - return null; - - final GridCacheAffinityManager aff = cache.context().affinity(); - - if (parts != null) { - if (parts.length < 64) { // Fast scan for small arrays. - return new IgniteBiPredicate<K, V>() { - @Override public boolean apply(K k, V v) { - int p = aff.partition(k); - - for (int p0 : parts) { - if (p0 == p) - return true; - - if (p0 > p) // Array is sorted. - return false; - } - - return false; - } - }; - } - - return new IgniteBiPredicate<K, V>() { - @Override public boolean apply(K k, V v) { - int p = aff.partition(k); - - return Arrays.binarySearch(parts, p) >= 0; - } - }; - } - - final ClusterNode locNode = ctx.discovery().localNode(); - - return new IgniteBiPredicate<K, V>() { - @Override public boolean apply(K k, V v) { - return aff.primaryByKey(locNode, k, topVer0); - } - }; - } - - @Override public boolean isValueRequired() { - return false; - } - - @Override public String toString() { - return "IndexingQueryFilter [ver=" + topVer + ']'; - } - }; + return new IndexingQueryFilterImpl(ctx, topVer, parts); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java index b32bfb8..6691485 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java @@ -31,8 +31,8 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.lang.GridCursor; -import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.h2.engine.Session; import org.h2.index.Cursor; import org.h2.index.IndexType; @@ -85,7 +85,7 @@ public class H2PkHashIndex extends GridH2IndexBase { /** {@inheritDoc} */ @Override public Cursor find(Session ses, final SearchRow lower, final SearchRow upper) { IndexingQueryFilter f = threadLocalFilter(); - IgniteBiPredicate<Object, Object> p = null; + IndexingQueryCacheFilter p = null; if (f != null) { String cacheName = getTable().cacheName(); @@ -179,13 +179,13 @@ public class H2PkHashIndex extends GridH2IndexBase { final GridCursor<? extends CacheDataRow> cursor; /** */ - final IgniteBiPredicate<Object, Object> filter; + final IndexingQueryCacheFilter filter; /** * @param cursor Cursor. * @param filter Filter. */ - private H2Cursor(GridCursor<? extends CacheDataRow> cursor, IgniteBiPredicate<Object, Object> filter) { + private H2Cursor(GridCursor<? extends CacheDataRow> cursor, IndexingQueryCacheFilter filter) { assert cursor != null; this.cursor = cursor; @@ -222,17 +222,7 @@ public class H2PkHashIndex extends GridH2IndexBase { CacheDataRow dataRow = cursor.get(); - GridH2Row row = tbl.rowDescriptor().createRow(dataRow.key(), dataRow.partition(), dataRow.value(), dataRow.version(), 0); - - row.link(dataRow.link()); - - Object key = row.getValue(0).getObject(); - Object val = row.getValue(1).getObject(); - - assert key != null; - assert val != null; - - if (filter.apply(key, val)) + if (filter.apply(dataRow.key())) return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index 5c3e1bd..1a3ea4a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -34,8 +34,8 @@ import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.util.IgniteTree; import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.h2.engine.Session; import org.h2.index.Cursor; import org.h2.index.IndexType; @@ -166,7 +166,7 @@ public class H2TreeIndex extends GridH2IndexBase { @Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) { try { IndexingQueryFilter f = threadLocalFilter(); - IgniteBiPredicate<Object, Object> p = null; + IndexingQueryCacheFilter p = null; if (f != null) { String cacheName = getTable().cacheName(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 048192a..92b7d10 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -17,12 +17,6 @@ package org.apache.ignite.internal.processors.query.h2.opt; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import javax.cache.CacheException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; @@ -41,17 +35,18 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.IgniteTree; +import org.apache.ignite.internal.util.lang.GridCursor; import org.apache.ignite.internal.util.typedef.CIX2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.h2.engine.Session; import org.h2.index.BaseIndex; import org.h2.index.Cursor; @@ -68,13 +63,29 @@ import org.h2.value.Value; import org.h2.value.ValueNull; import org.jetbrains.annotations.Nullable; +import javax.cache.CacheException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import static java.util.Collections.emptyIterator; import static java.util.Collections.singletonList; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY; import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; -import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.VAL_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel; +import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP; import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE; import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR; @@ -1574,15 +1585,13 @@ public abstract class GridH2IndexBase extends BaseIndex { protected static class FilteringCursor implements GridCursor<GridH2Row> { /** */ private final GridCursor<GridH2Row> cursor; + /** */ - private final IgniteBiPredicate<Object, Object> fltr; + private final IndexingQueryCacheFilter fltr; /** */ private final long time; - /** Is value required for filtering predicate? */ - private final boolean isValRequired; - /** */ private GridH2Row next; @@ -1595,19 +1604,8 @@ public abstract class GridH2IndexBase extends BaseIndex { protected FilteringCursor(GridCursor<GridH2Row> cursor, long time, IndexingQueryFilter qryFilter, String cacheName) { this.cursor = cursor; - this.time = time; - - if (qryFilter != null) { - this.fltr = qryFilter.forCache(cacheName); - - this.isValRequired = qryFilter.isValueRequired(); - } - else { - this.fltr = null; - - this.isValRequired = false; - } + this.fltr = qryFilter != null ? qryFilter.forCache(cacheName) : null; } /** @@ -1623,12 +1621,8 @@ public abstract class GridH2IndexBase extends BaseIndex { return true; Object key = row.getValue(KEY_COL).getObject(); - Object val = isValRequired ? row.getValue(VAL_COL).getObject() : null; - - assert key != null; - assert !isValRequired || val != null; - return fltr.apply(key, val); + return fltr.apply(key); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a6e28082/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java index f8d3ef2..b5d2456 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java @@ -32,9 +32,9 @@ import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.spi.indexing.IndexingQueryFilter; +import org.apache.ignite.spi.indexing.IndexingQueryCacheFilter; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -290,7 +290,7 @@ public class GridLuceneIndex implements AutoCloseable { throw new IgniteCheckedException(e); } - IgniteBiPredicate<K, V> fltr = null; + IndexingQueryCacheFilter fltr = null; if (filters != null) fltr = filters.forCache(cacheName); @@ -321,7 +321,7 @@ public class GridLuceneIndex implements AutoCloseable { private final ScoreDoc[] docs; /** */ - private final IgniteBiPredicate<K, V> filters; + private final IndexingQueryCacheFilter filters; /** */ private int idx; @@ -341,7 +341,7 @@ public class GridLuceneIndex implements AutoCloseable { * @param filters Filters over result. * @throws IgniteCheckedException if failed. */ - private It(IndexReader reader, IndexSearcher searcher, ScoreDoc[] docs, IgniteBiPredicate<K, V> filters) + private It(IndexReader reader, IndexSearcher searcher, ScoreDoc[] docs, IndexingQueryCacheFilter filters) throws IgniteCheckedException { this.reader = reader; this.searcher = searcher; @@ -354,17 +354,6 @@ public class GridLuceneIndex implements AutoCloseable { } /** - * Filters key using predicates. - * - * @param key Key. - * @param val Value. - * @return {@code True} if key passes filter. - */ - private boolean filter(K key, V val) { - return filters == null || filters.apply(key, val); - } - - /** * @param bytes Bytes. * @param ldr Class loader. * @return Object. @@ -404,15 +393,15 @@ public class GridLuceneIndex implements AutoCloseable { K k = unmarshall(doc.getBinaryValue(KEY_FIELD_NAME).bytes, ldr); + if (filters != null && !filters.apply(k)) + continue; + V v = type.valueClass() == String.class ? (V)doc.get(VAL_STR_FIELD_NAME) : this.<V>unmarshall(doc.getBinaryValue(VAL_FIELD_NAME).bytes, ldr); assert v != null; - if (!filter(k, v)) - continue; - curr = new IgniteBiTuple<>(k, v); break;
