This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e457dcc98e5 IGNITE-25449 Fix memory leak in HeavyQueriesTracker - 
Fixes #12087.
e457dcc98e5 is described below

commit e457dcc98e53cf131a90db78d1d5814a1e3a565b
Author: oleg-vlsk <[email protected]>
AuthorDate: Thu Jun 19 10:56:36 2025 +0300

    IGNITE-25449 Fix memory leak in HeavyQueriesTracker - Fixes #12087.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../integration/SqlDiagnosticIntegrationTest.java  |  99 ++++++++
 .../query/running/HeavyQueriesTracker.java         |   4 +-
 .../query/h2/twostep/GridMapQueryExecutor.java     |   3 -
 .../query/h2/twostep/MapQueryResult.java           |   2 +
 .../processors/query/LongRunningQueryTest.java     | 258 +++++++++++++++++++--
 5 files changed, 336 insertions(+), 30 deletions(-)

diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 141801edea5..ef625931ba9 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -36,7 +36,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
@@ -53,6 +55,7 @@ import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.events.SqlQueryExecutionEvent;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
 import 
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
@@ -61,10 +64,12 @@ import 
org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.calcite.Query;
 import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
+import org.apache.ignite.internal.processors.query.calcite.RootQuery;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.AbstractQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.typedef.F;
@@ -96,6 +101,7 @@ import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTr
 import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
 import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_FINISHED_MSG;
 import static 
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  * Test SQL diagnostic tools.
@@ -781,6 +787,72 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
         }
 
         assertTrue(logLsnr2.check(1000L));
+
+        assertTrue(isHeavyQueriesTrackerEmpty());
+    }
+
+    /**
+     * Verifies that once the query is fully fetched, it is no longer tracked 
and its information encapsulated in a
+     * {@link RootQuery} instance is removed from {@link HeavyQueriesTracker}.
+     */
+    @Test
+    public void testEmptyHeavyQueriesTrackerWithFullyFetchedIterator() throws 
IgniteInterruptedCheckedException {
+        Iterator<?> it = runNotFullyFetchedQuery(false).iterator();
+
+        assertFalse(isHeavyQueriesTrackerEmpty());
+
+        it.forEachRemaining(x -> {});
+
+        assertTrue(waitForCondition(this::isHeavyQueriesTrackerEmpty, 1_000));
+    }
+
+    /**
+     * Verifies that once the cursor of a not fully fetched query is closed, 
it is no longer tracked and its information
+     * encapsulated in a {@link RootQuery} instance is removed from {@link 
HeavyQueriesTracker}.
+     */
+    @Test
+    public void testEmptyHeavyQueriesTrackerWithClosedCursor() throws 
IgniteInterruptedCheckedException {
+        FieldsQueryCursor<List<?>> cursor = runNotFullyFetchedQuery(false);
+
+        assertFalse(isHeavyQueriesTrackerEmpty());
+
+        cursor.close();
+
+        assertTrue(waitForCondition(this::isHeavyQueriesTrackerEmpty, 1_000));
+    }
+
+    /**
+     * Verifies that once a not fully fetched query is cancelled, it is no 
longer tracked and its information
+     * encapsulated in a {@link RootQuery} instance is removed from {@link 
HeavyQueriesTracker}.
+     */
+    @Test
+    public void testEmptyHeavyQueriesTrackerWithCancelledQuery() throws 
IgniteInterruptedCheckedException {
+        runNotFullyFetchedQuery(false);
+
+        assertFalse(isHeavyQueriesTrackerEmpty());
+
+        RootQuery<?> rootQry = 
(RootQuery<?>)heavyQueriesTracker().getQueries().iterator().next();
+
+        grid(0).context().query().cancelQuery(rootQry.localQueryId(), 
rootQry.initiatorNodeId(), false);
+
+        assertTrue(waitForCondition(this::isHeavyQueriesTrackerEmpty, 1_000));
+    }
+
+    /**
+     * Verifies that once a not fully fetched local query is cancelled, it is 
no longer tracked and its information
+     * encapsulated in a {@link RootQuery} instance is removed from {@link 
HeavyQueriesTracker}.
+     */
+    @Test
+    public void testEmptyHeavyQueriesTrackerWithCancelledLocalQuery() throws 
IgniteInterruptedCheckedException {
+        runNotFullyFetchedQuery(true);
+
+        assertFalse(isHeavyQueriesTrackerEmpty());
+
+        RootQuery<?> rootQry = 
(RootQuery<?>)heavyQueriesTracker().getQueries().iterator().next();
+
+        
grid(0).context().query().cancelLocalQueries(Set.of(rootQry.localQueryId()));
+
+        assertTrue(waitForCondition(this::isHeavyQueriesTrackerEmpty, 1_000));
     }
 
     /** */
