This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new ed3dead1417 IGNITE-19586 SQL Calcite: Fix SQL/Query events - Fixes
#10756.
ed3dead1417 is described below
commit ed3dead1417b90c48a3bd0c5a75f3259971d049b
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Jun 1 11:58:44 2023 +0500
IGNITE-19586 SQL Calcite: Fix SQL/Query events - Fixes #10756.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/CalciteQueryProcessor.java | 13 +++
.../query/calcite/exec/ExecutionServiceImpl.java | 41 ++++++++-
.../calcite/util/ConvertingClosableIterator.java | 9 +-
.../integration/SqlDiagnosticIntegrationTest.java | 99 ++++++++++++++++++++++
.../java/org/apache/ignite/events/EventType.java | 3 +-
.../processors/query/GridQueryProcessor.java | 2 +-
.../internal/processors/query/QueryProperties.java | 13 ++-
7 files changed, 174 insertions(+), 6 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 2d543a919a8..7afc6746ed8 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -48,6 +48,7 @@ import org.apache.ignite.SystemProperty;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
+import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -98,10 +99,12 @@ import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
/** */
public class CalciteQueryProcessor extends GridProcessorAdapter implements
QueryEngine {
@@ -521,6 +524,16 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
qryReg.register(qry);
+ if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
+ ctx.event().record(new SqlQueryExecutionEvent(
+ ctx.discovery().localNode(),
+ "SQL query execution.",
+ sql,
+ params,
+ SecurityUtils.securitySubjectId(ctx))
+ );
+ }
+
try {
return action.apply(qry);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 414db873898..6adb151bb3a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -35,6 +35,8 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
@@ -44,6 +46,7 @@ import
org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
@@ -95,12 +98,14 @@ import
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import
org.apache.ignite.internal.processors.query.calcite.util.ConvertingClosableIterator;
import
org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.singletonList;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
import static
org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
@@ -115,6 +120,9 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** */
private UUID locNodeId;
+ /** */
+ private GridKernalContext ctx;
+
/** */
private GridEventStorageManager evtMgr;
@@ -400,6 +408,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
+ this.ctx = ctx;
localNodeId(ctx.localNodeId());
exchangeManager(ctx.cache().context().exchange());
cacheObjectValueContext(ctx.query().objectContext());
@@ -647,8 +656,38 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
Function<Object, Object> fieldConverter = (qryProps == null ||
qryProps.keepBinary()) ? null :
o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false,
true, null);
+ Function<List<Object>, List<Object>> rowConverter = null;
+
+ // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return
result to cursor.
+ if (qryProps != null && qryProps.cacheName() != null &&
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+ ClusterNode locNode = ctx.discovery().localNode();
+ UUID subjId = SecurityUtils.securitySubjectId(ctx);
+
+ rowConverter = row -> {
+ evtMgr.record(new CacheQueryReadEvent<>(
+ locNode,
+ "SQL fields query result set row read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SQL_FIELDS.name(),
+ qryProps.cacheName(),
+ null,
+ qry.sql(),
+ null,
+ null,
+ qry.parameters(),
+ subjId,
+ null,
+ null,
+ null,
+ null,
+ row));
+
+ return row;
+ };
+ }
+
Iterator<List<?>> it = new
ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
- fieldConverter);
+ fieldConverter, rowConverter);
return new ListFieldsQueryCursor<>(plan, it, ectx);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
index e3d19e18f6b..998f83bca0a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -39,15 +39,20 @@ public class ConvertingClosableIterator<Row> implements
Iterator<List<?>>, AutoC
/** */
@Nullable private final Function<Object, Object> fieldConverter;
+ /** */
+ @Nullable Function<List<Object>, List<Object>> rowConverter;
+
/** */
public ConvertingClosableIterator(
Iterator<Row> it,
ExecutionContext<Row> ectx,
- @Nullable Function<Object, Object> fieldConverter
+ @Nullable Function<Object, Object> fieldConverter,
+ @Nullable Function<List<Object>, List<Object>> rowConverter
) {
this.it = it;
rowHnd = ectx.rowHandler();
this.fieldConverter = fieldConverter;
+ this.rowConverter = rowConverter;
}
/**
@@ -70,7 +75,7 @@ public class ConvertingClosableIterator<Row> implements
Iterator<List<?>>, AutoC
for (int i = 0; i < rowSize; i++)
res.add(fieldConverter == null ? rowHnd.get(i, next) :
fieldConverter.apply(rowHnd.get(i, next)));
- return res;
+ return rowConverter == null ? res : rowConverter.apply(res);
}
/**
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 064fba39a39..e67fb63b379 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -24,13 +24,24 @@ import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.CacheQueryReadEvent;
+import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
@@ -40,6 +51,34 @@ import static
org.apache.ignite.internal.processors.performancestatistics.Abstra
* Test SQL diagnostic tools.
*/
public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest
{
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setSqlConfiguration(new
SqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration()))
+ .setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION,
EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGrids(nodeCount());
+
+ client = startClientGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
/** */
@Override protected int nodeCount() {
return 2;
@@ -125,4 +164,64 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
assertTrue("Query reads expected on nodes: " + readsNodes,
readsNodes.isEmpty());
assertEquals(Collections.singleton(lastQryId.get()), readsQueries);
}
+
+ /** */
+ @Test
+ public void testSqlEvents() {
+ sql("CREATE TABLE test_event (a INT) WITH cache_name=\"test_event\"");
+
+ AtomicIntegerArray evtsSqlExec = new AtomicIntegerArray(nodeCount());
+ AtomicIntegerArray evtsQryExec = new AtomicIntegerArray(nodeCount());
+ AtomicIntegerArray evtsQryRead = new AtomicIntegerArray(nodeCount());
+ for (int i = 0; i < nodeCount(); i++) {
+ int n = i;
+ grid(i).events().localListen(e -> {
+ evtsSqlExec.incrementAndGet(n);
+
+ assertTrue(e instanceof SqlQueryExecutionEvent);
+
assertTrue(((SqlQueryExecutionEvent)e).text().toLowerCase().contains("test_event"));
+
+ return true;
+ }, EVT_SQL_QUERY_EXECUTION);
+
+ grid(i).events().localListen(e -> {
+ evtsQryExec.incrementAndGet(n);
+
+ assertTrue(e instanceof CacheQueryExecutedEvent);
+ assertEquals("test_event", ((CacheQueryExecutedEvent<?,
?>)e).cacheName());
+ assertTrue(((CacheQueryExecutedEvent<?,
?>)e).clause().toLowerCase().contains("test_event"));
+ assertEquals(SQL_FIELDS.name(), ((CacheQueryExecutedEvent<?,
?>)e).queryType());
+ assertEquals(3, ((CacheQueryExecutedEvent<?,
?>)e).arguments().length);
+ assertNull(((CacheQueryExecutedEvent<?,
?>)e).scanQueryFilter());
+ assertNull(((CacheQueryExecutedEvent<?,
?>)e).continuousQueryFilter());
+
+ return true;
+ }, EVT_CACHE_QUERY_EXECUTED);
+
+ grid(i).events().localListen(e -> {
+ evtsQryRead.incrementAndGet(n);
+
+ assertTrue(e instanceof CacheQueryReadEvent);
+ assertEquals(SQL_FIELDS.name(), ((CacheQueryReadEvent<?,
?>)e).queryType());
+ assertTrue(((CacheQueryReadEvent<?,
?>)e).clause().toLowerCase().contains("test_event"));
+ assertNotNull(((CacheQueryReadEvent<?, ?>)e).row());
+
+ return true;
+ }, EVT_CACHE_QUERY_OBJECT_READ);
+ }
+
+ grid(0).cache("test_event").query(new SqlFieldsQuery("INSERT INTO
test_event VALUES (?), (?), (?)")
+ .setArgs(0, 1, 2)).getAll();
+
+ grid(0).cache("test_event").query(new SqlFieldsQuery("SELECT * FROM
test_event WHERE a IN (?, ?, ?)")
+ .setArgs(0, 1, 3)).getAll();
+
+ assertEquals(2, evtsSqlExec.get(0));
+ assertEquals(0, evtsSqlExec.get(1));
+ assertEquals(2, evtsQryExec.get(0));
+ assertEquals(0, evtsQryExec.get(1));
+ // 1 event fired by insert (number of rows inserted) + 2 events (1 per
row selected) fired by the second query.
+ assertEquals(3, evtsQryRead.get(0));
+ assertEquals(0, evtsQryRead.get(1));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index cdacb5cede3..69716c3176f 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -545,6 +545,7 @@ public interface EventType {
* @see CacheEvent
*/
public static final int EVT_CACHE_OBJECT_EXPIRED = 70;
+
/**
* Built-in event type: cache rebalance started.
* <p>
@@ -955,7 +956,7 @@ public interface EventType {
* This event is triggered after a corresponding SQL query validated and
before it is executed.
* Unlike {@link #EVT_CACHE_QUERY_EXECUTED}, {@code
EVT_SQL_QUERY_EXECUTION} is fired only once for a request
* and does not relate to a specific cache.
- * Enet includes the following information: qurey text and its arguments,
security subject id.
+ * Event includes the following information: query text and its arguments,
security subject id.
*
* <p>
* NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7556fdc17d2..d6a7b7e113f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -3088,7 +3088,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
QueryEngine qryEngine = engineForQuery(cliCtx, qry);
if (qryEngine != null) {
- QueryProperties qryProps = new
QueryProperties(keepBinary);
+ QueryProperties qryProps = new
QueryProperties(cctx == null ? null : cctx.name(), keepBinary);
if (qry instanceof SqlFieldsQueryEx &&
((SqlFieldsQueryEx)qry).isBatched()) {
res = qryEngine.queryBatched(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
index fa41ffb196c..9e2bad26704 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
@@ -17,15 +17,21 @@
package org.apache.ignite.internal.processors.query;
+import org.jetbrains.annotations.Nullable;
+
/**
* Additional properties to execute the query (Stored in {@link QueryContext}).
*/
public final class QueryProperties {
+ /** */
+ @Nullable String cacheName;
+
/** */
private final boolean keepBinary;
/** */
- public QueryProperties(boolean keepBinary) {
+ public QueryProperties(@Nullable String cacheName, boolean keepBinary) {
+ this.cacheName = cacheName;
this.keepBinary = keepBinary;
}
@@ -33,4 +39,9 @@ public final class QueryProperties {
public boolean keepBinary() {
return keepBinary;
}
+
+ /** */
+ public @Nullable String cacheName() {
+ return cacheName;
+ }
}