This is an automated email from the ASF dual-hosted git repository.

ipavlukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new cee9615  IGNITE-12189 Implement correct limit for TextQuery - Fixes 
#6917.
cee9615 is described below

commit cee961560f410653009915b61d821dab84ea490a
Author: Yuriy Shuliha <[email protected]>
AuthorDate: Mon Nov 11 15:51:33 2019 +0300

    IGNITE-12189 Implement correct limit for TextQuery - Fixes #6917.
    
    Signed-off-by: ipavlukhin <[email protected]>
---
 .../org/apache/ignite/cache/query/TextQuery.java   |  50 +++++
 .../processors/cache/IgniteCacheProxyImpl.java     |  22 +-
 .../processors/cache/query/CacheQuery.java         |   8 +
 .../query/GridCacheDistributedQueryManager.java    |   3 +
 .../cache/query/GridCacheQueryAdapter.java         |  20 ++
 .../cache/query/GridCacheQueryFutureAdapter.java   |  41 +++-
 .../cache/query/GridCacheQueryManager.java         |  24 ++-
 .../cache/query/GridCacheQueryRequest.java         |  41 +++-
 .../processors/platform/cache/PlatformCache.java   |   5 +
 .../processors/query/GridQueryIndexing.java        |   3 +-
 .../processors/query/GridQueryProcessor.java       |   5 +-
 ...ridCacheFullTextQueryMultithreadedSelfTest.java |   3 +-
 .../processors/query/DummyQueryIndexing.java       |   3 +-
 .../processors/query/h2/IgniteH2Indexing.java      |   4 +-
 .../processors/query/h2/opt/GridLuceneIndex.java   |   5 +-
 .../cache/GridCacheFullTextQuerySelfTest.java      | 224 ++++++++++++++++-----
 16 files changed, 361 insertions(+), 100 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java 
b/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java
index fc49736..114a2e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java
@@ -68,6 +68,9 @@ public final class TextQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
     /** SQL clause. */
     private String txt;
 
