This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e1a7f8e3989 IGNITE-22183 Recording of IndexQuery events added (#11342)
e1a7f8e3989 is described below

commit e1a7f8e39894200857408aeafa2a4477058286ec
Author: oleg-vlsk <[email protected]>
AuthorDate: Thu May 16 01:21:13 2024 +1000

    IGNITE-22183 Recording of IndexQuery events added (#11342)
---
 .../cache/query/index/IndexQueryProcessor.java     |  30 +++++-
 .../processors/cache/query/CacheQueryType.java     |   5 +-
 .../cache/query/GridCacheQueryManager.java         |  18 +++-
 .../processors/query/GridQueryProcessor.java       |   6 +-
 .../cache/IgniteCacheAbstractQuerySelfTest.java    | 115 +++++++++++++++++++++
 5 files changed, 169 insertions(+), 5 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
index 4048971a38a..bef88ec2e41 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
@@ -28,6 +28,7 @@ import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.query.IndexQuery;
 import org.apache.ignite.cache.query.IndexQueryCriterion;
+import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
 import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
@@ -41,6 +42,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -52,6 +54,9 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static 
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
+
 /**
  * Processor of {@link IndexQuery}.
  */
@@ -74,7 +79,8 @@ public class IndexQueryProcessor {
         IndexQueryDesc idxQryDesc,
         @Nullable IgniteBiPredicate<K, V> filter,
         IndexingQueryFilter cacheFilter,
-        boolean keepBinary
+        boolean keepBinary,
+        int taskHash
     ) throws IgniteCheckedException {
         InlineIndexImpl idx = (InlineIndexImpl)findSortedIndex(cctx, 
idxQryDesc);
 
@@ -86,6 +92,8 @@ public class IndexQueryProcessor {
 
         IndexQueryResultMeta meta = new IndexQueryResultMeta(def, 
qry.critSize());
 
+        boolean isRecordable = 
cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
         // Map IndexRow to Cache Key-Value pair.
         return new IndexQueryResult<>(meta, new 
GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
             private IgniteBiTuple<K, V> currVal;
@@ -111,6 +119,26 @@ public class IndexQueryProcessor {
                             continue;
                     }
 
+                    if (isRecordable) {
+                        cctx.gridEvents().record(new CacheQueryReadEvent<>(
+                            cctx.localNode(),
+                            "Index query entry read.",
+                            EVT_CACHE_QUERY_OBJECT_READ,
+                            CacheQueryType.INDEX.name(),
+                            cctx.name(),
+                            idxQryDesc.valType(),
+                            null,
+                            filter,
+                            null,
+                            null,
+                            securitySubjectId(cctx),
+                            
cctx.kernalContext().task().resolveTaskName(taskHash),
+                            k,
+                            v,
+                            null,
+                            null));
+                    }
+
                     currVal = new IgniteBiTuple<>(k, v);
                 }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java
index fa0f3df0428..979e771a0f1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java
@@ -43,5 +43,8 @@ public enum CacheQueryType {
     CONTINUOUS,
 
     /** SPI query. */
-    SPI
+    SPI,
+
+    /** Index query. */
+    INDEX
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 6a3234bc6be..fec2585834a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -629,13 +629,29 @@ public abstract class GridCacheQueryManager<K, V> extends 
GridCacheManagerAdapte
                     break;
 
                 case INDEX:
+                    if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+                        cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
+                            cctx.localNode(),
+                            "Index query executed.",
+                            EVT_CACHE_QUERY_EXECUTED,
+                            CacheQueryType.INDEX.name(),
+                            cctx.name(),
+                            qry.queryClassName(),
+                            null,
+                            qry.scanFilter(),
+                            null,
+                            null,
+                            securitySubjectId(cctx),
+                            taskName));
+                    }
+
                     int[] parts = null;
 
                     if (qry.partition() != null)
                         parts = new int[]{qry.partition()};
 
                     IndexQueryResult<K, V> idxQryRes = 
qryProc.queryIndex(cacheName, qry.queryClassName(), qry.idxQryDesc(),
-                        qry.scanFilter(), filter(qry, parts, parts != null), 
qry.keepBinary());
+                        qry.scanFilter(), filter(qry, parts, parts != null), 
qry.keepBinary(), qry.taskHash());
 
                     iter = idxQryRes.iter();
                     res.metadata(idxQryRes.metadata());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index da378a5833b..3db44e0239c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -3657,6 +3657,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
      * @param entryFilter Optional user defined cache entries filter.
      * @param cacheFilter Ignite specific cache entries filters.
      * @param keepBinary Keep binary flag.
+     * @param taskHash Hashcode of the task.
      * @return Key/value rows.
      * @throws IgniteCheckedException If failed.
      */
@@ -3666,7 +3667,8 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
         final IndexQueryDesc idxQryDesc,
         @Nullable IgniteBiPredicate<K, V> entryFilter,
         final IndexingQueryFilter cacheFilter,
-        boolean keepBinary
+        boolean keepBinary,
+        int taskHash
     ) throws IgniteCheckedException {
         if (!busyLock.enterBusy())
             throw new IllegalStateException("Failed to execute query (grid is 
stopping).");
@@ -3678,7 +3680,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                 new IgniteOutClosureX<IndexQueryResult<K, V>>() {
                     @Override public IndexQueryResult<K, V> applyx() throws 
IgniteCheckedException {
                         try {
-                            return idxQryPrc.queryLocal(cctx, idxQryDesc, 
entryFilter, cacheFilter, keepBinary);
+                            return idxQryPrc.queryLocal(cctx, idxQryDesc, 
entryFilter, cacheFilter, keepBinary, taskHash);
                         }
                         catch (IgniteCheckedException e) {
                             String msg = "Failed to execute IndexQuery: " + 
e.getMessage() + ". Query desc: " + idxQryDesc;
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index e13a6b2ed32..8918f094664 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.QueryIndex;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.IndexQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -80,6 +81,7 @@ import org.apache.ignite.events.SqlQueryExecutionEvent;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -101,10 +103,13 @@ import static 
org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gt;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
 import static 
org.apache.ignite.internal.processors.cache.query.CacheQueryType.FULL_TEXT;
+import static 
org.apache.ignite.internal.processors.cache.query.CacheQueryType.INDEX;
 import static 
org.apache.ignite.internal.processors.cache.query.CacheQueryType.SCAN;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
 import static org.junit.Assert.assertArrayEquals;
@@ -1883,6 +1888,116 @@ public abstract class IgniteCacheAbstractQuerySelfTest 
extends GridCommonAbstrac
         }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIndexQueryEvents() throws Exception {
+        final Map<Integer, BinaryObject> qryResults = new 
ConcurrentHashMap<>();
+        final IgniteCache<Integer, Type2> cache = jcache(Integer.class, 
Type2.class);
+        final boolean evtsDisabled = 
cache.getConfiguration(CacheConfiguration.class).isEventsDisabled();
+
+        final CountDownLatch readLatch = new CountDownLatch(evtsDisabled ? 0 : 
2);
+        final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 :
+            cacheMode() == REPLICATED ? 1 : gridCount());
+
+        IgnitePredicate[] objReadLsnrs = new IgnitePredicate[gridCount()];
+        IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()];
+
+        for (int i = 0; i < gridCount(); i++) {
+            IgnitePredicate<Event> objReadPred = new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    assert evt instanceof CacheQueryReadEvent;
+
+                    if (evtsDisabled)
+                        fail("Cache events are disabled");
+
+                    CacheQueryReadEvent<Integer, Type2> qe = 
(CacheQueryReadEvent<Integer, Type2>)evt;
+
+                    assertEquals(INDEX.name(), qe.queryType());
+                    assertEquals(cache.getName(), qe.cacheName());
+                    assertEquals("Type2", QueryUtils.typeName(qe.className()));
+                    assertNotNull(qe.scanQueryFilter());
+                    assertNull(qe.clause());
+                    assertNull(qe.continuousQueryFilter());
+                    assertNull(qe.arguments());
+
+                    qryResults.put(qe.key(), (BinaryObject)qe.value());
+
+                    readLatch.countDown();
+
+                    return true;
+                }
+            };
+
+            grid(i).events().localListen(objReadPred, 
EVT_CACHE_QUERY_OBJECT_READ);
+            objReadLsnrs[i] = objReadPred;
+
+            IgnitePredicate<Event> qryExecPred = new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    assert evt instanceof CacheQueryExecutedEvent;
+
+                    if (evtsDisabled)
+                        fail("Cache events are disabled");
+
+                    CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
+
+                    assertEquals(INDEX.name(), qe.queryType());
+                    assertEquals(cache.getName(), qe.cacheName());
+                    assertEquals("Type2", QueryUtils.typeName(qe.className()));
+                    assertNotNull(qe.scanQueryFilter());
+                    assertNull(qe.clause());
+                    assertNull(qe.continuousQueryFilter());
+                    assertNull(qe.arguments());
+
+                    execLatch.countDown();
+
+                    return true;
+                }
+            };
+
+            grid(i).events().localListen(qryExecPred, 
EVT_CACHE_QUERY_EXECUTED);
+            qryExecLsnrs[i] = qryExecPred;
+        }
+
+        try {
+            cache.put(1, new Type2(1, "John"));
+            cache.put(2, new Type2(2, "Bill"));
+            cache.put(3, new Type2(3, "Sam"));
+            cache.put(4, new Type2(4, "Bill"));
+            cache.put(5, new Type2(5, "Bob"));
+
+            IndexQuery<Integer, Type2> qry = new IndexQuery<Integer, 
Type2>(Type2.class)
+                .setCriteria(gt("id", 1), lt("id", 5))
+                .setFilter((k, v) -> v.name().contains("Bill"));
+
+            if (cacheMode() == REPLICATED)
+                qry.setLocal(true);
+
+            QueryCursor<Cache.Entry<Integer, Type2>> cursor = cache.query(qry);
+
+            cursor.getAll();
+
+            assert readLatch.await(1000, MILLISECONDS);
+            assert execLatch.await(1000, MILLISECONDS);
+
+            if (!evtsDisabled) {
+                assertEquals(2, qryResults.size());
+
+                assertEquals("Bill", 
((Type2)qryResults.get(2).deserialize()).name());
+                assertEquals("Bill", 
((Type2)qryResults.get(4).deserialize()).name());
+            }
+            else
+                assert qryResults.isEmpty();
+        }
+        finally {
+            for (int i = 0; i < gridCount(); i++) {
+                grid(i).events().stopLocalListen(objReadLsnrs[i]);
+                grid(i).events().stopLocalListen(qryExecLsnrs[i]);
+            }
+        }
+    }
+
     /**
      * @throws Exception If failed.
      */

Reply via email to