This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new ed3dead1417 IGNITE-19586 SQL Calcite: Fix SQL/Query events - Fixes 
#10756.
ed3dead1417 is described below

commit ed3dead1417b90c48a3bd0c5a75f3259971d049b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Jun 1 11:58:44 2023 +0500

    IGNITE-19586 SQL Calcite: Fix SQL/Query events - Fixes #10756.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/CalciteQueryProcessor.java       | 13 +++
 .../query/calcite/exec/ExecutionServiceImpl.java   | 41 ++++++++-
 .../calcite/util/ConvertingClosableIterator.java   |  9 +-
 .../integration/SqlDiagnosticIntegrationTest.java  | 99 ++++++++++++++++++++++
 .../java/org/apache/ignite/events/EventType.java   |  3 +-
 .../processors/query/GridQueryProcessor.java       |  2 +-
 .../internal/processors/query/QueryProperties.java | 13 ++-
 7 files changed, 174 insertions(+), 6 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 2d543a919a8..7afc6746ed8 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -48,6 +48,7 @@ import org.apache.ignite.SystemProperty;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
 import org.apache.ignite.configuration.QueryEngineConfiguration;
+import org.apache.ignite.events.SqlQueryExecutionEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -98,10 +99,12 @@ import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
 
 /** */
 public class CalciteQueryProcessor extends GridProcessorAdapter implements 
QueryEngine {
@@ -521,6 +524,16 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
 
         qryReg.register(qry);
 
+        if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
+            ctx.event().record(new SqlQueryExecutionEvent(
+                ctx.discovery().localNode(),
+                "SQL query execution.",
+                sql,
+                params,
+                SecurityUtils.securitySubjectId(ctx))
+            );
+        }
+
         try {
             return action.apply(qry);
         }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 414db873898..6adb151bb3a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -35,6 +35,8 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
@@ -44,6 +46,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheObjectUtils;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import 
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
@@ -95,12 +98,14 @@ import 
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import 
org.apache.ignite.internal.processors.query.calcite.util.ConvertingClosableIterator;
 import 
org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.singletonList;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
 import static 
org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
 
@@ -115,6 +120,9 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
     /** */
     private UUID locNodeId;
 
+    /** */
+    private GridKernalContext ctx;
+
     /** */
     private GridEventStorageManager evtMgr;
 
@@ -400,6 +408,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
     /** {@inheritDoc} */
     @Override public void onStart(GridKernalContext ctx) {
+        this.ctx = ctx;
         localNodeId(ctx.localNodeId());
         exchangeManager(ctx.cache().context().exchange());
         cacheObjectValueContext(ctx.query().objectContext());
@@ -647,8 +656,38 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
         Function<Object, Object> fieldConverter = (qryProps == null || 
qryProps.keepBinary()) ? null :
             o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, 
true, null);
 
+        Function<List<Object>, List<Object>> rowConverter = null;
+
+        // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return 
result to cursor.
+        if (qryProps != null && qryProps.cacheName() != null && 
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+            ClusterNode locNode = ctx.discovery().localNode();
+            UUID subjId = SecurityUtils.securitySubjectId(ctx);
+
+            rowConverter = row -> {
+                evtMgr.record(new CacheQueryReadEvent<>(
+                    locNode,
+                    "SQL fields query result set row read.",
+                    EVT_CACHE_QUERY_OBJECT_READ,
+                    CacheQueryType.SQL_FIELDS.name(),
+                    qryProps.cacheName(),
+                    null,
+                    qry.sql(),
+                    null,
+                    null,
+                    qry.parameters(),
+                    subjId,
+                    null,
+                    null,
+                    null,
+                    null,
+                    row));
+
+                return row;
+            };
+        }
+
         Iterator<List<?>> it = new 
ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
-            fieldConverter);
+            fieldConverter, rowConverter);
 
         return new ListFieldsQueryCursor<>(plan, it, ectx);
     }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
