This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e2a24fc68db IGNITE-27313 SQL: Add information about query initiator id 
to log - Fixes #12573.
e2a24fc68db is described below

commit e2a24fc68dbded7aa7d75251f64c37232610ca4e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Dec 12 10:30:07 2025 +0300

    IGNITE-27313 SQL: Add information about query initiator id to log - Fixes 
#12573.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/CalciteQueryProcessor.java       |  3 +-
 .../query/calcite/QueryRegistryImpl.java           |  7 +---
 .../processors/query/calcite/RootQuery.java        | 19 +++++++++--
 .../integration/SqlDiagnosticIntegrationTest.java  | 24 ++++++++++++-
 .../qa/query/WarningOnBigQueryResultsBaseTest.java |  2 +-
 .../internal/processors/query/h2/H2DmlInfo.java    |  7 +++-
 .../internal/processors/query/h2/H2QueryInfo.java  | 20 +++++++----
 .../processors/query/h2/IgniteH2Indexing.java      |  4 ++-
 .../processors/query/h2/MapH2QueryInfo.java        | 14 ++++++--
 .../processors/query/h2/ReduceH2QueryInfo.java     |  5 +--
 .../query/h2/twostep/GridMapQueryExecutor.java     |  6 +++-
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  8 +++--
 .../query/h2/twostep/msg/GridH2QueryRequest.java   | 35 +++++++++++++++++++
 .../processors/query/LongRunningQueryTest.java     | 39 ++++++++++++++++++++++
 14 files changed, 164 insertions(+), 29 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 123fdad34d5..0d357e0c3b0 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -752,7 +752,8 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
             (q, ex) -> qryReg.unregister(q.id(), ex),
             log,
             qryPlannerTimeout,
-            timeout
+            timeout,
+            fldsQry != null ? fldsQry.getQueryInitiatorId() : null
         );
 
         if (qrys != null)
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index aecf0fe69d8..fd8b9062e97 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -22,7 +22,6 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.cache.query.QueryCancelledException;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -58,12 +57,8 @@ public class QueryRegistryImpl extends AbstractService 
implements QueryRegistry
 
             RunningQueryManager qryMgr = kctx.query().runningQueryManager();
 
-            SqlFieldsQuery fieldsQry = 
rootQry.context().unwrap(SqlFieldsQuery.class);
-
-            String initiatorId = fieldsQry != null ? 
fieldsQry.getQueryInitiatorId() : null;
-
             long locId = qryMgr.register(rootQry.sql(), 
GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
-                false, createCancelToken(qry), initiatorId, false, true, 
false);
+                false, createCancelToken(qry), rootQry.initiatorId(), false, 
true, false);
 
             rootQry.localQueryId(locId);
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index d0203251038..2fdd84e9152 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -91,6 +91,9 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
     /** */
     private final long totalTimeout;
 
+    /** */
+    private final String initiatorId;
+
     /** */
     private volatile long locQryId;
 
@@ -113,7 +116,8 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
         BiConsumer<Query<RowT>, Throwable> unregister,
         IgniteLogger log,
         long plannerTimeout,
-        long totalTimeout
+        long totalTimeout,
+        String initiatorId
     ) {
         super(
             UUID.randomUUID(),
@@ -135,6 +139,7 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
 
         this.plannerTimeout = totalTimeout > 0 ? Math.min(plannerTimeout, 
totalTimeout) : plannerTimeout;
         this.totalTimeout = totalTimeout;
+        this.initiatorId = initiatorId;
 
         Context parent = Commons.convert(qryCtx);
 
@@ -172,7 +177,9 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
             unregister,
             log,
             plannerTimeout,
-            totalTimeout);
+            totalTimeout,
+            initiatorId
+        );
     }
 
     /** */
