This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new f8c6dc4685a IGNITE-21445 Optimize iterator for local IndexQuery
(#11317)
f8c6dc4685a is described below
commit f8c6dc4685aa5f250cb64542c1e9c3cfa56c12bf
Author: oleg-vlsk <[email protected]>
AuthorDate: Mon Jun 17 17:48:17 2024 +1000
IGNITE-21445 Optimize iterator for local IndexQuery (#11317)
---
.../processors/cache/IgniteCacheProxyImpl.java | 12 ++++
.../cache/query/GridCacheQueryManager.java | 76 ++++++++++++++++++++++
.../ignite/cache/query/IndexQueryLocalTest.java | 15 ++++-
.../cache/query/IndexQueryPartitionTest.java | 6 +-
4 files changed, 106 insertions(+), 3 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index fe28eb70705..2d2533a29a2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -557,6 +558,17 @@ public class IgniteCacheProxyImpl<K, V> extends
AsyncSupportAdapter<IgniteCache<
if (q.getLimit() > 0)
qry.limit(q.getLimit());
+ if (query.isLocal()) {
+ final GridCloseableIterator iter =
ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX,
+ cacheName, ctx, new
IgniteOutClosureX<GridCloseableIterator>() {
+ @Override public GridCloseableIterator applyx() throws
IgniteCheckedException {
+ return
ctx.queries().indexQueryLocal((GridCacheQueryAdapter)qry);
+ }
+ }, true);
+
+ return new QueryCursorImpl(iter);
+ }
+
fut =
ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX,
q.getValueType(), ctx,
new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
@Override public CacheQueryFuture<Map.Entry<K, V>>
applyx() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index b08fa52496c..ef6686253f6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -127,6 +127,7 @@ import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.spi.IgniteSpiCloseableIterator;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -1445,6 +1446,81 @@ public abstract class GridCacheQueryManager<K, V>
extends GridCacheManagerAdapte
}
}
+ /**
+ * Process local index query.
+ *
+ * @param qry Query.
+ * @return GridCloseableIterator.
+ */
+ @SuppressWarnings({"unchecked"})
+ public GridCloseableIterator indexQueryLocal(final GridCacheQueryAdapter
qry) throws IgniteCheckedException {
+ if (!enterBusy())
+ throw new IllegalStateException("Failed to process query request
(grid is stopping).");
+
+ try {
+ assert qry.type() == INDEX : "Wrong processing of query: " +
qry.type();
+
+ cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ if (cctx.localNode().isClient())
+ throw new IgniteException("Failed to execute local index query
on a client node.");
+
+ final Integer part = qry.partition();
+
+ int[] parts = null;
+
+ if (part != null) {
+ final GridDhtLocalPartition locPart =
cctx.dht().topology().localPartition(part);
+
+ if (locPart == null || locPart.state() != OWNING) {
+ throw new CacheInvalidStateException("Failed to execute
index query because required partition " +
+ "has not been found on local node [cacheName=" +
cctx.name() + ", part=" + part + "]");
+ }
+
+ parts = new int[] {part};
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Running local index query: " + qry);
+
+ if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+ cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
+ cctx.localNode(),
+ "Index query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.INDEX.name(),
+ cctx.name(),
+ qry.queryClassName(),
+ null,
+ qry.scanFilter(),
+ null,
+ null,
+ securitySubjectId(cctx),
+
cctx.kernalContext().task().resolveTaskName(qry.taskHash())));
+ }
+
+ IndexQueryResult<K, V> idxQryRes = qryProc.queryIndex(cacheName,
qry.queryClassName(), qry.idxQryDesc(),
+ qry.scanFilter(), filter(qry, parts, parts != null),
qry.keepBinary(), qry.taskHash());
+
+ GridCloseableIterator<IgniteBiTuple<K, V>> iter = idxQryRes.iter();
+
+ return new GridCloseableIteratorAdapter() {
+ @Override protected Object onNext() throws
IgniteCheckedException {
+ IgniteBiTuple<K, V> entry = iter.nextX();
+
+ return new CacheEntryImpl<>(entry.getKey(),
entry.getValue());
+ }
+
+ @Override protected boolean onHasNext() throws
IgniteCheckedException {
+ return iter.hasNextX();
+ }
+ };
+ }
+ finally {
+ leaveBusy();
+ }
+ }
+
/**
* @param qryInfo Info.
* @param taskName Task name.
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java
index c270d69b2d6..f3780f25abd 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java
@@ -112,7 +112,20 @@ public class IndexQueryLocalTest extends
GridCommonAbstractTest {
.setCriteria(lt("id", CNT / 2));
GridTestUtils.assertThrows(null, () ->
cache.query(qry.setLocal(true)).getAll(),
- IgniteException.class, "Cluster group is empty");
+ IgniteException.class, "Failed to execute local index query on a
client node.");
+ }
+
+ /** Should fail as the local node is a client node and the value type
specified for query doesn't exist. */
+ @Test
+ public void testClientNodeNoValueType() throws Exception {
+ Ignite cln = startClientGrid(6);
+
+ IndexQuery qry = new IndexQuery("ValType");
+
+ IgniteCache cache = cln.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ GridTestUtils.assertThrows(null, () ->
cache.query(qry.setLocal(true)).getAll(),
+ IgniteException.class, "Failed to execute local index query on a
client node.");
}
/** */
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
index df91fea6e0d..73ea7f30532 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -178,8 +179,9 @@ public class IndexQueryPartitionTest extends
GridCommonAbstractTest {
if (fail) {
GridTestUtils.assertThrows(null, () ->
grid().cache("CACHE").query(qry).getAll(),
- IgniteException.class,
- "Cluster group is empty.");
+ client ? IgniteException.class :
CacheInvalidStateException.class,
+ client ? "Failed to execute local index query on a client
node." :
+ "Failed to execute index query because required
partition has not been found on local node");
}
else
assertTrue(!grid().cache("CACHE").query(qry).getAll().isEmpty());