This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e2a24fc68db IGNITE-27313 SQL: Add information about query initiator id
to log - Fixes #12573.
e2a24fc68db is described below
commit e2a24fc68dbded7aa7d75251f64c37232610ca4e
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Dec 12 10:30:07 2025 +0300
IGNITE-27313 SQL: Add information about query initiator id to log - Fixes
#12573.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/CalciteQueryProcessor.java | 3 +-
.../query/calcite/QueryRegistryImpl.java | 7 +---
.../processors/query/calcite/RootQuery.java | 19 +++++++++--
.../integration/SqlDiagnosticIntegrationTest.java | 24 ++++++++++++-
.../qa/query/WarningOnBigQueryResultsBaseTest.java | 2 +-
.../internal/processors/query/h2/H2DmlInfo.java | 7 +++-
.../internal/processors/query/h2/H2QueryInfo.java | 20 +++++++----
.../processors/query/h2/IgniteH2Indexing.java | 4 ++-
.../processors/query/h2/MapH2QueryInfo.java | 14 ++++++--
.../processors/query/h2/ReduceH2QueryInfo.java | 5 +--
.../query/h2/twostep/GridMapQueryExecutor.java | 6 +++-
.../query/h2/twostep/GridReduceQueryExecutor.java | 8 +++--
.../query/h2/twostep/msg/GridH2QueryRequest.java | 35 +++++++++++++++++++
.../processors/query/LongRunningQueryTest.java | 39 ++++++++++++++++++++++
14 files changed, 164 insertions(+), 29 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 123fdad34d5..0d357e0c3b0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -752,7 +752,8 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
(q, ex) -> qryReg.unregister(q.id(), ex),
log,
qryPlannerTimeout,
- timeout
+ timeout,
+ fldsQry != null ? fldsQry.getQueryInitiatorId() : null
);
if (qrys != null)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index aecf0fe69d8..fd8b9062e97 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -22,7 +22,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.cache.query.QueryCancelledException;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -58,12 +57,8 @@ public class QueryRegistryImpl extends AbstractService
implements QueryRegistry
RunningQueryManager qryMgr = kctx.query().runningQueryManager();
- SqlFieldsQuery fieldsQry =
rootQry.context().unwrap(SqlFieldsQuery.class);
-
- String initiatorId = fieldsQry != null ?
fieldsQry.getQueryInitiatorId() : null;
-
long locId = qryMgr.register(rootQry.sql(),
GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
- false, createCancelToken(qry), initiatorId, false, true,
false);
+ false, createCancelToken(qry), rootQry.initiatorId(), false,
true, false);
rootQry.localQueryId(locId);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index d0203251038..2fdd84e9152 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -91,6 +91,9 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
/** */
private final long totalTimeout;
+ /** */
+ private final String initiatorId;
+
/** */
private volatile long locQryId;
@@ -113,7 +116,8 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
BiConsumer<Query<RowT>, Throwable> unregister,
IgniteLogger log,
long plannerTimeout,
- long totalTimeout
+ long totalTimeout,
+ String initiatorId
) {
super(
UUID.randomUUID(),
@@ -135,6 +139,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
this.plannerTimeout = totalTimeout > 0 ? Math.min(plannerTimeout,
totalTimeout) : plannerTimeout;
this.totalTimeout = totalTimeout;
+ this.initiatorId = initiatorId;
Context parent = Commons.convert(qryCtx);
@@ -172,7 +177,9 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
unregister,
log,
plannerTimeout,
- totalTimeout);
+ totalTimeout,
+ initiatorId
+ );
}
/** */
@@ -444,6 +451,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
.append(", type=CALCITE")
.append(", state=").append(state)
.append(", schema=").append(ctx.schemaName())
+ .append(", initiatorId=").append(initiatorId)
.append(", sql='").append(sql);
msgSb.append(']');
@@ -468,6 +476,13 @@ public class RootQuery<RowT> extends Query<RowT>
implements TrackableQuery {
return curTimeout <= 0 ? 0 : curTimeout;
}
+ /**
+ * @return Query initiator ID.
+ */
+ public String initiatorId() {
+ return initiatorId;
+ }
+
/** */
@Override public String toString() {
return S.toString(RootQuery.class, this);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 00bf5ebd4d1..40af652725f 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -1002,7 +1002,7 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
}
}
- /** Verifies that user-defined query initiator ID is present in the
SQL_QUERY_HISTORY system view. */
+ /** Verifies that user-defined query initiator ID is present in the
SQL_QUERY_HISTORY system view and logs. */
@Test
public void testSqlFieldsQueryWithInitiatorId() throws Exception {
IgniteEx grid = grid(0);
@@ -1027,6 +1027,18 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
return testId.equals(view.initiatorId());
}, 3_000));
}
+
+ String initiatorId = "testId2";
+
+ LogListener logLsnr = LogListener.matches(LONG_QUERY_FINISHED_MSG)
+ .andMatches("initiatorId=" + initiatorId).build();
+
+ log.registerListener(logLsnr);
+
+ cache.query(new SqlFieldsQuery("SELECT
sleep(?)").setArgs(LONG_QRY_TIMEOUT).setQueryInitiatorId(initiatorId))
+ .getAll();
+
+ assertTrue(logLsnr.check(1000));
}
/**
@@ -1136,6 +1148,16 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
return true;
}
+ /** */
+ @QuerySqlFunction
+ public static boolean sleep(int sleep) {
+ doSleep(sleep);
+
+ GridTestClockTimer.update();
+
+ return true;
+ }
+
/** */
@QuerySqlFunction
public static String innerSql(String ignite, String cache, String val)
{
diff --git
a/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
b/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
index d9e6863c51d..74842386bae 100644
---
a/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/qa/query/WarningOnBigQueryResultsBaseTest.java
@@ -65,7 +65,7 @@ public class WarningOnBigQueryResultsBaseTest extends
AbstractIndexingCommonTest
/** Log message pattern. */
private static final Pattern logPtrn = Pattern.compile(
"fetched=([0-9]+), duration=([0-9]+)ms, type=(MAP|LOCAL|REDUCE),
distributedJoin=(true|false), " +
- "enforceJoinOrder=(true|false), lazy=(true|false), schema=(\\S+),
sql");
+ "enforceJoinOrder=(true|false), lazy=(true|false), schema=(\\S+),
initiatorId=(\\S+), sql");
/** Test log. */
private static Map<String, BigResultsLogListener> logListeners = new
HashMap<>();
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
index 8f95b57e7dd..2a781a13d36 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
@@ -38,6 +38,9 @@ public class H2DmlInfo implements TrackableQuery {
/** Schema name. */
private final String schema;
+ /** Query initiator id. */
+ private final String initiatorId;
+
/** Dml command. */
private final String sql;
@@ -48,11 +51,12 @@ public class H2DmlInfo implements TrackableQuery {
* @param schema Schema name.
* @param sql Dml command.
*/
- public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema,
String sql) {
+ public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema,
String initiatorId, String sql) {
this.beginTs = beginTs;
this.qryId = qryId;
this.initNodeId = initNodeId;
this.schema = schema;
+ this.initiatorId = initiatorId;
this.sql = sql;
}
@@ -76,6 +80,7 @@ public class H2DmlInfo implements TrackableQuery {
msgSb.append(", duration=").append(time()).append("ms")
.append(", type=DML")
.append(", schema=").append(schema)
+ .append(", initiatorId=").append(initiatorId)
.append(", sql='").append(sql).append("']");
return msgSb.toString();
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
index a94d7a63861..5fb931e5d42 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
@@ -72,7 +72,10 @@ public class H2QueryInfo implements TrackableQuery {
private final UUID nodeId;
/** Query id. */
- private final long queryId;
+ private final long qryId;
+
+ /** Query initiator ID. */
+ private final String initiatorId;
/** Query SQL plan. */
private volatile String plan;
@@ -82,16 +85,18 @@ public class H2QueryInfo implements TrackableQuery {
* @param stmt Query statement.
* @param sql Query statement.
* @param nodeId Originator node id.
- * @param queryId Query id.
+ * @param qryId Query id.
+ * @param initiatorId Query initiator id.
*/
- public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql,
UUID nodeId, long queryId) {
+ public H2QueryInfo(QueryType type, PreparedStatement stmt, String sql,
UUID nodeId, long qryId, String initiatorId) {
try {
assert stmt != null;
this.type = type;
this.sql = sql;
this.nodeId = nodeId;
- this.queryId = queryId;
+ this.qryId = qryId;
+ this.initiatorId = initiatorId;
beginTs = U.currentTimeMillis();
@@ -116,7 +121,7 @@ public class H2QueryInfo implements TrackableQuery {
/** */
public long queryId() {
- return queryId;
+ return qryId;
}
/** */
@@ -183,10 +188,10 @@ public class H2QueryInfo implements TrackableQuery {
@Override public String queryInfo(@Nullable String additionalInfo) {
StringBuilder msgSb = new StringBuilder();
- if (queryId == RunningQueryManager.UNDEFINED_QUERY_ID)
+ if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID)
msgSb.append(" [globalQueryId=(undefined), node=").append(nodeId);
else
- msgSb.append("
[globalQueryId=").append(QueryUtils.globalQueryId(nodeId, queryId));
+ msgSb.append("
[globalQueryId=").append(QueryUtils.globalQueryId(nodeId, qryId));
if (additionalInfo != null)
msgSb.append(", ").append(additionalInfo);
@@ -197,6 +202,7 @@ public class H2QueryInfo implements TrackableQuery {
.append(", enforceJoinOrder=").append(enforceJoinOrder)
.append(", lazy=").append(lazy)
.append(", schema=").append(schema)
+ .append(", initiatorId=").append(initiatorId)
.append(", sql='").append(sql)
.append("', plan=").append(plan());
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index d1ee3aeed1c..59fdfda0a83 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -435,7 +435,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
H2Utils.bindParameters(stmt, F.asList(params));
qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL,
stmt, qry,
- ctx.localNodeId(), qryId);
+ ctx.localNodeId(), qryId, qryDesc.queryInitiatorId());
heavyQryTracker.startTracking(qryInfo);
@@ -1064,6 +1064,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
qryId,
ctx.localNodeId(),
qryDesc.schemaName(),
+ qryDesc.queryInitiatorId(),
qryDesc.sql()
);
@@ -1436,6 +1437,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
return IgniteH2Indexing.this.rdcQryExec.query(
qryId,
qryDesc.schemaName(),
+ qryDesc.queryInitiatorId(),
twoStepQry,
keepBinary,
qryDesc.enforceJoinOrder(),
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
index 5b1cccd495c..a159d2ac2c9 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/MapH2QueryInfo.java
@@ -35,12 +35,20 @@ public class MapH2QueryInfo extends H2QueryInfo {
* @param sql Query statement.
* @param nodeId Originator node id.
* @param qryId Query id.
+ * @param initiatorId Query initiator id.
* @param reqId Request ID.
* @param segment Segment.
*/
- public MapH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId,
long qryId, long reqId,
- int segment) {
- super(QueryType.MAP, stmt, sql, nodeId, qryId);
+ public MapH2QueryInfo(
+ PreparedStatement stmt,
+ String sql,
+ UUID nodeId,
+ long qryId,
+ String initiatorId,
+ long reqId,
+ int segment
+ ) {
+ super(QueryType.MAP, stmt, sql, nodeId, qryId, initiatorId);
this.reqId = reqId;
this.segment = segment;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
index ea88594a00d..9edbf595ede 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/ReduceH2QueryInfo.java
@@ -32,10 +32,11 @@ public class ReduceH2QueryInfo extends H2QueryInfo {
* @param sql Query statement.
* @param nodeId Originator node id.
* @param qryId Query id.
+ * @param initiatorId Query initiator id.
* @param reqId Request ID.
*/
- public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId,
long qryId, long reqId) {
- super(QueryType.REDUCE, stmt, sql, nodeId, qryId);
+ public ReduceH2QueryInfo(PreparedStatement stmt, String sql, UUID nodeId,
long qryId, String initiatorId, long reqId) {
+ super(QueryType.REDUCE, stmt, sql, nodeId, qryId, initiatorId);
this.reqId = reqId;
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index e08b9a2be51..f2a4b459a7d 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -257,6 +257,7 @@ public class GridMapQueryExecutor {
req.requestId(),
segment0,
req.schemaName(),
+ req.queryInitiatorId(),
req.queries(),
cacheIds,
req.topologyVersion(),
@@ -285,6 +286,7 @@ public class GridMapQueryExecutor {
req.requestId(),
firstSegment,
req.schemaName(),
+ req.queryInitiatorId(),
req.queries(),
cacheIds,
req.topologyVersion(),
@@ -312,6 +314,7 @@ public class GridMapQueryExecutor {
* @param reqId Request ID.
* @param segmentId index segment ID.
* @param schemaName Schema name.
+ * @param qryInitiatorId Query initiator ID.
* @param qrys Queries to execute.
* @param cacheIds Caches which will be affected by these queries.
* @param topVer Topology version.
@@ -332,6 +335,7 @@ public class GridMapQueryExecutor {
final long reqId,
final int segmentId,
final String schemaName,
+ final String qryInitiatorId,
final Collection<GridCacheSqlQuery> qrys,
final List<Integer> cacheIds,
final AffinityTopologyVersion topVer,
@@ -464,7 +468,7 @@ public class GridMapQueryExecutor {
H2Utils.bindParameters(stmt, params0);
- qryInfo = new MapH2QueryInfo(stmt, qry.query(),
node.id(), qryId, reqId, segmentId);
+ qryInfo = new MapH2QueryInfo(stmt, qry.query(),
node.id(), qryId, qryInitiatorId, reqId, segmentId);
h2.heavyQueriesTracker().startTracking(qryInfo);
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 47040b1767c..951e98a1587 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -326,6 +326,7 @@ public class GridReduceQueryExecutor {
/**
* @param qryId Query ID.
* @param schemaName Schema name.
+ * @param qryInitiatorId Query initiator ID.
* @param qry Query.
* @param keepBinary Keep binary.
* @param enforceJoinOrder Enforce join order of tables.
@@ -342,6 +343,7 @@ public class GridReduceQueryExecutor {
public Iterator<List<?>> query(
long qryId,
String schemaName,
+ String qryInitiatorId,
final GridCacheTwoStepQuery qry,
boolean keepBinary,
boolean enforceJoinOrder,
@@ -438,7 +440,8 @@ public class GridReduceQueryExecutor {
.flags(queryFlags(qry, enforceJoinOrder, lazy,
dataPageScanEnabled))
.timeout(timeoutMillis)
.explicitTimeout(true)
- .schemaName(schemaName);
+ .schemaName(schemaName)
+ .queryInitiatorId(qryInitiatorId);
final C2<ClusterNode, Message, Message> spec =
parts == null ? null : new
ReducePartitionsSpecializer(mapping.queryPartitionsMap());
@@ -513,7 +516,7 @@ public class GridReduceQueryExecutor {
H2Utils.bindParameters(stmt,
F.asList(rdc.parameters(params)));
qryInfo = new ReduceH2QueryInfo(stmt,
qry.originalSql(),
- ctx.localNodeId(), qryId, qryReqId);
+ ctx.localNodeId(), qryId, qryInitiatorId,
qryReqId);
h2.heavyQueriesTracker().startTracking(qryInfo);
@@ -1257,7 +1260,6 @@ public class GridReduceQueryExecutor {
* @return Table.
* @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings("unchecked")
private ReduceTable createMergeTable(H2PooledConnection conn,
GridCacheSqlQuery qry, boolean explain)
throws IgniteCheckedException {
try {
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 9b6fc078746..649e93ba958 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -148,6 +148,9 @@ public class GridH2QueryRequest implements Message,
GridCacheQueryMarshallable {
/** Schema name. */
private String schemaName;
+ /** Query initiator id. */
+ private String qryInitiatorId;
+
/** Id of the query assigned by {@link RunningQueryManager} on originator
node. */
private long qryId;
@@ -178,6 +181,7 @@ public class GridH2QueryRequest implements Message,
GridCacheQueryMarshallable {
params = req.params;
paramsBytes = req.paramsBytes;
schemaName = req.schemaName;
+ qryInitiatorId = req.qryInitiatorId;
qryId = req.qryId;
explicitTimeout = req.explicitTimeout;
}
@@ -413,6 +417,23 @@ public class GridH2QueryRequest implements Message,
GridCacheQueryMarshallable {
return this;
}
+ /**
+ * @return Query initiator id.
+ */
+ public String queryInitiatorId() {
+ return qryInitiatorId;
+ }
+
+ /**
+ * @param qryInitiatorId Query initiator id.
+ * @return {@code this}.
+ */
+ public GridH2QueryRequest queryInitiatorId(String qryInitiatorId) {
+ this.qryInitiatorId = qryInitiatorId;
+
+ return this;
+ }
+
/**
* @param flags Flags.
* @param dataPageScanEnabled {@code true} If data page scan enabled,
{@code false} if not, and {@code null} if not set.
@@ -632,6 +653,12 @@ public class GridH2QueryRequest implements Message,
GridCacheQueryMarshallable {
return false;
writer.incrementState();
+
+ case 14:
+ if (!writer.writeString(qryInitiatorId))
+ return false;
+
+ writer.incrementState();
}
return true;
@@ -749,6 +776,14 @@ public class GridH2QueryRequest implements Message,
GridCacheQueryMarshallable {
case 13:
qryId = reader.readLong();
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
+ qryInitiatorId = reader.readString();
+
if (!reader.isLastRead())
return false;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index c381e8e60f9..a46f489cae3 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -51,6 +52,7 @@ import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonT
import org.apache.ignite.internal.processors.query.h2.H2QueryInfo;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
+import org.apache.ignite.internal.util.GridTestClockTimer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.testframework.GridTestUtils;
@@ -64,6 +66,7 @@ import org.junit.runners.model.Statement;
import static java.lang.Thread.currentThread;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
+import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_FINISHED_MSG;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.h2.engine.Constants.DEFAULT_PAGE_SIZE;
@@ -517,6 +520,40 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
cancelQuery(qryIds.poll());
}
+ /**
+ * Verifies query initiator id information in logs.
+ */
+ @Test
+ @MultiNodeTest
+ public void testQueryInitiatorId() {
+ ListeningTestLogger testLog = testLog();
+
+ checkInitiatorId(testLog, "LOCAL", "SELECT sleep_func(?, 0)",
LONG_QUERY_WARNING_TIMEOUT);
+
+ checkInitiatorId(testLog, "MAP", "SELECT val FROM test WHERE id =
sleep_func(?, 0)",
+ LONG_QUERY_WARNING_TIMEOUT);
+
+ checkInitiatorId(testLog, "REDUCE", "SELECT sleep_func(?, sum(val))
FROM test WHERE id + 1 = 1",
+ LONG_QUERY_WARNING_TIMEOUT);
+
+ checkInitiatorId(testLog, "DML", "UPDATE test SET val = sleep_func(?,
val) WHERE id = 0",
+ LONG_QUERY_WARNING_TIMEOUT);
+ }
+
+ /** */
+ private void checkInitiatorId(ListeningTestLogger log, String type, String
sql, Object... args) {
+ String initiatorId = UUID.randomUUID().toString();
+
+ LogListener lsnr =
LogListener.matches(LONG_QUERY_FINISHED_MSG).andMatches("type=" + type)
+ .andMatches("initiatorId=" + initiatorId).build();
+
+ log.registerListener(lsnr);
+
+ ignite.cache("test").query(new
SqlFieldsQuery(sql).setQueryInitiatorId(initiatorId).setArgs(args)).getAll();
+
+ assertTrue(lsnr.check());
+ }
+
/**
* Do several fast queries.
* Log messages must not contain info about long query.
@@ -765,6 +802,8 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
public static int sleep_func(int sleep, int val) {
try {
Thread.sleep(sleep);
+
+ GridTestClockTimer.update();
}
catch (InterruptedException ignored) {
// No-op