Repository: ignite Updated Branches: refs/heads/ignite-1.4 28213a311 -> d223a707d
ignite-1366 Start cache processor before query processor. Initialize topology version for GridCacheQueryRequest to do not miss messages before message handler is registered. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15f3edb5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15f3edb5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15f3edb5 Branch: refs/heads/ignite-1.4 Commit: 15f3edb546c9f08ed46e3baa51f41250d57b1d98 Parents: f1f6be8 Author: sboikov <[email protected]> Authored: Fri Sep 4 10:30:18 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Sep 4 10:30:18 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 2 +- .../dht/GridPartitionedGetFuture.java | 14 +- .../distributed/near/GridNearGetFuture.java | 13 ++ .../query/GridCacheDistributedQueryFuture.java | 5 +- .../query/GridCacheDistributedQueryManager.java | 9 +- .../cache/query/GridCacheQueryManager.java | 177 ++++++------------- .../cache/query/GridCacheQueryRequest.java | 59 ++++++- .../IgniteCacheNodeJoinAbstractTest.java | 42 +++++ 8 files changed, 185 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index ad4940a..7deede7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -876,8 +876,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(new GridAffinityProcessor(ctx)); startProcessor(createComponent(GridSegmentationProcessor.class, ctx)); startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); - startProcessor(new GridQueryProcessor(ctx)); startProcessor(new GridCacheProcessor(ctx)); + startProcessor(new GridQueryProcessor(ctx)); startProcessor(new GridTaskSessionProcessor(ctx)); startProcessor(new GridJobProcessor(ctx)); startProcessor(new GridTaskProcessor(ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 2f0de86..3ddf6d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -770,6 +770,18 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (log.isDebugEnabled()) log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']'); + if (!canRemap) { + map(F.view(keys.keySet(), new P1<KeyCacheObject>() { + @Override public boolean apply(KeyCacheObject key) { + return invalidParts.contains(cctx.affinity().partition(key)); + } + }), F.t(node, keys), topVer); + + onDone(createResultMap(res.entries())); + + return; + } + // Need to wait for next topology version to remap. IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); @@ -779,7 +791,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get()); // This will append new futures to compound list. - map(F.view(keys.keySet(), new P1<KeyCacheObject>() { + map(F.view(keys.keySet(), new P1<KeyCacheObject>() { @Override public boolean apply(KeyCacheObject key) { return invalidParts.contains(cctx.affinity().partition(key)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index 9d2113e..a7875f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -904,6 +904,19 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma if (log.isDebugEnabled()) log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']'); + if (!canRemap) { + map(F.view(keys.keySet(), new P1<KeyCacheObject>() { + @Override public boolean apply(KeyCacheObject key) { + return invalidParts.contains(cctx.affinity().partition(key)); + } + }), F.t(node, keys), topVer); + + // It is critical to call onDone after adding futures to compound list. + onDone(loadEntries(node.id(), keys.keySet(), res.entries(), savedVers, topVer)); + + return; + } + // Need to wait for next topology version to remap. IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java index 32a4599..1d547c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java @@ -98,7 +98,10 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu subgrid.clear(); } - final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(), reqId, fields()); + final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(), + reqId, + fields(), + qryMgr.queryTopologyVersion()); // Process cancel query directly (without sending) for local node, cctx.closures().callLocalSafe(new Callable<Object>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java index d1fdfcf..4422952 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java @@ -566,7 +566,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage false, qry.query().keepPortable(), qry.query().subjectId(), - qry.query().taskHash()); + qry.query().taskHash(), + queryTopologyVersion()); addQueryFuture(req.id(), fut); @@ -610,7 +611,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage all, qry.keepPortable(), qry.subjectId(), - qry.taskHash()); + qry.taskHash(), + queryTopologyVersion()); sendRequest(fut, req, nodes); } @@ -675,7 +677,8 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage qry.query().includeMetadata(), qry.query().keepPortable(), qry.query().subjectId(), - qry.query().taskHash()); + qry.query().taskHash(), + queryTopologyVersion()); addQueryFuture(req.id(), fut); http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index b3f8720..2041464 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -168,6 +168,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** */ private boolean enabled; + /** */ + private AffinityTopologyVersion qryTopVer; + /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { qryProc = cctx.kernalContext().query(); @@ -182,12 +185,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (futs != null) { for (Map.Entry<Long, GridFutureAdapter<QueryResult<K, V>>> entry : futs.entrySet()) { - final Object recipient = recipient(nodeId, entry.getKey()); + final Object rcpt = recipient(nodeId, entry.getKey()); entry.getValue().listen(new CIX1<IgniteInternalFuture<QueryResult<K, V>>>() { @Override public void applyx(IgniteInternalFuture<QueryResult<K, V>> f) throws IgniteCheckedException { - f.get().closeIfNotShared(recipient); + f.get().closeIfNotShared(rcpt); } }); } @@ -197,12 +200,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (fieldsFuts != null) { for (Map.Entry<Long, GridFutureAdapter<FieldsResult>> entry : fieldsFuts.entrySet()) { - final Object recipient = recipient(nodeId, entry.getKey()); + final Object rcpt = recipient(nodeId, entry.getKey()); entry.getValue().listen(new CIX1<IgniteInternalFuture<FieldsResult>>() { @Override public void applyx(IgniteInternalFuture<FieldsResult> f) throws IgniteCheckedException { - f.get().closeIfNotShared(recipient); + f.get().closeIfNotShared(rcpt); } }); } @@ -213,6 +216,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte cctx.events().addListener(lsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); enabled = GridQueryProcessor.isEnabled(cctx.config()); + + qryTopVer = cctx.startTopologyVersion(); + + if (qryTopVer == null) + qryTopVer = new AffinityTopologyVersion(cctx.localNode().order(), 0); } /** @@ -281,16 +289,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * Rebuilds all search indexes of given value type. * - * @param valType Value type. - * @return Future that will be completed when rebuilding of all indexes is finished. - */ - public IgniteInternalFuture<?> rebuildIndexes(Class<?> valType) { - return rebuildIndexes(valType.getName()); - } - - /** - * Rebuilds all search indexes of given value type. - * * @param typeName Value type name. * @return Future that will be completed when rebuilding of all indexes is finished. */ @@ -307,23 +305,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** - * Rebuilds all search indexes of all types. - * - * @return Future that will be completed when rebuilding of all indexes is finished. - */ - public IgniteInternalFuture<?> rebuildAllIndexes() { - if (!enterBusy()) - throw new IllegalStateException("Failed to rebuild indexes (grid is stopping)."); - - try { - return qryProc.rebuildAllIndexes(); - } - finally { - leaveBusy(); - } - } - - /** * Marks this request as canceled. * * @param reqId Request id. @@ -531,12 +512,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param loc Local query or not. * @param subjId Security subject ID. * @param taskName Task name. - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. * @return Collection of found keys. * @throws IgniteCheckedException In case of error. */ + @SuppressWarnings("unchecked") private QueryResult<K, V> executeQuery(GridCacheQueryAdapter<?> qry, - @Nullable Object[] args, boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object recipient) + @Nullable Object[] args, boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object rcpt) throws IgniteCheckedException { if (qry.type() == null) { assert !loc; @@ -555,16 +537,16 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte res = (QueryResult<K, V>)qryResCache.get(resKey); - if (res != null && res.addRecipient(recipient)) + if (res != null && res.addRecipient(rcpt)) return res; - res = new QueryResult<>(qry.type(), recipient); + res = new QueryResult<>(qry.type(), rcpt); if (qryResCache.putIfAbsent(resKey, res) != null) resKey = null; } else - res = new QueryResult<>(qry.type(), recipient); + res = new QueryResult<>(qry.type(), rcpt); GridCloseableIterator<IgniteBiTuple<K, V>> iter; @@ -667,12 +649,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte * @param loc Local query or not. * @param subjId Security subject ID. * @param taskName Task name. - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. * @return Collection of found keys. * @throws IgniteCheckedException In case of error. */ private FieldsResult executeFieldsQuery(GridCacheQueryAdapter<?> qry, @Nullable Object[] args, - boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object recipient) throws IgniteCheckedException { + boolean loc, @Nullable UUID subjId, @Nullable String taskName, Object rcpt) throws IgniteCheckedException { assert qry != null; FieldsResult res; @@ -709,10 +691,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte res = (FieldsResult)qryResCache.get(resKey); - if (res != null && res.addRecipient(recipient)) + if (res != null && res.addRecipient(rcpt)) return res; // Cached result found. - res = new FieldsResult(recipient); + res = new FieldsResult(rcpt); if (qryResCache.putIfAbsent(resKey, res) != null) resKey = null; // Failed to cache result. @@ -736,7 +718,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte taskName)); } - res = new FieldsResult(recipient); + res = new FieldsResult(rcpt); } try { @@ -1191,7 +1173,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte // If metadata needs to be returned to user and cleaned from internal fields - copy it. List<GridQueryFieldMetadata> meta = qryInfo.includeMetaData() ? - (res.metaData() != null ? new ArrayList<GridQueryFieldMetadata>(res.metaData()) : null) : + (res.metaData() != null ? new ArrayList<>(res.metaData()) : null) : res.metaData(); if (!qryInfo.includeMetaData()) @@ -1996,6 +1978,13 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** + * @return Topology version for query requests. + */ + public AffinityTopologyVersion queryTopologyVersion() { + return qryTopVer; + } + + /** * @param qry Query. * @return Filter. */ @@ -2347,10 +2336,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * @param type Query type. - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. */ - private QueryResult(GridCacheQueryType type, Object recipient) { - super(recipient); + private QueryResult(GridCacheQueryType type, Object rcpt) { + super(rcpt); this.type = type; } @@ -2374,10 +2363,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private List<GridQueryFieldMetadata> meta; /** - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. */ - FieldsResult(Object recipient) { - super(recipient); + FieldsResult(Object rcpt) { + super(rcpt); } /** @@ -2674,39 +2663,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** - * - */ - private class GridCacheScanSwapEntry implements Cache.Entry<K, V> { - /** */ - private final AbstractLazySwapEntry e; - - /** - * @param e Entry. - */ - private GridCacheScanSwapEntry(AbstractLazySwapEntry e) { - this.e = e; - } - - /** {@inheritDoc} */ - @Nullable @Override public V getValue() { - return e.value(); - } - - /** {@inheritDoc} */ - @Override public K getKey() { - return e.key(); - } - - /** {@inheritDoc} */ - @Override public <T> T unwrap(Class<T> clazz) { - if (clazz.isAssignableFrom(getClass())) - return clazz.cast(this); - - throw new IllegalArgumentException(); - } - } - - /** * Cached result. */ private abstract static class CachedResult<R> extends GridFutureAdapter<IgniteSpiCloseableIterator<R>> { @@ -2720,10 +2676,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private final Map<Object, QueueIterator> recipients = new GridLeanMap<>(1); /** - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. */ - protected CachedResult(Object recipient) { - boolean res = addRecipient(recipient); + protected CachedResult(Object rcpt) { + boolean res = addRecipient(rcpt); assert res; } @@ -2731,17 +2687,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** * Close if this result does not have any other recipients. * - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. * @throws IgniteCheckedException If failed. */ - public void closeIfNotShared(Object recipient) throws IgniteCheckedException { + public void closeIfNotShared(Object rcpt) throws IgniteCheckedException { assert isDone(); synchronized (recipients) { if (recipients.isEmpty()) return; - recipients.remove(recipient); + recipients.remove(rcpt); if (recipients.isEmpty()) get().close(); @@ -2749,17 +2705,17 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. * @return {@code true} If the recipient successfully added. */ - public boolean addRecipient(Object recipient) { + public boolean addRecipient(Object rcpt) { synchronized (recipients) { if (isDone()) return false; - assert !recipients.containsKey(recipient) : recipient + " -> " + recipients; + assert !recipients.containsKey(rcpt) : rcpt + " -> " + recipients; - recipients.put(recipient, new QueueIterator(recipient)); + recipients.put(rcpt, new QueueIterator(rcpt)); } return true; @@ -2798,18 +2754,18 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte } /** - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. * @throws IgniteCheckedException If failed. */ - public IgniteSpiCloseableIterator<R> iterator(Object recipient) throws IgniteCheckedException { - assert recipient != null; + public IgniteSpiCloseableIterator<R> iterator(Object rcpt) throws IgniteCheckedException { + assert rcpt != null; IgniteSpiCloseableIterator<R> it = get(); assert it != null; synchronized (recipients) { - return queue == null ? it : recipients.get(recipient); + return queue == null ? it : recipients.get(rcpt); } } @@ -2825,7 +2781,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private static final int NEXT_SIZE = 64; /** */ - private final Object recipient; + private final Object rcpt; /** */ private int pos; @@ -2834,10 +2790,10 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte private Queue<R> next; /** - * @param recipient ID of the recipient. + * @param rcpt ID of the recipient. */ - private QueueIterator(Object recipient) { - this.recipient = recipient; + private QueueIterator(Object rcpt) { + this.rcpt = rcpt; } /** @@ -2850,7 +2806,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - closeIfNotShared(recipient); + closeIfNotShared(rcpt); } /** {@inheritDoc} */ @@ -3101,25 +3057,4 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte false, keepPortable); } - - /** - * Creates SQL fields query which will include results metadata if needed. - * - * @param qry SQL query. - * @param incMeta Whether to include results metadata. - * @param keepPortable Keep portable flag. - * @return Created query. - */ - public CacheQuery<List<?>> createSqlFieldsQuery(String qry, boolean incMeta, boolean keepPortable) { - assert qry != null; - - return new GridCacheQueryAdapter<>(cctx, - SQL_FIELDS, - null, - qry, - null, - null, - incMeta, - keepPortable); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java index c21ac66..f7ef76f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; @@ -121,6 +122,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** Partition. */ private int part; + /** */ + private AffinityTopologyVersion topVer; + /** * Required by {@link Externalizable} */ @@ -129,13 +133,21 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache } /** + * Creates cancel query request. + * + * @param cacheId Cache ID. * @param id Request to cancel. * @param fields Fields query flag. + * @param topVer Topology version. */ - public GridCacheQueryRequest(int cacheId, long id, boolean fields) { + public GridCacheQueryRequest(int cacheId, + long id, + boolean fields, + AffinityTopologyVersion topVer) { this.cacheId = cacheId; this.id = id; this.fields = fields; + this.topVer = topVer; cancel = true; } @@ -151,6 +163,9 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache * @param fields Fields query flag. * @param all Whether to load all pages. * @param keepPortable Whether to keep portables. + * @param subjId Subject ID. + * @param taskHash Task name hash code. + * @param topVer Topology version. */ public GridCacheQueryRequest( int cacheId, @@ -162,7 +177,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache boolean all, boolean keepPortable, UUID subjId, - int taskHash + int taskHash, + AffinityTopologyVersion topVer ) { this.cacheId = cacheId; this.id = id; @@ -174,6 +190,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache this.keepPortable = keepPortable; this.subjId = subjId; this.taskHash = taskHash; + this.topVer = topVer; } /** @@ -192,6 +209,10 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache * @param incBackups {@code true} if need to include backups. * @param args Query arguments. * @param incMeta Include meta data or not. + * @param keepPortable Keep portable flag. + * @param subjId Subject ID. + * @param taskHash Task name hash code. + * @param topVer Topology version. */ public GridCacheQueryRequest( int cacheId, @@ -211,7 +232,8 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache boolean incMeta, boolean keepPortable, UUID subjId, - int taskHash + int taskHash, + AffinityTopologyVersion topVer ) { assert type != null || fields; assert clause != null || (type == SCAN || type == SET || type == SPI); @@ -235,10 +257,15 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache this.keepPortable = keepPortable; this.subjId = subjId; this.taskHash = taskHash; + this.topVer = topVer; } - /** {@inheritDoc} - * @param ctx*/ + /** {@inheritDoc} */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); @@ -554,12 +581,18 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache writer.incrementState(); case 20: - if (!writer.writeByteArray("transBytes", transBytes)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 21: + if (!writer.writeByteArray("transBytes", transBytes)) + return false; + + writer.incrementState(); + + case 22: if (!writer.writeByte("type", type != null ? (byte)type.ordinal() : -1)) return false; @@ -718,7 +751,7 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 20: - transBytes = reader.readByteArray("transBytes"); + topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) return false; @@ -726,6 +759,14 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache reader.incrementState(); case 21: + transBytes = reader.readByteArray("transBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 22: byte typeOrd; typeOrd = reader.readByte("type"); @@ -749,11 +790,11 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 22; + return 23; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheQueryRequest.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/15f3edb5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java index 58aa571..2e7f2ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNodeJoinAbstractTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest; @@ -106,4 +107,45 @@ public abstract class IgniteCacheNodeJoinAbstractTest extends IgniteCacheAbstrac stopGrid(1); } } + + /** + * @throws Exception If failed. + */ + public void testScanQuery() throws Exception { + final IgniteCache<Integer, Integer> cache = jcache(0); + + for (int i = 0; i < 5; i++) { + log.info("Iteration: " + i); + + final IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + startGrid(1); + + return null; + } + }); + + final AtomicBoolean stop = new AtomicBoolean(); + + GridTestUtils.runMultiThreaded(new Callable<Void>() { + @Override public Void call() throws Exception { + ScanQuery qry = new ScanQuery(); + + while (!stop.get() && !fut.isDone()) + cache.query(qry).getAll(); + + return null; + } + }, 10, "test-qry"); + + try { + fut.get(60_000); + } + finally { + stop.set(true); + } + + stopGrid(1); + } + } } \ No newline at end of file