index e3d19e18f6b..998f83bca0a 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -39,15 +39,20 @@ public class ConvertingClosableIterator<Row> implements 
Iterator<List<?>>, AutoC
     /** */
     @Nullable private final Function<Object, Object> fieldConverter;
 
+    /** */
+    @Nullable Function<List<Object>, List<Object>> rowConverter;
+
     /** */
     public ConvertingClosableIterator(
         Iterator<Row> it,
         ExecutionContext<Row> ectx,
-        @Nullable Function<Object, Object> fieldConverter
+        @Nullable Function<Object, Object> fieldConverter,
+        @Nullable Function<List<Object>, List<Object>> rowConverter
     ) {
         this.it = it;
         rowHnd = ectx.rowHandler();
         this.fieldConverter = fieldConverter;
+        this.rowConverter = rowConverter;
     }
 
     /**
@@ -70,7 +75,7 @@ public class ConvertingClosableIterator<Row> implements 
Iterator<List<?>>, AutoC
         for (int i = 0; i < rowSize; i++)
             res.add(fieldConverter == null ? rowHnd.get(i, next) : 
fieldConverter.apply(rowHnd.get(i, next)));
 
-        return res;
+        return rowConverter == null ? res : rowConverter.apply(res);
     }
 
     /**
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 064fba39a39..e67fb63b379 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -24,13 +24,24 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.CacheQueryReadEvent;
+import org.apache.ignite.events.SqlQueryExecutionEvent;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import 
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.junit.Test;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
 import static 
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static 
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir;
 import static 
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
@@ -40,6 +51,34 @@ import static 
org.apache.ignite.internal.processors.performancestatistics.Abstra
  * Test SQL diagnostic tools.
  */
 public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest 
{
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setSqlConfiguration(new 
SqlConfiguration().setQueryEnginesConfiguration(new 
CalciteQueryEngineConfiguration()))
+            .setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION, 
EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGrids(nodeCount());
+
+        client = startClientGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
     /** */
     @Override protected int nodeCount() {
         return 2;
@@ -125,4 +164,64 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
         assertTrue("Query reads expected on nodes: " + readsNodes, 
readsNodes.isEmpty());
         assertEquals(Collections.singleton(lastQryId.get()), readsQueries);
     }
+
+    /** */
+    @Test
+    public void testSqlEvents() {
+        sql("CREATE TABLE test_event (a INT) WITH cache_name=\"test_event\"");
+
+        AtomicIntegerArray evtsSqlExec = new AtomicIntegerArray(nodeCount());
+        AtomicIntegerArray evtsQryExec = new AtomicIntegerArray(nodeCount());
+        AtomicIntegerArray evtsQryRead = new AtomicIntegerArray(nodeCount());
+        for (int i = 0; i < nodeCount(); i++) {
+            int n = i;
+            grid(i).events().localListen(e -> {
+                evtsSqlExec.incrementAndGet(n);
+
+                assertTrue(e instanceof SqlQueryExecutionEvent);
+                
assertTrue(((SqlQueryExecutionEvent)e).text().toLowerCase().contains("test_event"));
+
+                return true;
+            }, EVT_SQL_QUERY_EXECUTION);
+
+            grid(i).events().localListen(e -> {
+                evtsQryExec.incrementAndGet(n);
+
+                assertTrue(e instanceof CacheQueryExecutedEvent);
+                assertEquals("test_event", ((CacheQueryExecutedEvent<?, 
?>)e).cacheName());
+                assertTrue(((CacheQueryExecutedEvent<?, 
?>)e).clause().toLowerCase().contains("test_event"));
+                assertEquals(SQL_FIELDS.name(), ((CacheQueryExecutedEvent<?, 
?>)e).queryType());
+                assertEquals(3, ((CacheQueryExecutedEvent<?, 
?>)e).arguments().length);
+                assertNull(((CacheQueryExecutedEvent<?, 
?>)e).scanQueryFilter());
+                assertNull(((CacheQueryExecutedEvent<?, 
?>)e).continuousQueryFilter());
+
+                return true;
+            }, EVT_CACHE_QUERY_EXECUTED);
+
+            grid(i).events().localListen(e -> {
+                evtsQryRead.incrementAndGet(n);
+
+                assertTrue(e instanceof CacheQueryReadEvent);
+                assertEquals(SQL_FIELDS.name(), ((CacheQueryReadEvent<?, 
?>)e).queryType());
+                assertTrue(((CacheQueryReadEvent<?, 
?>)e).clause().toLowerCase().contains("test_event"));
+                assertNotNull(((CacheQueryReadEvent<?, ?>)e).row());
+
+                return true;
+            }, EVT_CACHE_QUERY_OBJECT_READ);
+        }
+
+        grid(0).cache("test_event").query(new SqlFieldsQuery("INSERT INTO 
test_event VALUES (?), (?), (?)")
+                .setArgs(0, 1, 2)).getAll();
+
+        grid(0).cache("test_event").query(new SqlFieldsQuery("SELECT * FROM 
test_event WHERE a IN (?, ?, ?)")
+                .setArgs(0, 1, 3)).getAll();
+
+        assertEquals(2, evtsSqlExec.get(0));
+        assertEquals(0, evtsSqlExec.get(1));
+        assertEquals(2, evtsQryExec.get(0));
+        assertEquals(0, evtsQryExec.get(1));
+        // 1 event fired by insert (number of rows inserted) + 2 events (1 per 
row selected) fired by the second query.
+        assertEquals(3, evtsQryRead.get(0));
+        assertEquals(0, evtsQryRead.get(1));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java 
b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index cdacb5cede3..69716c3176f 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -545,6 +545,7 @@ public interface EventType {
      * @see CacheEvent
      */
     public static final int EVT_CACHE_OBJECT_EXPIRED = 70;
+
     /**
      * Built-in event type: cache rebalance started.
      * <p>
@@ -955,7 +956,7 @@ public interface EventType {
      * This event is triggered after a corresponding SQL query validated and 
before it is executed.
      * Unlike {@link #EVT_CACHE_QUERY_EXECUTED}, {@code 
EVT_SQL_QUERY_EXECUTION} is fired only once for a request
      * and does not relate to a specific cache.
-     * Enet includes the following information: qurey text and its arguments, 
security subject id.
+     * Event includes the following information: query text and its arguments, 
security subject id.
      *
      * <p>
      * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7556fdc17d2..d6a7b7e113f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -3088,7 +3088,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
                         QueryEngine qryEngine = engineForQuery(cliCtx, qry);
 
                         if (qryEngine != null) {
-                            QueryProperties qryProps = new 
QueryProperties(keepBinary);
+                            QueryProperties qryProps = new 
QueryProperties(cctx == null ? null : cctx.name(), keepBinary);
 
                             if (qry instanceof SqlFieldsQueryEx && 
((SqlFieldsQueryEx)qry).isBatched()) {
                                 res = qryEngine.queryBatched(
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
index fa41ffb196c..9e2bad26704 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
@@ -17,15 +17,21 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Additional properties to execute the query (Stored in {@link QueryContext}).
  */
 public final class QueryProperties {
+    /** */
+    @Nullable String cacheName;
+
     /** */
     private final boolean keepBinary;
 
     /** */
-    public QueryProperties(boolean keepBinary) {
+    public QueryProperties(@Nullable String cacheName, boolean keepBinary) {
+        this.cacheName = cacheName;
         this.keepBinary = keepBinary;
     }
 
@@ -33,4 +39,9 @@ public final class QueryProperties {
     public boolean keepBinary() {
         return keepBinary;
     }
+
+    /** */
+    public @Nullable String cacheName() {
+        return cacheName;
+    }
 }

Reply via email to