This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e1a7f8e3989 IGNITE-22183 Recording of IndexQuery events added (#11342)
e1a7f8e3989 is described below
commit e1a7f8e39894200857408aeafa2a4477058286ec
Author: oleg-vlsk <[email protected]>
AuthorDate: Thu May 16 01:21:13 2024 +1000
IGNITE-22183 Recording of IndexQuery events added (#11342)
---
.../cache/query/index/IndexQueryProcessor.java | 30 +++++-
.../processors/cache/query/CacheQueryType.java | 5 +-
.../cache/query/GridCacheQueryManager.java | 18 +++-
.../processors/query/GridQueryProcessor.java | 6 +-
.../cache/IgniteCacheAbstractQuerySelfTest.java | 115 +++++++++++++++++++++
5 files changed, 169 insertions(+), 5 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
index 4048971a38a..bef88ec2e41 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/IndexQueryProcessor.java
@@ -28,6 +28,7 @@ import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.IndexQueryCriterion;
+import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyDefinition;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
@@ -41,6 +42,7 @@ import
org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
@@ -52,6 +54,9 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static
org.apache.ignite.internal.processors.security.SecurityUtils.securitySubjectId;
+
/**
* Processor of {@link IndexQuery}.
*/
@@ -74,7 +79,8 @@ public class IndexQueryProcessor {
IndexQueryDesc idxQryDesc,
@Nullable IgniteBiPredicate<K, V> filter,
IndexingQueryFilter cacheFilter,
- boolean keepBinary
+ boolean keepBinary,
+ int taskHash
) throws IgniteCheckedException {
InlineIndexImpl idx = (InlineIndexImpl)findSortedIndex(cctx,
idxQryDesc);
@@ -86,6 +92,8 @@ public class IndexQueryProcessor {
IndexQueryResultMeta meta = new IndexQueryResultMeta(def,
qry.critSize());
+ boolean isRecordable =
cctx.events().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+
// Map IndexRow to Cache Key-Value pair.
return new IndexQueryResult<>(meta, new
GridCloseableIteratorAdapter<IgniteBiTuple<K, V>>() {
private IgniteBiTuple<K, V> currVal;
@@ -111,6 +119,26 @@ public class IndexQueryProcessor {
continue;
}
+ if (isRecordable) {
+ cctx.gridEvents().record(new CacheQueryReadEvent<>(
+ cctx.localNode(),
+ "Index query entry read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.INDEX.name(),
+ cctx.name(),
+ idxQryDesc.valType(),
+ null,
+ filter,
+ null,
+ null,
+ securitySubjectId(cctx),
+
cctx.kernalContext().task().resolveTaskName(taskHash),
+ k,
+ v,
+ null,
+ null));
+ }
+
currVal = new IgniteBiTuple<>(k, v);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java
index fa0f3df0428..979e771a0f1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryType.java
@@ -43,5 +43,8 @@ public enum CacheQueryType {
CONTINUOUS,
/** SPI query. */
- SPI
+ SPI,
+
+ /** Index query. */
+ INDEX
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 6a3234bc6be..fec2585834a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -629,13 +629,29 @@ public abstract class GridCacheQueryManager<K, V> extends
GridCacheManagerAdapte
break;
case INDEX:
+ if (cctx.events().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
+ cctx.gridEvents().record(new CacheQueryExecutedEvent<>(
+ cctx.localNode(),
+ "Index query executed.",
+ EVT_CACHE_QUERY_EXECUTED,
+ CacheQueryType.INDEX.name(),
+ cctx.name(),
+ qry.queryClassName(),
+ null,
+ qry.scanFilter(),
+ null,
+ null,
+ securitySubjectId(cctx),
+ taskName));
+ }
+
int[] parts = null;
if (qry.partition() != null)
parts = new int[]{qry.partition()};
IndexQueryResult<K, V> idxQryRes =
qryProc.queryIndex(cacheName, qry.queryClassName(), qry.idxQryDesc(),
- qry.scanFilter(), filter(qry, parts, parts != null),
qry.keepBinary());
+ qry.scanFilter(), filter(qry, parts, parts != null),
qry.keepBinary(), qry.taskHash());
iter = idxQryRes.iter();
res.metadata(idxQryRes.metadata());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index da378a5833b..3db44e0239c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -3657,6 +3657,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
* @param entryFilter Optional user defined cache entries filter.
* @param cacheFilter Ignite specific cache entries filters.
* @param keepBinary Keep binary flag.
+ * @param taskHash Hashcode of the task.
* @return Key/value rows.
* @throws IgniteCheckedException If failed.
*/
@@ -3666,7 +3667,8 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
final IndexQueryDesc idxQryDesc,
@Nullable IgniteBiPredicate<K, V> entryFilter,
final IndexingQueryFilter cacheFilter,
- boolean keepBinary
+ boolean keepBinary,
+ int taskHash
) throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new IllegalStateException("Failed to execute query (grid is
stopping).");
@@ -3678,7 +3680,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
new IgniteOutClosureX<IndexQueryResult<K, V>>() {
@Override public IndexQueryResult<K, V> applyx() throws
IgniteCheckedException {
try {
- return idxQryPrc.queryLocal(cctx, idxQryDesc,
entryFilter, cacheFilter, keepBinary);
+ return idxQryPrc.queryLocal(cctx, idxQryDesc,
entryFilter, cacheFilter, keepBinary, taskHash);
}
catch (IgniteCheckedException e) {
String msg = "Failed to execute IndexQuery: " +
e.getMessage() + ". Query desc: " + idxQryDesc;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
index e13a6b2ed32..8918f094664 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java
@@ -55,6 +55,7 @@ import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -80,6 +81,7 @@ import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -101,10 +103,13 @@ import static
org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.gt;
+import static org.apache.ignite.cache.query.IndexQueryCriteriaBuilder.lt;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
import static
org.apache.ignite.internal.processors.cache.query.CacheQueryType.FULL_TEXT;
+import static
org.apache.ignite.internal.processors.cache.query.CacheQueryType.INDEX;
import static
org.apache.ignite.internal.processors.cache.query.CacheQueryType.SCAN;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.junit.Assert.assertArrayEquals;
@@ -1883,6 +1888,116 @@ public abstract class IgniteCacheAbstractQuerySelfTest
extends GridCommonAbstrac
}
}
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testIndexQueryEvents() throws Exception {
+ final Map<Integer, BinaryObject> qryResults = new
ConcurrentHashMap<>();
+ final IgniteCache<Integer, Type2> cache = jcache(Integer.class,
Type2.class);
+ final boolean evtsDisabled =
cache.getConfiguration(CacheConfiguration.class).isEventsDisabled();
+
+ final CountDownLatch readLatch = new CountDownLatch(evtsDisabled ? 0 :
2);
+ final CountDownLatch execLatch = new CountDownLatch(evtsDisabled ? 0 :
+ cacheMode() == REPLICATED ? 1 : gridCount());
+
+ IgnitePredicate[] objReadLsnrs = new IgnitePredicate[gridCount()];
+ IgnitePredicate[] qryExecLsnrs = new IgnitePredicate[gridCount()];
+
+ for (int i = 0; i < gridCount(); i++) {
+ IgnitePredicate<Event> objReadPred = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt instanceof CacheQueryReadEvent;
+
+ if (evtsDisabled)
+ fail("Cache events are disabled");
+
+ CacheQueryReadEvent<Integer, Type2> qe =
(CacheQueryReadEvent<Integer, Type2>)evt;
+
+ assertEquals(INDEX.name(), qe.queryType());
+ assertEquals(cache.getName(), qe.cacheName());
+ assertEquals("Type2", QueryUtils.typeName(qe.className()));
+ assertNotNull(qe.scanQueryFilter());
+ assertNull(qe.clause());
+ assertNull(qe.continuousQueryFilter());
+ assertNull(qe.arguments());
+
+ qryResults.put(qe.key(), (BinaryObject)qe.value());
+
+ readLatch.countDown();
+
+ return true;
+ }
+ };
+
+ grid(i).events().localListen(objReadPred,
EVT_CACHE_QUERY_OBJECT_READ);
+ objReadLsnrs[i] = objReadPred;
+
+ IgnitePredicate<Event> qryExecPred = new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt instanceof CacheQueryExecutedEvent;
+
+ if (evtsDisabled)
+ fail("Cache events are disabled");
+
+ CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
+
+ assertEquals(INDEX.name(), qe.queryType());
+ assertEquals(cache.getName(), qe.cacheName());
+ assertEquals("Type2", QueryUtils.typeName(qe.className()));
+ assertNotNull(qe.scanQueryFilter());
+ assertNull(qe.clause());
+ assertNull(qe.continuousQueryFilter());
+ assertNull(qe.arguments());
+
+ execLatch.countDown();
+
+ return true;
+ }
+ };
+
+ grid(i).events().localListen(qryExecPred,
EVT_CACHE_QUERY_EXECUTED);
+ qryExecLsnrs[i] = qryExecPred;
+ }
+
+ try {
+ cache.put(1, new Type2(1, "John"));
+ cache.put(2, new Type2(2, "Bill"));
+ cache.put(3, new Type2(3, "Sam"));
+ cache.put(4, new Type2(4, "Bill"));
+ cache.put(5, new Type2(5, "Bob"));
+
+ IndexQuery<Integer, Type2> qry = new IndexQuery<Integer,
Type2>(Type2.class)
+ .setCriteria(gt("id", 1), lt("id", 5))
+ .setFilter((k, v) -> v.name().contains("Bill"));
+
+ if (cacheMode() == REPLICATED)
+ qry.setLocal(true);
+
+ QueryCursor<Cache.Entry<Integer, Type2>> cursor = cache.query(qry);
+
+ cursor.getAll();
+
+ assert readLatch.await(1000, MILLISECONDS);
+ assert execLatch.await(1000, MILLISECONDS);
+
+ if (!evtsDisabled) {
+ assertEquals(2, qryResults.size());
+
+ assertEquals("Bill",
((Type2)qryResults.get(2).deserialize()).name());
+ assertEquals("Bill",
((Type2)qryResults.get(4).deserialize()).name());
+ }
+ else
+ assert qryResults.isEmpty();
+ }
+ finally {
+ for (int i = 0; i < gridCount(); i++) {
+ grid(i).events().stopLocalListen(objReadLsnrs[i]);
+ grid(i).events().stopLocalListen(qryExecLsnrs[i]);
+ }
+ }
+ }
+
/**
* @throws Exception If failed.
*/