@@ -444,6 +451,7 @@ public class RootQuery<RowT> extends Query<RowT> implements 
TrackableQuery {
             .append(", type=CALCITE")
             .append(", state=").append(state)
             .append(", schema=").append(ctx.schemaName())
+            .append(", initiatorId=").append(initiatorId)
             .append(", sql='").append(sql);
 
         msgSb.append(']');
@@ -468,6 +476,13 @@ public class RootQuery<RowT> extends Query<RowT> 
implements TrackableQuery {
         return curTimeout <= 0 ? 0 : curTimeout;
     }
 
+    /**
+     * @return Query initiator ID.
+     */
+    public String initiatorId() {
+        return initiatorId;
+    }
+
     /** */
     @Override public String toString() {
         return S.toString(RootQuery.class, this);
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 00bf5ebd4d1..40af652725f 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -1002,7 +1002,7 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
         }
     }
 
-    /** Verifies that user-defined query initiator ID is present in the 
SQL_QUERY_HISTORY system view. */
+    /** Verifies that user-defined query initiator ID is present in the 
SQL_QUERY_HISTORY system view and logs. */
     @Test
     public void testSqlFieldsQueryWithInitiatorId() throws Exception {
         IgniteEx grid = grid(0);
@@ -1027,6 +1027,18 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
                 return testId.equals(view.initiatorId());
             }, 3_000));
         }
