This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 531eba2c86e IGNITE-17086 Sql: Fix absence of warnings for long running 
queries with lazy flag - Fixes #11405.
531eba2c86e is described below

commit 531eba2c86eb7bd8fc5280129690a55284875109
Author: oleg-vlsk <oleg-v...@yandex.ru>
AuthorDate: Thu Jul 25 18:51:50 2024 +0300

    IGNITE-17086 Sql: Fix absence of warnings for long running queries with 
lazy flag - Fixes #11405.
    
    Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com>
---
 .../query/running/HeavyQueriesTracker.java         |   6 +
 .../internal/processors/query/h2/H2QueryInfo.java  |  39 +++-
 .../processors/query/h2/H2ResultSetIterator.java   |   9 +-
 .../processors/query/h2/IgniteH2Indexing.java      |  48 ++++-
 .../query/h2/twostep/GridMapQueryExecutor.java     |  87 +++++---
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  27 ++-
 .../query/h2/twostep/MapQueryResult.java           |   5 +
 .../processors/query/LongRunningQueryTest.java     | 239 ++++++++++++++++++++-
 8 files changed, 407 insertions(+), 53 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
index d7a06793340..1a3654a415a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.running;
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
@@ -254,6 +255,11 @@ public final class HeavyQueriesTracker {
         this.rsSizeThresholdMult = rsSizeThresholdMult <= 1 ? 1 : 
rsSizeThresholdMult;
     }
 
