This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4bccc3d40d7 IGNITE-28621 SQL: Add H2 map query details to system view
and logs - Fixes #13091.
4bccc3d40d7 is described below
commit 4bccc3d40d7f21ccd210a88c48a7e74fd74e821f
Author: chesnokoff <[email protected]>
AuthorDate: Fri May 15 09:28:31 2026 +0300
IGNITE-28621 SQL: Add H2 map query details to system view and logs - Fixes
#13091.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../common/RunningQueryInfoCheckInitiatorTest.java | 4 +-
.../ignite/jdbc/thin/JdbcThinMetadataSelfTest.java | 1 +
.../query/running/GridRunningQueryInfo.java | 13 ++++
.../query/running/RunningQueryManager.java | 74 ++++++++++++++++++--
.../ignite/spi/systemview/view/SqlQueryView.java | 6 ++
.../processors/query/h2/IgniteH2Indexing.java | 2 +
.../processors/query/h2/MapH2QueryInfo.java | 15 +++-
.../query/h2/twostep/GridMapQueryExecutor.java | 47 ++++++++++++-
.../query/h2/twostep/GridReduceQueryExecutor.java | 3 +
.../query/h2/twostep/MapQueryResult.java | 2 +
.../query/h2/twostep/msg/GridH2DmlRequest.java | 22 ++++++
.../processors/query/LongRunningQueryTest.java | 27 ++++++++
.../processors/query/SqlSystemViewsSelfTest.java | 79 ++++++++++++++++++++++
13 files changed, 285 insertions(+), 10 deletions(-)
diff --git
a/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java
b/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java
index c00e27cf547..86204c379c0 100644
---
a/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/common/RunningQueryInfoCheckInitiatorTest.java
@@ -336,7 +336,7 @@ public class RunningQueryInfoCheckInitiatorTest extends
JdbcThinAbstractSelfTest
fail("Timeout. Cannot find query with: " + sqlMatch);
List<List<?>> res = node.context().query().querySqlFields(
- new SqlFieldsQuery("SELECT sql, initiator_id FROM
SYS.SQL_QUERIES"), false).getAll();
+ new SqlFieldsQuery("SELECT sql, initiator_id FROM
SYS.SQL_QUERIES WHERE MAP_QUERY = FALSE"), false).getAll();
for (List<?> row : res) {
if
(((String)row.get(0)).toUpperCase().contains(sqlMatch.toUpperCase()))
@@ -356,7 +356,7 @@ public class RunningQueryInfoCheckInitiatorTest extends
JdbcThinAbstractSelfTest
while (true) {
List<List<?>> res = node.context().query().querySqlFields(
- new SqlFieldsQuery("SELECT * FROM SYS.SQL_QUERIES"),
false).getAll();
+ new SqlFieldsQuery("SELECT * FROM SYS.SQL_QUERIES WHERE
MAP_QUERY = FALSE"), false).getAll();
res.stream().forEach(System.out::println);
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
index b2de892d55e..8fdb7f651e2 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinMetadataSelfTest.java
@@ -787,6 +787,7 @@ public class JdbcThinMetadataSelfTest extends
JdbcThinAbstractSelfTest {
"SYS.SQL_QUERIES.DURATION.null",
"SYS.SQL_QUERIES.ORIGIN_NODE_ID.null",
"SYS.SQL_QUERIES.INITIATOR_ID.null",
+ "SYS.SQL_QUERIES.MAP_QUERY.null",
"SYS.SQL_QUERIES.SUBJECT_ID.null",
"SYS.SCAN_QUERIES.START_TIME.null",
"SYS.SCAN_QUERIES.TRANSFORMER.null",
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/GridRunningQueryInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/GridRunningQueryInfo.java
index 69530a59bcb..599102b8c4c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/GridRunningQueryInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/GridRunningQueryInfo.java
@@ -67,6 +67,9 @@ public class GridRunningQueryInfo {
/** Originator. */
private final String qryInitiatorId;
+ /** Map query flag. */
+ private final boolean mapQry;
+
/** Enforce join order flag. */
private final boolean enforceJoinOrder;
@@ -89,6 +92,7 @@ public class GridRunningQueryInfo {
* @param cancel Query cancel.
* @param loc Local query flag.
* @param qryInitiatorId Query's initiator identifier.
+ * @param mapQry Map query flag.
* @param enforceJoinOrder Enforce join order flag.
* @param distributedJoins Distributed joins flag.
* @param subjId Subject ID.
@@ -104,6 +108,7 @@ public class GridRunningQueryInfo {
GridQueryCancel cancel,
boolean loc,
String qryInitiatorId,
+ boolean mapQry,
boolean enforceJoinOrder,
boolean distributedJoins,
UUID subjId
@@ -119,6 +124,7 @@ public class GridRunningQueryInfo {
this.loc = loc;
this.span = MTC.span();
this.qryInitiatorId = qryInitiatorId;
+ this.mapQry = mapQry;
this.enforceJoinOrder = enforceJoinOrder;
this.distributedJoins = distributedJoins;
this.subjId = subjId;
@@ -224,6 +230,13 @@ public class GridRunningQueryInfo {
return qryInitiatorId;
}
+ /**
+ * @return {@code true} if query executes map phase.
+ */
+ public boolean mapQuery() {
+ return mapQry;
+ }
+
/**
* @return Distributed joins.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
index b037037cc88..e35e6809c47 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
@@ -287,6 +287,68 @@ public class RunningQueryManager {
public long register(String qry, GridCacheQueryType qryType, String
schemaName, boolean loc,
@Nullable GridQueryCancel cancel,
String qryInitiatorId, boolean enforceJoinOrder, boolean
distributedJoins) {
+ return register(
+ qry,
+ qryType,
+ schemaName,
+ loc,
+ cancel,
+ qryInitiatorId,
+ enforceJoinOrder,
+ distributedJoins,
+ localNodeId,
+ false
+ );
+ }
+
+ /**
+ * Registers map-side running query and returns an id associated with the
query on the current node.
+ *
+ * @param qry Query text.
+ * @param schemaName Schema name.
+ * @param cancel Query cancel.
+ * @param qryInitiatorId Query initiator ID.
+ * @param originNodeId Query origin node ID.
+ * @param enforceJoinOrder Enforce join order flag.
+ * @param distributedJoins Distributed joins flag.
+ * @return Id of registered query.
+ */
+ public long registerMapQuery(
+ String qry,
+ String schemaName,
+ @Nullable GridQueryCancel cancel,
+ String qryInitiatorId,
+ UUID originNodeId,
+ boolean enforceJoinOrder,
+ boolean distributedJoins
+ ) {
+ return register(
+ qry,
+ SQL_FIELDS,
+ schemaName,
+ false,
+ cancel,
+ qryInitiatorId,
+ enforceJoinOrder,
+ distributedJoins,
+ originNodeId,
+ true
+ );
+ }
+
+ /** Registers running query and returns an id associated with the query. */
+ private long register(
+ String qry,
+ GridCacheQueryType qryType,
+ String schemaName,
+ boolean loc,
+ @Nullable GridQueryCancel cancel,
+ String qryInitiatorId,
+ boolean enforceJoinOrder,
+ boolean distributedJoins,
+ UUID nodeId,
+ boolean mapQry
+ ) {
long qryId = qryIdGen.incrementAndGet();
if (qryInitiatorId == null)
@@ -294,7 +356,7 @@ public class RunningQueryManager {
final GridRunningQueryInfo run = new GridRunningQueryInfo(
qryId,
- localNodeId,
+ nodeId,
qry,
qryType,
schemaName,
@@ -303,6 +365,7 @@ public class RunningQueryManager {
cancel,
loc,
qryInitiatorId,
+ mapQry,
enforceJoinOrder,
distributedJoins,
securitySubjectId(ctx)
@@ -314,7 +377,7 @@ public class RunningQueryManager {
run.span().addTag(SQL_QRY_ID, run::globalQueryId);
- if (!qryStartedListeners.isEmpty()) {
+ if (!mapQry && !qryStartedListeners.isEmpty()) {
GridQueryStartedInfo info = new GridQueryStartedInfo(
run.id(),
localNodeId,
@@ -375,10 +438,13 @@ public class RunningQueryManager {
if (failed)
qrySpan.addTag(ERROR, failReason::getMessage);
- //We need to collect query history and metrics only for SQL
queries.
if (isSqlQuery(qry)) {
qry.runningFuture().onDone();
+ if (qry.mapQuery())
+ return;
+
+ // We need to collect query history and metrics only for SQL
queries initiated by user.
qryHistTracker.collectHistory(qry, failed);
if (!failed)
@@ -553,7 +619,7 @@ public class RunningQueryManager {
long curTime = U.currentTimeMillis();
for (GridRunningQueryInfo runningQryInfo : runs.values()) {
- if (curTime - runningQryInfo.startTime() > duration)
+ if (!runningQryInfo.mapQuery() && curTime -
runningQryInfo.startTime() > duration)
res.add(runningQryInfo);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
index cd5f2d7d91b..1537f79ea00 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
@@ -81,6 +81,12 @@ public class SqlQueryView {
return qry.queryInitiatorId();
}
+ /** @return {@code True} if query executes map phase. */
+ @Order(8)
+ public boolean mapQuery() {
+ return qry.mapQuery();
+ }
+
/** @return {@code True} if query is local. */
public boolean local() {
return qry.local();
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 88ac32a113c..9e84486f16b 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2177,6 +2177,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
distributedPlan.getCacheIds(),
qryDesc.sql(),
qryParams.arguments(),
+ qryDesc.queryInitiatorId(),
qryDesc.enforceJoinOrder(),
qryParams.pageSize(),
qryParams.timeout(),
@@ -2205,6 +2206,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
.setEnforceJoinOrder(qryDesc.enforceJoinOrder())
.setLocal(qryDesc.local())
.setPageSize(qryParams.pageSize())
+ .setQueryInitiatorId(qryDesc.queryInitiatorId())
.setTimeout(qryParams.timeout(), TimeUnit.MILLISECONDS);
Iterable<List<?>> cur;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
index a159d2ac2c9..3e10c7733b1 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
@@ -30,11 +30,15 @@ public class MapH2QueryInfo extends H2QueryInfo {
/** Segment. */
private final int segment;
+ /** Local query id. */
+ private final long locQryId;
+
/**
* @param stmt Query statement.
* @param sql Query statement.
* @param nodeId Originator node id.
* @param qryId Query id.
+ * @param locQryId Local query id.
* @param initiatorId Query initiator id.
* @param reqId Request ID.
* @param segment Segment.
@@ -44,19 +48,28 @@ public class MapH2QueryInfo extends H2QueryInfo {
String sql,
UUID nodeId,
long qryId,
+ long locQryId,
String initiatorId,
long reqId,
int segment
) {
super(QueryType.MAP, stmt, sql, nodeId, qryId, initiatorId);
+ this.locQryId = locQryId;
this.reqId = reqId;
this.segment = segment;
}
+ /** @return Local query id. */
+ public long localQueryId() {
+ return locQryId;
+ }
+
/** {@inheritDoc} */
@Override protected void printInfo(StringBuilder msg) {
- msg.append(", reqId=").append(reqId)
+ msg.append(", mapQuery=true")
+ .append(", originNodeId=").append(nodeId())
+ .append(", reqId=").append(reqId)
.append(", segment=").append(segment);
}
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index f2a4b459a7d..36ff10e3146 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -71,6 +71,7 @@ import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
import
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
import
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Span;
@@ -454,6 +455,8 @@ public class GridMapQueryExecutor {
MapH2QueryInfo qryInfo = null;
+ long runningQryId = RunningQueryManager.UNDEFINED_QUERY_ID;
+
try {
res.lock();
@@ -468,7 +471,28 @@ public class GridMapQueryExecutor {
H2Utils.bindParameters(stmt, params0);
- qryInfo = new MapH2QueryInfo(stmt, qry.query(),
node.id(), qryId, qryInitiatorId, reqId, segmentId);
+ GridQueryCancel qryCancel =
qryResults.queryCancel(qryIdx);
+
+ runningQryId =
h2.runningQueryManager().registerMapQuery(
+ sql,
+ schemaName,
+ qryCancel,
+ qryInitiatorId,
+ node.id(),
+ enforceJoinOrder,
+ distributedJoins
+ );
+
+ qryInfo = new MapH2QueryInfo(
+ stmt,
+ qry.query(),
+ node.id(),
+ qryId,
+ runningQryId,
+ qryInitiatorId,
+ reqId,
+ segmentId
+ );
h2.heavyQueriesTracker().startTracking(qryInfo);
@@ -482,8 +506,6 @@ public class GridMapQueryExecutor {
);
}
- GridQueryCancel qryCancel =
qryResults.queryCancel(qryIdx);
-
ResultSet rs = h2.executeWithResumableTimeTracking(
() -> h2.executeSqlQueryWithTimer(
stmt,
@@ -567,6 +589,8 @@ public class GridMapQueryExecutor {
if (qryInfo != null)
h2.heavyQueriesTracker().stopTracking(qryInfo, e);
+ h2.runningQueryManager().unregister(runningQryId, e);
+
throw e;
}
finally {
@@ -684,6 +708,8 @@ public class GridMapQueryExecutor {
MapNodeResults nodeResults = resultsForNode(node.id());
+ long runningQryId = RunningQueryManager.UNDEFINED_QUERY_ID;
+
// We don't use try with resources on purpose - the catch block must
also be executed in the context of this span.
TraceSurroundings trace = MTC.support(ctx.tracing()
.create(SpanType.SQL_DML_QRY_EXEC_REQ, MTC.span())
@@ -720,6 +746,7 @@ public class GridMapQueryExecutor {
fldsQry.setEnforceJoinOrder(req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
fldsQry.setPageSize(req.pageSize());
+ fldsQry.setQueryInitiatorId(req.queryInitiatorId());
fldsQry.setLocal(true);
if (req.timeout() > 0 || req.explicitTimeout())
@@ -736,6 +763,16 @@ public class GridMapQueryExecutor {
loc = false;
}
+ runningQryId = h2.runningQueryManager().registerMapQuery(
+ req.query(),
+ req.schemaName(),
+ cancel,
+ req.queryInitiatorId(),
+ node.id(),
+ req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER),
+ fldsQry.isDistributedJoins()
+ );
+
UpdateResult updRes = h2.executeUpdateOnDataNode(req.schemaName(),
fldsQry, filter, cancel, loc);
GridCacheContext<?, ?> mainCctx =
@@ -760,6 +797,8 @@ public class GridMapQueryExecutor {
}
sendUpdateResponse(node, reqId, updRes, null);
+
+ h2.runningQueryManager().unregister(runningQryId, null);
}
catch (Exception e) {
MTC.span().addTag(ERROR, e::getMessage);
@@ -767,6 +806,8 @@ public class GridMapQueryExecutor {
U.error(log, "Error processing dml request. [localNodeId=" +
ctx.localNodeId() +
", nodeId=" + node.id() + ", req=" + req + ']', e);
+ h2.runningQueryManager().unregister(runningQryId, e);
+
sendUpdateResponse(node, reqId, null, e.getMessage());
}
finally {
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 951e98a1587..977f44bda68 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -900,6 +900,7 @@ public class GridReduceQueryExecutor {
* @param cacheIds Cache ids.
* @param selectQry Select query.
* @param params SQL parameters.
+ * @param qryInitiatorId Query initiator id.
* @param enforceJoinOrder Enforce join order of tables.
* @param pageSize Page size.
* @param timeoutMillis Timeout.
@@ -914,6 +915,7 @@ public class GridReduceQueryExecutor {
List<Integer> cacheIds,
String selectQry,
Object[] params,
+ String qryInitiatorId,
boolean enforceJoinOrder,
int pageSize,
int timeoutMillis,
@@ -967,6 +969,7 @@ public class GridReduceQueryExecutor {
.query(selectQry)
.pageSize(pageSize)
.parameters(params)
+ .queryInitiatorId(qryInitiatorId)
.timeout(timeoutMillis)
.explicitTimeout(true)
.flags(flags);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index 0d844d289f5..4e692207987 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -390,6 +390,8 @@ class MapQueryResult {
U.close(rs, log);
h2.heavyQueriesTracker().stopTracking(qryInfo, null);
+
+ h2.runningQueryManager().unregister(qryInfo.localQueryId(), null);
}
}
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
index 56bd37e7a54..725bdaf6ecc 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2DmlRequest.java
@@ -91,6 +91,10 @@ public class GridH2DmlRequest implements Message,
GridCacheQueryMarshallable {
@Order(10)
boolean explicitTimeout;
+ /** Query initiator id. */
+ @Order(11)
+ String qryInitiatorId;
+
/**
* Empty constructor.
*/
@@ -114,6 +118,7 @@ public class GridH2DmlRequest implements Message,
GridCacheQueryMarshallable {
paramsBytes = req.paramsBytes;
schemaName = req.schemaName;
explicitTimeout = req.explicitTimeout;
+ qryInitiatorId = req.qryInitiatorId;
}
/**
@@ -309,6 +314,23 @@ public class GridH2DmlRequest implements Message,
GridCacheQueryMarshallable {
return this;
}
+ /**
+ * @return Query initiator id.
+ */
+ public String queryInitiatorId() {
+ return qryInitiatorId;
+ }
+
+ /**
+ * @param qryInitiatorId Query initiator id.
+ * @return {@code this}.
+ */
+ public GridH2DmlRequest queryInitiatorId(String qryInitiatorId) {
+ this.qryInitiatorId = qryInitiatorId;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public void marshall(BinaryMarshaller m) {
if (paramsBytes != null)
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index 37c6edfcb89..d6e7cab4284 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -505,6 +505,33 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
LONG_QUERY_WARNING_TIMEOUT);
}
+ /** Verifies map query information in long-query logs. */
+ @Test
+ @MultiNodeTest
+ public void testLongMapQueryLogInfo() {
+ ListeningTestLogger testLog = testLog();
+
+ String initiatorId = UUID.randomUUID().toString();
+
+ UUID originNodeId = ignite.cluster().localNode().id();
+
+ LogListener lsnr = LogListener.matches(LONG_QUERY_FINISHED_MSG)
+ .andMatches("type=MAP")
+ .andMatches("mapQuery=true")
+ .andMatches("originNodeId=" + originNodeId)
+ .andMatches("initiatorId=" + initiatorId)
+ .build();
+
+ testLog.registerListener(lsnr);
+
+ ignite.cache("test").query(new SqlFieldsQuery("SELECT val FROM test
WHERE id = sleep_func(?, 0)")
+ .setQueryInitiatorId(initiatorId)
+ .setArgs(LONG_QUERY_WARNING_TIMEOUT))
+ .getAll();
+
+ assertTrue(lsnr.check());
+ }
+
/** */
private void checkInitiatorId(ListeningTestLogger log, String type, String
sql, Object... args) {
String initiatorId = UUID.randomUUID().toString();
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSystemViewsSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSystemViewsSelfTest.java
index 8defdf0b3d6..4c9d8362822 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSystemViewsSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlSystemViewsSelfTest.java
@@ -68,6 +68,7 @@ import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.configuration.TopologyValidator;
import org.apache.ignite.internal.ClusterMetricsSnapshot;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.cache.query.index.IndexProcessor;
@@ -681,6 +682,84 @@ public class SqlSystemViewsSelfTest extends
AbstractIndexingCommonTest {
}
}
+ /** Test SELECT and DML map query flags in running queries system view. */
+ @Test
+ public void testMapQueryRunningQueriesView() throws Exception {
+ IgniteEx ignite = startGrids(2);
+
+ IgniteCache<Integer, Integer> cache = createMapQueryTestCache(ignite);
+
+ checkMapQueryView(ignite, cache, "SELECT * FROM Integer WHERE sleep(?)
>= 0");
+
+ checkMapQueryView(ignite, cache, "DELETE FROM Integer WHERE sleep(?)
>= 0");
+ }
+
+ /** Test map query is unregistered from running queries system view on
error. */
+ @Test
+ public void testMapQueryRunningQueriesViewOnError() throws Exception {
+ IgniteEx ignite = startGrids(2);
+
+ IgniteCache<Integer, Integer> cache = createMapQueryTestCache(ignite);
+
+ String initiatorId = UUID.randomUUID().toString();
+
+ GridTestUtils.assertThrows(log,
+ () -> cache.query(new SqlFieldsQuery("SELECT * FROM Integer WHERE
can_fail(_key = 0) = 0")
+ .setQueryInitiatorId(initiatorId)).getAll(),
+ CacheException.class,
+ "Exception calling user-defined function");
+
+ assertTrue(waitForCondition(() -> !hasMapQueryView(ignite,
initiatorId), 5_000));
+ }
+
+ /** */
+ private void checkMapQueryView(IgniteEx ignite, IgniteCache<Integer,
Integer> cache, String sql) throws Exception {
+ String initiatorId = UUID.randomUUID().toString();
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() ->
+ cache.query(new
SqlFieldsQuery(sql).setQueryInitiatorId(initiatorId).setArgs(1_000)).getAll()
+ );
+
+ try {
+ assertTrue(waitForCondition(() -> hasMapQueryView(ignite,
initiatorId), 5_000));
+ }
+ finally {
+ fut.get();
+ }
+
+ assertTrue(waitForCondition(() -> !hasMapQueryView(ignite,
initiatorId), 5_000));
+ }
+
+ /** */
+ private IgniteCache<Integer, Integer> createMapQueryTestCache(IgniteEx
ignite) throws Exception {
+ IgniteCache<Integer, Integer> cache = ignite.createCache(
+ new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setIndexedTypes(Integer.class, Integer.class)
+ .setSqlFunctionClasses(GridTestUtils.SqlTestFunctions.class)
+ );
+
+ cache.put(0, 0);
+
+ awaitPartitionMapExchange();
+
+ return cache;
+ }
+
+ /** */
+ private boolean hasMapQueryView(IgniteEx originNode, String initiatorId) {
+ for (Ignite ignite : G.allGrids()) {
+ SystemView<SqlQueryView> view =
((IgniteEx)ignite).context().systemView().view(SQL_QRY_VIEW);
+
+ for (SqlQueryView qry : view) {
+ if (qry.mapQuery() &&
originNode.localNode().id().equals(qry.originNodeId()) &&
initiatorId.equals(qry.initiatorId()))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
/**
* Test that we can't use cache tables and system views in the same query.
*/