This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new f8c6dc4685a IGNITE-21445 Optimize iterator for local IndexQuery 
(#11317)
f8c6dc4685a is described below

commit f8c6dc4685aa5f250cb64542c1e9c3cfa56c12bf
Author: oleg-vlsk <[email protected]>
AuthorDate: Mon Jun 17 17:48:17 2024 +1000

    IGNITE-21445 Optimize iterator for local IndexQuery (#11317)
---
 .../processors/cache/IgniteCacheProxyImpl.java     | 12 ++++
 .../cache/query/GridCacheQueryManager.java         | 76 ++++++++++++++++++++++
 .../ignite/cache/query/IndexQueryLocalTest.java    | 15 ++++-
 .../cache/query/IndexQueryPartitionTest.java       |  6 +-
 4 files changed, 106 insertions(+), 3 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index fe28eb70705..2d2533a29a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.query.CacheQuery;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -557,6 +558,17 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
             if (q.getLimit() > 0)
                 qry.limit(q.getLimit());
 
+            if (query.isLocal()) {
+                final GridCloseableIterator iter = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX,
+                    cacheName, ctx, new 
IgniteOutClosureX<GridCloseableIterator>() {
+                        @Override public GridCloseableIterator applyx() throws 
IgniteCheckedException {
+                            return 
ctx.queries().indexQueryLocal((GridCacheQueryAdapter)qry);
+                        }
+                    }, true);
+
+                return new QueryCursorImpl(iter);
+            }
+
             fut = 
ctx.kernalContext().query().executeQuery(GridCacheQueryType.INDEX, 
q.getValueType(), ctx,
                 new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
                     @Override public CacheQueryFuture<Map.Entry<K, V>> 
applyx() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index b08fa52496c..ef6686253f6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -127,6 +127,7 @@ import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteReducer;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.security.SecurityPermission;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.spi.IgniteSpiCloseableIterator;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -1445,6 +1446,81 @@ public abstract class GridCacheQueryManager<K, V> 
extends GridCacheManagerAdapte
         }
     }
 
+    /**
+     * Process local index query.
+     *
+     * @param qry Query.
+     * @return GridCloseableIterator.
+     */
+    @SuppressWarnings({"unchecked"})
+    public GridCloseableIterator indexQueryLocal(final GridCacheQueryAdapter 
qry) throws IgniteCheckedException {
+        if (!enterBusy())
+            throw new IllegalStateException("Failed to process query request 
(grid is stopping).");
+
+        try {
+            assert qry.type() == INDEX : "Wrong processing of query: " + 
qry.type();
+
+            cctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+            if (cctx.localNode().isClient())
+                throw new IgniteException("Failed to execute local index query 
on a client node.");
+
+            final Integer part = qry.partition();
+
+            int[] parts = null;
+
+            if (part != null) {
+                final GridDhtLocalPartition locPart = 
cctx.dht().topology().localPartition(part);
+
+                if (locPart == null || locPart.state() != OWNING) {
+                    throw new CacheInvalidStateException("Failed to execute 
index query because required partition " +
+                        "has not been found on local node [cacheName=" + 
cctx.name() + ", part=" + part + "]");
+                }
+
+                parts = new int[] {part};
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("Running local index query: " + qry);
+
+            if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
+                    cctx.localNode(),
+                    "Index query executed.",
+                    EVT_CACHE_QUERY_EXECUTED,
+                    CacheQueryType.INDEX.name(),
+                    cctx.name(),
+                    qry.queryClassName(),
+                    null,
+                    qry.scanFilter(),
+                    null,
+                    null,
+                    securitySubjectId(cctx),
+                    
cctx.kernalContext().task().resolveTaskName(qry.taskHash())));
+            }
+
+            IndexQueryResult<K, V> idxQryRes = qryProc.queryIndex(cacheName, 
qry.queryClassName(), qry.idxQryDesc(),
+                qry.scanFilter(), filter(qry, parts, parts != null), 
qry.keepBinary(), qry.taskHash());
+
+            GridCloseableIterator<IgniteBiTuple<K, V>> iter = idxQryRes.iter();
+
+            return new GridCloseableIteratorAdapter() {
+                @Override protected Object onNext() throws 
IgniteCheckedException {
+                    IgniteBiTuple<K, V> entry = iter.nextX();
+
+                    return new CacheEntryImpl<>(entry.getKey(), 
entry.getValue());
+                }
+
+                @Override protected boolean onHasNext() throws 
IgniteCheckedException {
+                    return iter.hasNextX();
+                }
+            };
+        }
+        finally {
+            leaveBusy();
+        }
+    }
+
     /**
      * @param qryInfo Info.
      * @param taskName Task name.
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java
index c270d69b2d6..f3780f25abd 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryLocalTest.java
@@ -112,7 +112,20 @@ public class IndexQueryLocalTest extends 
GridCommonAbstractTest {
             .setCriteria(lt("id", CNT / 2));
 
         GridTestUtils.assertThrows(null, () -> 
cache.query(qry.setLocal(true)).getAll(),
-            IgniteException.class, "Cluster group is empty");
+            IgniteException.class, "Failed to execute local index query on a 
client node.");
+    }
+
+    /** Should fail as the local node is a client node and the value type 
specified for query doesn't exist. */
+    @Test
+    public void testClientNodeNoValueType() throws Exception {
+        Ignite cln = startClientGrid(6);
+
+        IndexQuery qry = new IndexQuery("ValType");
+
+        IgniteCache cache = cln.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        GridTestUtils.assertThrows(null, () -> 
cache.query(qry.setLocal(true)).getAll(),
+            IgniteException.class, "Failed to execute local index query on a 
client node.");
     }
 
     /** */
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
index df91fea6e0d..73ea7f30532 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/cache/query/IndexQueryPartitionTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -178,8 +179,9 @@ public class IndexQueryPartitionTest extends 
GridCommonAbstractTest {
 
             if (fail) {
                 GridTestUtils.assertThrows(null, () -> 
grid().cache("CACHE").query(qry).getAll(),
-                    IgniteException.class,
-                    "Cluster group is empty.");
+                    client ? IgniteException.class : 
CacheInvalidStateException.class,
+                    client ? "Failed to execute local index query on a client 
node." :
+                        "Failed to execute index query because required 
partition has not been found on local node");
             }
             else
                 
assertTrue(!grid().cache("CACHE").query(qry).getAll().isEmpty());

Reply via email to