+
+        String initiatorId = "testId2";
+
+        LogListener logLsnr = LogListener.matches(LONG_QUERY_FINISHED_MSG)
+            .andMatches("initiatorId=" + initiatorId).build();
+
+        log.registerListener(logLsnr);
+
+        cache.query(new SqlFieldsQuery("SELECT 
sleep(?)").setArgs(LONG_QRY_TIMEOUT).setQueryInitiatorId(initiatorId))
+            .getAll();
+
+        assertTrue(logLsnr.check(1000));
     }
 
     /**
@@ -1136,6 +1148,16 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
             return true;
         }
 
+        /** */
+        @QuerySqlFunction
+        public static boolean sleep(int sleep) {
+            doSleep(sleep);
+
+            GridTestClockTimer.update();
+
+            return true;
+        }
+
         /** */
         @QuerySqlFunction
         public static String innerSql(String ignite, String cache, String val) 
{
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
index d9e6863c51d..74842386bae 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
@@ -65,7 +65,7 @@ public class WarningOnBigQueryResultsBaseTest extends 
AbstractIndexingCommonTest
     /** Log message pattern. */
     private static final Pattern logPtrn = Pattern.compile(
         "fetched=([0-9]+), duration=([0-9]+)ms, type=(MAP|LOCAL|REDUCE), 
distributedJoin=(true|false), " +
-            "enforceJoinOrder=(true|false), lazy=(true|false), schema=(\\S+), 
sql");
+            "enforceJoinOrder=(true|false), lazy=(true|false), schema=(\\S+), 
initiatorId=(\\S+), sql");
 
     /** Test log. */
     private static Map<String, BigResultsLogListener> logListeners = new 
HashMap<>();
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
index 8f95b57e7dd..2a781a13d36 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
@@ -38,6 +38,9 @@ public class H2DmlInfo implements TrackableQuery {
     /** Schema name. */
     private final String schema;
 
+    /** Query initiator id. */
+    private final String initiatorId;
+
     /** Dml command. */
     private final String sql;
 
@@ -48,11 +51,12 @@ public class H2DmlInfo implements TrackableQuery {
      * @param schema Schema name.
      * @param sql Dml command.
      */
-    public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema, 
String sql) {
+    public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema, 
String initiatorId, String sql) {
         this.beginTs = beginTs;
         this.qryId = qryId;
         this.initNodeId = initNodeId;
         this.schema = schema;
+        this.initiatorId = initiatorId;
         this.sql = sql;
     }
 
@@ -76,6 +80,7 @@ public class H2DmlInfo implements TrackableQuery {
         msgSb.append(", duration=").append(time()).append("ms")
             .append(", type=DML")
             .append(", schema=").append(schema)
+            .append(", initiatorId=").append(initiatorId)
             .append(", sql='").append(sql).append("']");
 
         return msgSb.toString();
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
index a94d7a63861..5fb931e5d42 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
@@ -72,7 +72,10 @@ public class H2QueryInfo implements TrackableQuery {
     private final UUID nodeId;
 
     /** Query id. */
-    private final long queryId;
+    private final long qryId;
+
+    /** Query initiator ID. */
+    private final String initiatorId;
 
     /** Query SQL plan. */
     private volatile String plan;
@@ -82,16 +85,18 @@ public class H2QueryInfo implements TrackableQuery {
      * @param stmt Query statement.
      * @param sql Query statement.
      * @param nodeId Originator node id.
-     * @param queryId Query id.
+     * @param qryId Query id.
+     * @param initiatorId Query initiator id.
      */
-    public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql, 
UUID nodeId, long queryId) {
+    public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql, 
UUID nodeId, long qryId, String initiatorId) {
         try {
             assert stmt != null;
 
             this.type = type;
             this.sql = sql;
             this.nodeId = nodeId;
-            this.queryId = queryId;
+            this.qryId = qryId;
+            this.initiatorId = initiatorId;
 
             beginTs = U.currentTimeMillis();
 
@@ -116,7 +121,7 @@ public class H2QueryInfo implements TrackableQuery {
 
     /** */
     public long queryId() {
-        return queryId;
+        return qryId;
     }
 
     /** */
@@ -183,10 +188,10 @@ public class H2QueryInfo implements TrackableQuery {
     @Override public String queryInfo(@Nullable String additionalInfo) {
         StringBuilder msgSb = new StringBuilder();
 
-        if (queryId == RunningQueryManager.UNDEFINED_QUERY_ID)
+        if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID)
             msgSb.append(" [globalQueryId=(undefined), node=").append(nodeId);
         else
-            msgSb.append(" 
[globalQueryId=").append(QueryUtils.globalQueryId(nodeId, queryId));
+            msgSb.append(" 
[globalQueryId=").append(QueryUtils.globalQueryId(nodeId, qryId));
 
         if (additionalInfo != null)
             msgSb.append(", ").append(additionalInfo);
@@ -197,6 +202,7 @@ public class H2QueryInfo implements TrackableQuery {
                 .append(", enforceJoinOrder=").append(enforceJoinOrder)
                 .append(", lazy=").append(lazy)
                 .append(", schema=").append(schema)
+                .append(", initiatorId=").append(initiatorId)
                 .append(", sql='").append(sql)
                 .append("', plan=").append(plan());
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index d1ee3aeed1c..59fdfda0a83 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -435,7 +435,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     H2Utils.bindParameters(stmt, F.asList(params));
 
                     qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, 
stmt, qry,
-                        ctx.localNodeId(), qryId);
+                        ctx.localNodeId(), qryId, qryDesc.queryInitiatorId());
 
                     heavyQryTracker.startTracking(qryInfo);
 
@@ -1064,6 +1064,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                 qryId,
                 ctx.localNodeId(),
                 qryDesc.schemaName(),
+                qryDesc.queryInitiatorId(),
                 qryDesc.sql()
             );
 
@@ -1436,6 +1437,7 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                         return IgniteH2Indexing.this.rdcQryExec.query(
                             qryId,
                             qryDesc.schemaName(),
+                            qryDesc.queryInitiatorId(),
                             twoStepQry,
                             keepBinary,
                             qryDesc.enforceJoinOrder(),
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
index 5b1cccd495c..a159d2ac2c9 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
@@ -35,12 +35,20 @@ public class MapH2QueryInfo extends H2QueryInfo {
      * @param sql Query statement.
      * @param nodeId Originator node id.
      * @param qryId Query id.
+     * @param initiatorId Query initiator id.
      * @param reqId Request ID.
      * @param segment Segment.
      */
-    public MapH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, 
long qryId, long reqId,
-        int segment) {
-        super(QueryType.MAP, stmt, sql, nodeId, qryId);
+    public MapH2QueryInfo(
+        PreparedStatement stmt,
+        String sql,
+        UUID nodeId,
+        long qryId,
+        String initiatorId,
+        long reqId,
+        int segment
+    ) {
+        super(QueryType.MAP, stmt, sql, nodeId, qryId, initiatorId);
 
         this.reqId = reqId;
         this.segment = segment;
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
index ea88594a00d..9edbf595ede 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
@@ -32,10 +32,11 @@ public class ReduceH2QueryInfo extends H2QueryInfo {
      * @param sql Query statement.
      * @param nodeId Originator node id.
      * @param qryId Query id.
+     * @param initiatorId Query initiator id.
      * @param reqId Request ID.
      */
-    public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, 
long qryId, long reqId) {
-        super(QueryType.REDUCE, stmt, sql, nodeId, qryId);
+    public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId, 
long qryId, String initiatorId, long reqId) {
+        super(QueryType.REDUCE, stmt, sql, nodeId, qryId, initiatorId);
 
         this.reqId = reqId;
     }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e08b9a2be51..f2a4b459a7d 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -257,6 +257,7 @@ public class GridMapQueryExecutor {
                                 req.requestId(),
                                 segment0,
                                 req.schemaName(),
+                                req.queryInitiatorId(),
                                 req.queries(),
                                 cacheIds,
                                 req.topologyVersion(),
@@ -285,6 +286,7 @@ public class GridMapQueryExecutor {
                 req.requestId(),
                 firstSegment,
                 req.schemaName(),
+                req.queryInitiatorId(),
                 req.queries(),
                 cacheIds,
                 req.topologyVersion(),
@@ -312,6 +314,7 @@ public class GridMapQueryExecutor {
      * @param reqId Request ID.
      * @param segmentId index segment ID.
      * @param schemaName Schema name.
+     * @param qryInitiatorId Query initiator ID.
      * @param qrys Queries to execute.
      * @param cacheIds Caches which will be affected by these queries.
      * @param topVer Topology version.
@@ -332,6 +335,7 @@ public class GridMapQueryExecutor {
         final long reqId,
         final int segmentId,
         final String schemaName,
+        final String qryInitiatorId,
         final Collection<GridCacheSqlQuery> qrys,
         final List<Integer> cacheIds,
         final AffinityTopologyVersion topVer,
@@ -464,7 +468,7 @@ public class GridMapQueryExecutor {
 
                         H2Utils.bindParameters(stmt, params0);
 
-                        qryInfo = new MapH2QueryInfo(stmt, qry.query(), 
node.id(), qryId, reqId, segmentId);
+                        qryInfo = new MapH2QueryInfo(stmt, qry.query(), 
node.id(), qryId, qryInitiatorId, reqId, segmentId);
 
                         h2.heavyQueriesTracker().startTracking(qryInfo);
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 47040b1767c..951e98a1587 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -326,6 +326,7 @@ public class GridReduceQueryExecutor {
     /**
      * @param qryId Query ID.
      * @param schemaName Schema name.
+     * @param qryInitiatorId Query initiator ID.
      * @param qry Query.
      * @param keepBinary Keep binary.
      * @param enforceJoinOrder Enforce join order of tables.
@@ -342,6 +343,7 @@ public class GridReduceQueryExecutor {
     public Iterator<List<?>> query(
         long qryId,
         String schemaName,
+        String qryInitiatorId,
         final GridCacheTwoStepQuery qry,
         boolean keepBinary,
         boolean enforceJoinOrder,
@@ -438,7 +440,8 @@ public class GridReduceQueryExecutor {
                         .flags(queryFlags(qry, enforceJoinOrder, lazy, 
dataPageScanEnabled))
                         .timeout(timeoutMillis)
                         .explicitTimeout(true)
-                        .schemaName(schemaName);
+                        .schemaName(schemaName)
+                        .queryInitiatorId(qryInitiatorId);
 
                     final C2<ClusterNode, Message, Message> spec =
                         parts == null ? null : new 
ReducePartitionsSpecializer(mapping.queryPartitionsMap());
@@ -513,7 +516,7 @@ public class GridReduceQueryExecutor {
                         H2Utils.bindParameters(stmt, 
F.asList(rdc.parameters(params)));
 
                         qryInfo = new ReduceH2QueryInfo(stmt, 
qry.originalSql(),
-                            ctx.localNodeId(), qryId, qryReqId);
+                            ctx.localNodeId(), qryId, qryInitiatorId, 
qryReqId);
 
                         h2.heavyQueriesTracker().startTracking(qryInfo);
 
@@ -1257,7 +1260,6 @@ public class GridReduceQueryExecutor {
      * @return Table.
      * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings("unchecked")
     private ReduceTable createMergeTable(H2PooledConnection conn, 
GridCacheSqlQuery qry, boolean explain)
         throws IgniteCheckedException {
         try {
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 9b6fc078746..649e93ba958 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -148,6 +148,9 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
     /** Schema name. */
     private String schemaName;
 
+    /** Query initiator id. */
+    private String qryInitiatorId;
+
     /** Id of the query assigned by {@link RunningQueryManager} on originator 
node. */
     private long qryId;
 
@@ -178,6 +181,7 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
         params = req.params;
         paramsBytes = req.paramsBytes;
         schemaName = req.schemaName;
+        qryInitiatorId = req.qryInitiatorId;
         qryId = req.qryId;
         explicitTimeout = req.explicitTimeout;
     }
@@ -413,6 +417,23 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
         return this;
     }
 
+    /**
+     * @return Query initiator id.
+     */
+    public String queryInitiatorId() {
+        return qryInitiatorId;
+    }
+
+    /**
+     * @param qryInitiatorId Query initiator id.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest queryInitiatorId(String qryInitiatorId) {
+        this.qryInitiatorId = qryInitiatorId;
+
+        return this;
+    }
+
     /**
      * @param flags Flags.
      * @param dataPageScanEnabled {@code true} If data page scan enabled, 
{@code false} if not, and {@code null} if not set.
@@ -632,6 +653,12 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
                     return false;
 
                 writer.incrementState();
+
+            case 14:
+                if (!writer.writeString(qryInitiatorId))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -749,6 +776,14 @@ public class GridH2QueryRequest implements Message, 
GridCacheQueryMarshallable {
             case 13:
                 qryId = reader.readLong();
 
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 14:
+                qryInitiatorId = reader.readString();
+
                 if (!reader.isLastRead())
                     return false;
 
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index c381e8e60f9..a46f489cae3 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -51,6 +52,7 @@ import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonT
 import org.apache.ignite.internal.processors.query.h2.H2QueryInfo;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
+import org.apache.ignite.internal.util.GridTestClockTimer;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -64,6 +66,7 @@ import org.junit.runners.model.Statement;
 
 import static java.lang.Thread.currentThread;
 import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
+import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_FINISHED_MSG;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.h2.engine.Constants.DEFAULT_PAGE_SIZE;
 
@@ -517,6 +520,40 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
             cancelQuery(qryIds.poll());
     }
 
+    /**
+     * Verifies query initiator id information in logs.
+     */
+    @Test
+    @MultiNodeTest
+    public void testQueryInitiatorId() {
+        ListeningTestLogger testLog = testLog();
+
+        checkInitiatorId(testLog, "LOCAL", "SELECT sleep_func(?, 0)", 
LONG_QUERY_WARNING_TIMEOUT);
+
+        checkInitiatorId(testLog, "MAP", "SELECT val FROM test WHERE id = 
sleep_func(?, 0)",
+            LONG_QUERY_WARNING_TIMEOUT);
+
+        checkInitiatorId(testLog, "REDUCE", "SELECT sleep_func(?, sum(val)) 
FROM test WHERE id + 1 = 1",
+            LONG_QUERY_WARNING_TIMEOUT);
+
+        checkInitiatorId(testLog, "DML", "UPDATE test SET val = sleep_func(?, 
val) WHERE id = 0",
+            LONG_QUERY_WARNING_TIMEOUT);
+    }
+
+    /** */
+    private void checkInitiatorId(ListeningTestLogger log, String type, String 
sql, Object... args) {
+        String initiatorId = UUID.randomUUID().toString();
+
+        LogListener lsnr = 
LogListener.matches(LONG_QUERY_FINISHED_MSG).andMatches("type=" + type)
+            .andMatches("initiatorId=" + initiatorId).build();
+
+        log.registerListener(lsnr);
+
+        ignite.cache("test").query(new 
SqlFieldsQuery(sql).setQueryInitiatorId(initiatorId).setArgs(args)).getAll();
+
+        assertTrue(lsnr.check());
+    }
+
     /**
      * Do several fast queries.
      * Log messages must not contain info about long query.
@@ -765,6 +802,8 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         public static int sleep_func(int sleep, int val) {
             try {
                 Thread.sleep(sleep);
+
+                GridTestClockTimer.update();
             }
             catch (InterruptedException ignored) {
                 // No-op

Reply via email to