This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 01ba73c10b3 IGNITE-22925 Add long queries tracking for DML operations
- Fixes #11535.
01ba73c10b3 is described below
commit 01ba73c10b303292d3f98e61545579e276389286
Author: 21518201 <[email protected]>
AuthorDate: Mon Oct 14 17:59:55 2024 +0300
IGNITE-22925 Add long queries tracking for DML operations - Fixes #11535.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../internal/processors/query/h2/H2DmlInfo.java | 83 ++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 15 ++
.../processors/query/LongRunningQueryTest.java | 173 +++++++++++++++++++++
3 files changed, 271 insertions(+)
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
new file mode 100644
index 00000000000..8f95b57e7dd
--- /dev/null
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlInfo.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.util.UUID;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
+import org.apache.ignite.internal.processors.query.running.TrackableQuery;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class H2DmlInfo implements TrackableQuery {
+ /** Begin timestamp. */
+ private final long beginTs;
+
+ /** Query id. */
+ private final long qryId;
+
+ /** Initiator node id. */
+ private final UUID initNodeId;
+
+ /** Schema name. */
+ private final String schema;
+
+ /** Dml command. */
+ private final String sql;
+
+ /**
+ * @param beginTs Begin timestamp.
+ * @param qryId Query id.
+ * @param initNodeId Initiator node id.
+ * @param schema Schema name.
+ * @param sql Dml command.
+ */
+ public H2DmlInfo(long beginTs, long qryId, UUID initNodeId, String schema,
String sql) {
+ this.beginTs = beginTs;
+ this.qryId = qryId;
+ this.initNodeId = initNodeId;
+ this.schema = schema;
+ this.sql = sql;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long time() {
+ return U.currentTimeMillis() - beginTs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String queryInfo(@Nullable String additionalInfo) {
+ StringBuilder msgSb = new StringBuilder();
+
+ if (qryId == RunningQueryManager.UNDEFINED_QUERY_ID)
+ msgSb.append(" [globalQueryId=(undefined),
node=").append(initNodeId);
+ else
+ msgSb.append("
[globalQueryId=").append(QueryUtils.globalQueryId(initNodeId, qryId));
+
+ if (additionalInfo != null)
+ msgSb.append(", ").append(additionalInfo);
+
+ msgSb.append(", duration=").append(time()).append("ms")
+ .append(", type=DML")
+ .append(", schema=").append(schema)
+ .append(", sql='").append(sql).append("']");
+
+ return msgSb.toString();
+ }
+}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 65600be66a7..a759fe3f09f 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1044,6 +1044,8 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
Exception failReason = null;
+ H2DmlInfo dmlInfo = null;
+
try (TraceSurroundings ignored =
MTC.support(ctx.tracing().create(SQL_DML_QRY_EXECUTE, MTC.span()))) {
if (!updateInTxAllowed && ctx.cache().context().tm().inUserTx()) {
throw new IgniteSQLException("DML statements are not allowed
inside a transaction over " +
@@ -1051,6 +1053,16 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
"\"-DIGNITE_ALLOW_DML_INSIDE_TRANSACTION=true\")");
}
+ dmlInfo = new H2DmlInfo(
+ U.currentTimeMillis(),
+ qryId,
+ ctx.localNodeId(),
+ qryDesc.schemaName(),
+ qryDesc.sql()
+ );
+
+ heavyQueriesTracker().startTracking(dmlInfo);
+
if (!qryDesc.local()) {
return executeUpdateDistributed(
qryId,
@@ -1101,6 +1113,9 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
", params=" + S.toString(QueryParameters.class, qryParams)
+ "]", e);
}
finally {
+ if (dmlInfo != null)
+ heavyQueriesTracker().stopTracking(dmlInfo, failReason);
+
runningQueryManager().unregister(qryId, failReason);
}
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index a7faf240c2d..d9db970ebdd 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -32,6 +32,9 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.query.h2.H2QueryInfo;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -54,9 +57,36 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
/** Keys count. */
private static final int KEY_CNT = 1000;
+ /** Long query warning timeout. */
+ private static final int LONG_QUERY_WARNING_TIMEOUT = 1000;
+
/** External wait time. */
private static final int EXT_WAIT_TIME = 2000;
+ /** Insert. */
+ private static final String INSERT_SQL = "insert into test (_key, _val)
values (1001, wait_func())";
+
+ /** Insert with a subquery. */
+ private static final String INSERT_WITH_SUBQUERY_SQL = "insert into test
(_key, _val) select p._key, p.orgId from " +
+ "\"pers\".Person p where p._key < wait_func()";
+
+ /** Update. */
+ private static final String UPDATE_SQL = "update test set _val =
wait_func() where _key = 1";
+
+ /** Update with a subquery. */
+ private static final String UPDATE_WITH_SUBQUERY_SQL = "update test set
_val = 111 where _key in " +
+ "(select p._key from \"pers\".Person p where p._key < wait_func())";
+
+ /** Delete. */
+ private static final String DELETE_SQL = "delete from test where _key =
wait_func()";
+
+ /** Delete with a subquery. */
+ private static final String DELETE_WITH_SUBQUERY_SQL = "delete from test
where _key in " +
+ "(select p._key from \"pers\".Person p where p._key < wait_func())";
+
+ /** Log listener for long DMLs. */
+ private static LogListener lsnrDml;
+
/** Page size. */
private int pageSize = DEFAULT_PAGE_SIZE;
@@ -75,6 +105,13 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
/** Ignite instance. */
private Ignite ignite;
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ return cfg.setSqlConfiguration(new
SqlConfiguration().setLongQueryWarningTimeout(LONG_QUERY_WARNING_TIMEOUT));
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
@@ -95,6 +132,17 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
for (long i = 0; i < KEY_CNT; ++i)
c.put(i, i);
+
+ IgniteCache c2 = grid().createCache(cacheConfig("pers", Integer.class,
Person.class));
+
+ c2.put(1001, new Person(1, "p1"));
+
+ lsnrDml = LogListener
+ .matches(LONG_QUERY_EXEC_MSG)
+ .andMatches(s -> s.contains("type=DML"))
+ .build();
+
+ testLog().registerListener(lsnrDml);
}
/** {@inheritDoc} */
@@ -182,6 +230,102 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
checkFastQueries();
}
+ /** */
+ @Test
+ public void testLongRunningInsert() {
+ local = false;
+
+ runDml(INSERT_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningInsertWithSubquery() {
+ local = false;
+
+ runDml(INSERT_WITH_SUBQUERY_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningInsertLocal() {
+ local = true;
+
+ runDml(INSERT_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningInsertWithSubqueryLocal() {
+ local = true;
+
+ runDml(INSERT_WITH_SUBQUERY_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningUpdate() {
+ local = false;
+
+ runDml(UPDATE_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningUpdateWithSubquery() {
+ local = false;
+
+ runDml(UPDATE_WITH_SUBQUERY_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningUpdateLocal() {
+ local = true;
+
+ runDml(UPDATE_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningUpdateWithSubqueryLocal() {
+ local = true;
+
+ runDml(UPDATE_WITH_SUBQUERY_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningDelete() {
+ local = false;
+
+ runDml(DELETE_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningDeleteWithSubquery() {
+ local = false;
+
+ runDml(DELETE_WITH_SUBQUERY_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningDeleteLocal() {
+ local = true;
+
+ runDml(DELETE_SQL);
+ }
+
+ /** */
+ @Test
+ public void testLongRunningDeleteWithSubqueryLocal() {
+ local = true;
+
+ runDml(DELETE_WITH_SUBQUERY_SQL);
+ }
+
/**
* Test checks that no long-running queries warnings are printed in case
of external waits during
* the execution of distributed queries.
@@ -424,6 +568,21 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
}
}
+ /**
+ * @param dml Dml command.
+ */
+ public void runDml(String dml) {
+ lazy = false;
+
+ long start = U.currentTimeMillis();
+
+ sql("test", dml);
+
+ assertTrue((U.currentTimeMillis() - start) >
LONG_QUERY_WARNING_TIMEOUT);
+
+ assertTrue(lsnrDml.check());
+ }
+
/**
* Utility class with custom SQL functions.
*/
@@ -443,6 +602,20 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
}
return v;
}
+
+ /** */
+ @SuppressWarnings("unused")
+ @QuerySqlFunction
+ public static int wait_func() {
+ try {
+ GridTestUtils.waitForCondition(() -> lsnrDml.check(), 10_000);
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ // No-op
+ }
+
+ return 1;
+ }
}
/**