+    /** */
+    public Set<TrackableQuery> getQueries() {
+        return qrys.keySet();
+    }
+
     /**
      * Holds timeout settings for the specified query.
      */
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
index 7cdbed157fe..95ce6c1bf7e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
@@ -41,6 +41,15 @@ public class H2QueryInfo implements TrackableQuery {
     /** Begin timestamp. */
     private final long beginTs;
 
+    /** The most recent point in time when the tracking of a long query was 
suspended. */
+    private volatile long lastSuspendTs;
+
+    /** External wait time. */
+    private volatile long extWait;
+
+    /** Long query time tracking suspension flag. */
+    private volatile boolean isSuspended;
+
     /** Query schema. */
     private final String schema;
 
@@ -112,6 +121,11 @@ public class H2QueryInfo implements TrackableQuery {
         return stmt.getPlanSQL();
     }
 
+    /** */
+    public long extWait() {
+        return extWait;
+    }
+
     /**
      * Print info specified by children.
      *
@@ -123,7 +137,25 @@ public class H2QueryInfo implements TrackableQuery {
 
     /** {@inheritDoc} */
     @Override public long time() {
-        return U.currentTimeMillis() - beginTs;
+        return (isSuspended ? lastSuspendTs : U.currentTimeMillis()) - beginTs 
- extWait;
+    }
+
+    /** */
+    public synchronized void suspendTracking() {
+        if (!isSuspended) {
+            isSuspended = true;
+
+            lastSuspendTs = U.currentTimeMillis();
+        }
+    }
+
+    /** */
+    public synchronized void resumeTracking() {
+        if (isSuspended) {
+            isSuspended = false;
+
+            extWait += U.currentTimeMillis() - lastSuspendTs;
+        }
     }
 
     /**
@@ -156,6 +188,11 @@ public class H2QueryInfo implements TrackableQuery {
         return msgSb.toString();
     }
 
+    /** */
+    public boolean isSuspended() {
+        return isSuspended;
+    }
+
     /**
      * Query type.
      */
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
index 9529b4430b1..18cbac6b494 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
@@ -120,6 +120,9 @@ public abstract class H2ResultSetIterator<T> extends 
GridIteratorAdapter<T> impl
     /** */
     private final H2QueryInfo qryInfo;
 
+    /** */
+    final IgniteH2Indexing h2;
+
     /**
      * @param data Data array.
      * @param log Logger.
@@ -141,6 +144,7 @@ public abstract class H2ResultSetIterator<T> extends 
GridIteratorAdapter<T> impl
         this.data = data;
         this.tracing = tracing;
         this.qryInfo = qryInfo;
+        this.h2 = h2;
 
         try {
             res = (ResultInterface)RESULT_FIELD.get(data);
@@ -325,6 +329,9 @@ public abstract class H2ResultSetIterator<T> extends 
GridIteratorAdapter<T> impl
 
         lockTables();
 
+        if (qryInfo != null)
+            h2.heavyQueriesTracker().stopTracking(qryInfo, null);
+
         try {
             resultSetChecker.checkOnClose();
 
@@ -391,7 +398,7 @@ public abstract class H2ResultSetIterator<T> extends 
GridIteratorAdapter<T> impl
         if (closed)
             return false;
 
-        return hasRow || (hasRow = fetchNext());
+        return hasRow || (hasRow = 
h2.executeWithResumableTimeTracking(this::fetchNext, qryInfo));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 091a15c9230..8e91c71a711 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -119,6 +119,7 @@ import 
org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
+import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -414,6 +415,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             @Override public GridCloseableIterator<List<?>> iterator() throws 
IgniteCheckedException {
                 H2PooledConnection conn = 
connections().connection(qryDesc.schemaName());
 
+                H2QueryInfo qryInfo = null;
+
                 try (TraceSurroundings ignored = 
MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
                     H2Utils.setupConnection(conn, qctx,
                         qryDesc.distributedJoins(), 
qryDesc.enforceJoinOrder(), qryParams.lazy());
@@ -436,9 +439,11 @@ public class IgniteH2Indexing implements GridQueryIndexing 
{
 
                     H2Utils.bindParameters(stmt, F.asList(params));
 
-                    H2QueryInfo qryInfo = new 
H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
+                    qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, 
stmt, qry,
                         ctx.localNodeId(), qryId);
 
+                    heavyQryTracker.startTracking(qryInfo);
+
                     if (ctx.performanceStatistics().enabled()) {
                         ctx.performanceStatistics().queryProperty(
                             GridCacheQueryType.SQL_FIELDS,
@@ -449,13 +454,16 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
                         );
                     }
 
-                    ResultSet rs = executeSqlQueryWithTimer(
-                        stmt,
-                        conn,
-                        qry,
-                        timeout,
-                        cancel,
-                        qryParams.dataPageScanEnabled(),
+                    ResultSet rs = executeWithResumableTimeTracking(
+                        () -> executeSqlQueryWithTimer(
+                            stmt,
+                            conn,
+                            qry,
+                            timeout,
+                            cancel,
+                            qryParams.dataPageScanEnabled(),
+                            null
+                        ),
                         qryInfo
                     );
 
@@ -472,6 +480,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 catch (IgniteCheckedException | RuntimeException | Error e) {
                     conn.close();
 
+                    if (qryInfo != null)
+                        heavyQryTracker.stopTracking(qryInfo, e);
+
                     throw e;
                 }
             }
@@ -2259,4 +2270,25 @@ public class IgniteH2Indexing implements 
GridQueryIndexing {
     public DistributedIndexingConfiguration distributedConfiguration() {
         return distrCfg;
     }
+
+    /**
+     * Resumes time tracking before the task (if needed) and suspends time 
tracking after the task is finished.
+     *
+     * @param task Query/fetch to execute.
+     * @param qryInfo Query info.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <T> T executeWithResumableTimeTracking(
+        IgniteThrowableSupplier<T> task,
+        final H2QueryInfo qryInfo
+    ) throws IgniteCheckedException {
+        qryInfo.resumeTracking();
+
+        try {
+            return task.get();
+        }
+        finally {
+            qryInfo.suspendTracking();
+        }
+    }
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 3d388561840..78a0e0aeb96 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -446,6 +446,8 @@ public class GridMapQueryExecutor {
 
                 qryResults.addResult(qryIdx, res);
 
+                MapH2QueryInfo qryInfo = null;
+
                 try {
                     res.lock();
 
@@ -460,7 +462,9 @@ public class GridMapQueryExecutor {
 
                         H2Utils.bindParameters(stmt, params0);
 
-                        MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, 
qry.query(), node.id(), qryId, reqId, segmentId);
+                        qryInfo = new MapH2QueryInfo(stmt, qry.query(), 
node.id(), qryId, reqId, segmentId);
+
+                        h2.heavyQueriesTracker().startTracking(qryInfo);
 
                         if (performanceStatsEnabled) {
                             ctx.performanceStatistics().queryProperty(
@@ -472,14 +476,20 @@ public class GridMapQueryExecutor {
                             );
                         }
 
-                        ResultSet rs = h2.executeSqlQueryWithTimer(
-                            stmt,
-                            conn,
-                            sql,
-                            timeout,
-                            qryResults.queryCancel(qryIdx),
-                            dataPageScanEnabled,
-                            qryInfo);
+                        GridQueryCancel qryCancel = 
qryResults.queryCancel(qryIdx);
+
+                        ResultSet rs = h2.executeWithResumableTimeTracking(
+                            () -> h2.executeSqlQueryWithTimer(
+                                stmt,
+                                conn,
+                                sql,
+                                timeout,
+                                qryCancel,
+                                dataPageScanEnabled,
+                                null
+                            ),
+                            qryInfo
+                        );
 
                         if (evt) {
                             ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -507,14 +517,21 @@ public class GridMapQueryExecutor {
 
                         res.openResult(rs, qryInfo);
 
-                        final GridQueryNextPageResponse msg = prepareNextPage(
-                            nodeRess,
-                            node,
-                            qryResults,
-                            qryIdx,
-                            segmentId,
-                            pageSize,
-                            dataPageScanEnabled
+                        MapQueryResults qryResults0 = qryResults;
+
+                        int qryIdx0 = qryIdx;
+
+                        final GridQueryNextPageResponse msg = 
h2.executeWithResumableTimeTracking(
+                            () -> prepareNextPage(
+                                nodeRess,
+                                node,
+                                qryResults0,
+                                qryIdx0,
+                                segmentId,
+                                pageSize,
+                                dataPageScanEnabled
+                            ),
+                            qryInfo
                         );
 
                         if (msg != null)
@@ -528,6 +545,12 @@ public class GridMapQueryExecutor {
 
                     qryIdx++;
                 }
+                catch (Throwable e) {
+                    if (qryInfo != null)
+                        h2.heavyQueriesTracker().stopTracking(qryInfo, e);
+
+                    throw e;
+                }
                 finally {
                     try {
                         res.unlockTables();
@@ -843,13 +866,15 @@ public class GridMapQueryExecutor {
 
             final MapQueryResults qryResults = nodeRess.get(reqId, 
req.segmentId());
 
+            MapQueryResult res = null;
+
             if (qryResults == null)
                 sendError(node, reqId, new CacheException("No query result 
found for request: " + req));
             else if (qryResults.cancelled())
                 sendQueryCancel(node, reqId);
             else {
                 try {
-                    MapQueryResult res = qryResults.result(req.query());
+                    res = qryResults.result(req.query());
 
                     assert res != null;
 
@@ -862,14 +887,18 @@ public class GridMapQueryExecutor {
 
                         Boolean dataPageScanEnabled = 
isDataPageScanEnabled(req.getFlags());
 
-                        GridQueryNextPageResponse msg = prepareNextPage(
-                            nodeRess,
-                            node,
-                            qryResults,
-                            req.query(),
-                            req.segmentId(),
-                            req.pageSize(),
-                            dataPageScanEnabled);
+                        GridQueryNextPageResponse msg = 
h2.executeWithResumableTimeTracking(
+                            () -> prepareNextPage(
+                                nodeRess,
+                                node,
+                                qryResults,
+                                req.query(),
+                                req.segmentId(),
+                                req.pageSize(),
+                                dataPageScanEnabled
+                            ),
+                            res.qryInfo()
+                        );
 
                         if (msg != null)
                             sendNextPage(node, msg);
@@ -884,6 +913,9 @@ public class GridMapQueryExecutor {
                     }
                 }
                 catch (Exception e) {
+                    if (res.qryInfo() != null)
+                        h2.heavyQueriesTracker().stopTracking(res.qryInfo(), 
e);
+
                     QueryRetryException retryEx = X.cause(e, 
QueryRetryException.class);
 
                     if (retryEx != null)
@@ -939,6 +971,9 @@ public class GridMapQueryExecutor {
             if (last) {
                 qr.closeResult(qry);
 
+                if (res.qryInfo() != null)
+                    h2.heavyQueriesTracker().stopTracking(res.qryInfo(), null);
+
                 if (qr.isAllClosed()) {
                     nodeRess.remove(qr.queryRequestId(), segmentId, qr);
 
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3fdadabe361..cffa0dfd9b8 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -419,6 +419,8 @@ public class GridReduceQueryExecutor {
 
                 runs.put(qryReqId, r);
 
+                ReduceH2QueryInfo qryInfo = null;
+
                 try {
                     cancel.add(() -> send(nodes, new 
GridQueryCancelRequest(qryReqId), null, true));
 
@@ -509,9 +511,11 @@ public class GridReduceQueryExecutor {
 
                         H2Utils.bindParameters(stmt, 
F.asList(rdc.parameters(params)));
 
-                        ReduceH2QueryInfo qryInfo = new 
ReduceH2QueryInfo(stmt, qry.originalSql(),
+                        qryInfo = new ReduceH2QueryInfo(stmt, 
qry.originalSql(),
                             ctx.localNodeId(), qryId, qryReqId);
 
+                        h2.heavyQueriesTracker().startTracking(qryInfo);
+
                         if (ctx.performanceStatistics().enabled()) {
                             ctx.performanceStatistics().queryProperty(
                                 GridCacheQueryType.SQL_FIELDS,
@@ -522,12 +526,18 @@ public class GridReduceQueryExecutor {
                             );
                         }
 
-                        ResultSet res = h2.executeSqlQueryWithTimer(stmt,
-                            conn,
-                            rdc.query(),
-                            timeoutMillis,
-                            cancel,
-                            dataPageScanEnabled,
+                        H2PooledConnection conn0 = conn;
+
+                        ResultSet res = h2.executeWithResumableTimeTracking(
+                            () -> h2.executeSqlQueryWithTimer(
+                                stmt,
+                                conn0,
+                                rdc.query(),
+                                timeoutMillis,
+                                cancel,
+                                dataPageScanEnabled,
+                                null
+                            ),
                             qryInfo
                         );
 
@@ -549,6 +559,9 @@ public class GridReduceQueryExecutor {
                 catch (IgniteCheckedException | RuntimeException e) {
                     release = true;
 
+                    if (qryInfo != null)
+                        h2.heavyQueriesTracker().stopTracking(qryInfo, e);
+
                     if (e instanceof CacheException) {
                         if (QueryUtils.wasCancelled(e))
                             throw new CacheException("Failed to run reduce 
query locally.",
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index d7d8736e43e..f644a7805e1 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -324,6 +324,11 @@ class MapQueryResult {
             GridH2Table.checkTablesVersions(ses);
     }
 
+    /** */
+    public MapH2QueryInfo qryInfo() {
+        return res.qryInfo;
+    }
+
     /** */
     private class Result {
         /** */
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index 8ba7223c998..a7faf240c2d 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -22,17 +22,21 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.QueryCancelledException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
 import org.apache.ignite.configuration.CacheConfiguration;
 import 
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.query.h2.H2QueryInfo;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
@@ -41,6 +45,7 @@ import org.junit.Test;
 
 import static java.lang.Thread.currentThread;
 import static 
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
+import static org.h2.engine.Constants.DEFAULT_PAGE_SIZE;
 
 /**
  * Tests for log print for long-running query.
@@ -49,21 +54,35 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
     /** Keys count. */
     private static final int KEY_CNT = 1000;
 
+    /** External wait time. */
+    private static final int EXT_WAIT_TIME = 2000;
+
+    /** Page size. */
+    private int pageSize = DEFAULT_PAGE_SIZE;
+
     /** Local query mode. */
     private boolean local;
 
     /** Lazy query mode. */
     private boolean lazy;
 
+    /** Merge table usage flag. */
+    private boolean withMergeTable;
+
+    /** Distributed joins flag. */
+    private boolean distributedJoins;
+
+    /** Ignite instance. */
+    private Ignite ignite;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        startGrid();
+        ignite = startGrid();
 
         IgniteCache c = grid().createCache(new CacheConfiguration<Long, Long>()
             .setName("test")
-            .setSqlSchema("TEST")
             .setQueryEntities(Collections.singleton(new 
QueryEntity(Long.class, Long.class)
                 .setTableName("test")
                 .addQueryField("id", Long.class.getName(), null)
@@ -85,6 +104,18 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         super.afterTest();
     }
 
+    /**
+     * @param name Name.
+     * @param idxTypes Index types.
+     */
+    @SuppressWarnings("unchecked")
+    private static CacheConfiguration cacheConfig(String name, Class<?>... 
idxTypes) {
+        return new CacheConfiguration()
+            .setName(name)
+            .setIndexedTypes(idxTypes)
+            .setSqlFunctionClasses(TestSQLFunctions.class);
+    }
+
     /**
      *
      */
@@ -109,6 +140,72 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
         checkFastQueries();
     }
 
+    /**
+     *
+     */
+    @Test
+    public void testLongDistributedLazy() {
+        local = false;
+        lazy = true;
+
+        checkLongRunning();
+        checkFastQueries();
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testLongDistributedLazyWithMergeTable() {
+        local = false;
+        lazy = true;
+
+        withMergeTable = true;
+
+        try {
+            checkLongRunning();
+        }
+        finally {
+            withMergeTable = false;
+        }
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testLongLocalLazy() {
+        local = true;
+        lazy = true;
+
+        checkLongRunning();
+        checkFastQueries();
+    }
+
+    /**
+     * Test checks that no long-running queries warnings are printed in case 
of external waits during
+     * the execution of distributed queries.
+     */
+    @Test
+    public void testDistributedLazyWithExternalWait() {
+        local = false;
+        lazy = true;
+
+        checkLazyWithExternalWait();
+    }
+
+    /**
+     * Test checks that no long-running queries warnings are printed in case 
of external waits during
+     * the execution of local queries.
+     */
+    @Test
+    public void testlocalLazyWithExternalWait() {
+        local = true;
+        lazy = true;
+
+        checkLazyWithExternalWait();
+    }
+
     /**
      * Test checks the correctness of thread name when displaying errors
      * about long queries.
@@ -166,7 +263,7 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
 
         // Several fast queries.
         for (int i = 0; i < 10; ++i)
-            sql("SELECT * FROM test").getAll();
+            sql("test", "SELECT * FROM test").getAll();
 
         assertFalse(lsnr.check());
     }
@@ -200,7 +297,7 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
 
         testLog.registerListener(lsnr);
 
-        try (FieldsQueryCursor cur = sql("SELECT T0.id FROM test AS T0, test 
AS T1")) {
+        try (FieldsQueryCursor cur = sql("test", "SELECT T0.id FROM test AS 
T0, test AS T1")) {
             Iterator it = cur.iterator();
 
             while (it.hasNext())
@@ -215,28 +312,116 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
      * @param args Query parameters.
      */
     private void sqlCheckLongRunning(String sql, Object... args) {
-        GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), 
QueryCancelledException.class, "");
+        GridTestUtils.assertThrowsAnyCause(log, () -> sql("test", sql, 
args).getAll(), QueryCancelledException.class, "");
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     */
+    private void sqlCheckLongRunningLazy(String sql, Object... args) {
+        pageSize = 1;
+
+        try {
+            assertFalse(sql("test", sql, args).iterator().next().isEmpty());
+        }
+        finally {
+            pageSize = DEFAULT_PAGE_SIZE;
+        }
+    }
+
+    /**
+     * @param sql SQL query.
+     * @param args Query parameters.
+     */
+    private void sqlCheckLongRunningLazyWithMergeTable(String sql, Object... 
args) {
+        distributedJoins = true;
+
+        try {
+            CacheConfiguration ccfg1 = cacheConfig("pers", Integer.class, 
Person.class);
+            CacheConfiguration ccfg2 = cacheConfig("org", Integer.class, 
Organization.class);
+
+            IgniteCache<Integer, Person> cache1 = 
ignite.getOrCreateCache(ccfg1);
+            IgniteCache<Integer, Organization> cache2 = 
ignite.getOrCreateCache(ccfg2);
+
+            cache2.put(1, new Organization("o1"));
+            cache2.put(2, new Organization("o2"));
+            cache1.put(3, new Person(1, "p1"));
+            cache1.put(4, new Person(2, "p2"));
+            cache1.put(5, new Person(3, "p3"));
+
+            assertFalse(sql("pers", sql, args).getAll().isEmpty());
+        }
+        finally {
+            distributedJoins = false;
+        }
     }
 
     /**
      * Execute long-running sql with a check for errors.
      */
     private void sqlCheckLongRunning() {
-        sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS 
T2 where T0.id > ?", 0);
+        if (lazy && withMergeTable) {
+            String select = "select o.name n1, p.name n2 from Person p, 
\"org\".Organization o" +
+                " where p.orgId = o._key and o._key=1 and o._key < 
sleep_func(?)" +
+                " union select o.name n1, p.name n2 from Person p, 
\"org\".Organization o" +
+                " where p.orgId = o._key and o._key=2";
+
+            sqlCheckLongRunningLazyWithMergeTable(select, 2000);
+        }
+        else if (lazy && !withMergeTable)
+            sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < 
sleep_func(?)", 2000);
+        else
+            sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, 
test AS T2 where T0.id > ?", 0);
     }
 
     /**
+     * @param cacheName Cache name.
      * @param sql SQL query.
      * @param args Query parameters.
      * @return Results cursor.
      */
-    private FieldsQueryCursor<List<?>> sql(String sql, Object... args) {
-        return grid().context().query().querySqlFields(new SqlFieldsQuery(sql)
+    private FieldsQueryCursor<List<?>> sql(String cacheName, String sql, 
Object... args) {
+        return ignite.cache(cacheName).query(new SqlFieldsQuery(sql)
             .setTimeout(10, TimeUnit.SECONDS)
             .setLocal(local)
             .setLazy(lazy)
-            .setSchema("TEST")
-            .setArgs(args), false);
+            .setPageSize(pageSize)
+            .setDistributedJoins(distributedJoins)
+            .setArgs(args));
+    }
+
+    /** */
+    public void checkLazyWithExternalWait() {
+        pageSize = 1;
+
+        LogListener lsnr = LogListener
+            .matches(LONG_QUERY_EXEC_MSG)
+            .build();
+
+        testLog().registerListener(lsnr);
+
+        try {
+            Iterator<List<?>> it = sql("test", "select * from 
test").iterator();
+
+            it.next();
+
+            long sleepStartTs = U.currentTimeMillis();
+
+            while (U.currentTimeMillis() - sleepStartTs <= EXT_WAIT_TIME)
+                doSleep(100L);
+
+            it.next();
+
+            H2QueryInfo qry = 
(H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next();
+
+            assertTrue(qry.extWait() >= EXT_WAIT_TIME);
+
+            assertFalse(lsnr.check());
+        }
+        finally {
+            pageSize = DEFAULT_PAGE_SIZE;
+        }
     }
 
     /**
@@ -287,4 +472,38 @@ public class LongRunningQueryTest extends 
AbstractIndexingCommonTest {
     private HeavyQueriesTracker heavyQueriesTracker() {
         return 
((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker();
     }
+
+    /** */
+    private static class Person {
+        /** */
+        @QuerySqlField(index = true)
+        int orgId;
+
+        /** */
+        @QuerySqlField(index = true)
+        String name;
+
+        /**
+         * @param orgId Organization ID.
+         * @param name Name.
+         */
+        public Person(int orgId, String name) {
+            this.orgId = orgId;
+            this.name = name;
+        }
+    }
+
+    /** */
+    private static class Organization {
+        /** */
+        @QuerySqlField
+        String name;
+
+        /**
+         * @param name Organization name.
+         */
+        public Organization(String name) {
+            this.name = name;
+        }
+    }
 }

Reply via email to