This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a225705fc5f IGNITE-16619 IndexQuery should support limit (#10767)
a225705fc5f is described below
commit a225705fc5f487c2170dc584913f769f1b80f34b
Author: yurinaryshkin <[email protected]>
AuthorDate: Thu Jun 15 13:05:20 2023 +0300
IGNITE-16619 IndexQuery should support limit (#10767)
---
.../org/apache/ignite/cache/query/IndexQuery.java | 26 +++
.../client/thin/ProtocolBitmaskFeature.java | 5 +-
.../internal/client/thin/TcpClientCache.java | 7 +
.../processors/cache/IgniteCacheProxyImpl.java | 3 +
.../platform/client/ClientBitmaskFeature.java | 5 +-
.../platform/client/ClientMessageParser.java | 2 +-
.../client/cache/ClientCacheIndexQueryRequest.java | 15 +-
.../ignite/cache/query/IndexQueryLimitTest.java | 233 +++++++++++++++++++++
.../ignite/cache/query/IndexQueryTestSuite.java | 3 +-
.../cache/query/ThinClientIndexQueryTest.java | 65 ++++++
10 files changed, 359 insertions(+), 5 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
b/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
index 0e7f2a1abc9..8c61ccc1936 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/IndexQuery.java
@@ -50,6 +50,9 @@ public final class IndexQuery<K, V> extends
Query<Cache.Entry<K, V>> {
/** Index name. */
private final @Nullable String idxName;
+ /** Limit */
+ private int limit;
+
/** Index query criteria. */
private @Nullable List<IndexQueryCriterion> criteria;
@@ -152,6 +155,29 @@ public final class IndexQuery<K, V> extends
Query<Cache.Entry<K, V>> {
return idxName;
}
+ /**
+ * Gets limit to response records count.
+ *
+ * @return Limit value.
+ */
+ public int getLimit() {
+ return limit;
+ }
+
+ /**
+ * Sets limit to response records count.
+ *
+ * @param limit POsitive limit to set.
+ * @return {@code this} For chaining.
+ */
+ public IndexQuery<K, V> setLimit(int limit) {
+ A.ensure(limit > 0, "Limit must be positive.");
+
+ this.limit = limit;
+
+ return this;
+ }
+
/**
* Sets remote cache entries filter.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index 3a586ec4952..751f66dbc27 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -71,7 +71,10 @@ public enum ProtocolBitmaskFeature {
ALL_AFFINITY_MAPPINGS(13),
/** IndexQuery. */
- INDEX_QUERY(14);
+ INDEX_QUERY(14),
+
+ /** IndexQuery limit. */
+ INDEX_QUERY_LIMIT(15);
/** */
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 5edf586652b..130447a2309 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -1016,6 +1016,13 @@ public class TcpClientCache<K, V> implements
ClientCache<K, V> {
w.writeBoolean(qry.isLocal());
w.writeInt(qry.getPartition() == null ? -1 :
qry.getPartition());
+ if
(!payloadCh.clientChannel().protocolCtx().isFeatureSupported(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT))
{
+ if (qry.getLimit() > 0)
+ throw new
ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT);
+ }
+ else
+ w.writeInt(qry.getLimit());
+
w.writeString(qry.getValueType());
w.writeString(qry.getIndexName());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 274f476042d..8d9f23f9a00 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -558,6 +558,9 @@ public class IgniteCacheProxyImpl<K, V> extends
AsyncSupportAdapter<IgniteCache<
if (grp != null)
qry.projection(grp);
+ if (q.getLimit() > 0)
+ qry.limit(q.getLimit());
+
fut =
ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX,
q.getValueType(), ctx,
new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
@Override public CacheQueryFuture<Map.Entry<K, V>>
applyx() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index ae09009be03..e858bdbb8db 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -69,7 +69,10 @@ public enum ClientBitmaskFeature implements
ThinProtocolFeature {
ALL_AFFINITY_MAPPINGS(13),
/** IndexQuery. */
- INDEX_QUERY(14);
+ INDEX_QUERY(14),
+
+ /** IndexQuery limit. */
+ INDEX_QUERY_LIMIT(15);
/** */
private static final EnumSet<ClientBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 1ac13755489..dbd62bc0e28 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -585,7 +585,7 @@ public class ClientMessageParser implements
ClientListenerMessageParser {
return new ClientCacheQueryContinuousRequest(reader);
case OP_QUERY_INDEX:
- return new ClientCacheIndexQueryRequest(reader);
+ return new ClientCacheIndexQueryRequest(reader, protocolCtx);
case OP_TX_START:
return new ClientTxStartRequest(reader);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
index 51d72478fbf..8c282658521 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
@@ -29,7 +29,9 @@ import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.cache.query.InIndexQueryCriterion;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
+import
org.apache.ignite.internal.processors.platform.client.ClientBitmaskFeature;
import
org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import
org.apache.ignite.internal.processors.platform.client.ClientProtocolContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;
@@ -47,8 +49,12 @@ public class ClientCacheIndexQueryRequest extends
ClientCacheQueryRequest {
/**
* @param reader Reader.
+ * @param protocolCtx
*/
- public ClientCacheIndexQueryRequest(BinaryRawReaderEx reader) {
+ public ClientCacheIndexQueryRequest(
+ BinaryRawReaderEx reader,
+ ClientProtocolContext protocolCtx
+ ) {
super(reader);
pageSize = reader.readInt();
@@ -57,6 +63,10 @@ public class ClientCacheIndexQueryRequest extends
ClientCacheQueryRequest {
int part = reader.readInt();
+ int limit = 0;
+ if
(protocolCtx.isFeatureSupported(ClientBitmaskFeature.INDEX_QUERY_LIMIT))
+ limit = reader.readInt();
+
String valType = reader.readString();
String idxName = reader.readString();
@@ -89,6 +99,9 @@ public class ClientCacheIndexQueryRequest extends
ClientCacheQueryRequest {
if (filterObj != null)
qry.setFilter(((BinaryObject)filterObj).deserialize());
+
+ if (limit > 0)
+ qry.setLimit(limit);
}
/** */
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java
new file mode 100644
index 00000000000..5c69e523356
--- /dev/null
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLimitTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt;
+
+/** */
+public class IndexQueryLimitTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE = "TEST_CACHE";
+
+ /** */
+ private static final String IDX = "PERSON_ID_IDX";
+
+ /** */
+ private static final int CNT = 10_000;
+
+ /** */
+ private Ignite crd;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ crd = startGrids(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<Long,
Person>()
+ .setName(CACHE)
+ .setIndexedTypes(Long.class, Person.class)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setCacheMode(REPLICATED);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testRangeQueriesWithoutDuplicates() throws Exception {
+ checkRangeQueries(1);
+ }
+
+ /** */
+ @Test
+ public void testRangeQueriesWithDuplicates() throws Exception {
+ checkRangeQueries(10);
+ }
+
+ /** */
+ @Test
+ public void testSetLimit() {
+ GridTestUtils.assertThrows(log, () -> new IndexQuery<>(Person.class,
IDX).setLimit(0),
+ IllegalArgumentException.class, "Limit must be positive.");
+
+ int limit = 1 + new Random().nextInt(1000);
+
+ GridTestUtils.assertThrows(log, () -> new IndexQuery<>(Person.class,
IDX).setLimit(0 - limit),
+ IllegalArgumentException.class, "Limit must be positive.");
+
+ IndexQuery<Long, Person> qry = new IndexQuery<>(Person.class, IDX);
+
+ qry.setLimit(limit);
+
+ assertEquals(limit, qry.getLimit());
+ }
+
+ /** */
+ private void checkRangeQueries(int duplicates) throws Exception {
+ // Add data
+ insertData(duplicates);
+
+ // All
+ checkLimit(null, 0, CNT, duplicates);
+
+ int pivot = new Random().nextInt(CNT);
+
+ // Lt.
+ checkLimit(lt("id", pivot), 0, pivot, duplicates);
+ }
+
+ /** */
+ private void checkLimit(IndexQueryCriterion criterion, int left, int
right, int duplicates) throws Exception {
+ int rows = right - left;
+ int limit = new Random().nextInt(rows) + 1;
+
+ // limit < rows
+ checkLimit(criterion, limit, left, left + limit, duplicates);
+
+ // limit >= rows
+ if (rows > 1) {
+ limit = new Random().nextInt(CNT + 2 - rows) + rows;
+
+ checkLimit(criterion, limit, left, right, duplicates);
+ }
+ }
+
+ /** */
+ private void checkLimit(IndexQueryCriterion criterion, int limit, int
left, int right, int duplicates) throws Exception {
+ IndexQuery<Long, Person> qry = new IndexQuery<>(Person.class, IDX);
+
+ if (criterion != null)
+ qry.setCriteria(criterion);
+
+ qry.setLimit(limit);
+
+ QueryCursor<Cache.Entry<Long, Person>> cursor =
crd.cache(CACHE).query(qry);
+
+ int expSize = (right - left) * duplicates;
+
+ if (limit > 0 && limit < expSize)
+ expSize = limit;
+
+ Set<Long> expKeys = new HashSet<>(expSize);
+ List<Integer> expOrderedValues = new LinkedList<>();
+
+ loop: for (int i = left; i != right; i++) {
+ for (int j = 0; j < duplicates; j++) {
+ expOrderedValues.add(i);
+
+ expKeys.add((long)CNT * j + i);
+ if (expOrderedValues.size() >= limit)
+ break loop;
+ }
+ }
+
+ AtomicInteger actSize = new AtomicInteger();
+ ((QueryCursorEx<Cache.Entry<Long, Person>>)cursor).getAll(entry -> {
+ assertEquals(expOrderedValues.remove(0),
(Integer)entry.getValue().id);
+
+ assertTrue(expKeys.remove(entry.getKey()));
+
+ int persId = entry.getKey().intValue() % CNT;
+
+ assertEquals(new Person(persId), entry.getValue());
+
+ actSize.incrementAndGet();
+ });
+
+ assertEquals(expSize, actSize.get());
+
+ assertTrue(expKeys.isEmpty());
+ }
+
+ /** */
+ private void insertData(int duplicates) {
+ try (IgniteDataStreamer<Long, Person> streamer =
crd.dataStreamer(CACHE)) {
+ for (int persId = 0; persId < CNT; persId++) {
+ // Create duplicates of data.
+ for (int i = 0; i < duplicates; i++)
+ streamer.addData((long)CNT * i + persId, new
Person(persId));
+ }
+ }
+ }
+
+ /** */
+ private static class Person {
+ /** */
+ @QuerySqlField(index = true)
+ final int id;
+
+ /** */
+ Person(int id) {
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "Person[id=" + id + "]";
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ Person person = (Person)o;
+
+ return Objects.equals(id, person.id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
index 3d408c06d2f..8bb75faddad 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryTestSuite.java
@@ -46,7 +46,8 @@ import org.junit.runners.Suite;
ThinClientIndexQueryTest.class,
RepeatedFieldIndexQueryTest.class,
IndexQueryInCriterionTest.class,
- IndexQueryInCriterionDescTest.class
+ IndexQueryInCriterionDescTest.class,
+ IndexQueryLimitTest.class
})
public class IndexQueryTestSuite {
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
index 4c4a8ce0951..08cc8b76477 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/ThinClientIndexQueryTest.java
@@ -17,7 +17,9 @@
package org.apache.ignite.cache.query;
+import java.lang.reflect.Field;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.function.Consumer;
@@ -31,11 +33,13 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -344,6 +348,26 @@ public class ThinClientIndexQueryTest extends
GridCommonAbstractTest {
.setCriteria(crit);
assertClientQuery(cache, left, right, idxQry);
+
+ if (left < right) {
+ Random r = new Random();
+
+ int limit = 1 + r.nextInt(right - left);
+
+ idxQry = new IndexQuery<Integer, Person>(Person.class, idxName)
+ .setCriteria(crit)
+ .setLimit(limit);
+
+ assertClientQuery(cache, left, left + limit, idxQry);
+
+ limit = right - left + r.nextInt(right - left);
+
+ idxQry = new IndexQuery<Integer, Person>(Person.class, idxName)
+ .setCriteria(crit)
+ .setLimit(limit);
+
+ assertClientQuery(cache, left, right, idxQry);
+ }
}
/** */
@@ -369,6 +393,47 @@ public class ThinClientIndexQueryTest extends
GridCommonAbstractTest {
}
}
+ /** */
+ @Test
+ public void testIndexQueryLimitOnOlderProtocolVersion() throws Exception {
+ // Exclude INDEX_QUERY_LIMIT from protocol.
+ Class<?> clazz =
Class.forName("org.apache.ignite.internal.client.thin.ProtocolBitmaskFeature");
+
+ Field field = clazz.getDeclaredField("ALL_FEATURES_AS_ENUM_SET");
+
+ field.setAccessible(true);
+
+ EnumSet<ProtocolBitmaskFeature> allFeaturesEnumSet =
(EnumSet<ProtocolBitmaskFeature>)field.get(null);
+
+ allFeaturesEnumSet.remove(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT);
+
+ try {
+ withClientCache((cache) -> {
+ // No limit.
+ IndexQuery<Integer, Person> idxQry = new
IndexQuery<>(Person.class, IDX_FLD1);
+
+ assertClientQuery(cache, NULLS_CNT, CNT, idxQry);
+
+ // With limit.
+ IndexQuery<Integer, Person> idxQryWithLImit = new
IndexQuery<Integer, Person>(Person.class, IDX_FLD1)
+ .setLimit(10);
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> {
+ cache.query(idxQryWithLImit).getAll();
+ return null;
+ },
+ ClientFeatureNotSupportedByServerException.class,
+ "Feature INDEX_QUERY_LIMIT is not supported by the
server");
+ });
+ }
+ finally {
+ //revert the features set
+ allFeaturesEnumSet.add(ProtocolBitmaskFeature.INDEX_QUERY_LIMIT);
+ }
+ }
+
/** */
private void withClientCache(Consumer<ClientCache<Integer, Person>>
consumer) {
ClientConfiguration clnCfg = new ClientConfiguration()