This is an automated email from the ASF dual-hosted git repository. alexpl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 531eba2c86e IGNITE-17086 Sql: Fix absence of warnings for long running queries with lazy flag - Fixes #11405. 531eba2c86e is described below commit 531eba2c86eb7bd8fc5280129690a55284875109 Author: oleg-vlsk <oleg-v...@yandex.ru> AuthorDate: Thu Jul 25 18:51:50 2024 +0300 IGNITE-17086 Sql: Fix absence of warnings for long running queries with lazy flag - Fixes #11405. Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com> --- .../query/running/HeavyQueriesTracker.java | 6 + .../internal/processors/query/h2/H2QueryInfo.java | 39 +++- .../processors/query/h2/H2ResultSetIterator.java | 9 +- .../processors/query/h2/IgniteH2Indexing.java | 48 ++++- .../query/h2/twostep/GridMapQueryExecutor.java | 87 +++++--- .../query/h2/twostep/GridReduceQueryExecutor.java | 27 ++- .../query/h2/twostep/MapQueryResult.java | 5 + .../processors/query/LongRunningQueryTest.java | 239 ++++++++++++++++++++- 8 files changed, 407 insertions(+), 53 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java index d7a06793340..1a3654a415a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.running; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; @@ -254,6 +255,11 @@ public final class HeavyQueriesTracker { this.rsSizeThresholdMult = rsSizeThresholdMult <= 1 ? 1 : rsSizeThresholdMult; } + /** */ + public Set<TrackableQuery> getQueries() { + return qrys.keySet(); + } + /** * Holds timeout settings for the specified query. */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java index 7cdbed157fe..95ce6c1bf7e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java @@ -41,6 +41,15 @@ public class H2QueryInfo implements TrackableQuery { /** Begin timestamp. */ private final long beginTs; + /** The most recent point in time when the tracking of a long query was suspended. */ + private volatile long lastSuspendTs; + + /** External wait time. */ + private volatile long extWait; + + /** Long query time tracking suspension flag. */ + private volatile boolean isSuspended; + /** Query schema. */ private final String schema; @@ -112,6 +121,11 @@ public class H2QueryInfo implements TrackableQuery { return stmt.getPlanSQL(); } + /** */ + public long extWait() { + return extWait; + } + /** * Print info specified by children. * @@ -123,7 +137,25 @@ public class H2QueryInfo implements TrackableQuery { /** {@inheritDoc} */ @Override public long time() { - return U.currentTimeMillis() - beginTs; + return (isSuspended ? lastSuspendTs : U.currentTimeMillis()) - beginTs - extWait; + } + + /** */ + public synchronized void suspendTracking() { + if (!isSuspended) { + isSuspended = true; + + lastSuspendTs = U.currentTimeMillis(); + } + } + + /** */ + public synchronized void resumeTracking() { + if (isSuspended) { + isSuspended = false; + + extWait += U.currentTimeMillis() - lastSuspendTs; + } } /** @@ -156,6 +188,11 @@ public class H2QueryInfo implements TrackableQuery { return msgSb.toString(); } + /** */ + public boolean isSuspended() { + return isSuspended; + } + /** * Query type. */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java index 9529b4430b1..18cbac6b494 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java @@ -120,6 +120,9 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl /** */ private final H2QueryInfo qryInfo; + /** */ + final IgniteH2Indexing h2; + /** * @param data Data array. * @param log Logger. @@ -141,6 +144,7 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl this.data = data; this.tracing = tracing; this.qryInfo = qryInfo; + this.h2 = h2; try { res = (ResultInterface)RESULT_FIELD.get(data); @@ -325,6 +329,9 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl lockTables(); + if (qryInfo != null) + h2.heavyQueriesTracker().stopTracking(qryInfo, null); + try { resultSetChecker.checkOnClose(); @@ -391,7 +398,7 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl if (closed) return false; - return hasRow || (hasRow = fetchNext()); + return hasRow || (hasRow = h2.executeWithResumableTimeTracking(this::fetchNext, qryInfo)); } /** {@inheritDoc} */ diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index 091a15c9230..8e91c71a711 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -119,6 +119,7 @@ import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.lang.IgniteSingletonIterator; +import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; @@ -414,6 +415,8 @@ public class IgniteH2Indexing implements GridQueryIndexing { @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException { H2PooledConnection conn = connections().connection(qryDesc.schemaName()); + H2QueryInfo qryInfo = null; + try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) { H2Utils.setupConnection(conn, qctx, qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy()); @@ -436,9 +439,11 @@ public class IgniteH2Indexing implements GridQueryIndexing { H2Utils.bindParameters(stmt, F.asList(params)); - H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry, + qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry, ctx.localNodeId(), qryId); + heavyQryTracker.startTracking(qryInfo); + if (ctx.performanceStatistics().enabled()) { ctx.performanceStatistics().queryProperty( GridCacheQueryType.SQL_FIELDS, @@ -449,13 +454,16 @@ public class IgniteH2Indexing implements GridQueryIndexing { ); } - ResultSet rs = executeSqlQueryWithTimer( - stmt, - conn, - qry, - timeout, - cancel, - qryParams.dataPageScanEnabled(), + ResultSet rs = executeWithResumableTimeTracking( + () -> executeSqlQueryWithTimer( + stmt, + conn, + qry, + timeout, + cancel, + qryParams.dataPageScanEnabled(), + null + ), qryInfo ); @@ -472,6 +480,9 @@ public class IgniteH2Indexing implements GridQueryIndexing { catch (IgniteCheckedException | RuntimeException | Error e) { conn.close(); + if (qryInfo != null) + heavyQryTracker.stopTracking(qryInfo, e); + throw e; } } @@ -2259,4 +2270,25 @@ public class IgniteH2Indexing implements GridQueryIndexing { public DistributedIndexingConfiguration distributedConfiguration() { return distrCfg; } + + /** + * Resumes time tracking before the task (if needed) and suspends time tracking after the task is finished. + * + * @param task Query/fetch to execute. + * @param qryInfo Query info. + * @throws IgniteCheckedException If failed. + */ + public <T> T executeWithResumableTimeTracking( + IgniteThrowableSupplier<T> task, + final H2QueryInfo qryInfo + ) throws IgniteCheckedException { + qryInfo.resumeTracking(); + + try { + return task.get(); + } + finally { + qryInfo.suspendTracking(); + } + } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 3d388561840..78a0e0aeb96 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -446,6 +446,8 @@ public class GridMapQueryExecutor { qryResults.addResult(qryIdx, res); + MapH2QueryInfo qryInfo = null; + try { res.lock(); @@ -460,7 +462,9 @@ public class GridMapQueryExecutor { H2Utils.bindParameters(stmt, params0); - MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId); + qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId); + + h2.heavyQueriesTracker().startTracking(qryInfo); if (performanceStatsEnabled) { ctx.performanceStatistics().queryProperty( @@ -472,14 +476,20 @@ public class GridMapQueryExecutor { ); } - ResultSet rs = h2.executeSqlQueryWithTimer( - stmt, - conn, - sql, - timeout, - qryResults.queryCancel(qryIdx), - dataPageScanEnabled, - qryInfo); + GridQueryCancel qryCancel = qryResults.queryCancel(qryIdx); + + ResultSet rs = h2.executeWithResumableTimeTracking( + () -> h2.executeSqlQueryWithTimer( + stmt, + conn, + sql, + timeout, + qryCancel, + dataPageScanEnabled, + null + ), + qryInfo + ); if (evt) { ctx.event().record(new CacheQueryExecutedEvent<>( @@ -507,14 +517,21 @@ public class GridMapQueryExecutor { res.openResult(rs, qryInfo); - final GridQueryNextPageResponse msg = prepareNextPage( - nodeRess, - node, - qryResults, - qryIdx, - segmentId, - pageSize, - dataPageScanEnabled + MapQueryResults qryResults0 = qryResults; + + int qryIdx0 = qryIdx; + + final GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking( + () -> prepareNextPage( + nodeRess, + node, + qryResults0, + qryIdx0, + segmentId, + pageSize, + dataPageScanEnabled + ), + qryInfo ); if (msg != null) @@ -528,6 +545,12 @@ public class GridMapQueryExecutor { qryIdx++; } + catch (Throwable e) { + if (qryInfo != null) + h2.heavyQueriesTracker().stopTracking(qryInfo, e); + + throw e; + } finally { try { res.unlockTables(); @@ -843,13 +866,15 @@ public class GridMapQueryExecutor { final MapQueryResults qryResults = nodeRess.get(reqId, req.segmentId()); + MapQueryResult res = null; + if (qryResults == null) sendError(node, reqId, new CacheException("No query result found for request: " + req)); else if (qryResults.cancelled()) sendQueryCancel(node, reqId); else { try { - MapQueryResult res = qryResults.result(req.query()); + res = qryResults.result(req.query()); assert res != null; @@ -862,14 +887,18 @@ public class GridMapQueryExecutor { Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags()); - GridQueryNextPageResponse msg = prepareNextPage( - nodeRess, - node, - qryResults, - req.query(), - req.segmentId(), - req.pageSize(), - dataPageScanEnabled); + GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking( + () -> prepareNextPage( + nodeRess, + node, + qryResults, + req.query(), + req.segmentId(), + req.pageSize(), + dataPageScanEnabled + ), + res.qryInfo() + ); if (msg != null) sendNextPage(node, msg); @@ -884,6 +913,9 @@ public class GridMapQueryExecutor { } } catch (Exception e) { + if (res.qryInfo() != null) + h2.heavyQueriesTracker().stopTracking(res.qryInfo(), e); + QueryRetryException retryEx = X.cause(e, QueryRetryException.class); if (retryEx != null) @@ -939,6 +971,9 @@ public class GridMapQueryExecutor { if (last) { qr.closeResult(qry); + if (res.qryInfo() != null) + h2.heavyQueriesTracker().stopTracking(res.qryInfo(), null); + if (qr.isAllClosed()) { nodeRess.remove(qr.queryRequestId(), segmentId, qr); diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 3fdadabe361..cffa0dfd9b8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -419,6 +419,8 @@ public class GridReduceQueryExecutor { runs.put(qryReqId, r); + ReduceH2QueryInfo qryInfo = null; + try { cancel.add(() -> send(nodes, new GridQueryCancelRequest(qryReqId), null, true)); @@ -509,9 +511,11 @@ public class GridReduceQueryExecutor { H2Utils.bindParameters(stmt, F.asList(rdc.parameters(params))); - ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), + qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), ctx.localNodeId(), qryId, qryReqId); + h2.heavyQueriesTracker().startTracking(qryInfo); + if (ctx.performanceStatistics().enabled()) { ctx.performanceStatistics().queryProperty( GridCacheQueryType.SQL_FIELDS, @@ -522,12 +526,18 @@ public class GridReduceQueryExecutor { ); } - ResultSet res = h2.executeSqlQueryWithTimer(stmt, - conn, - rdc.query(), - timeoutMillis, - cancel, - dataPageScanEnabled, + H2PooledConnection conn0 = conn; + + ResultSet res = h2.executeWithResumableTimeTracking( + () -> h2.executeSqlQueryWithTimer( + stmt, + conn0, + rdc.query(), + timeoutMillis, + cancel, + dataPageScanEnabled, + null + ), qryInfo ); @@ -549,6 +559,9 @@ public class GridReduceQueryExecutor { catch (IgniteCheckedException | RuntimeException e) { release = true; + if (qryInfo != null) + h2.heavyQueriesTracker().stopTracking(qryInfo, e); + if (e instanceof CacheException) { if (QueryUtils.wasCancelled(e)) throw new CacheException("Failed to run reduce query locally.", diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index d7d8736e43e..f644a7805e1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -324,6 +324,11 @@ class MapQueryResult { GridH2Table.checkTablesVersions(ses); } + /** */ + public MapH2QueryInfo qryInfo() { + return res.qryInfo; + } + /** */ private class Result { /** */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java index 8ba7223c998..a7faf240c2d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java @@ -22,17 +22,21 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.QueryCancelledException; import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; +import org.apache.ignite.internal.processors.query.h2.H2QueryInfo; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.ListeningTestLogger; @@ -41,6 +45,7 @@ import org.junit.Test; import static java.lang.Thread.currentThread; import static org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG; +import static org.h2.engine.Constants.DEFAULT_PAGE_SIZE; /** * Tests for log print for long-running query. @@ -49,21 +54,35 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { /** Keys count. */ private static final int KEY_CNT = 1000; + /** External wait time. */ + private static final int EXT_WAIT_TIME = 2000; + + /** Page size. */ + private int pageSize = DEFAULT_PAGE_SIZE; + /** Local query mode. */ private boolean local; /** Lazy query mode. */ private boolean lazy; + /** Merge table usage flag. */ + private boolean withMergeTable; + + /** Distributed joins flag. */ + private boolean distributedJoins; + + /** Ignite instance. */ + private Ignite ignite; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); - startGrid(); + ignite = startGrid(); IgniteCache c = grid().createCache(new CacheConfiguration<Long, Long>() .setName("test") - .setSqlSchema("TEST") .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class) .setTableName("test") .addQueryField("id", Long.class.getName(), null) @@ -85,6 +104,18 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { super.afterTest(); } + /** + * @param name Name. + * @param idxTypes Index types. + */ + @SuppressWarnings("unchecked") + private static CacheConfiguration cacheConfig(String name, Class<?>... idxTypes) { + return new CacheConfiguration() + .setName(name) + .setIndexedTypes(idxTypes) + .setSqlFunctionClasses(TestSQLFunctions.class); + } + /** * */ @@ -109,6 +140,72 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { checkFastQueries(); } + /** + * + */ + @Test + public void testLongDistributedLazy() { + local = false; + lazy = true; + + checkLongRunning(); + checkFastQueries(); + } + + /** + * + */ + @Test + public void testLongDistributedLazyWithMergeTable() { + local = false; + lazy = true; + + withMergeTable = true; + + try { + checkLongRunning(); + } + finally { + withMergeTable = false; + } + } + + /** + * + */ + @Test + public void testLongLocalLazy() { + local = true; + lazy = true; + + checkLongRunning(); + checkFastQueries(); + } + + /** + * Test checks that no long-running queries warnings are printed in case of external waits during + * the execution of distributed queries. + */ + @Test + public void testDistributedLazyWithExternalWait() { + local = false; + lazy = true; + + checkLazyWithExternalWait(); + } + + /** + * Test checks that no long-running queries warnings are printed in case of external waits during + * the execution of local queries. + */ + @Test + public void testlocalLazyWithExternalWait() { + local = true; + lazy = true; + + checkLazyWithExternalWait(); + } + /** * Test checks the correctness of thread name when displaying errors * about long queries. @@ -166,7 +263,7 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { // Several fast queries. for (int i = 0; i < 10; ++i) - sql("SELECT * FROM test").getAll(); + sql("test", "SELECT * FROM test").getAll(); assertFalse(lsnr.check()); } @@ -200,7 +297,7 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { testLog.registerListener(lsnr); - try (FieldsQueryCursor cur = sql("SELECT T0.id FROM test AS T0, test AS T1")) { + try (FieldsQueryCursor cur = sql("test", "SELECT T0.id FROM test AS T0, test AS T1")) { Iterator it = cur.iterator(); while (it.hasNext()) @@ -215,28 +312,116 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { * @param args Query parameters. */ private void sqlCheckLongRunning(String sql, Object... args) { - GridTestUtils.assertThrowsAnyCause(log, () -> sql(sql, args).getAll(), QueryCancelledException.class, ""); + GridTestUtils.assertThrowsAnyCause(log, () -> sql("test", sql, args).getAll(), QueryCancelledException.class, ""); + } + + /** + * @param sql SQL query. + * @param args Query parameters. + */ + private void sqlCheckLongRunningLazy(String sql, Object... args) { + pageSize = 1; + + try { + assertFalse(sql("test", sql, args).iterator().next().isEmpty()); + } + finally { + pageSize = DEFAULT_PAGE_SIZE; + } + } + + /** + * @param sql SQL query. + * @param args Query parameters. + */ + private void sqlCheckLongRunningLazyWithMergeTable(String sql, Object... args) { + distributedJoins = true; + + try { + CacheConfiguration ccfg1 = cacheConfig("pers", Integer.class, Person.class); + CacheConfiguration ccfg2 = cacheConfig("org", Integer.class, Organization.class); + + IgniteCache<Integer, Person> cache1 = ignite.getOrCreateCache(ccfg1); + IgniteCache<Integer, Organization> cache2 = ignite.getOrCreateCache(ccfg2); + + cache2.put(1, new Organization("o1")); + cache2.put(2, new Organization("o2")); + cache1.put(3, new Person(1, "p1")); + cache1.put(4, new Person(2, "p2")); + cache1.put(5, new Person(3, "p3")); + + assertFalse(sql("pers", sql, args).getAll().isEmpty()); + } + finally { + distributedJoins = false; + } } /** * Execute long-running sql with a check for errors. */ private void sqlCheckLongRunning() { - sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0); + if (lazy && withMergeTable) { + String select = "select o.name n1, p.name n2 from Person p, \"org\".Organization o" + + " where p.orgId = o._key and o._key=1 and o._key < sleep_func(?)" + + " union select o.name n1, p.name n2 from Person p, \"org\".Organization o" + + " where p.orgId = o._key and o._key=2"; + + sqlCheckLongRunningLazyWithMergeTable(select, 2000); + } + else if (lazy && !withMergeTable) + sqlCheckLongRunningLazy("SELECT * FROM test WHERE _key < sleep_func(?)", 2000); + else + sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS T2 where T0.id > ?", 0); } /** + * @param cacheName Cache name. * @param sql SQL query. * @param args Query parameters. * @return Results cursor. */ - private FieldsQueryCursor<List<?>> sql(String sql, Object... args) { - return grid().context().query().querySqlFields(new SqlFieldsQuery(sql) + private FieldsQueryCursor<List<?>> sql(String cacheName, String sql, Object... args) { + return ignite.cache(cacheName).query(new SqlFieldsQuery(sql) .setTimeout(10, TimeUnit.SECONDS) .setLocal(local) .setLazy(lazy) - .setSchema("TEST") - .setArgs(args), false); + .setPageSize(pageSize) + .setDistributedJoins(distributedJoins) + .setArgs(args)); + } + + /** */ + public void checkLazyWithExternalWait() { + pageSize = 1; + + LogListener lsnr = LogListener + .matches(LONG_QUERY_EXEC_MSG) + .build(); + + testLog().registerListener(lsnr); + + try { + Iterator<List<?>> it = sql("test", "select * from test").iterator(); + + it.next(); + + long sleepStartTs = U.currentTimeMillis(); + + while (U.currentTimeMillis() - sleepStartTs <= EXT_WAIT_TIME) + doSleep(100L); + + it.next(); + + H2QueryInfo qry = (H2QueryInfo)heavyQueriesTracker().getQueries().iterator().next(); + + assertTrue(qry.extWait() >= EXT_WAIT_TIME); + + assertFalse(lsnr.check()); + } + finally { + pageSize = DEFAULT_PAGE_SIZE; + } } /** @@ -287,4 +472,38 @@ public class LongRunningQueryTest extends AbstractIndexingCommonTest { private HeavyQueriesTracker heavyQueriesTracker() { return ((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker(); } + + /** */ + private static class Person { + /** */ + @QuerySqlField(index = true) + int orgId; + + /** */ + @QuerySqlField(index = true) + String name; + + /** + * @param orgId Organization ID. + * @param name Name. + */ + public Person(int orgId, String name) { + this.orgId = orgId; + this.name = name; + } + } + + /** */ + private static class Organization { + /** */ + @QuerySqlField + String name; + + /** + * @param name Organization name. + */ + public Organization(String name) { + this.name = name; + } + } }