This is an automated email from the ASF dual-hosted git repository.
ipavlukhin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cee9615 IGNITE-12189 Implement correct limit for TextQuery - Fixes
#6917.
cee9615 is described below
commit cee961560f410653009915b61d821dab84ea490a
Author: Yuriy Shuliha <[email protected]>
AuthorDate: Mon Nov 11 15:51:33 2019 +0300
IGNITE-12189 Implement correct limit for TextQuery - Fixes #6917.
Signed-off-by: ipavlukhin <[email protected]>
---
.../org/apache/ignite/cache/query/TextQuery.java | 50 +++++
.../processors/cache/IgniteCacheProxyImpl.java | 22 +-
.../processors/cache/query/CacheQuery.java | 8 +
.../query/GridCacheDistributedQueryManager.java | 3 +
.../cache/query/GridCacheQueryAdapter.java | 20 ++
.../cache/query/GridCacheQueryFutureAdapter.java | 41 +++-
.../cache/query/GridCacheQueryManager.java | 24 ++-
.../cache/query/GridCacheQueryRequest.java | 41 +++-
.../processors/platform/cache/PlatformCache.java | 5 +
.../processors/query/GridQueryIndexing.java | 3 +-
.../processors/query/GridQueryProcessor.java | 5 +-
...ridCacheFullTextQueryMultithreadedSelfTest.java | 3 +-
.../processors/query/DummyQueryIndexing.java | 3 +-
.../processors/query/h2/IgniteH2Indexing.java | 4 +-
.../processors/query/h2/opt/GridLuceneIndex.java | 5 +-
.../cache/GridCacheFullTextQuerySelfTest.java | 224 ++++++++++++++++-----
16 files changed, 361 insertions(+), 100 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java
b/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java
index fc49736..114a2e2 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/TextQuery.java
@@ -68,6 +68,9 @@ public final class TextQuery<K, V> extends
Query<Cache.Entry<K, V>> {
/** SQL clause. */
private String txt;
+ /** Limit */
+ private int limit;
+
/**
* Constructs query for the given search string.
*
@@ -84,6 +87,19 @@ public final class TextQuery<K, V> extends
Query<Cache.Entry<K, V>> {
*
* @param type Type.
* @param txt Search string.
+ * @param limit Limits response records count. If 0 or less, considered to
be no limit.
+ */
+ public TextQuery(String type, String txt, int limit) {
+ setType(type);
+ setText(txt);
+ setLimit(limit);
+ }
+
+ /**
+ * Constructs query for the given search string.
+ *
+ * @param type Type.
+ * @param txt Search string.
*/
public TextQuery(Class<?> type, String txt) {
setType(type);
@@ -91,6 +107,19 @@ public final class TextQuery<K, V> extends
Query<Cache.Entry<K, V>> {
}
/**
+ * Constructs query for the given search string.
+ *
+ * @param type Type.
+ * @param txt Search string.
+ * @param limit Limits response records count. If 0 or less, considered to
be no limit.
+ */
+ public TextQuery(Class<?> type, String txt, int limit) {
+ setType(type);
+ setText(txt);
+ setLimit(limit);
+ }
+
+ /**
* Gets type for query.
*
* @return Type.
@@ -144,6 +173,27 @@ public final class TextQuery<K, V> extends
Query<Cache.Entry<K, V>> {
return this;
}
+ /**
+ * Gets limit to response records count.
+ *
+ * @return Limit value.
+ */
+ public int getLimit() {
+ return limit;
+ }
+
+ /**
+ * Sets limit to response records count.
+ *
+ * @param limit If 0 or less, considered to be no limit.
+ * @return {@code this} For chaining.
+ */
+ public TextQuery<K, V> setLimit(int limit) {
+ this.limit = limit;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public TextQuery<K, V> setPageSize(int pageSize) {
return (TextQuery<K, V>)super.setPageSize(pageSize);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 4e415e3..b75b08e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -520,13 +520,13 @@ public class IgniteCacheProxyImpl<K, V> extends
AsyncSupportAdapter<IgniteCache<
}
/**
- * @param filter Filter.
+ * @param query Query.
* @param grp Optional cluster group.
* @return Cursor.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable
ClusterGroup grp)
+ private QueryCursor<Cache.Entry<K, V>> query(final Query query, @Nullable
ClusterGroup grp)
throws IgniteCheckedException {
GridCacheContext<K, V> ctx = getContextSafe();
@@ -538,40 +538,40 @@ public class IgniteCacheProxyImpl<K, V> extends
AsyncSupportAdapter<IgniteCache<
final CacheQueryFuture fut;
- if (filter instanceof TextQuery) {
- TextQuery p = (TextQuery)filter;
+ if (query instanceof TextQuery) {
+ TextQuery q = (TextQuery)query;
- qry = ctx.queries().createFullTextQuery(p.getType(), p.getText(),
isKeepBinary);
+ qry = ctx.queries().createFullTextQuery(q.getType(), q.getText(),
q.getLimit(), isKeepBinary);
if (grp != null)
qry.projection(grp);
- fut =
ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, p.getText(),
ctx,
+ fut =
ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, q.getText(),
ctx,
new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
@Override public CacheQueryFuture<Map.Entry<K, V>>
applyx() {
return qry.execute();
}
}, false);
}
- else if (filter instanceof SpiQuery) {
+ else if (query instanceof SpiQuery) {
qry = ctx.queries().createSpiQuery(isKeepBinary);
if (grp != null)
qry.projection(grp);
- fut =
ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI,
filter.getClass().getSimpleName(),
+ fut =
ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI,
query.getClass().getSimpleName(),
ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K,
V>>>() {
@Override public CacheQueryFuture<Map.Entry<K, V>>
applyx() {
- return qry.execute(((SpiQuery)filter).getArgs());
+ return qry.execute(((SpiQuery)query).getArgs());
}
}, false);
}
else {
- if (filter instanceof SqlFieldsQuery)
+ if (query instanceof SqlFieldsQuery)
throw new CacheException("Use methods 'queryFields' and
'localQueryFields' for " +
SqlFieldsQuery.class.getSimpleName() + ".");
- throw new CacheException("Unsupported query type: " + filter);
+ throw new CacheException("Unsupported query type: " + query);
}
return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K,
V>>() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
index 65e46cf..2466978 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQuery.java
@@ -205,6 +205,14 @@ public interface CacheQuery<T> {
public CacheQuery<T> timeout(long timeout);
/**
+ * Sets limit of returned records. {@code 0} means there is no limit
+ *
+ * @param limit Records limit.
+ * @return {@code this} query instance for chaining.
+ */
+ public CacheQuery<T> limit(int limit);
+
+ /**
* Sets whether or not to include backup entries into query result. This
flag
* is {@code false} by default.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 1e10d60..9df76d2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -277,6 +277,7 @@ public class GridCacheDistributedQueryManager<K, V> extends
GridCacheQueryManage
req.partition() == -1 ? null : req.partition(),
req.className(),
req.clause(),
+ req.limit(),
req.includeMetaData(),
req.keepBinary(),
req.subjectId(),
@@ -545,6 +546,7 @@ public class GridCacheDistributedQueryManager<K, V> extends
GridCacheQueryManage
qry.query().type(),
false,
qry.query().clause(),
+ qry.query().limit(),
clsName,
qry.query().scanFilter(),
qry.query().partition(),
@@ -746,6 +748,7 @@ public class GridCacheDistributedQueryManager<K, V> extends
GridCacheQueryManage
qry.query().type(),
true,
qry.query().clause(),
+ qry.query().limit(),
null,
null,
null,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 24f7959..bb3afc9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -98,6 +98,9 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
/** */
private final IgniteBiPredicate<Object, Object> filter;
+ /** Limits returned records quantity. */
+ private int limit;
+
/** Transformer. */
private IgniteClosure<?, ?> transform;
@@ -231,6 +234,7 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
* @param part Partition.
* @param clsName Class name.
* @param clause Clause.
+ * @param limit Response limit. Set to 0 for no limits.
* @param incMeta Include metadata flag.
* @param keepBinary Keep binary flag.
* @param subjId Security subject ID.
@@ -251,6 +255,7 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
@Nullable Integer part,
@Nullable String clsName,
String clause,
+ int limit,
boolean incMeta,
boolean keepBinary,
UUID subjId,
@@ -270,6 +275,7 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
this.part = part;
this.clsName = clsName;
this.clause = clause;
+ this.limit = limit;
this.incMeta = incMeta;
this.keepBinary = keepBinary;
this.subjId = subjId;
@@ -390,6 +396,20 @@ public class GridCacheQueryAdapter<T> implements
CacheQuery<T> {
}
/**
+ * @return Response limit. Returns 0 for no limits.
+ **/
+ public int limit() {
+ return limit;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheQuery<T> limit(int limit) {
+ this.limit = limit;
+
+ return this;
+ }
+
+ /**
* @return Timeout.
*/
public long timeout() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index e316da5..f037ec2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -50,7 +50,6 @@ import org.jetbrains.annotations.Nullable;
* Query future adapter.
*
* @param <R> Result type.
- *
*/
public abstract class GridCacheQueryFutureAdapter<K, V, R> extends
GridFutureAdapter<Collection<R>>
implements CacheQueryFuture<R>, GridTimeoutObject {
@@ -69,6 +68,12 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R>
extends GridFutureAda
/** */
protected final GridCacheQueryBean qry;
+ /** */
+ private int capacity;
+
+ /** */
+ private boolean limitDisabled;
+
/** Set of received keys used to deduplicate query result set. */
private final Collection<K> keys;
@@ -93,9 +98,7 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R>
extends GridFutureAda
/** */
protected boolean loc;
- /**
- *
- */
+ /** */
protected GridCacheQueryFutureAdapter() {
qry = null;
keys = null;
@@ -117,6 +120,8 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R>
extends GridFutureAda
startTime = U.currentTimeMillis();
long timeout = qry.query().timeout();
+ capacity = query().query().limit();
+ limitDisabled = capacity <= 0;
if (timeout > 0) {
endTime = startTime + timeout;
@@ -327,15 +332,31 @@ public abstract class GridCacheQueryFutureAdapter<K, V,
R> extends GridFutureAda
protected void enqueue(Collection<?> col) {
assert Thread.holdsLock(this);
- queue.add((Collection<R>)col);
+ if (limitDisabled) {
+ queue.add((Collection<R>)col);
- cnt.addAndGet(col.size());
+ cnt.addAndGet(col.size());
+ }
+ else {
+ if (capacity >= col.size()) {
+ queue.add((Collection<R>)col);
+ capacity -= col.size();
+
+ cnt.addAndGet(col.size());
+ }
+ else if (capacity > 0) {
+ queue.add(new ArrayList<>((Collection<R>)col).subList(0,
capacity));
+ capacity = 0;
+
+ cnt.addAndGet(capacity);
+ }
+ }
}
/**
* @param col Query data collection.
- * @return If dedup flag is {@code true} deduplicated collection
(considering keys),
- * otherwise passed in collection without any modifications.
+ * @return If dedup flag is {@code true} deduplicated collection
(considering keys), otherwise passed in collection
+ * without any modifications.
*/
private Collection<?> dedupIfRequired(Collection<?> col) {
if (!qry.query().enableDedup())
@@ -568,7 +589,9 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R>
extends GridFutureAda
return S.toString(GridCacheQueryFutureAdapter.class, this);
}
- /** */
+ /**
+ *
+ */
public void printMemoryStats() {
X.println(">>> Query future memory statistics.");
X.println(">>> queueSize: " + queue.size());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index a6cbe6e..e21ebce 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -613,7 +613,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
taskName));
}
- iter = qryProc.queryText(cacheName, qry.clause(),
qry.queryClassName(), filter(qry));
+ iter = qryProc.queryText(cacheName, qry.clause(),
qry.queryClassName(), filter(qry), qry.limit());
break;
@@ -790,7 +790,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
assert !cctx.mvccEnabled() || qry.mvccSnapshot() != null;
final IgniteBiPredicate<K, V> keyValFilter = qry.scanFilter();
- final InternalScanFilter<K,V> intFilter = keyValFilter != null ? new
InternalScanFilter<>(keyValFilter) : null;
+ final InternalScanFilter<K, V> intFilter = keyValFilter != null ? new
InternalScanFilter<>(keyValFilter) : null;
try {
injectResources(keyValFilter);
@@ -1142,7 +1142,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
// Need to call it after gathering start time because
// actual row extracting may happen inside this method.
- if(!iter.hasNext())
+ if (!iter.hasNext())
break;
Object row0 = iter.next();
@@ -1158,7 +1158,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
// Scan iterator may return already transformed entry
data.add(row0);
else {
- IgniteBiTuple<K, V> row = (IgniteBiTuple<K, V>) row0;
+ IgniteBiTuple<K, V> row = (IgniteBiTuple<K, V>)row0;
final K key = row.getKey();
@@ -1360,7 +1360,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
final String namex = cctx.name();
final InternalScanFilter<K, V> intFilter = qry.scanFilter() != null ?
- new InternalScanFilter<>(qry.scanFilter()) : null;
+ new InternalScanFilter<>(qry.scanFilter()) : null;
try {
assert qry.type() == SCAN;
@@ -2501,6 +2501,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
}
/**
+ *
*/
public void init() {
assert next == null;
@@ -2747,15 +2748,16 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
*
* @param clsName Query class name.
* @param search Search clause.
+ * @param limit Limits response records count. If 0 or less, considered to
be no limit.
* @param keepBinary Keep binary flag.
* @return Created query.
*/
public CacheQuery<Map.Entry<K, V>> createFullTextQuery(String clsName,
- String search, boolean keepBinary) {
+ String search, int limit, boolean keepBinary) {
A.notNull("clsName", clsName);
A.notNull("search", search);
- return new GridCacheQueryAdapter<>(cctx,
+ return new GridCacheQueryAdapter<Map.Entry<K, V>>(cctx,
TEXT,
clsName,
search,
@@ -2763,7 +2765,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
null,
false,
keepBinary,
- null);
+ null).limit(limit);
}
/**
@@ -2916,7 +2918,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
readEvt = cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ)
&&
cctx.gridEvents().hasListener(EVT_CACHE_QUERY_OBJECT_READ);
- if (readEvt){
+ if (readEvt) {
taskName =
cctx.kernalContext().task().resolveTaskName(qry.taskHash());
subjId = qry.subjectId();
}
@@ -3091,7 +3093,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
}
}
else
- next0 = !locNode ? new T2<>(key0, val0):
+ next0 = !locNode ? new T2<>(key0, val0) :
new CacheQueryEntry<>(key0, val0);
break;
@@ -3125,7 +3127,7 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
}
/** {@inheritDoc} */
- @Override public boolean apply(K k, V v){
+ @Override public boolean apply(K k, V v) {
try {
return scanFilter == null || scanFilter.apply(k, v);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 7b513d1..851fb2d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -81,6 +81,9 @@ public class GridCacheQueryRequest extends GridCacheIdMessage
implements GridCac
private String clause;
/** */
+ private int limit;
+
+ /** */
private String clsName;
/** */
@@ -234,6 +237,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
* @param type Query type.
* @param fields {@code true} if query returns fields.
* @param clause Query clause.
+ * @param limit Response limit. Set to 0 for no limits.
* @param clsName Query class name.
* @param keyValFilter Key-value filter.
* @param part Partition.
@@ -257,6 +261,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
GridCacheQueryType type,
boolean fields,
String clause,
+ int limit,
String clsName,
IgniteBiPredicate<Object, Object> keyValFilter,
@Nullable Integer part,
@@ -284,6 +289,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
this.type = type;
this.fields = fields;
this.clause = clause;
+ this.limit = limit;
this.clsName = clsName;
this.keyValFilter = keyValFilter;
this.part = part == null ? -1 : part;
@@ -433,6 +439,13 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
}
/**
+ * @return Query limit.
+ */
+ public int limit() {
+ return limit;
+ }
+
+ /**
* @return Query clause.
*/
public String clause() {
@@ -676,24 +689,30 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
writer.incrementState();
case 22:
- if (!writer.writeInt("taskHash", taskHash))
+ if (!writer.writeInt("limit", limit))
return false;
writer.incrementState();
case 23:
- if (!writer.writeAffinityTopologyVersion("topVer", topVer))
+ if (!writer.writeInt("taskHash", taskHash))
return false;
writer.incrementState();
case 24:
- if (!writer.writeByteArray("transBytes", transBytes))
+ if (!writer.writeAffinityTopologyVersion("topVer", topVer))
return false;
writer.incrementState();
case 25:
+ if (!writer.writeByteArray("transBytes", transBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 26:
if (!writer.writeByte("type", type != null ?
(byte)type.ordinal() : -1))
return false;
@@ -860,7 +879,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
reader.incrementState();
case 22:
- taskHash = reader.readInt("taskHash");
+ limit = reader.readInt("limit");
if (!reader.isLastRead())
return false;
@@ -868,7 +887,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
reader.incrementState();
case 23:
- topVer = reader.readAffinityTopologyVersion("topVer");
+ taskHash = reader.readInt("taskHash");
if (!reader.isLastRead())
return false;
@@ -876,7 +895,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
reader.incrementState();
case 24:
- transBytes = reader.readByteArray("transBytes");
+ topVer = reader.readAffinityTopologyVersion("topVer");
if (!reader.isLastRead())
return false;
@@ -884,6 +903,14 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
reader.incrementState();
case 25:
+ transBytes = reader.readByteArray("transBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 26:
byte typeOrd;
typeOrd = reader.readByte("type");
@@ -907,7 +934,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 26;
+ return 27;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index fbab71b..ef2944a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -1422,6 +1422,11 @@ public class PlatformCache extends
PlatformAbstractTarget {
String typ = reader.readString();
final int pageSize = reader.readInt();
+ //TODO: IGNITE-12266, uncomment when limit parameter is added to
Platforms
+ //
+ // final int limit = reader.readInt();
+ // return new TextQuery(typ, txt,
limit).setPageSize(pageSize).setLocal(loc);
+
return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index bc62eff..c0f19e0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -135,10 +135,11 @@ public interface GridQueryIndexing {
* @param qry Text query.
* @param typeName Type name.
* @param filter Cache name and key filter. @return Queried rows.
+ * @param limit Limits response records count. If 0 or less, the limit
considered to be Integer.MAX_VALUE, that is virtually no limit.
* @throws IgniteCheckedException If failed.
*/
public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>>
queryLocalText(String schemaName, String cacheName,
- String qry, String typeName, IndexingQueryFilter filter) throws
IgniteCheckedException;
+ String qry, String typeName, IndexingQueryFilter filter, int limit)
throws IgniteCheckedException;
/**
* Create new index locally.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 5632664..a2a51bd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2821,11 +2821,12 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
* @param filters Key and value filters.
* @param <K> Key type.
* @param <V> Value type.
+ * @param limit Limits response records count. If 0 or less, the limit
considered to be Integer.MAX_VALUE, that is virtually no limit.
* @return Key/value rows.
* @throws IgniteCheckedException If failed.
*/
public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryText(final
String cacheName, final String clause,
- final String resType, final IndexingQueryFilter filters) throws
IgniteCheckedException {
+ final String resType, final IndexingQueryFilter filters, int limit)
throws IgniteCheckedException {
checkEnabled();
if (!busyLock.enterBusy())
@@ -2840,7 +2841,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
String typeName = typeName(cacheName, resType);
String schemaName = idx.schema(cacheName);
- return idx.queryLocalText(schemaName, cacheName,
clause, typeName, filters);
+ return idx.queryLocalText(schemaName, cacheName,
clause, typeName, filters, limit);
}
}, true);
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
index 5ce5df7..e4ded48 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQueryMultithreadedSelfTest.java
@@ -76,6 +76,7 @@ public class GridCacheFullTextQueryMultithreadedSelfTest
extends GridCacheAbstra
final int keyCnt = 5000;
final int logFreq = 50;
final String txt = "Value";
+ final int limit = 0;
final GridCacheAdapter<Integer, H2TextValue> c =
((IgniteKernal)grid(0)).internalCache(DEFAULT_CACHE_NAME);
@@ -94,7 +95,7 @@ public class GridCacheFullTextQueryMultithreadedSelfTest
extends GridCacheAbstra
// Create query.
final CacheQuery<Map.Entry<Integer, H2TextValue>> qry =
c.context().queries().createFullTextQuery(
- H2TextValue.class.getName(), txt, false);
+ H2TextValue.class.getName(), txt, limit, false);
qry.enableDedup(false);
qry.includeBackups(false);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index ff60501..57d6e93 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -109,7 +109,8 @@ public class DummyQueryIndexing implements
GridQueryIndexing {
String cacheName,
String qry,
String typeName,
- IndexingQueryFilter filter
+ IndexingQueryFilter filter,
+ int limit
) throws IgniteCheckedException {
return null;
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index f5d60c0..0e3a0e8 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -480,14 +480,14 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
/** {@inheritDoc} */
@Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>>
queryLocalText(String schemaName,
- String cacheName, String qry, String typeName, IndexingQueryFilter
filters) throws IgniteCheckedException {
+ String cacheName, String qry, String typeName, IndexingQueryFilter
filters, int limit) throws IgniteCheckedException {
H2TableDescriptor tbl = schemaMgr.tableForType(schemaName, cacheName,
typeName);
if (tbl != null && tbl.luceneIndex() != null) {
Long qryId = runningQueryManager().register(qry, TEXT, schemaName,
true, null);
try {
- return tbl.luceneIndex().query(qry.toUpperCase(), filters);
+ return tbl.luceneIndex().query(qry.toUpperCase(), filters,
limit);
}
finally {
runningQueryManager().unregister(qryId, false);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index be55e13..4693d23 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -237,11 +237,12 @@ public class GridLuceneIndex implements AutoCloseable {
*
* @param qry Query.
* @param filters Filters over result.
+ * @param limit Limits response records count. If 0 or less, the limit
considered to be Integer.MAX_VALUE, that is virtually no limit.
* @return Query result.
* @throws IgniteCheckedException If failed.
*/
public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(String qry,
- IndexingQueryFilter filters) throws IgniteCheckedException {
+ IndexingQueryFilter filters, int limit) throws IgniteCheckedException {
IndexReader reader;
try {
@@ -280,7 +281,7 @@ public class GridLuceneIndex implements AutoCloseable {
.add(filter, BooleanClause.Occur.FILTER)
.build();
- docs = searcher.search(query, Integer.MAX_VALUE);
+ docs = searcher.search(query, limit > 0 ? limit :
Integer.MAX_VALUE);
}
catch (Exception e) {
U.closeQuiet(reader);
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQuerySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQuerySelfTest.java
index 82cb2f2..7df7911 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQuerySelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheFullTextQuerySelfTest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.TextQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -43,7 +42,9 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
import org.junit.Test;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -59,6 +60,32 @@ public class GridCacheFullTextQuerySelfTest extends
GridCommonAbstractTest {
/** Cache name */
private static final String PERSON_CACHE = "Person";
+ /** Limitation to query response size */
+ private static final int QUERY_LIMIT = 5;
+
+ /** Concurrent threads number */
+ private static final int N_THREADS = 20;
+
+ /**
+ * Container for expected values and all available entries
+ */
+ private static final class TestPair {
+ /** */
+ public final Set<Integer> expected;
+
+ /** */
+ public final List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
+
+ /**
+ * Constructor
+ *
+ * @param exp expected values set.
+ * */
+ public TestPair(Set<Integer> exp) {
+ this.expected = new HashSet<>(exp);
+ }
+ }
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -93,7 +120,15 @@ public class GridCacheFullTextQuerySelfTest extends
GridCommonAbstractTest {
*/
@Test
public void testTextQueryWithField() throws Exception {
- checkTextQuery("name:1*", false, false);
+ checkTextQuery("name:1*", 0, false, false);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testTextQueryWithFieldLimited() throws Exception {
+ checkTextQuery("name:1*", QUERY_LIMIT, false, false);
}
/**
@@ -116,6 +151,14 @@ public class GridCacheFullTextQuerySelfTest extends
GridCommonAbstractTest {
* @throws Exception In case of error.
*/
@Test
+ public void testLocalTextQueryLimited() throws Exception {
+ checkTextQuery(null, QUERY_LIMIT, true, false);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ @Test
public void testTextQueryWithKeepBinary() throws Exception {
checkTextQuery(false, true);
}
@@ -124,8 +167,61 @@ public class GridCacheFullTextQuerySelfTest extends
GridCommonAbstractTest {
* @throws Exception In case of error.
*/
@Test
+ public void testTextQueryWithKeepBinaryLimited() throws Exception {
+ checkTextQuery(null, QUERY_LIMIT, false, true);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ @Test
public void testTextQuery() throws Exception {
- checkTextQuery(false, true);
+ checkTextQuery(false, false);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testTextQueryLimited() throws Exception {
+ checkTextQuery(null, QUERY_LIMIT, false, false);
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ @Test
+ public void testTextQueryLimitedConcurrent() throws Exception {
+ final IgniteEx ignite = grid(0);
+
+ final String clause = "1*";
+
+ // 1. Populate cache with data, calculating expected count in parallel.
+ Set<Integer> exp = populateCache(ignite, false, MAX_ITEM_COUNT,
(IgnitePredicate<Integer>)x -> String.valueOf(x).startsWith("1"));
+
+ GridTestUtils.runMultiThreaded(textQueryTask(ignite, clause, exp),
N_THREADS, "text-query-test");
+
+ clearCache(ignite);
+ }
+
+ /**
+ * Creates Runnable for TextQuery
+ *
+ * @param ignite Ignite insance.
+ * @param clause Query clause.
+ * @param exp Expected results for validation.
+ * @return TextQuery and validation wrapped into Runnable functional
interface.
+ */
+ @NotNull private Runnable textQueryTask(IgniteEx ignite, String clause,
Set<Integer> exp) {
+ return () -> {
+ try {
+ TextQuery qry = new TextQuery<>(Person.class, clause);
+ validateQueryResults(ignite, qry, exp, false);
+ }
+ catch (Exception e) {
+ fail(e.getMessage());
+ }
+ };
}
/**
@@ -133,15 +229,16 @@ public class GridCacheFullTextQuerySelfTest extends
GridCommonAbstractTest {
* @param keepBinary keep binary flag.
*/
private void checkTextQuery(boolean loc, boolean keepBinary) throws
Exception {
- checkTextQuery(null, loc, keepBinary);
+ checkTextQuery(null, 0, loc, keepBinary);
}
/**
* @param clause Query clause.
+ * @param limit limits response size
* @param loc local query flag.
* @param keepBinary keep binary flag.
*/
- private void checkTextQuery(String clause, boolean loc, boolean
keepBinary) throws Exception {
+ private void checkTextQuery(String clause, int limit, boolean loc, boolean
keepBinary) throws Exception {
final IgniteEx ignite = grid(0);
if (F.isEmpty(clause))
@@ -156,7 +253,7 @@ public class GridCacheFullTextQuerySelfTest extends
GridCommonAbstractTest {
});
// 2. Validate results.
- TextQuery qry = new TextQuery<>(Person.class, clause).setLocal(loc);
+ TextQuery qry = new TextQuery<>(Person.class,
clause).setLocal(loc).setLimit(limit);
validateQueryResults(ignite, qry, exp, keepBinary);
@@ -212,7 +309,7 @@ public class GridCacheFullTextQuerySelfTest extends
GridCommonAbstractTest {
*
* @throws IgniteCheckedException if failed.
*/
- private static void validateQueryResults(IgniteEx ignite, Query qry,
Set<Integer> exp,
+ private static void validateQueryResults(IgniteEx ignite, TextQuery qry,
Set<Integer> exp,
boolean keepBinary) throws IgniteCheckedException {
IgniteCache<Integer, Person> cache = ignite.cache(PERSON_CACHE);
@@ -220,78 +317,99 @@ public class GridCacheFullTextQuerySelfTest extends
GridCommonAbstractTest {
IgniteCache<Integer, BinaryObject> cache0 = cache.withKeepBinary();
try (QueryCursor<Cache.Entry<Integer, BinaryObject>> cursor =
cache0.query(qry)) {
- Set<Integer> exp0 = new HashSet<>(exp);
-
- List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
-
- for (Cache.Entry<Integer, BinaryObject> entry :
cursor.getAll()) {
- all.add(entry);
-
- assertEquals(entry.getKey().toString(),
entry.getValue().field("name"));
- assertEquals(entry.getKey(),
entry.getValue().field("age"));
+ TestPair testPair = processExpectedWithBinary(exp, cursor);
- exp0.remove(entry.getKey());
- }
-
- checkForMissedKeys(ignite, exp0, all);
+ assertResult(ignite, qry, testPair);
}
try (QueryCursor<Cache.Entry<Integer, BinaryObject>> cursor =
cache0.query(qry)) {
- Set<Integer> exp0 = new HashSet<>(exp);
-
- List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
-
- for (Cache.Entry<Integer, BinaryObject> entry :
cursor.getAll()) {
- all.add(entry);
-
- assertEquals(entry.getKey().toString(),
entry.getValue().field("name"));
-
- assertEquals(entry.getKey(),
entry.getValue().field("age"));
- exp0.remove(entry.getKey());
- }
+ TestPair testPair = processExpectedWithBinary(exp, cursor);
- checkForMissedKeys(ignite, exp0, all);
+ assertResult(ignite, qry, testPair);
}
}
else {
try (QueryCursor<Cache.Entry<Integer, Person>> cursor =
cache.query(qry)) {
- Set<Integer> exp0 = new HashSet<>(exp);
- List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
+ TestPair testPair = processExpected(exp, cursor);
- for (Cache.Entry<Integer, Person> entry : cursor.getAll()) {
- all.add(entry);
+ assertResult(ignite, qry, testPair);
- assertEquals(entry.getKey().toString(),
entry.getValue().name);
+ }
- assertEquals(entry.getKey(),
Integer.valueOf(entry.getValue().age));
+ try (QueryCursor<Cache.Entry<Integer, Person>> cursor =
cache.query(qry)) {
- exp0.remove(entry.getKey());
- }
+ TestPair testPair = processExpected(exp, cursor);
- checkForMissedKeys(ignite, exp0, all);
+ assertResult(ignite, qry, testPair);
}
+ }
+ }
- try (QueryCursor<Cache.Entry<Integer, Person>> cursor =
cache.query(qry)) {
- Set<Integer> exp0 = new HashSet<>(exp);
+ /**
+ * Checks query for missed keys or if limit is set - for limitation
correctness.
+ *
+ * @param ignite Ignite context.
+ * @param qry Initial text query.
+ * @param testPair pair containing expected and all entries.
+ * @throws IgniteCheckedException if key check failed.
+ */
+ private static void assertResult(IgniteEx ignite, TextQuery qry,
+ TestPair testPair) throws IgniteCheckedException {
+ if (qry.getLimit() > 0)
+ assertTrue(testPair.all.size() <= QUERY_LIMIT);
+ else
+ checkForMissedKeys(ignite, testPair.expected, testPair.all);
+ }
+
+ /**
+ * Checks cursor with binary entries for correct keys and values.
+ * Removes valid entries from expected list copy.
+ *
+ * @param cursor Query cursor with response
+ * @param exp List of expected values.
+ * @return Altered expected values list.
+ */
+ @NotNull private static GridCacheFullTextQuerySelfTest.TestPair
processExpectedWithBinary(Set<Integer> exp,
+ QueryCursor<Cache.Entry<Integer, BinaryObject>> cursor) {
+ TestPair testPair = new TestPair(exp);
- List<Cache.Entry<Integer, ?>> all = new ArrayList<>();
+ for (Cache.Entry<Integer, BinaryObject> entry : cursor.getAll()) {
+ testPair.all.add(entry);
- for (Cache.Entry<Integer, Person> entry : cursor.getAll()) {
- all.add(entry);
+ assertEquals(entry.getKey().toString(),
entry.getValue().field("name"));
- assertEquals(entry.getKey().toString(),
entry.getValue().name);
+ assertEquals(entry.getKey(), entry.getValue().field("age"));
- assertEquals(entry.getKey().intValue(),
entry.getValue().age);
+ testPair.expected.remove(entry.getKey());
+ }
+ return testPair;
+ }
- exp0.remove(entry.getKey());
- }
+ /**
+ * Checks cursor entries for correct keys and values.
+ * Removes valid entries from expected list copy.
+ *
+ * @param cursor Query cursor with response
+ * @param exp List of expected values.
+ * @return Altered expected values list.
+ */
+ @NotNull private static GridCacheFullTextQuerySelfTest.TestPair
processExpected(Set<Integer> exp,
+ QueryCursor<Cache.Entry<Integer, Person>> cursor) {
+ TestPair testPair = new TestPair(exp);
- checkForMissedKeys(ignite, exp0, all);
- }
+ for (Cache.Entry<Integer, Person> entry : cursor.getAll()) {
+ testPair.all.add(entry);
+
+ assertEquals(entry.getKey().toString(), entry.getValue().name);
+
+ assertEquals(entry.getKey().intValue(), entry.getValue().age);
+
+ testPair.expected.remove(entry.getKey());
}
+ return testPair;
}
/**