+    /** Limit */
+    private int limit;
+
     /**
      * Constructs query for the given search string.
      *
@@ -84,6 +87,19 @@ public final class TextQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
      *
      * @param type Type.
      * @param txt Search string.
+     * @param limit Limits response records count. If 0 or less, considered to 
be no limit.
+     */
+    public TextQuery(String type, String txt, int limit) {
+        setType(type);
+        setText(txt);
+        setLimit(limit);
+    }
+
+    /**
+     * Constructs query for the given search string.
+     *
+     * @param type Type.
+     * @param txt Search string.
      */
     public TextQuery(Class<?> type, String txt) {
         setType(type);
@@ -91,6 +107,19 @@ public final class TextQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
     }
 
     /**
+     * Constructs query for the given search string.
+     *
+     * @param type Type.
+     * @param txt Search string.
+     * @param limit Limits response records count. If 0 or less, considered to 
be no limit.
+     */
+    public TextQuery(Class<?> type, String txt, int limit) {
+        setType(type);
+        setText(txt);
+        setLimit(limit);
+    }
+
+    /**
      * Gets type for query.
      *
      * @return Type.
@@ -144,6 +173,27 @@ public final class TextQuery<K, V> extends 
Query<Cache.Entry<K, V>> {
         return this;
     }
 
+    /**
+     * Gets limit to response records count.
+     *
+     * @return Limit value.
+     */
+    public int getLimit() {
+        return limit;
+    }
+
+    /**
+     * Sets limit to response records count.
+     *
+     * @param limit If 0 or less, considered to be no limit.
+     * @return {@code this} For chaining.
+     */
+    public TextQuery<K, V> setLimit(int limit) {
+        this.limit = limit;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public TextQuery<K, V> setPageSize(int pageSize) {
         return (TextQuery<K, V>)super.setPageSize(pageSize);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 4e415e3..b75b08e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -520,13 +520,13 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
     }
 
     /**
-     * @param filter Filter.
+     * @param query Query.
      * @param grp Optional cluster group.
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable 
ClusterGroup grp)
+    private QueryCursor<Cache.Entry<K, V>> query(final Query query, @Nullable 
ClusterGroup grp)
         throws IgniteCheckedException {
         GridCacheContext<K, V> ctx = getContextSafe();
 
@@ -538,40 +538,40 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
 
         final CacheQueryFuture fut;
 
-        if (filter instanceof TextQuery) {
-            TextQuery p = (TextQuery)filter;
+        if (query instanceof TextQuery) {
+            TextQuery q = (TextQuery)query;
 
-            qry = ctx.queries().createFullTextQuery(p.getType(), p.getText(), 
isKeepBinary);
+            qry = ctx.queries().createFullTextQuery(q.getType(), q.getText(), 
q.getLimit(), isKeepBinary);
 
             if (grp != null)
                 qry.projection(grp);
 
-            fut = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, p.getText(), 
ctx,
+            fut = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, q.getText(), 
ctx,
                 new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
                     @Override public CacheQueryFuture<Map.Entry<K, V>> 
applyx() {
                         return qry.execute();
                     }
                 }, false);
         }
-        else if (filter instanceof SpiQuery) {
+        else if (query instanceof SpiQuery) {
             qry = ctx.queries().createSpiQuery(isKeepBinary);
 
             if (grp != null)
                 qry.projection(grp);
 
-            fut = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, 
filter.getClass().getSimpleName(),
+            fut = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, 
query.getClass().getSimpleName(),
                 ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, 
V>>>() {
                     @Override public CacheQueryFuture<Map.Entry<K, V>> 
applyx() {
-                        return qry.execute(((SpiQuery)filter).getArgs());
+                        return qry.execute(((SpiQuery)query).getArgs());
                     }
                 }, false);
         }
         else {
-            if (filter instanceof SqlFieldsQuery)
+            if (query instanceof SqlFieldsQuery)
                 throw new CacheException("Use methods 'queryFields' and 
'localQueryFields' for " +
                     SqlFieldsQuery.class.getSimpleName() + ".");
 
-            throw new CacheException("Unsupported query type: " + filter);
+            throw new CacheException("Unsupported query type: " + query);
         }
 
         return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K, 
V>>() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 65e46cf..2466978 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -205,6 +205,14 @@ public interface CacheQuery<T> {
     public CacheQuery<T> timeout(long timeout);
 
     /**
+     * Sets limit of returned records. {@code 0} means there is no limit
+     *
+     * @param limit Records limit.
+     * @return {@code this} query instance for chaining.
+     */
+    public CacheQuery<T> limit(int limit);
+
+    /**
      * Sets whether or not to include backup entries into query result. This 
flag
      * is {@code false} by default.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 1e10d60..9df76d2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -277,6 +277,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
                 req.partition() == -1 ? null : req.partition(),
                 req.className(),
                 req.clause(),
+                req.limit(),
                 req.includeMetaData(),
                 req.keepBinary(),
                 req.subjectId(),
@@ -545,6 +546,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
                 qry.query().type(),
                 false,
                 qry.query().clause(),
+                qry.query().limit(),
                 clsName,
                 qry.query().scanFilter(),
                 qry.query().partition(),
@@ -746,6 +748,7 @@ public class GridCacheDistributedQueryManager<K, V> extends 
GridCacheQueryManage
                 qry.query().type(),
                 true,
                 qry.query().clause(),
+                qry.query().limit(),
                 null,
                 null,
                 null,
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 24f7959..bb3afc9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -98,6 +98,9 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
     /** */
     private final IgniteBiPredicate<Object, Object> filter;
 
+    /** Limits returned records quantity. */
+    private int limit;
+
     /** Transformer. */
     private IgniteClosure<?, ?> transform;
 
@@ -231,6 +234,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
      * @param part Partition.
      * @param clsName Class name.
      * @param clause Clause.
+     * @param limit Response limit. Set to 0 for no limits.
      * @param incMeta Include metadata flag.
      * @param keepBinary Keep binary flag.
      * @param subjId Security subject ID.
@@ -251,6 +255,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
         @Nullable Integer part,
         @Nullable String clsName,
         String clause,
+        int limit,
         boolean incMeta,
         boolean keepBinary,
         UUID subjId,
@@ -270,6 +275,7 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
         this.part = part;
         this.clsName = clsName;
         this.clause = clause;
+        this.limit = limit;
         this.incMeta = incMeta;
         this.keepBinary = keepBinary;
         this.subjId = subjId;
@@ -390,6 +396,20 @@ public class GridCacheQueryAdapter<T> implements 
CacheQuery<T> {
     }
 
     /**
+     * @return Response limit. Returns 0 for no limits.
+     **/
+    public int limit() {
+        return limit;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheQuery<T> limit(int limit) {
+        this.limit = limit;
+
+        return this;
+    }
+
+    /**
      * @return Timeout.
      */
     public long timeout() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index e316da5..f037ec2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -50,7 +50,6 @@ import org.jetbrains.annotations.Nullable;
  * Query future adapter.
  *
  * @param <R> Result type.
- *
  */
 public abstract class GridCacheQueryFutureAdapter<K, V, R> extends 
GridFutureAdapter<Collection<R>>
     implements CacheQueryFuture<R>, GridTimeoutObject {
@@ -69,6 +68,12 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
     /** */
     protected final GridCacheQueryBean qry;
 
+    /** */
+    private int capacity;
+
+    /** */
+    private boolean limitDisabled;
+
     /** Set of received keys used to deduplicate query result set. */
     private final Collection<K> keys;
 
@@ -93,9 +98,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
     /** */
     protected boolean loc;
 
-    /**
-     *
-     */
+    /** */
     protected GridCacheQueryFutureAdapter() {
         qry = null;
         keys = null;
@@ -117,6 +120,8 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
         startTime = U.currentTimeMillis();
 
         long timeout = qry.query().timeout();
+        capacity = query().query().limit();
+        limitDisabled = capacity <= 0;
 
         if (timeout > 0) {
             endTime = startTime + timeout;
@@ -327,15 +332,31 @@ public abstract class GridCacheQueryFutureAdapter<K, V, 
R> extends GridFutureAda
     protected void enqueue(Collection<?> col) {
         assert Thread.holdsLock(this);
 
-        queue.add((Collection<R>)col);
+        if (limitDisabled) {
+            queue.add((Collection<R>)col);
 
-        cnt.addAndGet(col.size());
+            cnt.addAndGet(col.size());
+        }
+        else {
+            if (capacity >= col.size()) {
+                queue.add((Collection<R>)col);
+                capacity -= col.size();
+
+                cnt.addAndGet(col.size());
+            }
+            else if (capacity > 0) {
+                queue.add(new ArrayList<>((Collection<R>)col).subList(0, 
capacity));
+                capacity = 0;
+
+                cnt.addAndGet(capacity);
+            }
+        }
     }
 
     /**
      * @param col Query data collection.
-     * @return If dedup flag is {@code true} deduplicated collection 
(considering keys),
-     *      otherwise passed in collection without any modifications.
+     * @return If dedup flag is {@code true} deduplicated collection 
(considering keys), otherwise passed in collection
+     * without any modifications.
      */
     private Collection<?> dedupIfRequired(Collection<?> col) {
         if (!qry.query().enableDedup())
@@ -568,7 +589,9 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> 
extends GridFutureAda
         return S.toString(GridCacheQueryFutureAdapter.class, this);
     }
 
-    /** */
+    /**
+     *
+     */
     public void printMemoryStats() {
         X.println(">>> Query future memory statistics.");
         X.println(">>>  queueSize: " + queue.size());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index a6cbe6e..e21ebce 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -613,7 +613,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                             taskName));
                     }
 
-                    iter = qryProc.queryText(cacheName, qry.clause(), 
qry.queryClassName(), filter(qry));
+                    iter = qryProc.queryText(cacheName, qry.clause(), 
qry.queryClassName(), filter(qry), qry.limit());
 
                     break;
 
@@ -790,7 +790,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         assert !cctx.mvccEnabled() || qry.mvccSnapshot() != null;
 
         final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
-        final InternalScanFilter<K,V> intFilter = keyValFilter != null ? new 
InternalScanFilter<>(keyValFilter) : null;
+        final InternalScanFilter<K, V> intFilter = keyValFilter != null ? new 
InternalScanFilter<>(keyValFilter) : null;
 
         try {
             injectResources(keyValFilter);
@@ -1142,7 +1142,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
 
                     // Need to call it after gathering start time because
                     // actual row extracting may happen inside this method.
-                    if(!iter.hasNext())
+                    if (!iter.hasNext())
                         break;
 
                     Object row0 = iter.next();
@@ -1158,7 +1158,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                         // Scan iterator may return already transformed entry
                         data.add(row0);
                     else {
-                        IgniteBiTuple<K, V> row = (IgniteBiTuple<K, V>) row0;
+                        IgniteBiTuple<K, V> row = (IgniteBiTuple<K, V>)row0;
 
                         final K key = row.getKey();
 
@@ -1360,7 +1360,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         final String namex = cctx.name();
 
         final InternalScanFilter<K, V> intFilter = qry.scanFilter() != null ?
-                new InternalScanFilter<>(qry.scanFilter()) : null;
+            new InternalScanFilter<>(qry.scanFilter()) : null;
 
         try {
             assert qry.type() == SCAN;
@@ -2501,6 +2501,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
             }
 
             /**
+             *
              */
             public void init() {
                 assert next == null;
@@ -2747,15 +2748,16 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
      *
      * @param clsName Query class name.
      * @param search Search clause.
+     * @param limit Limits response records count. If 0 or less, considered to 
be no limit.
      * @param keepBinary Keep binary flag.
      * @return Created query.
      */
     public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
-        String search, boolean keepBinary) {
+        String search, int limit, boolean keepBinary) {
         A.notNull("clsName", clsName);
         A.notNull("search", search);
 
-        return new GridCacheQueryAdapter<>(cctx,
+        return new GridCacheQueryAdapter<Map.Entry<K, V>>(cctx,
             TEXT,
             clsName,
             search,
@@ -2763,7 +2765,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
             null,
             false,
             keepBinary,
-            null);
+            null).limit(limit);
     }
 
     /**
@@ -2916,7 +2918,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
             readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ) 
&&
                 cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ);
 
-            if (readEvt){
+            if (readEvt) {
                 taskName = 
cctx.kernalContext().task().resolveTaskName(qry.taskHash());
                 subjId = qry.subjectId();
             }
@@ -3091,7 +3093,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                             }
                         }
                         else
-                            next0 = !locNode ? new T2<>(key0, val0):
+                            next0 = !locNode ? new T2<>(key0, val0) :
                                 new CacheQueryEntry<>(key0, val0);
 
                         break;
@@ -3125,7 +3127,7 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
         }
 
         /** {@inheritDoc} */
-        @Override public boolean apply(K k, V v){
+        @Override public boolean apply(K k, V v) {
             try {
                 return scanFilter == null || scanFilter.apply(k, v);
             }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 7b513d1..851fb2d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -81,6 +81,9 @@ public class GridCacheQueryRequest extends GridCacheIdMessage 
implements GridCac
     private String clause;
 
     /** */
+    private int limit;
+
+    /** */
     private String clsName;
 
     /** */
@@ -234,6 +237,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
      * @param type Query type.
      * @param fields {@code true} if query returns fields.
      * @param clause Query clause.
+     * @param limit Response limit. Set to 0 for no limits.
      * @param clsName Query class name.
      * @param keyValFilter Key-value filter.
      * @param part Partition.
@@ -257,6 +261,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
         GridCacheQueryType type,
         boolean fields,
         String clause,
+        int limit,
         String clsName,
         IgniteBiPredicate<Object, Object> keyValFilter,
         @Nullable Integer part,
@@ -284,6 +289,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
         this.type = type;
         this.fields = fields;
         this.clause = clause;
+        this.limit = limit;
         this.clsName = clsName;
         this.keyValFilter = keyValFilter;
         this.part = part == null ? -1 : part;
@@ -433,6 +439,13 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
     }
 
     /**
+     * @return Query limit.
+     */
+    public int limit() {
+        return limit;
+    }
+
+    /**
      * @return Query clause.
      */
     public String clause() {
@@ -676,24 +689,30 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeInt("taskHash", taskHash))
+                if (!writer.writeInt("limit", limit))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeAffinityTopologyVersion("topVer", topVer))
+                if (!writer.writeInt("taskHash", taskHash))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeByteArray("transBytes", transBytes))
+                if (!writer.writeAffinityTopologyVersion("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 25:
+                if (!writer.writeByteArray("transBytes", transBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 26:
                 if (!writer.writeByte("type", type != null ? 
(byte)type.ordinal() : -1))
                     return false;
 
@@ -860,7 +879,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 22:
-                taskHash = reader.readInt("taskHash");
+                limit = reader.readInt("limit");
 
                 if (!reader.isLastRead())
                     return false;
@@ -868,7 +887,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 23:
-                topVer = reader.readAffinityTopologyVersion("topVer");
+                taskHash = reader.readInt("taskHash");
 
                 if (!reader.isLastRead())
                     return false;
@@ -876,7 +895,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 24:
-                transBytes = reader.readByteArray("transBytes");
+                topVer = reader.readAffinityTopologyVersion("topVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -884,6 +903,14 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 25:
+                transBytes = reader.readByteArray("transBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 26:
                 byte typeOrd;
 
                 typeOrd = reader.readByte("type");
@@ -907,7 +934,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 26;
+        return 27;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index fbab71b..ef2944a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -1422,6 +1422,11 @@ public class PlatformCache extends 
PlatformAbstractTarget {
         String typ = reader.readString();
         final int pageSize = reader.readInt();
 
+        //TODO: IGNITE-12266, uncomment when limit parameter is added to 
Platforms
+        //
+        // final int limit = reader.readInt();
+        // return new TextQuery(typ, txt, 
limit).setPageSize(pageSize).setLocal(loc);
+
         return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc);
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index bc62eff..c0f19e0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -135,10 +135,11 @@ public interface GridQueryIndexing {
      * @param qry Text query.
      * @param typeName Type name.
      * @param filter Cache name and key filter.    @return Queried rows.
+     * @param limit Limits response records count. If 0 or less, the limit 
considered to be Integer.MAX_VALUE, that is virtually no limit.
      * @throws IgniteCheckedException If failed.
      */
     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> 
queryLocalText(String schemaName, String cacheName,
-        String qry, String typeName, IndexingQueryFilter filter) throws 
IgniteCheckedException;
+        String qry, String typeName, IndexingQueryFilter filter, int limit) 
throws IgniteCheckedException;
 
     /**
      * Create new index locally.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 5632664..a2a51bd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2821,11 +2821,12 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param filters Key and value filters.
      * @param <K> Key type.
      * @param <V> Value type.
+     * @param limit Limits response records count. If 0 or less, the limit 
considered to be Integer.MAX_VALUE, that is virtually no limit.
      * @return Key/value rows.
      * @throws IgniteCheckedException If failed.
      */
     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText(final 
String cacheName, final String clause,
-        final String resType, final IndexingQueryFilter filters) throws 
IgniteCheckedException {
+        final String resType, final IndexingQueryFilter filters, int limit) 
throws IgniteCheckedException {
         checkEnabled();
 
         if (!busyLock.enterBusy())
@@ -2840,7 +2841,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                         String typeName = typeName(cacheName, resType);
                         String schemaName = idx.schema(cacheName);
 
-                        return idx.queryLocalText(schemaName, cacheName, 
clause, typeName, filters);
+                        return idx.queryLocalText(schemaName, cacheName, 
clause, typeName, filters, limit);
                     }
                 }, true);
         }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
index 5ce5df7..e4ded48 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
@@ -76,6 +76,7 @@ public class GridCacheFullTextQueryMultithreadedSelfTest 
extends GridCacheAbstra
         final int keyCnt = 5000;
         final int logFreq = 50;
         final String txt = "Value";
+        final int limit = 0;
 
         final GridCacheAdapter<Integer, H2TextValue> c = 
((IgniteKernal)grid(0)).internalCache(DEFAULT_CACHE_NAME);
 
@@ -94,7 +95,7 @@ public class GridCacheFullTextQueryMultithreadedSelfTest 
extends GridCacheAbstra
 
         // Create query.
         final CacheQuery<Map.Entry<Integer, H2TextValue>> qry = 
c.context().queries().createFullTextQuery(
-            H2TextValue.class.getName(), txt, false);
+            H2TextValue.class.getName(), txt, limit, false);
 
         qry.enableDedup(false);
         qry.includeBackups(false);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index ff60501..57d6e93 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -109,7 +109,8 @@ public class DummyQueryIndexing implements 
GridQueryIndexing {
         String cacheName,
         String qry,
         String typeName,
-        IndexingQueryFilter filter
+        IndexingQueryFilter filter,
+        int limit
     ) throws IgniteCheckedException {
         return null;
     }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index f5d60c0..0e3a0e8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -480,14 +480,14 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
 
     /** {@inheritDoc} */
     @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> 
queryLocalText(String schemaName,
-        String cacheName, String qry, String typeName, IndexingQueryFilter 
filters) throws IgniteCheckedException {
+        String cacheName, String qry, String typeName, IndexingQueryFilter 
filters, int limit) throws IgniteCheckedException {
         H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName, 
typeName);
 
         if (tbl != null && tbl.luceneIndex() != null) {
             Long qryId = runningQueryManager().register(qry, TEXT, schemaName, 
true, null);
 
             try {
-                return tbl.luceneIndex().query(qry.toUpperCase(), filters);
+                return tbl.luceneIndex().query(qry.toUpperCase(), filters, 
limit);
             }
             finally {
                 runningQueryManager().unregister(qryId, false);
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index be55e13..4693d23 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -237,11 +237,12 @@ public class GridLuceneIndex implements AutoCloseable {
      *
      * @param qry Query.
      * @param filters Filters over result.
+     * @param limit Limits response records count. If 0 or less, the limit 
considered to be Integer.MAX_VALUE, that is virtually no limit.
      * @return Query result.
      * @throws IgniteCheckedException If failed.
      */
     public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String qry,
-        IndexingQueryFilter filters) throws IgniteCheckedException {
+        IndexingQueryFilter filters, int limit) throws IgniteCheckedException {
         IndexReader reader;
 
         try {
@@ -280,7 +281,7 @@ public class GridLuceneIndex implements AutoCloseable {
                 .add(filter, BooleanClause.Occur.FILTER)
                 .build();
 
-            docs = searcher.search(query, Integer.MAX_VALUE);
+            docs = searcher.search(query, limit > 0 ? limit : 
Integer.MAX_VALUE);
         }
         catch (Exception e) {
             U.closeQuiet(reader);
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQuerySelfTest.java
index 82cb2f2..7df7911 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQuerySelfTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.query.Query;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.TextQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -43,7 +42,9 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -59,6 +60,32 @@ public class GridCacheFullTextQuerySelfTest extends 
GridCommonAbstractTest {
     /** Cache name */
     private static final String PERSON_CACHE = "Person";
 
+    /** Limitation to query response size */
+    private static final int QUERY_LIMIT = 5;
+
+    /** Concurrent threads number */
+    private static final int N_THREADS = 20;
+
+    /**
+     * Container for expected values and all available entries
+     */
+    private static final class TestPair {
+        /** */
+        public final Set<Integer> expected;
+
+        /** */
+        public final List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
+
+        /**
+         * Constructor
+         *
+         * @param exp expected values set.
+         * */
+        public TestPair(Set<Integer> exp) {
+            this.expected = new HashSet<>(exp);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -93,7 +120,15 @@ public class GridCacheFullTextQuerySelfTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testTextQueryWithField() throws Exception {
-        checkTextQuery("name:1*", false, false);
+        checkTextQuery("name:1*", 0, false, false);
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    @Test
+    public void testTextQueryWithFieldLimited() throws Exception {
+        checkTextQuery("name:1*", QUERY_LIMIT, false, false);
     }
 
     /**
@@ -116,6 +151,14 @@ public class GridCacheFullTextQuerySelfTest extends 
GridCommonAbstractTest {
      * @throws Exception In case of error.
      */
     @Test
+    public void testLocalTextQueryLimited() throws Exception {
+        checkTextQuery(null, QUERY_LIMIT, true, false);
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    @Test
     public void testTextQueryWithKeepBinary() throws Exception {
         checkTextQuery(false, true);
     }
@@ -124,8 +167,61 @@ public class GridCacheFullTextQuerySelfTest extends 
GridCommonAbstractTest {
      * @throws Exception In case of error.
      */
     @Test
+    public void testTextQueryWithKeepBinaryLimited() throws Exception {
+        checkTextQuery(null, QUERY_LIMIT, false, true);
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    @Test
     public void testTextQuery() throws Exception {
-        checkTextQuery(false, true);
+        checkTextQuery(false, false);
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    @Test
+    public void testTextQueryLimited() throws Exception {
+        checkTextQuery(null, QUERY_LIMIT, false, false);
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    @Test
+    public void testTextQueryLimitedConcurrent() throws Exception {
+        final IgniteEx ignite = grid(0);
+
+        final String clause = "1*";
+
+        // 1. Populate cache with data, calculating expected count in parallel.
+        Set<Integer> exp = populateCache(ignite, false, MAX_ITEM_COUNT, 
(IgnitePredicate<Integer>)x -> String.valueOf(x).startsWith("1"));
+
+        GridTestUtils.runMultiThreaded(textQueryTask(ignite, clause, exp), 
N_THREADS, "text-query-test");
+
+        clearCache(ignite);
+    }
+
+    /**
+     * Creates Runnable for TextQuery
+     *
+     * @param ignite Ignite insance.
+     * @param clause Query clause.
+     * @param exp Expected results for validation.
+     * @return TextQuery and validation wrapped into Runnable functional 
interface.
+     */
+    @NotNull private Runnable textQueryTask(IgniteEx ignite, String clause, 
Set<Integer> exp) {
+        return () -> {
+            try {
+                TextQuery qry = new TextQuery<>(Person.class, clause);
+                validateQueryResults(ignite, qry, exp, false);
+            }
+            catch (Exception e) {
+                fail(e.getMessage());
+            }
+        };
     }
 
     /**
@@ -133,15 +229,16 @@ public class GridCacheFullTextQuerySelfTest extends 
GridCommonAbstractTest {
      * @param keepBinary keep binary flag.
      */
     private void checkTextQuery(boolean loc, boolean keepBinary) throws 
Exception {
-        checkTextQuery(null, loc, keepBinary);
+        checkTextQuery(null, 0, loc, keepBinary);
     }
 
     /**
      * @param clause Query clause.
+     * @param limit limits response size
      * @param loc local query flag.
      * @param keepBinary keep binary flag.
      */
-    private void checkTextQuery(String clause, boolean loc, boolean 
keepBinary) throws Exception {
+    private void checkTextQuery(String clause, int limit, boolean loc, boolean 
keepBinary) throws Exception {
         final IgniteEx ignite = grid(0);
 
         if (F.isEmpty(clause))
@@ -156,7 +253,7 @@ public class GridCacheFullTextQuerySelfTest extends 
GridCommonAbstractTest {
         });
 
         // 2. Validate results.
-        TextQuery qry = new TextQuery<>(Person.class, clause).setLocal(loc);
+        TextQuery qry = new TextQuery<>(Person.class, 
clause).setLocal(loc).setLimit(limit);
 
         validateQueryResults(ignite, qry, exp, keepBinary);
 
@@ -212,7 +309,7 @@ public class GridCacheFullTextQuerySelfTest extends 
GridCommonAbstractTest {
      *
      * @throws IgniteCheckedException if failed.
      */
-    private static void validateQueryResults(IgniteEx ignite, Query qry, 
Set<Integer> exp,
+    private static void validateQueryResults(IgniteEx ignite, TextQuery qry, 
Set<Integer> exp,
         boolean keepBinary) throws IgniteCheckedException {
         IgniteCache<Integer, Person> cache = ignite.cache(PERSON_CACHE);
 
@@ -220,78 +317,99 @@ public class GridCacheFullTextQuerySelfTest extends 
GridCommonAbstractTest {
             IgniteCache<Integer, BinaryObject> cache0 = cache.withKeepBinary();
 
             try (QueryCursor<Cache.Entry<Integer, BinaryObject>> cursor = 
cache0.query(qry)) {
-                Set<Integer> exp0 = new HashSet<>(exp);
-
-                List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
-
-                for (Cache.Entry<Integer, BinaryObject> entry : 
cursor.getAll()) {
-                    all.add(entry);
-
-                    assertEquals(entry.getKey().toString(), 
entry.getValue().field("name"));
 
-                    assertEquals(entry.getKey(), 
entry.getValue().field("age"));
+                TestPair testPair = processExpectedWithBinary(exp, cursor);
 
-                    exp0.remove(entry.getKey());
-                }
-
-                checkForMissedKeys(ignite, exp0, all);
+                assertResult(ignite, qry, testPair);
             }
 
             try (QueryCursor<Cache.Entry<Integer, BinaryObject>> cursor = 
cache0.query(qry)) {
-                Set<Integer> exp0 = new HashSet<>(exp);
-
-                List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
-
-                for (Cache.Entry<Integer, BinaryObject> entry : 
cursor.getAll()) {
-                    all.add(entry);
-
-                    assertEquals(entry.getKey().toString(), 
entry.getValue().field("name"));
-
-                    assertEquals(entry.getKey(), 
entry.getValue().field("age"));
 
-                    exp0.remove(entry.getKey());
-                }
+                TestPair testPair = processExpectedWithBinary(exp, cursor);
 
-                checkForMissedKeys(ignite, exp0, all);
+                assertResult(ignite, qry, testPair);
             }
         }
         else {
             try (QueryCursor<Cache.Entry<Integer, Person>> cursor = 
cache.query(qry)) {
-                Set<Integer> exp0 = new HashSet<>(exp);
 
-                List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
+                TestPair testPair = processExpected(exp, cursor);
 
-                for (Cache.Entry<Integer, Person> entry : cursor.getAll()) {
-                    all.add(entry);
+                assertResult(ignite, qry, testPair);
 
-                    assertEquals(entry.getKey().toString(), 
entry.getValue().name);
+            }
 
-                    assertEquals(entry.getKey(), 
Integer.valueOf(entry.getValue().age));
+            try (QueryCursor<Cache.Entry<Integer, Person>> cursor = 
cache.query(qry)) {
 
-                    exp0.remove(entry.getKey());
-                }
+                TestPair testPair = processExpected(exp, cursor);
 
-                checkForMissedKeys(ignite, exp0, all);
+                assertResult(ignite, qry, testPair);
             }
+        }
+    }
 
-            try (QueryCursor<Cache.Entry<Integer, Person>> cursor = 
cache.query(qry)) {
-                Set<Integer> exp0 = new HashSet<>(exp);
+    /**
+     * Checks query for missed keys or if limit is set - for limitation 
correctness.
+     *
+     * @param ignite Ignite context.
+     * @param qry Initial text query.
+     * @param testPair pair containing expected and all entries.
+     * @throws IgniteCheckedException if key check failed.
+     */
+    private static void assertResult(IgniteEx ignite, TextQuery qry,
+        TestPair testPair) throws IgniteCheckedException {
+        if (qry.getLimit() > 0)
+            assertTrue(testPair.all.size() <= QUERY_LIMIT);
+        else
+            checkForMissedKeys(ignite, testPair.expected, testPair.all);
+    }
+
+    /**
+     * Checks cursor with binary entries for correct keys and values.
+     * Removes valid entries from expected list copy.
+     *
+     * @param cursor Query cursor with response
+     * @param exp List of expected values.
+     * @return Altered expected values list.
+     */
+    @NotNull private static GridCacheFullTextQuerySelfTest.TestPair 
processExpectedWithBinary(Set<Integer> exp,
+        QueryCursor<Cache.Entry<Integer, BinaryObject>> cursor) {
+        TestPair testPair = new TestPair(exp);
 
-                List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
+        for (Cache.Entry<Integer, BinaryObject> entry : cursor.getAll()) {
+            testPair.all.add(entry);
 
-                for (Cache.Entry<Integer, Person> entry : cursor.getAll()) {
-                    all.add(entry);
+            assertEquals(entry.getKey().toString(), 
entry.getValue().field("name"));
 
-                    assertEquals(entry.getKey().toString(), 
entry.getValue().name);
+            assertEquals(entry.getKey(), entry.getValue().field("age"));
 
-                    assertEquals(entry.getKey().intValue(), 
entry.getValue().age);
+            testPair.expected.remove(entry.getKey());
+        }
+        return testPair;
+    }
 
-                    exp0.remove(entry.getKey());
-                }
+    /**
+     * Checks cursor entries for correct keys and values.
+     * Removes valid entries from expected list copy.
+     *
+     * @param cursor Query cursor with response
+     * @param exp List of expected values.
+     * @return Altered expected values list.
+     */
+    @NotNull private static GridCacheFullTextQuerySelfTest.TestPair 
processExpected(Set<Integer> exp,
+        QueryCursor<Cache.Entry<Integer, Person>> cursor) {
+        TestPair testPair = new TestPair(exp);
 
-                checkForMissedKeys(ignite, exp0, all);
-            }
+        for (Cache.Entry<Integer, Person> entry : cursor.getAll()) {
+            testPair.all.add(entry);
+
+            assertEquals(entry.getKey().toString(), entry.getValue().name);
+
+            assertEquals(entry.getKey().intValue(), entry.getValue().age);
+
+            testPair.expected.remove(entry.getKey());
         }
+        return testPair;
     }
 
     /**

Reply via email to