ignite-2921: ScanQueries over local partitions performance optimisation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e8072f8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e8072f8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e8072f8 Branch: refs/heads/master Commit: 0e8072f8bc831ea08cb4194c60b5b2d0449e671a Parents: 9a9c35d Author: ashutak <[email protected]> Authored: Mon May 16 15:34:40 2016 +0300 Committer: ashutak <[email protected]> Committed: Mon May 16 15:34:40 2016 +0300 ---------------------------------------------------------------------- .../cache/CacheWeakQueryIteratorsHolder.java | 169 ++++++- .../processors/cache/GridCacheAdapter.java | 13 +- .../processors/cache/GridCacheSwapManager.java | 12 +- .../processors/cache/IgniteCacheProxy.java | 39 +- .../binary/CacheObjectBinaryProcessorImpl.java | 16 +- .../processors/cache/query/CacheQuery.java | 10 +- .../cache/query/CacheQueryFuture.java | 13 +- .../query/GridCacheDistributedQueryManager.java | 100 +++- .../cache/query/GridCacheLocalQueryManager.java | 34 +- .../cache/query/GridCacheQueryAdapter.java | 175 +++++-- .../cache/query/GridCacheQueryErrorFuture.java | 12 +- .../query/GridCacheQueryFutureAdapter.java | 10 +- .../cache/query/GridCacheQueryManager.java | 502 +++++++++++++----- .../datastructures/GridCacheSetImpl.java | 9 +- .../service/GridServiceProcessor.java | 49 +- ...achePartitionedPreloadLifecycleSelfTest.java | 102 +--- ...CacheReplicatedPreloadLifecycleSelfTest.java | 132 +---- .../ConfigVariationsTestSuiteBuilder.java | 4 +- ...IgniteCacheConfigVariationsAbstractTest.java | 2 +- .../IgniteConfigVariationsAbstractTest.java | 27 +- modules/indexing/pom.xml | 7 + .../CacheAbstractQueryMetricsSelfTest.java | 4 +- .../cache/IgniteCacheAbstractQuerySelfTest.java | 34 +- .../IgniteCacheConfigVariationsQueryTest.java | 505 +++++++++++++++++++ ...gniteCacheConfigVariationQueryTestSuite.java | 41 ++ 25 files changed, 1449 insertions(+), 572 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java index 4c48e74..2e03b53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheWeakQueryIteratorsHolder.java @@ -26,6 +26,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.internal.U; import org.jsr166.ConcurrentHashMap8; @@ -34,11 +35,10 @@ import org.jsr166.ConcurrentHashMap8; */ public class CacheWeakQueryIteratorsHolder<V> { /** Iterators weak references queue. */ - private final ReferenceQueue<WeakQueryFutureIterator> refQueue = new ReferenceQueue<>(); + private final ReferenceQueue refQueue = new ReferenceQueue(); /** Iterators futures. */ - private final Map<WeakReference<WeakQueryFutureIterator>,CacheQueryFuture<V>> futs = - new ConcurrentHashMap8<>(); + private final Map<WeakReference, AutoCloseable> refs = new ConcurrentHashMap8<>(); /** Logger. */ private final IgniteLogger log; @@ -56,10 +56,27 @@ public class CacheWeakQueryIteratorsHolder<V> { * @param <T> Type for the iterator. * @return Iterator over the cache. */ - public <T> WeakQueryFutureIterator iterator(CacheQueryFuture<V> fut, CacheIteratorConverter<T, V> convert) { + public <T> WeakReferenceCloseableIterator<T> iterator(final CacheQueryFuture<V> fut, + CacheIteratorConverter<T, V> convert) { WeakQueryFutureIterator it = new WeakQueryFutureIterator(fut, convert); - CacheQueryFuture<V> old = futs.put(it.weakReference(), fut); + AutoCloseable old = refs.put(it.weakReference(), fut); + + assert old == null; + + return it; + } + + /** + * @param iter Closeable iterator. + * @param <T> Type for the iterator. + * @return Iterator over the cache. + */ + public <T> WeakReferenceCloseableIterator<T> iterator(final GridCloseableIterator<V> iter, + CacheIteratorConverter<T, V> convert) { + WeakQueryCloseableIterator it = new WeakQueryCloseableIterator(iter, convert); + + AutoCloseable old = refs.put(it.weakReference(), iter); assert old == null; @@ -71,8 +88,8 @@ public class CacheWeakQueryIteratorsHolder<V> { * * @throws IgniteCheckedException If failed. */ - public void removeIterator(WeakQueryFutureIterator it) throws IgniteCheckedException { - futs.remove(it.weakReference()); + public void removeIterator(WeakReferenceCloseableIterator it) throws IgniteCheckedException { + refs.remove(it.weakReference()); it.close(); } @@ -81,17 +98,17 @@ public class CacheWeakQueryIteratorsHolder<V> { * Closes unreachable iterators. */ public void checkWeakQueue() { - for (Reference<? extends WeakQueryFutureIterator> itRef = refQueue.poll(); itRef != null; + for (Reference itRef = refQueue.poll(); itRef != null; itRef = refQueue.poll()) { try { - WeakReference<WeakQueryFutureIterator> weakRef = (WeakReference<WeakQueryFutureIterator>)itRef; + WeakReference weakRef = (WeakReference)itRef; - CacheQueryFuture<?> fut = futs.remove(weakRef); + AutoCloseable rsrc = refs.remove(weakRef); - if (fut != null) - fut.cancel(); + if (rsrc != null) + rsrc.close(); } - catch (IgniteCheckedException e) { + catch (Exception e) { U.error(log, "Failed to close iterator.", e); } } @@ -101,16 +118,16 @@ public class CacheWeakQueryIteratorsHolder<V> { * Cancel all cache queries. */ public void clearQueries(){ - for (CacheQueryFuture<?> fut : futs.values()) { + for (AutoCloseable rsrc : refs.values()) { try { - fut.cancel(); + rsrc.close(); } - catch (IgniteCheckedException e) { + catch (Exception e) { U.error(log, "Failed to close iterator.", e); } } - futs.clear(); + refs.clear(); } @@ -119,7 +136,8 @@ public class CacheWeakQueryIteratorsHolder<V> { * * @param <T> Type for iterator. */ - public class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T> { + private class WeakQueryFutureIterator<T> extends GridCloseableIteratorAdapter<T> + implements WeakReferenceCloseableIterator<T> { /** */ private static final long serialVersionUID = 0L; @@ -204,10 +222,8 @@ public class CacheWeakQueryIteratorsHolder<V> { cur = null; } - /** - * @return Iterator weak reference. - */ - private WeakReference<WeakQueryFutureIterator<T>> weakReference() { + /** {@inheritDoc} */ + @Override public WeakReference<WeakQueryFutureIterator<T>> weakReference() { return weakRef; } @@ -217,7 +233,7 @@ public class CacheWeakQueryIteratorsHolder<V> { private void clearWeakReference() { weakRef.clear(); - futs.remove(weakRef); + refs.remove(weakRef); } /** @@ -233,4 +249,109 @@ public class CacheWeakQueryIteratorsHolder<V> { } } } -} \ No newline at end of file + + /** + * @param <T> Type. + */ + public class WeakQueryCloseableIterator<T> extends GridCloseableIteratorAdapter<T> + implements WeakReferenceCloseableIterator<T> { + /** */ + private static final long serialVersionUID = 0; + + /** */ + private final GridCloseableIterator<V> iter; + + /** */ + private final CacheIteratorConverter<T, V> convert; + + /** */ + private final WeakReference weakRef; + + /** */ + private T cur; + + /** + * @param iter Iterator. + * @param convert Converter. + */ + WeakQueryCloseableIterator(GridCloseableIterator<V> iter, CacheIteratorConverter<T, V> convert) { + this.iter = iter; + this.convert = convert; + + weakRef = new WeakReference(this, refQueue); + } + + + /** {@inheritDoc} */ + @Override protected T onNext() throws IgniteCheckedException { + V next; + + try { + next = iter.nextX(); + } + catch (NoSuchElementException e){ + clearWeakReference(); + + throw e; + } + + if (next == null) + clearWeakReference(); + + cur = next != null ? convert.convert(next) : null; + + return cur; + } + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + boolean hasNextX = iter.hasNextX(); + + if (!hasNextX) + clearWeakReference(); + + return hasNextX; + } + + /** {@inheritDoc} */ + @Override protected void onRemove() throws IgniteCheckedException { + if (cur == null) + throw new IllegalStateException(); + + convert.remove(cur); + + cur = null; + } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + iter.close(); + + clearWeakReference(); + } + + /** + * Clears weak reference. + */ + private void clearWeakReference() { + weakRef.clear(); + + refs.remove(weakRef); + } + + /** {@inheritDoc} */ + @Override public WeakReference weakReference() { + return weakRef; + } + } + + /** + * + */ + public static interface WeakReferenceCloseableIterator<T> extends GridCloseableIterator<T> { + /** + * @return Iterator weak reference. + */ + public WeakReference weakReference(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d81cbd2..a02db2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -86,7 +86,6 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; @@ -105,6 +104,7 @@ import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; @@ -2400,7 +2400,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V validateCacheKeys(keys); IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) { - @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) { + @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, + AffinityTopologyVersion readyTopVer) { Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { @Override public EntryProcessor apply(K k) { return entryProcessor; @@ -3961,7 +3962,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /** * @return Distributed ignite cache iterator. */ - public Iterator<Cache.Entry<K, V>> igniteIterator() { + public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException { GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx; final CacheOperationContext opCtx = ctx.operationContextPerCall(); @@ -3969,11 +3970,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (!ctx0.isSwapOrOffheapEnabled() && ctx0.kernalContext().discovery().size() == 1) return localIteratorHonorExpirePolicy(opCtx); - CacheQueryFuture<Map.Entry<K, V>> fut = ctx0.queries().createScanQuery(null, null, ctx.keepBinary()) + final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(null, null, ctx.keepBinary()) .keepAll(false) - .execute(); + .executeScanQuery(); - return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { + return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { return new CacheEntryImpl<>(e.getKey(), e.getValue()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index 7901105..98251dd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -1848,7 +1848,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return Off-heap iterator. */ public <T> GridCloseableIterator<T> rawOffHeapIterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, - Integer part, + @Nullable Integer part, boolean primary, boolean backup) { @@ -1859,8 +1859,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { checkIteratorQueue(); - if (primary && backup) - return offheap.iterator(spaceName, c); + if (primary && backup) { + if (part == null) + return offheap.iterator(spaceName, c); + else + return offheap.iterator(spaceName, c, part); + } AffinityTopologyVersion ver = cctx.affinity().affinityTopologyVersion(); @@ -1894,7 +1898,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (!offheapEnabled || (!primary && !backup)) return new GridEmptyCloseableIterator<>(); - if (primary && backup) + if (primary && backup && part == null) return new GridCloseableIteratorAdapter<Map.Entry<byte[], byte[]>>() { private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> it = offheap.iterator(spaceName); http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index 5b78271..fc046af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -21,7 +21,6 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Date; @@ -59,7 +58,6 @@ import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.TextQuery; import org.apache.ignite.cluster.ClusterGroup; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.AsyncSupportAdapter; import org.apache.ignite.internal.GridKernalContext; @@ -72,6 +70,7 @@ import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; import org.apache.ignite.internal.util.GridEmptyIterator; import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -86,7 +85,6 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.CacheMetricsMXBean; import org.apache.ignite.plugin.security.SecurityPermission; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; /** @@ -468,7 +466,6 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V private QueryCursor<Cache.Entry<K,V>> query(final Query filter, @Nullable ClusterGroup grp) throws IgniteCheckedException { final CacheQuery<Map.Entry<K,V>> qry; - final CacheQueryFuture<Map.Entry<K,V>> fut; boolean isKeepBinary = opCtx != null && opCtx.isKeepBinary(); @@ -480,14 +477,35 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V if (grp != null) qry.projection(grp); - fut = ctx.kernalContext().query().executeQuery(ctx, - new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() { - @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() throws IgniteCheckedException { - return qry.execute(); + final GridCloseableIterator<Entry<K, V>> iter = ctx.kernalContext().query().executeQuery(ctx, + new IgniteOutClosureX<GridCloseableIterator<Entry<K,V>>>() { + @Override public GridCloseableIterator<Entry<K,V>> applyx() throws IgniteCheckedException { + final GridCloseableIterator<Map.Entry> iter0 = qry.executeScanQuery(); + + return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() { + @Override protected Cache.Entry<K, V> onNext() throws IgniteCheckedException { + Map.Entry<K, V> next = iter0.nextX(); + + return new CacheEntryImpl<>(next.getKey(), next.getValue()); + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return iter0.hasNextX(); + } + + @Override protected void onClose() throws IgniteCheckedException { + iter0.close(); + } + }; } }, false); + + return new QueryCursorImpl<>(iter); } - else if (filter instanceof TextQuery) { + + final CacheQueryFuture<Map.Entry<K,V>> fut; + + if (filter instanceof TextQuery) { TextQuery p = (TextQuery)filter; qry = ctx.queries().createFullTextQuery(p.getType(), p.getText(), isKeepBinary); @@ -1810,6 +1828,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V try { return ctx.cache().igniteIterator(); } + catch (IgniteCheckedException e) { + throw cacheException(e); + } finally { onLeave(gate, prev); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index c043bc7..18751c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -79,11 +79,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.query.CacheQuery; -import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridMapEntry; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; @@ -295,16 +295,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm qry.projection(ctx.cluster().get().forNode(oldestSrvNode)); - try { - CacheQueryFuture<Map.Entry<BinaryMetadataKey, BinaryMetadata>> fut = qry.execute(); - - Map.Entry<BinaryMetadataKey, BinaryMetadata> next; - - while ((next = fut.next()) != null) { - assert next.getKey() != null : next; - assert next.getValue() != null : next; + try (GridCloseableIterator<Map.Entry<BinaryMetadataKey, BinaryMetadata>> entries = qry.executeScanQuery()) { + for (Map.Entry<BinaryMetadataKey, BinaryMetadata> e : entries) { + assert e.getKey() != null : e; + assert e.getValue() != null : e; - addClientCacheMetaData(next.getKey(), next.getValue()); + addClientCacheMetaData(e.getKey(), e.getValue()); } } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java index 5f9dc61..47c6e89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.query; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.affinity.AffinityKey; import org.apache.ignite.cache.query.Query; import org.apache.ignite.cache.query.QueryMetrics; @@ -24,6 +26,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.cache.query.annotations.QueryTextField; import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; @@ -289,4 +292,9 @@ public interface CacheQuery<T> { * Resets metrics for this query. */ public void resetMetrics(); -} \ No newline at end of file + + /** + * @return Scan query iterator. + */ + public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java index bb342b3..a0244d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryFuture.java @@ -26,16 +26,7 @@ import org.jetbrains.annotations.Nullable; * Cache query future returned by query execution. * Refer to {@link CacheQuery} documentation for more information. */ -public interface CacheQueryFuture<T> extends IgniteInternalFuture<Collection<T>> { - /** - * Returns number of elements that are already fetched and can - * be returned from {@link #next()} method without blocking. - * - * @return Number of fetched elements which are available immediately. - * @throws IgniteCheckedException In case of error. - */ - public int available() throws IgniteCheckedException; - +public interface CacheQueryFuture<T> extends IgniteInternalFuture<Collection<T>>, AutoCloseable { /** * Returns next element from result set. * <p> @@ -62,4 +53,4 @@ public interface CacheQueryFuture<T> extends IgniteInternalFuture<Collection<T>> * @throws IgniteCheckedException {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index 353fbd3..5f6cb8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; @@ -35,6 +36,8 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedSet; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; @@ -55,7 +58,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE; /** - * Distributed query manager. + * Distributed query manager (for cache in REPLICATED / PARTITIONED cache mode). */ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManager<K, V> { /** */ @@ -512,29 +515,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage } /** {@inheritDoc} */ - @Override public CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry) { - assert cctx.config().getCacheMode() != LOCAL; - - if (log.isDebugEnabled()) - log.debug("Executing query on local node: " + qry); - - GridCacheLocalQueryFuture<K, V, ?> fut = new GridCacheLocalQueryFuture<>(cctx, qry); - - try { - qry.query().validate(); - - fut.execute(); - } - catch (IgniteCheckedException e) { - fut.onDone(e); - } - - return fut; - } - - /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) { + @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, final Collection<ClusterNode> nodes) { assert cctx.config().getCacheMode() != LOCAL; if (log.isDebugEnabled()) @@ -550,7 +532,7 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage String clsName = qry.query().queryClassName(); - GridCacheQueryRequest req = new GridCacheQueryRequest( + final GridCacheQueryRequest req = new GridCacheQueryRequest( cctx.cacheId(), reqId, cctx.name(), @@ -595,6 +577,76 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage } /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "serial"}) + @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(final GridCacheQueryAdapter qry, + Collection<ClusterNode> nodes) throws IgniteCheckedException { + assert !cctx.isLocal() : cctx.name(); + assert qry.type() == GridCacheQueryType.SCAN: qry; + + GridCloseableIterator<Map.Entry<K, V>> locIter0 = null; + + for (ClusterNode node : nodes) { + if (node.isLocal()) { + locIter0 = (GridCloseableIterator)scanQueryLocal(qry, false); + + Collection<ClusterNode> rmtNodes = new ArrayList<>(nodes.size() - 1); + + for (ClusterNode n : nodes) { + // Equals by reference can be used here. + if (n != node) + rmtNodes.add(n); + } + + nodes = rmtNodes; + + break; + } + } + + final GridCloseableIterator<Map.Entry<K, V>> locIter = locIter0; + + final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null); + + final CacheQueryFuture<Map.Entry<K, V>> fut = (CacheQueryFuture<Map.Entry<K, V>>)queryDistributed(bean, nodes); + + return new GridCloseableIteratorAdapter<Map.Entry<K, V>>() { + /** */ + private Map.Entry<K, V> cur; + + @Override protected Map.Entry<K, V> onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); + + Map.Entry<K, V> e = cur; + + cur = null; + + return e; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + if (cur != null) + return true; + + if (locIter != null && locIter.hasNextX()) + cur = locIter.nextX(); + + return cur != null || (cur = fut.next()) != null; + } + + @Override protected void onClose() throws IgniteCheckedException { + super.onClose(); + + if (locIter != null) + locIter.close(); + + if (fut != null) + fut.cancel(); + } + }; + } + + /** {@inheritDoc} */ @Override public void loadPage(long id, GridCacheQueryAdapter<?> qry, Collection<ClusterNode> nodes, boolean all) { assert cctx.config().getCacheMode() != LOCAL; assert qry != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java index 4e72f97..183abde 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryManager.java @@ -19,16 +19,18 @@ package org.apache.ignite.internal.processors.cache.query; import java.util.Collection; import java.util.List; +import java.util.Map; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.cache.CacheMode.LOCAL; /** - * Local query manager. + * Local query manager (for cache in LOCAL cache mode). */ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V> { /** {@inheritDoc} */ @@ -80,32 +82,20 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V } /** {@inheritDoc} */ - @Override public CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry) { + @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) { assert cctx.config().getCacheMode() == LOCAL; - if (log.isDebugEnabled()) - log.debug("Executing query on local node: " + qry); - - GridCacheLocalQueryFuture<K, V, ?> fut = new GridCacheLocalQueryFuture<>(cctx, qry); - - try { - qry.query().validate(); - - fut.execute(); - } - catch (IgniteCheckedException e) { - fut.onDone(e); - } - - return fut; + throw new IgniteException("Distributed queries are not available for local cache " + + "(use 'CacheQuery.execute(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']'); } /** {@inheritDoc} */ - @Override public CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) { - assert cctx.config().getCacheMode() == LOCAL; + @Override public GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry, + Collection<ClusterNode> nodes) throws IgniteCheckedException { + assert cctx.isLocal() : cctx.name(); - throw new IgniteException("Distributed queries are not available for local cache " + - "(use 'CacheQuery.execute(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']'); + throw new IgniteException("Distributed scan query are not available for local cache " + + "(use 'CacheQuery.executeScanQuery(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']'); } /** {@inheritDoc} */ @@ -142,4 +132,4 @@ public class GridCacheLocalQueryManager<K, V> extends GridCacheQueryManager<K, V throw new IgniteException("Distributed queries are not available for local cache " + "(use 'CacheQuery.execute(grid.forLocal())' instead) [cacheName=" + cctx.name() + ']'); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java index b948dc5..90e14f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java @@ -22,6 +22,8 @@ import java.util.Collections; import java.util.Deque; import java.util.HashSet; import java.util.LinkedList; +import java.util.Map; +import java.util.NoSuchElementException; import java.util.Queue; import java.util.Set; import java.util.UUID; @@ -40,9 +42,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException; import org.apache.ignite.internal.processors.query.GridQueryProcessor; -import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.util.GridEmptyCloseableIterator; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -429,6 +434,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { @SuppressWarnings({"IfMayBeConditional", "unchecked"}) private <R> CacheQueryFuture<R> execute(@Nullable IgniteReducer<T, R> rmtReducer, @Nullable IgniteClosure<T, R> rmtTransform, @Nullable Object... args) { + assert type != SCAN : this; + Collection<ClusterNode> nodes; try { @@ -440,7 +447,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { cctx.checkSecurity(SecurityPermission.CACHE_READ); - if (nodes.isEmpty() && (type != SCAN || part == null)) + if (nodes.isEmpty()) return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException()); if (log.isDebugEnabled()) @@ -471,12 +478,44 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { if (type == SQL_FIELDS || type == SPI) return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) : qryMgr.queryFieldsDistributed(bean, nodes)); - else if (type == SCAN && part != null && !cctx.isLocal()) - return new CacheQueryFallbackFuture<>(part, bean, qryMgr, cctx); else return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes)); } + /** {@inheritDoc} */ + @SuppressWarnings({"IfMayBeConditional", "unchecked"}) + @Override public <R extends Map.Entry> GridCloseableIterator<R> executeScanQuery() throws IgniteCheckedException { + assert type == SCAN: "Wrong processing of qyery: " + type; + + Collection<ClusterNode> nodes = nodes(); + + cctx.checkSecurity(SecurityPermission.CACHE_READ); + + if (nodes.isEmpty() && part == null) + return new GridEmptyCloseableIterator(); + + if (log.isDebugEnabled()) + log.debug("Executing query [query=" + this + ", nodes=" + nodes + ']'); + + if (cctx.deploymentEnabled()) + cctx.deploy().registerClasses(filter); + + if (subjId == null) + subjId = cctx.localNodeId(); + + taskHash = cctx.kernalContext().job().currentTaskNameHash(); + + final GridCacheQueryManager qryMgr = cctx.queries(); + + if (part != null && !cctx.isLocal()) + return (GridCloseableIterator<R>)new ScanQueryFallbackClosableIterator(part, this, qryMgr, cctx); + else { + boolean loc = nodes.size() == 1 && F.first(nodes).id().equals(cctx.localNodeId()); + + return loc ? qryMgr.scanQueryLocal(this, true) : qryMgr.scanQueryDistributed(this, nodes); + } + } + /** * @return Nodes to execute on. */ @@ -549,10 +588,12 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** * Wrapper for queries with fallback. */ - private static class CacheQueryFallbackFuture<R> extends GridFutureAdapter<Collection<R>> - implements CacheQueryFuture<R> { + private static class ScanQueryFallbackClosableIterator extends GridCloseableIteratorAdapter<Map.Entry> { + /** */ + private static final long serialVersionUID = 0L; + /** Query future. */ - private volatile GridCacheQueryFutureAdapter<?, ?, R> fut; + private volatile T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> tuple; /** Backups. */ private volatile Queue<ClusterNode> nodes; @@ -564,7 +605,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private volatile int unreservedNodesRetryCnt = 5; /** Bean. */ - private final GridCacheQueryBean bean; + private final GridCacheQueryAdapter qry; /** Query manager. */ private final GridCacheQueryManager qryMgr; @@ -578,15 +619,18 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { /** Flag indicating that a first item has been returned to a user. */ private boolean firstItemReturned; + /** */ + private Map.Entry cur; + /** * @param part Partition. - * @param bean Bean. + * @param qry Query. * @param qryMgr Query manager. * @param cctx Cache context. */ - private CacheQueryFallbackFuture(int part, GridCacheQueryBean bean, + private ScanQueryFallbackClosableIterator(int part, GridCacheQueryAdapter qry, GridCacheQueryManager qryMgr, GridCacheContext cctx) { - this.bean = bean; + this.qry = qry; this.qryMgr = qryMgr; this.cctx = cctx; this.part = part; @@ -628,46 +672,82 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { private void init() { final ClusterNode node = nodes.poll(); - fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ? - qryMgr.queryLocal(bean) : - qryMgr.queryDistributed(bean, Collections.singleton(node))); - } + if (node.isLocal()) { + try { + GridCloseableIterator it = qryMgr.scanQueryLocal(qry, true); - /** {@inheritDoc} */ - @Override public int available() { - return fut.available(); - } + tuple= new T2(it, null); + } + catch (IgniteClientDisconnectedCheckedException e) { + throw CU.convertToCacheException(e); + } + catch (IgniteCheckedException e) { + retryIfPossible(e); + } + } + else { + final GridCacheQueryBean bean = new GridCacheQueryBean(qry, null, null, null); - /** {@inheritDoc} */ - @Override public boolean cancel() throws IgniteCheckedException { - return fut.cancel(); + GridCacheQueryFutureAdapter fut = + (GridCacheQueryFutureAdapter)qryMgr.queryDistributed(bean, Collections.singleton(node)); + + tuple = new T2(null, fut); + } } /** {@inheritDoc} */ - @Override public Collection<R> get() throws IgniteCheckedException { - assert false; + @Override protected Map.Entry onNext() throws IgniteCheckedException { + if (!onHasNext()) + throw new NoSuchElementException(); - return super.get(); + assert cur != null; + + Map.Entry e = cur; + + cur = null; + + return e; } /** {@inheritDoc} */ - @Override public R next() { - if (firstItemReturned) - return fut.next(); - + @Override protected boolean onHasNext() throws IgniteCheckedException { while (true) { - try { - fut.awaitFirstPage(); + if (cur != null) + return true; - firstItemReturned = true; + T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> t = tuple; - return fut.next(); - } - catch (IgniteClientDisconnectedCheckedException e) { - throw CU.convertToCacheException(e); + GridCloseableIterator<Map.Entry> iter = t.get1(); + + if (iter != null) { + boolean hasNext = iter.hasNext(); + + if (hasNext) + cur = iter.next(); + + return hasNext; } - catch (IgniteCheckedException e) { - retryIfPossible(e); + else { + GridCacheQueryFutureAdapter fut = t.get2(); + + assert fut != null; + + if (firstItemReturned) + return (cur = (Map.Entry)fut.next()) != null; + + try { + fut.awaitFirstPage(); + + firstItemReturned = true; + + return (cur = (Map.Entry)fut.next()) != null; + } + catch (IgniteClientDisconnectedCheckedException e) { + throw CU.convertToCacheException(e); + } + catch (IgniteCheckedException e) { + retryIfPossible(e); + } } } } @@ -679,8 +759,10 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { try { IgniteInternalFuture<?> retryFut; - if (e.hasCause(GridDhtUnreservedPartitionException.class)) { - AffinityTopologyVersion waitVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion(); + GridDhtUnreservedPartitionException partErr = X.cause(e, GridDhtUnreservedPartitionException.class); + + if (partErr != null) { + AffinityTopologyVersion waitVer = partErr.topologyVersion(); assert waitVer != null; @@ -720,5 +802,18 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> { throw CU.convertToCacheException(ex); } } + + /** {@inheritDoc} */ + @Override protected void onClose() throws IgniteCheckedException { + super.onClose(); + + T2<GridCloseableIterator<Map.Entry>, GridCacheQueryFutureAdapter> t = tuple; + + if (t != null && t.get1() != null) + t.get1().close(); + + if (t != null && t.get2() != null) + t.get2().cancel(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java index fd8c4d8..ac14ae6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryErrorFuture.java @@ -36,14 +36,14 @@ public class GridCacheQueryErrorFuture<T> extends GridFinishedFuture<Collection< } /** {@inheritDoc} */ - @Override public int available() throws IgniteCheckedException { - return 0; - } - - /** {@inheritDoc} */ @Nullable @Override public T next() throws IgniteCheckedException { get(); return null; } -} \ No newline at end of file + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + cancel(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java index e3e5d98..db519f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java @@ -161,11 +161,6 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda } /** {@inheritDoc} */ - @Override public int available() { - return cnt.get(); - } - - /** {@inheritDoc} */ @Override public R next() { try { R next = unmaskNull(internalIterator().next()); @@ -571,6 +566,11 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda } /** {@inheritDoc} */ + @Override public void close() throws Exception { + cancel(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheQueryFutureAdapter.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 8bcf564..3028208 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -38,6 +38,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import javax.cache.Cache; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; @@ -466,7 +467,27 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param qry Query. * @return Query future. */ - public abstract CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry); + @SuppressWarnings("unchecked") + public CacheQueryFuture<?> queryLocal(GridCacheQueryBean qry) { + assert qry.query().type() != GridCacheQueryType.SCAN : qry; + + if (log.isDebugEnabled()) + log.debug("Executing query on local node: " + qry); + + GridCacheLocalQueryFuture fut = new GridCacheLocalQueryFuture<>(cctx, qry); + + try { + qry.query().validate(); + + fut.execute(); + } + catch (IgniteCheckedException e) { + if (fut != null) + fut.onDone(e); + } + + return fut; + } /** * Executes distributed query. @@ -478,6 +499,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte public abstract CacheQueryFuture<?> queryDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes); /** + * Executes distributed SCAN query. + * + * @param qry Query. + * @param nodes Nodes. + * @return Iterator. + * @throws IgniteCheckedException If failed. + */ + public abstract GridCloseableIterator<Map.Entry<K, V>> scanQueryDistributed(GridCacheQueryAdapter qry, + Collection<ClusterNode> nodes) throws IgniteCheckedException; + + /** * Loads page. * * @param id Query ID. @@ -591,7 +623,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte taskName)); } - iter = scanIterator(qry); + iter = scanIterator(qry, false); break; @@ -800,18 +832,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param qry Query. + * @param locNode Local node. * @return Full-scan row iterator. * @throws IgniteCheckedException If failed to get iterator. */ @SuppressWarnings({"unchecked"}) - private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry) + private GridCloseableIterator<IgniteBiTuple<K, V>> scanIterator(final GridCacheQueryAdapter<?> qry, boolean locNode) throws IgniteCheckedException { - IgniteInternalCache<K, V> prj0 = cctx.cache(); - - prj0 = prj0.keepBinary(); - - final IgniteInternalCache prj = prj0; - final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter(); try { @@ -823,56 +850,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte final boolean backups = qry.includeBackups() || cctx.isReplicated(); - Iterator<K> keyIter; - - GridDhtLocalPartition locPart = null; - - Integer part = qry.partition(); - - if (part == null || cctx.isLocal()) - keyIter = backups ? prj.keySetx().iterator() : prj.primaryKeySet().iterator(); - else if (part < 0 || part >= cctx.affinity().partitions()) - keyIter = new GridEmptyIterator<>(); - else { - final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); - - locPart = dht.topology().localPartition(part, topVer, false); - - // double check for owning state - if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING) - throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(), - "Partition can not be reserved"); - - final GridDhtLocalPartition locPart0 = locPart; - - keyIter = new Iterator<K>() { - private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator(); - - @Override public boolean hasNext() { - return iter0.hasNext(); - } - - @Override public K next() { - return (K)iter0.next(); - } - - @Override public void remove() { - iter0.remove(); - } - }; - } - - final GridDhtLocalPartition locPart0 = locPart; - - final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt = - new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), true) { - @Override protected void onClose() { - super.onClose(); - - if (locPart0 != null) - locPart0.release(); - } - }; + final GridIterator<IgniteBiTuple<K, V>> heapIt = onheapIterator(qry, + topVer, + keyValFilter, + backups, + plc, + locNode); final GridIterator<IgniteBiTuple<K, V>> it; @@ -882,10 +865,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte iters.add(heapIt); if (cctx.isOffHeapEnabled()) - iters.add(offheapIterator(qry, topVer, backups, plc)); + iters.add(offheapIterator(qry, topVer, backups, plc, locNode)); if (cctx.swap().swapEnabled()) - iters.add(swapIterator(qry, topVer, backups, plc)); + iters.add(swapIterator(qry, topVer, backups, plc, locNode)); it = new CompoundIterator<>(iters); } @@ -907,7 +890,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @Override protected void onClose() throws IgniteCheckedException { try { - heapIt.close(); + if (heapIt instanceof IgniteSpiCloseableIterator) + ((IgniteSpiCloseableIterator)heapIt).close(); } finally { closeScanFilter(keyValFilter); @@ -915,8 +899,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } }; } - catch (IgniteCheckedException | RuntimeException e) - { + catch (IgniteCheckedException | RuntimeException e) { closeScanFilter(keyValFilter); throw e; @@ -935,7 +918,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param qry Query. + * @param topVer Topology version. * @param backups Include backups. + * @param expPlc Expiry policy. + * @param locNode Local node. * @return Swap iterator. * @throws IgniteCheckedException If failed. */ @@ -943,8 +929,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte GridCacheQueryAdapter<?> qry, AffinityTopologyVersion topVer, boolean backups, - ExpiryPolicy expPlc - ) throws IgniteCheckedException { + ExpiryPolicy expPlc, + boolean locNode) throws IgniteCheckedException { IgniteBiPredicate<K, V> filter = qry.scanFilter(); Integer part = qry.partition(); @@ -958,22 +944,146 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte topVer, filter, expPlc, - qry.keepBinary()); + qry.keepBinary(), locNode); - return scanIterator(it, filter, qry.keepBinary()); + return scanIterator(it, filter, qry.keepBinary(), locNode); } /** * @param qry Query. + * @param topVer Topology version. + * @param keyValFilter Filter. * @param backups Include backups. + * @param plc Expiry policy. + * @param locNode Local node. + * @return Offheap iterator. + * @throws GridDhtUnreservedPartitionException If failed to reserve partition. + */ + private GridIterator<IgniteBiTuple<K, V>> onheapIterator( + GridCacheQueryAdapter<?> qry, + AffinityTopologyVersion topVer, + final IgniteBiPredicate<K, V> keyValFilter, + boolean backups, + final ExpiryPolicy plc, + final boolean locNode) throws GridDhtUnreservedPartitionException { + Iterator<K> keyIter; + + GridDhtLocalPartition locPart = null; + + Integer part = qry.partition(); + + if (part == null || cctx.isLocal()) { + // Performance optimization. + if (locNode && plc == null && !cctx.isLocal()) { + GridDhtCacheAdapter<K, V> cache = cctx.isNear() ? cctx.near().dht() : cctx.dht(); + + final Iterator<Cache.Entry<K, V>> iter = cache.localEntriesIterator(true, backups); + + return new GridIteratorAdapter<IgniteBiTuple<K, V>>() { + /** */ + private IgniteBiTuple<K, V> next; + + { + advance(); + } + + @Override public boolean hasNextX() throws IgniteCheckedException { + return next != null; + } + + @Override public IgniteBiTuple<K, V> nextX() throws IgniteCheckedException { + if (next == null) + throw new NoSuchElementException(); + + IgniteBiTuple<K, V> next0 = next; + + advance(); + + return next0; + } + + @Override public void removeX() throws IgniteCheckedException { + // No-op. + } + + private void advance() { + IgniteBiTuple<K, V> next0 = null; + + while (iter.hasNext()) { + Cache.Entry<K, V> cacheEntry = iter.next(); + + if (keyValFilter != null && !keyValFilter.apply(cacheEntry.getKey(), cacheEntry.getValue())) + continue; + + next0 = new IgniteBiTuple<>(cacheEntry.getKey(), cacheEntry.getValue()); + + break; + } + + next = next0; + } + }; + } + + IgniteInternalCache<K, V> keepBinaryCache = cctx.cache().keepBinary(); + + keyIter = backups ? keepBinaryCache.keySetx().iterator() : keepBinaryCache.primaryKeySet().iterator(); + } + else if (part < 0 || part >= cctx.affinity().partitions()) + keyIter = new GridEmptyIterator<>(); + else { + final GridDhtCacheAdapter dht = cctx.isNear() ? cctx.near().dht() : cctx.dht(); + + locPart = dht.topology().localPartition(part, topVer, false); + + // Double check for owning state. + if (locPart == null || locPart.state() != OWNING || !locPart.reserve() || locPart.state() != OWNING) + throw new GridDhtUnreservedPartitionException(part, cctx.affinity().affinityTopologyVersion(), + "Partition can not be reserved."); + + final GridDhtLocalPartition locPart0 = locPart; + + keyIter = new Iterator<K>() { + private Iterator<KeyCacheObject> iter0 = locPart0.keySet().iterator(); + + @Override public boolean hasNext() { + return iter0.hasNext(); + } + + @Override public K next() { + return (K)iter0.next(); + } + + @Override public void remove() { + iter0.remove(); + } + }; + } + + final GridDhtLocalPartition locPart0 = locPart; + + return new PeekValueExpiryAwareIterator(keyIter, plc, topVer, keyValFilter, qry.keepBinary(), locNode, true) { + @Override protected void onClose() { + super.onClose(); + + if (locPart0 != null) + locPart0.release(); + } + }; + } + + /** + * @param qry Query. + * @param backups Include backups. + * @param locNode Local node. * @return Offheap iterator. */ private GridIterator<IgniteBiTuple<K, V>> offheapIterator( GridCacheQueryAdapter<?> qry, AffinityTopologyVersion topVer, boolean backups, - ExpiryPolicy expPlc - ) { + ExpiryPolicy expPlc, + boolean locNode) { IgniteBiPredicate<K, V> filter = qry.scanFilter(); if (expPlc != null) { @@ -982,18 +1092,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte topVer, filter, expPlc, - qry.keepBinary()); + qry.keepBinary(), locNode); } if (cctx.offheapTiered() && filter != null) { - OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary()); + OffheapIteratorClosure c = new OffheapIteratorClosure(filter, qry.keepBinary(), locNode); return cctx.swap().rawOffHeapIterator(c, qry.partition(), true, backups); } else { Iterator<Map.Entry<byte[], byte[]>> it = cctx.swap().rawOffHeapIterator(qry.partition(), true, backups); - return scanIterator(it, filter, qry.keepBinary()); + return scanIterator(it, filter, qry.keepBinary(), locNode); } } @@ -1001,12 +1111,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param it Lazy swap or offheap iterator. * @param filter Scan filter. * @param keepBinary Keep binary flag. + * @param locNode Local node. * @return Iterator. */ private GridIteratorAdapter<IgniteBiTuple<K, V>> scanIterator( @Nullable final Iterator<Map.Entry<byte[], byte[]>> it, @Nullable final IgniteBiPredicate<K, V> filter, - final boolean keepBinary) { + final boolean keepBinary, + final boolean locNode) { if (it == null) return new GridEmptyCloseableIterator<>(); @@ -1042,15 +1154,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte while (it.hasNext()) { final LazySwapEntry e = new LazySwapEntry(it.next()); - if (filter != null) { - K key = (K)cctx.unwrapBinaryIfNeeded(e.key(), keepBinary); - V val = (V)cctx.unwrapBinaryIfNeeded(e.value(), keepBinary); + K key = e.key(); + V val = e.value(); - if (!filter.apply(key, val)) - continue; + if (filter != null || locNode) { + key = (K)cctx.unwrapBinaryIfNeeded(key, keepBinary); + val = (V)cctx.unwrapBinaryIfNeeded(val, keepBinary); } - next = new IgniteBiTuple<>(e.key(), e.value()); + if (filter != null && !filter.apply(key, val)) + continue; + + next = new IgniteBiTuple<>(key, val); break; } @@ -1064,6 +1179,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param filter Filter. * @param expPlc Expiry policy. * @param keepBinary Keep binary flag. + * @param locNode Local node. * @return Final key-value iterator. */ private GridIterator<IgniteBiTuple<K,V>> scanExpiryIterator( @@ -1071,8 +1187,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte AffinityTopologyVersion topVer, @Nullable final IgniteBiPredicate<K, V> filter, ExpiryPolicy expPlc, - final boolean keepBinary - ) { + final boolean keepBinary, + boolean locNode) { Iterator <K> keyIter = new Iterator<K>() { /** {@inheritDoc} */ @Override public boolean hasNext() { @@ -1097,7 +1213,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } }; - return new PeekValueExpiryAwareIterator(keyIter, expPlc, topVer, filter, keepBinary, false); + return new PeekValueExpiryAwareIterator(keyIter, expPlc, topVer, filter, keepBinary, locNode, false); } /** @@ -1318,6 +1434,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @SuppressWarnings("unchecked") protected void runQuery(GridCacheQueryInfo qryInfo) { assert qryInfo != null; + assert qryInfo.query().type() != SCAN || !qryInfo.local() : qryInfo; if (!enterBusy()) { if (cctx.localNodeId().equals(qryInfo.senderId())) @@ -1439,6 +1556,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } if (readEvt) { + K key0 = (K)cctx.unwrapBinaryIfNeeded(key, qry.keepBinary()); + V val0 = (V)cctx.unwrapBinaryIfNeeded(val, qry.keepBinary()); + switch (type) { case SQL: cctx.gridEvents().record(new CacheQueryReadEvent<>( @@ -1454,8 +1574,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte qryInfo.arguments(), qry.subjectId(), taskName, - key, - val, + key0, + val0, null, null)); @@ -1475,8 +1595,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, qry.subjectId(), taskName, - key, - val, + key0, + val0, null, null)); @@ -1496,8 +1616,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte null, qry.subjectId(), taskName, - key, - val, + key0, + val0, null, null)); @@ -1589,12 +1709,127 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** + * Process local scan query. + * + * @param qry Query. + * @param updStatisticsIfNeeded Update statistics flag. + */ + @SuppressWarnings({"unchecked", "serial"}) + protected GridCloseableIterator<IgniteBiTuple<K, V>> scanQueryLocal(final GridCacheQueryAdapter qry, + final boolean updStatisticsIfNeeded) throws IgniteCheckedException { + if (!enterBusy()) + throw new IllegalStateException("Failed to process query request (grid is stopping)."); + + final boolean statsEnabled = cctx.config().isStatisticsEnabled(); + + boolean needUpdStatistics = updStatisticsIfNeeded && statsEnabled; + + long startTime = U.currentTimeMillis(); + + try { + assert qry.type() == SCAN; + + if (log.isDebugEnabled()) + log.debug("Running local SCAN query: " + qry); + + final String taskName = cctx.kernalContext().task().resolveTaskName(qry.taskHash()); + final IgniteBiPredicate filter = qry.scanFilter(); + final String namex = cctx.namex(); + final ClusterNode locNode = cctx.localNode(); + final UUID subjId = qry.subjectId(); + + if (cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_EXECUTED)) { + cctx.gridEvents().record(new CacheQueryExecutedEvent<>( + locNode, + "Scan query executed.", + EVT_CACHE_QUERY_EXECUTED, + CacheQueryType.SCAN.name(), + namex, + null, + null, + filter, + null, + null, + subjId, + taskName)); + } + + final GridCloseableIterator<IgniteBiTuple<K, V>> iter = scanIterator(qry, true); + + if (updStatisticsIfNeeded) { + needUpdStatistics = false; + + cctx.queries().onCompleted(U.currentTimeMillis() - startTime, false); + } + + final boolean readEvt = cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ); + + return new GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() { + @Override protected IgniteBiTuple<K, V> onNext() throws IgniteCheckedException { + long start = statsEnabled ? System.nanoTime() : 0L; + + IgniteBiTuple<K, V> next = iter.nextX(); + + if (statsEnabled) { + CacheMetricsImpl metrics = cctx.cache().metrics0(); + + metrics.onRead(true); + + metrics.addGetTimeNanos(System.nanoTime() - start); + } + + if (readEvt) { + cctx.gridEvents().record(new CacheQueryReadEvent<>( + cctx.localNode(), + "Scan query entry read.", + EVT_CACHE_QUERY_OBJECT_READ, + CacheQueryType.SCAN.name(), + namex, + null, + null, + filter, + null, + null, + subjId, + taskName, + next.getKey(), + next.getValue(), + null, + null)); + } + + return next; + } + + @Override protected boolean onHasNext() throws IgniteCheckedException { + return iter.hasNextX(); + } + + @Override protected void onClose() throws IgniteCheckedException { + iter.close(); + } + }; + } + catch (Exception e) { + if (needUpdStatistics) + cctx.queries().onCompleted(U.currentTimeMillis() - startTime, true); + + throw e; + } + finally { + leaveBusy(); + } + } + + /** * @param qryInfo Info. * @param taskName Task name. * @return Iterator. * @throws IgniteCheckedException In case of error. */ - private QueryResult<K, V> queryResult(GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { + private QueryResult<K, V> queryResult(final GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { + assert qryInfo != null; + final UUID sndId = qryInfo.senderId(); assert sndId != null; @@ -1602,8 +1837,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs = qryIters.get(sndId); if (futs == null) { - futs = new LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>>( - 16, 0.75f, true) { + futs = new LinkedHashMap<Long, GridFutureAdapter<QueryResult<K, V>>>(16, 0.75f, true) { @Override protected boolean removeEldestEntry(Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> e) { boolean rmv = size() > maxIterCnt; @@ -1626,22 +1860,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte futs = old; } - return queryResult(futs, qryInfo, taskName); - } - - /** - * @param futs Futures map. - * @param qryInfo Info. - * @return Iterator. - * @throws IgniteCheckedException In case of error. - */ - @SuppressWarnings({ - "SynchronizationOnLocalVariableOrMethodParameter", - "NonPrivateFieldAccessedInSynchronizedContext"}) - private QueryResult<K, V> queryResult(Map<Long, GridFutureAdapter<QueryResult<K, V>>> futs, - GridCacheQueryInfo qryInfo, String taskName) throws IgniteCheckedException { assert futs != null; - assert qryInfo != null; GridFutureAdapter<QueryResult<K, V>> fut; @@ -2584,17 +2803,23 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** */ private boolean keepBinary; + /** */ + private boolean locNode; + /** * @param filter Filter. * @param keepBinary Keep binary flag. + * @param locNode Local node. */ private OffheapIteratorClosure( @Nullable IgniteBiPredicate<K, V> filter, - boolean keepBinary) { + boolean keepBinary, + boolean locNode) { assert filter != null; this.filter = filter; this.keepBinary = keepBinary; + this.locNode = locNode; } /** {@inheritDoc} */ @@ -2609,15 +2834,19 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (!filter.apply(key, val)) return null; - if (key instanceof CacheObject) - ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext()); + if (locNode) + return new IgniteBiTuple<>(key, val); + else{ + if (key instanceof CacheObject) + ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext()); - val = (V)cctx.unwrapTemporary(e.value()); + val = (V)cctx.unwrapTemporary(e.value()); - if (val instanceof CacheObject) - ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext()); + if (val instanceof CacheObject) + ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext()); - return new IgniteBiTuple<>(e.key(), val); + return new IgniteBiTuple<>(key, val); + } } } @@ -3098,6 +3327,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** */ private final IgniteBiPredicate<K, V> keyValFilter; + /** */ + private boolean locNode; + /** Heap only flag. */ private boolean heapOnly; @@ -3119,6 +3351,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param topVer Topology version. * @param keyValFilter Key-value filter. * @param keepBinary Keep binary flag from the query. + * @param locNode Local node. + * @param heapOnly Heap only. */ private PeekValueExpiryAwareIterator( Iterator<K> keyIt, @@ -3126,12 +3360,14 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte AffinityTopologyVersion topVer, IgniteBiPredicate<K, V> keyValFilter, boolean keepBinary, + boolean locNode, boolean heapOnly ) { this.keyIt = keyIt; this.plc = plc; this.topVer = topVer; this.keyValFilter = keyValFilter; + this.locNode = locNode; this.heapOnly = heapOnly; dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht()); @@ -3195,11 +3431,27 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } if (val != null) { + boolean keepBinary0 = !locNode || keepBinary; + next0 = F.t( - (K)cctx.unwrapBinaryIfNeeded(key, true), - (V)cctx.unwrapBinaryIfNeeded(val, true)); + (K)cctx.unwrapBinaryIfNeeded(key, keepBinary0), + (V)cctx.unwrapBinaryIfNeeded(val, keepBinary0)); + + boolean passPred = true; - if (checkPredicate(next0)) + if (keyValFilter != null) { + Object key0 = next0.getKey(); + Object val0 = next0.getValue(); + + if (keepBinary0 && !keepBinary) { + key0 = (K)cctx.unwrapBinaryIfNeeded(key0, keepBinary); + val0 = (V)cctx.unwrapBinaryIfNeeded(val0, keepBinary); + } + + passPred = keyValFilter.apply((K)key0, (V)val0); + } + + if (passPred) break; else next0 = null; @@ -3246,21 +3498,5 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } } } - - /** - * Check key-value predicate. - * - * @param e Entry to check. - * @return Filter evaluation result. - */ - private boolean checkPredicate(Map.Entry<K, V> e) { - if (keyValFilter != null) { - Map.Entry<K, V> e0 = (Map.Entry<K, V>)cctx.unwrapBinaryIfNeeded(e, keepBinary); - - return keyValFilter.apply(e0.getKey(), e0.getValue()); - } - - return true; - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java index 5b74992..70232af 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java @@ -30,8 +30,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; - -import org.apache.ignite.*; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSet; import org.apache.ignite.cache.affinity.AffinityKeyMapped; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -417,7 +420,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite CacheQueryFuture<Map.Entry<T, ?>> fut = qry.execute(); - CacheWeakQueryIteratorsHolder.WeakQueryFutureIterator it = + CacheWeakQueryIteratorsHolder.WeakReferenceCloseableIterator it = ctx.itHolder().iterator(fut, new CacheIteratorConverter<T, Map.Entry<T, ?>>() { @Override protected T convert(Map.Entry<T, ?> e) { return e.getKey();