@@ -925,6 +997,33 @@ public class SqlDiagnosticIntegrationTest extends 
AbstractBasicIntegrationTest {
         }
     }
 
+    /** */
+    private FieldsQueryCursor<List<?>> runNotFullyFetchedQuery(boolean loc) {
+        IgniteCache<Long, Long> cache = grid(0).createCache(new 
CacheConfiguration<Long, Long>()
+            .setName("test")
+            .setQueryEntities(Collections.singleton(new 
QueryEntity(Long.class, Long.class)
+                .setTableName("test")
+                .addQueryField("id", Long.class.getName(), null)
+                .addQueryField("val", Long.class.getName(), null)
+                .setKeyFieldName("id")
+                .setValueFieldName("val"))));
+
+        for (long i = 0; i < 10; ++i)
+            cache.put(i, i);
+
+        return cache.query(new SqlFieldsQuery("select * from 
test").setLocal(loc).setPageSize(1));
+    }
+
+    /** */
+    private HeavyQueriesTracker heavyQueriesTracker() {
+        return 
grid(0).context().query().runningQueryManager().heavyQueriesTracker();
+    }
+
+    /** */
+    private boolean isHeavyQueriesTrackerEmpty() {
+        return heavyQueriesTracker().getQueries().isEmpty();
+    }
+
     /** */
     public static class FunctionsLibrary {
         /** */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
index 1a3654a415a..69b8e635198 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
@@ -143,9 +143,7 @@ public final class HeavyQueriesTracker {
     public void stopTracking(TrackableQuery qryInfo, @Nullable Throwable err) {
         assert qryInfo != null;
 
-        qrys.remove(qryInfo);
-
-        if (qryInfo.time() > timeout) {
+        if (qrys.remove(qryInfo) != null && qryInfo.time() > timeout) {
             if (err == null)
                 LT.warn(log, LONG_QUERY_FINISHED_MSG + 
qryInfo.queryInfo(null));
             else
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 2a0e8824190..47d3000acf0 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -985,9 +985,6 @@ public class GridMapQueryExecutor {
             if (last) {
                 qr.closeResult(qry);
 
-                if (res.qryInfo() != null)
-                    h2.heavyQueriesTracker().stopTracking(res.qryInfo(), null);
-
                 if (qr.isAllClosed()) {
                     nodeRess.remove(qr.queryRequestId(), segmentId, qr);
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index 441a7f0a97d..0d844d289f5 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -388,6 +388,8 @@ class MapQueryResult {
             }
 
             U.close(rs, log);
+
+            h2.heavyQueriesTracker().stopTracking(qryInfo, null);
         }
     }
 }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index d9db970ebdd..c381e8e60f9 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -17,13 +17,24 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+import java.util.ArrayDeque;
 import java.util.Collections;
+import java.util.Deque;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
@@ -34,6 +45,7 @@ import 
org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
 import org.apache.ignite.internal.processors.query.h2.H2QueryInfo;
@@ -44,10 +56,15 @@ import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
 
 import static java.lang.Thread.currentThread;
 import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 import static org.h2.engine.Constants.DEFAULT_PAGE_SIZE;
 
 /**
@@ -57,6 +74,9 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
     /** Keys count. */
     private static final int KEY_CNT = 1000;
 
+    /** Number of keys to be queries in lazy queries. */
+    private static final int LAZY_QRYS_KEY_CNT = 5;
+
     /** Long query warning timeout. */
     private static final int LONG_QUERY_WARNING_TIMEOUT = 1000;
 
@@ -87,6 +107,15 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
     /** Log listener for long DMLs. */
     private static LogListener lsnrDml;
 
+    /** Multi-node test rule. */
+    @Rule
+    public final MultiNodeTestRule multiNodeTestRule = new MultiNodeTestRule();
+
+    /** Annotation for the {@link MultiNodeTestRule}. */
+    @Retention(RetentionPolicy.RUNTIME)
+    @Target(ElementType.METHOD)
+    public @interface MultiNodeTest {}
+
     /** Page size. */
     private int pageSize = DEFAULT_PAGE_SIZE;
 
@@ -112,13 +141,9 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         return cfg.setSqlConfiguration(new 
SqlConfiguration().setLongQueryWarningTimeout(LONG_QUERY_WARNING_TIMEOUT));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        ignite = startGrid();
-
-        IgniteCache c = grid().createCache(new CacheConfiguration<Long, Long>()
+    /** */
+    private void prepareTestEnvironment() {
+        IgniteCache c = grid(0).createCache(new CacheConfiguration<Long, 
Long>()
             .setName("test")
             .setQueryEntities(Collections.singleton(new 
QueryEntity(Long.class, Long.class)
                 .setTableName("test")
@@ -133,7 +158,7 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         for (long i = 0; i < KEY_CNT; ++i)
             c.put(i, i);
 
-        IgniteCache c2 = grid().createCache(cacheConfig("pers", Integer.class, 
Person.class));
+        IgniteCache c2 = grid(0).createCache(cacheConfig("pers", 
Integer.class, Person.class));
 
         c2.put(1001, new Person(1, "p1"));
 
@@ -147,6 +172,8 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
+        checkQryInfoCount(0);
+
         stopAllGrids();
 
         super.afterTest();
@@ -392,6 +419,104 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         checkBigResultSet();
     }
 
+    /**
+     * Verifies that while a query is not fully fetched, its {@link 
H2QueryInfo} is kept in {@link HeavyQueriesTracker}
+     * on all cluster nodes and its {@link H2QueryInfo#isSuspended()} returns 
{@code true}. Then, once the query is fully
+     * fetched, its {@link H2QueryInfo} is removed from {@link 
HeavyQueriesTracker}.
+     */
+    @Test
+    @MultiNodeTest
+    public void testEmptyHeavyQueriesTrackerWithFullyFetchedIterator() {
+        Iterator<?> it = queryCursor(false).iterator();
+
+        checkQryInfoCount(gridCount());
+
+        H2QueryInfo qry = 
(H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next();
+
+        assertTrue(qry.isSuspended());
+
+        it.forEachRemaining(x -> {});
+    }
+
+    /**
+     * Verifies that when the cursor of a not fully fetched query is closed, 
its {@link H2QueryInfo} is removed from
+     * {@link HeavyQueriesTracker} on all cluster nodes.
+     */
+    @Test
+    @MultiNodeTest
+    public void testEmptyHeavyQueriesTrackerWithClosedCursor() {
+        FieldsQueryCursor<List<?>> cursor = queryCursor(false);
+
+        cursor.iterator().next();
+
+        checkQryInfoCount(gridCount());
+
+        H2QueryInfo qryInfo = 
(H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next();
+
+        assertTrue(qryInfo.isSuspended());
+
+        cursor.close();
+    }
+
+    /**
+     * Verifies that when a not fully fetched query is cancelled, its {@link 
H2QueryInfo} is removed from
+     * {@link HeavyQueriesTracker} on all cluster nodes.
+     */
+    @Test
+    @MultiNodeTest
+    public void testEmptyHeavyQueriesTrackerWithCancelledQuery() {
+        long qryId = runNotFullyFetchedQuery(false);
+
+        checkQryInfoCount(gridCount());
+
+        cancelQuery(qryId);
+    }
+
+    /**
+     * Verifies that when a not fully fetched local query is cancelled, its 
{@link H2QueryInfo} is removed from
+     * {@link HeavyQueriesTracker} on all cluster nodes.
+     */
+    @Test
+    @MultiNodeTest
+    public void testEmptyHeavyQueriesTrackerWithCancelledLocalQuery() {
+        long qryId = runNotFullyFetchedQuery(true);
+
+        checkQryInfoCount(1);
+
+        ((IgniteEx)ignite).context().query().cancelLocalQueries(Set.of(qryId));
+    }
+
+    /**
+     * Verifies that when multiple not fully fetched queries are cancelled 
separately, corresponding
+     * {@link H2QueryInfo} instances are removed from {@link 
HeavyQueriesTracker} on all cluster nodes.
+     */
+    @Test
+    @MultiNodeTest
+    public void testEmptyHeavyQueriesTrackerWithMultipleCancelledQueries() {
+        int qryCnt = 4;
+        int cnldQryCnt = 2;
+
+        for (int i = 0; i < qryCnt; i++)
+            runNotFullyFetchedQuery(false);
+
+        checkQryInfoCount(gridCount() * qryCnt);
+
+        Deque<Long> qryIds = new ArrayDeque<>(getQueryIdsOnNode(0));
+
+        Set<Long> cnldQryIds = new HashSet<>();
+
+        for (int i = 0; i < cnldQryCnt; i++)
+            cnldQryIds.add(cancelQuery(qryIds.poll()));
+
+        checkQryInfoCount(gridCount() * (qryCnt - cnldQryCnt));
+
+        for (int i = 0; i < gridCount(); i++)
+            assertTrue(getQueryIdsOnNode(i).stream().allMatch(id -> 
!cnldQryIds.contains(id) && qryIds.contains(id)));
+
+        while (!qryIds.isEmpty())
+            cancelQuery(qryIds.poll());
+    }
+
     /**
      * Do several fast queries.
      * Log messages must not contain info about long query.
@@ -467,7 +592,7 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         pageSize = 1;
 
         try {
-            assertFalse(sql("test", sql, args).iterator().next().isEmpty());
+            assertEquals(LAZY_QRYS_KEY_CNT, sql("test", sql, 
args).getAll().size());
         }
         finally {
             pageSize = DEFAULT_PAGE_SIZE;
@@ -507,14 +632,14 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
     private void sqlCheckLongRunning() {
         if (lazy && withMergeTable) {
             String select = "select o.name n1, p.name n2 from Person p, 
\"org\".Organization o" +
-                " where p.orgId = o._key and o._key=1 and o._key < 
sleep_func(?)" +
+                " where p.orgId = o._key and o._key=1 and o._key < 
sleep_func(?, ?)" +
                 " union select o.name n1, p.name n2 from Person p, 
\"org\".Organization o" +
                 " where p.orgId = o._key and o._key=2";
 
-            sqlCheckLongRunningLazyWithMergeTable(select, 2000);
+            sqlCheckLongRunningLazyWithMergeTable(select, 2000, 
LAZY_QRYS_KEY_CNT);
         }
         else if (lazy && !withMergeTable)
-            sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < 
sleep_func(?)", 2000);
+            sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < 
sleep_func(?, ?)", 2000, LAZY_QRYS_KEY_CNT);
         else
             sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, 
test AS T2 where T0.id > ?", 0);
     }
@@ -545,9 +670,9 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
 
         testLog().registerListener(lsnr);
 
-        try {
-            Iterator<List<?>> it = sql("test", "select * from 
test").iterator();
+        Iterator<List<?>> it = sql("test", "select * from test").iterator();
 
+        try {
             it.next();
 
             long sleepStartTs = U.currentTimeMillis();
@@ -565,6 +690,8 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         }
         finally {
             pageSize = DEFAULT_PAGE_SIZE;
+
+            it.forEachRemaining(x -> {});
         }
     }
 
@@ -583,24 +710,66 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         assertTrue(lsnrDml.check());
     }
 
+    /**
+     * @param loc Flag indicating if the query is local.
+     * @return Query id.
+     */
+    private long runNotFullyFetchedQuery(boolean loc) {
+        queryCursor(loc).iterator().next();
+
+        H2QueryInfo qryInfo = 
(H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next();
+
+        assertTrue(qryInfo.isSuspended());
+
+        return qryInfo.queryId();
+    }
+
+    /**
+     * @param loc Flag indicating if the query is local.
+     * @return Query cursor.
+     */
+    private FieldsQueryCursor<List<?>> queryCursor(boolean loc) {
+        return ignite.cache("test").query(new SqlFieldsQuery("select * from 
test").setLocal(loc).setPageSize(1));
+    }
+
+    /**
+     * @param qryId Query id.
+     * @return Cancelled query id.
+     */
+    private long cancelQuery(long qryId) {
+        ((IgniteEx)ignite).context().query().cancelQuery(qryId, 
ignite.cluster().node().id(), false);
+
+        return qryId;
+    }
+
+    /**
+     * @param nodeIdx Node index.
+     * @return Set of query ids registered on a node.
+     */
+    private Set<Long> getQueryIdsOnNode(int nodeIdx) {
+        return heavyQueriesTracker(nodeIdx).getQueries().stream()
+            .map(query -> ((H2QueryInfo)query).queryId())
+            .collect(Collectors.toSet());
+    }
+
     /**
      * Utility class with custom SQL functions.
      */
     public static class TestSQLFunctions {
         /**
-         * @param v amount of milliseconds to sleep
-         * @return amount of milliseconds to sleep
+         * @param sleep amount of milliseconds to sleep
+         * @param val value to be returned by the function
          */
         @SuppressWarnings("unused")
         @QuerySqlFunction
-        public static int sleep_func(int v) {
+        public static int sleep_func(int sleep, int val) {
             try {
-                Thread.sleep(v);
+                Thread.sleep(sleep);
             }
             catch (InterruptedException ignored) {
                 // No-op
             }
-            return v;
+            return val;
         }
 
         /** */
@@ -608,7 +777,7 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         @QuerySqlFunction
         public static int wait_func() {
             try {
-                GridTestUtils.waitForCondition(() -> lsnrDml.check(), 10_000);
+                waitForCondition(() -> lsnrDml.check(), 10_000);
             }
             catch (IgniteInterruptedCheckedException ignored) {
                 // No-op
@@ -626,13 +795,13 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
     private ListeningTestLogger testLog() {
         ListeningTestLogger testLog = new ListeningTestLogger(log);
 
-        
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker(),
+        
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid(0).context().query().getIndexing()).heavyQueriesTracker(),
             "log", testLog);
 
-        
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid().context().query().getIndexing()).mapQueryExecutor(),
+        
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid(0).context().query().getIndexing()).mapQueryExecutor(),
             "log", testLog);
 
-        GridTestUtils.setFieldValue(grid().context().query().getIndexing(), 
"log", testLog);
+        GridTestUtils.setFieldValue(grid(0).context().query().getIndexing(), 
"log", testLog);
 
         return testLog;
     }
@@ -643,7 +812,32 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
      * @return Heavy queries tracker.
      */
     private HeavyQueriesTracker heavyQueriesTracker() {
-        return 
((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker();
+        return heavyQueriesTracker(0);
+    }
+
+    /** */
+    private HeavyQueriesTracker heavyQueriesTracker(int idx) {
+        return 
((IgniteH2Indexing)grid(idx).context().query().getIndexing()).heavyQueriesTracker();
+    }
+
+    /**
+     * @param exp Expected number of {@link H2QueryInfo} instances registered 
in {@link HeavyQueriesTracker}
+     * on all cluster nodes.
+     */
+    private void checkQryInfoCount(int exp) {
+        try {
+            assertTrue(waitForCondition(
+                () -> IntStream.range(0, gridCount()).map(i -> 
heavyQueriesTracker(i).getQueries().size()).sum() == exp,
+                3_000));
+        }
+        catch (IgniteInterruptedCheckedException ignored) {
+            // No-op
+        }
+    }
+
+    /** */
+    private int gridCount() {
+        return Ignition.allGrids().size();
     }
 
     /** */
@@ -679,4 +873,20 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
             this.name = name;
         }
     }
+
+    /** Test rule that allows starting a test in the multi-node mode via the 
{@link MultiNodeTest} annotation. */
+    private class MultiNodeTestRule implements TestRule {
+        /** {@inheritDoc} */
+        @Override public Statement apply(Statement base, Description 
description) {
+            return new Statement() {
+                @Override public void evaluate() throws Throwable {
+                    ignite = 
startGrids(description.getAnnotation(MultiNodeTest.class) == null ? 1 : 3);
+
+                    prepareTestEnvironment();
+
+                    base.evaluate();
+                }
+            };
+        }
+    }
 }

Reply via email to