This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 77723f0d95c IGNITE-19588 SQL Calcite: Add long-running queries
warnings - Fixes #10769.
77723f0d95c is described below
commit 77723f0d95c60a780680b2b01ab92c9aff576759
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Tue Jun 13 18:19:32 2023 +0500
IGNITE-19588 SQL Calcite: Add long-running queries warnings - Fixes #10769.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/CalciteQueryProcessor.java | 40 ++---
.../internal/processors/query/calcite/Query.java | 32 ++--
.../processors/query/calcite/QueryRegistry.java | 11 +-
.../query/calcite/QueryRegistryImpl.java | 27 ++--
.../processors/query/calcite}/QueryState.java | 2 +-
.../processors/query/calcite/RootQuery.java | 67 ++++++--
.../query/calcite/exec/ExchangeService.java | 5 +
.../query/calcite/exec/ExchangeServiceImpl.java | 16 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 33 ++--
.../query/calcite/exec/rel/RootNode.java | 62 +++++++-
.../calcite/util/ConvertingClosableIterator.java | 10 +-
.../processors/query/calcite/CancelTest.java | 6 +-
.../exec/rel/TimeCalculationExecutionTest.java | 170 +++++++++++++++++++++
.../KillQueryCommandDdlIntegrationTest.java | 2 +-
.../integration/RunningQueriesIntegrationTest.java | 25 ++-
.../integration/SqlDiagnosticIntegrationTest.java | 106 ++++++++++++-
.../ignite/testsuites/ExecutionTestSuite.java | 2 +
.../thin/JdbcThinPartitionAwarenessSelfTest.java | 2 +-
...ThinPartitionAwarenessTransactionsSelfTest.java | 2 +-
.../internal/managers/IgniteMBeansManager.java | 8 +-
.../processors/bulkload/BulkLoadProcessor.java | 4 +-
.../processors/query/GridQueryIndexing.java | 10 +-
.../processors/query/GridQueryProcessor.java | 4 +-
.../internal/processors/query/NoOpQueryEngine.java | 12 --
.../internal/processors/query/QueryEngine.java | 8 -
.../query/{ => running}/GridRunningQueryInfo.java | 13 +-
.../query/running/HeavyQueriesTracker.java} | 108 +++++++++++--
.../query/{ => running}/QueryHistory.java | 2 +-
.../query/{ => running}/QueryHistoryKey.java | 2 +-
.../{ => running}/QueryHistoryMetricsValue.java | 2 +-
.../query/{ => running}/QueryHistoryTracker.java | 2 +-
.../query/{ => running}/QueryRunningFuture.java | 2 +-
.../query/{ => running}/RunningQueryManager.java | 28 +++-
.../processors/query/running}/SqlQueryMXBean.java | 2 +-
.../query/running}/SqlQueryMXBeanImpl.java | 23 ++-
.../TrackableQuery.java} | 19 +--
.../spi/systemview/view/SqlQueryHistoryView.java | 2 +-
.../ignite/spi/systemview/view/SqlQueryView.java | 2 +-
.../main/resources/META-INF/classnames.properties | 3 +-
.../processors/query/DummyQueryIndexing.java | 7 +-
.../query/h2/H2QueryFetchSizeInterceptor.java | 92 -----------
.../internal/processors/query/h2/H2QueryInfo.java | 22 ++-
.../processors/query/h2/H2ResultSetIterator.java | 13 +-
.../processors/query/h2/IgniteH2Indexing.java | 43 ++----
.../processors/query/h2/RegisteredQueryCursor.java | 5 +-
.../query/h2/twostep/MapQueryResult.java | 10 +-
.../query/h2/twostep/msg/GridH2QueryRequest.java | 3 +-
.../metric/SqlStatisticsUserQueriesFastTest.java | 2 +-
.../metric/SqlStatisticsUserQueriesLongTest.java | 2 +-
.../internal/metric/UserQueriesTestBase.java | 4 +-
.../cache/CacheSqlQueryValueCopySelfTest.java | 2 +-
.../IgniteSqlSkipReducerOnUpdateDmlSelfTest.java | 1 +
.../query/KillQueryErrorOnCancelTest.java | 1 +
.../query/KillQueryOnClientDisconnectTest.java | 1 +
.../internal/processors/query/KillQueryTest.java | 1 +
.../processors/query/LongRunningQueryTest.java | 22 +--
.../processors/query/RunningQueriesTest.java | 1 +
.../processors/query/SqlQueryHistorySelfTest.java | 1 +
.../systemview/SystemViewSecurityTest.java | 2 +-
59 files changed, 728 insertions(+), 383 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 0579a891dae..c52c8f4180e 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -61,7 +61,6 @@ import
org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.RunningQuery;
import
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl;
@@ -375,14 +374,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
String schemaName,
String sql
) throws IgniteSQLException {
- return parseAndProcessQuery(ctx, (qry, plan) -> {
- try {
- return fieldsMeta(plan, true);
- }
- finally {
- qryReg.unregister(qry.id());
- }
- }, schemaName, sql);
+ return parseAndProcessQuery(ctx, (qry, plan) -> fieldsMeta(plan,
true), schemaName, sql);
}
/** {@inheritDoc} */
@@ -391,14 +383,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
String schemaName,
String sql
) throws IgniteSQLException {
- return parseAndProcessQuery(ctx, (qry, plan) -> {
- try {
- return fieldsMeta(plan, false);
- }
- finally {
- qryReg.unregister(qry.id());
- }
- }, schemaName, sql);
+ return parseAndProcessQuery(ctx, (qry, plan) -> fieldsMeta(plan,
false), schemaName, sql);
}
/** {@inheritDoc} */
@@ -534,7 +519,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
params,
qryCtx,
exchangeSvc,
- (q) -> qryReg.unregister(q.id()),
+ (q, ex) -> qryReg.unregister(q.id(), ex),
log,
queryPlannerTimeout
);
@@ -555,7 +540,14 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
}
try {
- return action.apply(qry);
+ T res = action.apply(qry);
+
+ // Queries without iterator (DDL, EXPLAIN, metadata requests) can
be unregistered right after executing,
+ // queries with iterator (SELECT, DML) must be unregistered only
after fetching all data or on error/cancel.
+ if (qry.iterator() == null)
+ qryReg.unregister(qry.id(), null);
+
+ return res;
}
catch (Throwable e) {
boolean isCanceled = qry.isCancelled();
@@ -563,7 +555,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
if (qrys != null)
qrys.forEach(RootQuery::cancel);
- qryReg.unregister(qry.id());
+ qryReg.unregister(qry.id(), e);
if (isCanceled)
throw new IgniteSQLException("The query was cancelled while
planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
@@ -613,13 +605,13 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
}
}
- /** {@inheritDoc} */
- @Override public RunningQuery runningQuery(UUID id) {
+ /** */
+ public Query<?> runningQuery(UUID id) {
return qryReg.query(id);
}
- /** {@inheritDoc} */
- @Override public Collection<? extends RunningQuery> runningQueries() {
+ /** */
+ public Collection<? extends Query<?>> runningQueries() {
return qryReg.runningQueries();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
index 29f2348d551..9d42e051ac6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
@@ -25,23 +25,22 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.QueryState;
-import org.apache.ignite.internal.processors.query.RunningQuery;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
/** */
-public class Query<RowT> implements RunningQuery {
+public class Query<RowT> {
/** */
private final UUID initNodeId;
@@ -58,7 +57,7 @@ public class Query<RowT> implements RunningQuery {
protected final GridQueryCancel cancel;
/** */
- protected final Consumer<Query<RowT>> unregister;
+ protected final BiConsumer<Query<RowT>, Throwable> unregister;
/** */
protected volatile QueryState state = QueryState.INITED;
@@ -87,7 +86,7 @@ public class Query<RowT> implements RunningQuery {
UUID initNodeId,
GridQueryCancel cancel,
ExchangeService exch,
- Consumer<Query<RowT>> unregister,
+ BiConsumer<Query<RowT>, Throwable> unregister,
IgniteLogger log,
int totalFragmentsCnt
) {
@@ -103,13 +102,13 @@ public class Query<RowT> implements RunningQuery {
this.totalFragmentsCnt = totalFragmentsCnt;
}
- /** {@inheritDoc} */
- @Override public UUID id() {
+ /** Query ID. */
+ public UUID id() {
return id;
}
- /** {@inheritDoc} */
- @Override public QueryState state() {
+ /** Query state. */
+ public QueryState state() {
return state;
}
@@ -119,7 +118,7 @@ public class Query<RowT> implements RunningQuery {
}
/** */
- protected void tryClose() {
+ protected void tryClose(@Nullable Throwable failure) {
List<RunningFragment<RowT>> fragments = new
ArrayList<>(this.fragments);
AtomicInteger cntDown = new AtomicInteger(fragments.size());
@@ -130,8 +129,7 @@ public class Query<RowT> implements RunningQuery {
frag.context().cancel();
if (cntDown.decrementAndGet() == 0)
- unregister.accept(this);
-
+ unregister.accept(this, failure);
}, frag.root()::onError);
}
@@ -141,8 +139,8 @@ public class Query<RowT> implements RunningQuery {
}
}
- /** {@inheritDoc} */
- @Override public void cancel() {
+ /** Cancel query. */
+ public void cancel() {
synchronized (mux) {
if (state == QueryState.CLOSED)
return;
@@ -169,7 +167,7 @@ public class Query<RowT> implements RunningQuery {
for (RunningFragment<RowT> frag : fragments)
frag.context().execute(() -> frag.root().onError(new
ExecutionCancelledException()), frag.root()::onError);
- tryClose();
+ tryClose(new ExecutionCancelledException());
}
/** */
@@ -238,7 +236,7 @@ public class Query<RowT> implements RunningQuery {
}
if (state0 == QueryState.EXECUTING)
- tryClose();
+ tryClose(null);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
index f3a411d2710..fbe990db9be 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistry.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.query.calcite;
import java.util.Collection;
import java.util.UUID;
-import org.apache.ignite.internal.processors.query.RunningQuery;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.jetbrains.annotations.Nullable;
/**
* Registry of the running queries.
@@ -32,7 +32,7 @@ public interface QueryRegistry extends Service {
* @param qry Query to register.
* @return Registered query.
*/
- RunningQuery register(RunningQuery qry);
+ Query<?> register(Query<?> qry);
/**
* Lookup query by identifier.
@@ -40,15 +40,16 @@ public interface QueryRegistry extends Service {
* @param id Query identified.
* @return Registered query or {@code null} if the query with specified
identifier isn't found.
*/
- RunningQuery query(UUID id);
+ Query<?> query(UUID id);
/**
* Unregister query by identifier.
*
* @param id Query identifier.
+ * @param failReason Fail reason.
*/
- void unregister(UUID id);
+ void unregister(UUID id, @Nullable Throwable failReason);
/** */
- Collection<? extends RunningQuery> runningQueries();
+ Collection<? extends Query<?>> runningQueries();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index d023a258e1a..7fe09c6bc4a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -26,17 +26,17 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.RunningQuery;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
import
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
/**
* Registry of the running queries.
*/
public class QueryRegistryImpl extends AbstractService implements
QueryRegistry {
/** */
- private final ConcurrentMap<UUID, RunningQuery> runningQrys = new
ConcurrentHashMap<>();
+ private final ConcurrentMap<UUID, Query<?>> runningQrys = new
ConcurrentHashMap<>();
/** */
protected final GridKernalContext kctx;
@@ -49,7 +49,7 @@ public class QueryRegistryImpl extends AbstractService
implements QueryRegistry
}
/** {@inheritDoc} */
- @Override public RunningQuery register(RunningQuery qry) {
+ @Override public Query<?> register(Query<?> qry) {
return runningQrys.computeIfAbsent(qry.id(), k -> {
if (!(qry instanceof RootQuery))
return qry;
@@ -67,24 +67,29 @@ public class QueryRegistryImpl extends AbstractService
implements QueryRegistry
rootQry.localQueryId(locId);
+ qryMgr.heavyQueriesTracker().startTracking(rootQry);
+
return qry;
});
}
/** {@inheritDoc} */
- @Override public RunningQuery query(UUID id) {
+ @Override public Query<?> query(UUID id) {
return runningQrys.get(id);
}
/** {@inheritDoc} */
- @Override public void unregister(UUID id) {
- RunningQuery val = runningQrys.remove(id);
- if (val instanceof RootQuery<?>)
-
kctx.query().runningQueryManager().unregister(((RootQuery<?>)val).localQueryId(),
null);
+ @Override public void unregister(UUID id, @Nullable Throwable failReason) {
+ Query<?> val = runningQrys.remove(id);
+ if (val instanceof RootQuery<?>) {
+ RootQuery<?> qry = (RootQuery<?>)val;
+ kctx.query().runningQueryManager().unregister(qry.localQueryId(),
failReason);
+
kctx.query().runningQueryManager().heavyQueriesTracker().stopTracking(qry,
failReason);
+ }
}
/** {@inheritDoc} */
- @Override public Collection<? extends RunningQuery> runningQueries() {
+ @Override public Collection<? extends Query<?>> runningQueries() {
return runningQrys.values();
}
@@ -95,7 +100,7 @@ public class QueryRegistryImpl extends AbstractService
implements QueryRegistry
}
/** */
- private static GridQueryCancel createCancelToken(RunningQuery qry) {
+ private static GridQueryCancel createCancelToken(Query<?> qry) {
GridQueryCancel token = new GridQueryCancel();
try {
token.add(qry::cancel);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryState.java
similarity index 94%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
rename to
modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryState.java
index a6cd8cb7a15..95a04564559 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryState.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryState.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.calcite;
/** */
public enum QueryState {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index 1b81f637309..c1f5b2d16df 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -25,7 +25,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.calcite.plan.Context;
import org.apache.calcite.schema.SchemaPlus;
@@ -40,8 +40,9 @@ import
org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
-import org.apache.ignite.internal.processors.query.QueryState;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
@@ -50,8 +51,11 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.Fragment;
import
org.apache.ignite.internal.processors.query.calcite.prepare.MultiStepPlan;
import
org.apache.ignite.internal.processors.query.calcite.prepare.PlanningContext;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.processors.query.running.TrackableQuery;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
@@ -60,7 +64,7 @@ import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryPr
* It contains the information about query state, contexts, remote fragments;
* It provides 'cancel' functionality for running query like a base query
class.
*/
-public class RootQuery<RowT> extends Query<RowT> {
+public class RootQuery<RowT> extends Query<RowT> implements TrackableQuery {
/** SQL query. */
private final String sql;
@@ -88,6 +92,12 @@ public class RootQuery<RowT> extends Query<RowT> {
/** */
private volatile long locQryId;
+ /** Query start timestamp (millis). */
+ private final long startTs;
+
+ /** Planning time (millys). */
+ private long planningTime;
+
/** */
public RootQuery(
String sql,
@@ -95,13 +105,13 @@ public class RootQuery<RowT> extends Query<RowT> {
Object[] params,
QueryContext qryCtx,
ExchangeService exch,
- Consumer<Query<RowT>> unregister,
+ BiConsumer<Query<RowT>, Throwable> unregister,
IgniteLogger log,
long plannerTimeout
) {
super(
UUID.randomUUID(),
- null,
+ exch.localNodeId(),
qryCtx != null ? qryCtx.unwrap(GridQueryCancel.class) : null,
exch,
unregister,
@@ -112,6 +122,8 @@ public class RootQuery<RowT> extends Query<RowT> {
this.sql = sql;
this.params = params;
+ startTs = U.currentTimeMillis();
+
remoteFragments = new HashMap<>();
waiting = new HashSet<>();
@@ -185,6 +197,8 @@ public class RootQuery<RowT> extends Query<RowT> {
);
}
+ planningTime = U.currentTimeMillis() - startTs;
+
RootNode<RowT> rootNode = new RootNode<>(ctx,
plan.fieldsMetadata().rowType(), this::tryClose);
rootNode.register(root);
@@ -218,7 +232,7 @@ public class RootQuery<RowT> extends Query<RowT> {
* Can be called multiple times after receive each error
* at {@link #onResponse(RemoteFragmentKey, Throwable)}.
*/
- @Override protected void tryClose() {
+ @Override protected void tryClose(@Nullable Throwable failure) {
QueryState state0 = null;
synchronized (mux) {
@@ -264,7 +278,7 @@ public class RootQuery<RowT> extends Query<RowT> {
log.warning("An exception occures during the query
cancel", wrpEx);
}
finally {
- super.tryClose();
+ super.tryClose(failure);
}
}
}
@@ -273,7 +287,7 @@ public class RootQuery<RowT> extends Query<RowT> {
@Override public void cancel() {
cancel.cancel();
- tryClose();
+ tryClose(new ExecutionCancelledException());
}
/** */
@@ -364,14 +378,14 @@ public class RootQuery<RowT> extends Query<RowT> {
if (error != null)
onError(error);
else if (state == QueryState.CLOSING)
- tryClose();
+ tryClose(null);
}
/** */
public void onError(Throwable error) {
root.onError(error);
- tryClose();
+ tryClose(error);
}
/** {@inheritDoc} */
@@ -398,6 +412,39 @@ public class RootQuery<RowT> extends Query<RowT> {
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id().hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String queryInfo(@Nullable String additionalInfo) {
+ StringBuilder msgSb = new StringBuilder();
+
+ msgSb.append(" [queryId=").append(id());
+ msgSb.append(",
globalQueryId=").append(QueryUtils.globalQueryId(initiatorNodeId(),
localQueryId()));
+
+ if (additionalInfo != null)
+ msgSb.append(", ").append(additionalInfo);
+
+ msgSb.append(", planningTime=").append(root == null ?
U.currentTimeMillis() - startTs : planningTime).append("ms")
+ .append(", execTime=").append(root == null ? 0 :
root.execTime()).append("ms")
+ .append(", idleTime=").append(root == null ? 0 :
root.idleTime()).append("ms")
+ .append(", type=CALCITE")
+ .append(", state=").append(state)
+ .append(", schema=").append(ctx.schemaName())
+ .append(", sql='").append(sql);
+
+ msgSb.append(']');
+
+ return msgSb.toString();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long time() {
+ return root == null ? U.currentTimeMillis() - startTs : planningTime +
root.execTime();
+ }
+
/** */
@Override public String toString() {
return S.toString(RootQuery.class, this);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
index 5c618d20dbe..9b893509b90 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeService.java
@@ -90,4 +90,9 @@ public interface ExchangeService extends Service {
* Callback after the last batch of the query fragment from the node is
processed.
*/
void onInboundExchangeFinished(UUID nodeId, UUID qryId, long exchangeId);
+
+ /**
+ * Local node ID.
+ */
+ UUID localNodeId();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 9f260f1bb45..bb99f549093 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.query.RunningQuery;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
@@ -128,7 +127,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
messageService().send(nodeId, new QueryBatchMessage(qryId, fragmentId,
exchangeId, batchId, last, Commons.cast(rows)));
if (batchId == 0) {
- Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+ Query<?> qry = qryRegistry.query(qryId);
if (qry != null)
qry.onOutboundExchangeStarted(nodeId, exchangeId);
@@ -184,7 +183,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** {@inheritDoc} */
@Override public void onOutboundExchangeFinished(UUID qryId, long
exchangeId) {
- Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+ Query<?> qry = qryRegistry.query(qryId);
if (qry != null)
qry.onOutboundExchangeFinished(exchangeId);
@@ -192,12 +191,17 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** {@inheritDoc} */
@Override public void onInboundExchangeFinished(UUID nodeId, UUID qryId,
long exchangeId) {
- Query<?> qry = (Query<?>)qryRegistry.query(qryId);
+ Query<?> qry = qryRegistry.query(qryId);
if (qry != null)
qry.onInboundExchangeFinished(nodeId, exchangeId);
}
+ /** {@inheritDoc} */
+ @Override public UUID localNodeId() {
+ return locaNodeId;
+ }
+
/** */
protected void onMessage(UUID nodeId, InboxCloseMessage msg) {
Collection<Inbox<?>> inboxes =
mailboxRegistry().inboxes(msg.queryId(), msg.fragmentId(), msg.exchangeId());
@@ -217,7 +221,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** */
protected void onMessage(UUID nodeId, QueryCloseMessage msg) {
- RunningQuery qry = qryRegistry.query(msg.queryId());
+ Query<?> qry = qryRegistry.query(msg.queryId());
if (qry != null)
qry.cancel();
@@ -270,7 +274,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
if (inbox != null) {
try {
if (msg.batchId() == 0) {
- Query<?> qry = (Query<?>)qryRegistry.query(msg.queryId());
+ Query<?> qry = qryRegistry.query(msg.queryId());
if (qry != null)
qry.onInboundExchangeStarted(nodeId, msg.exchangeId());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 0cdccd6bfa4..4d31c32ef66 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -52,11 +52,10 @@ import
org.apache.ignite.internal.processors.failure.FailureProcessor;
import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryProperties;
-import org.apache.ignite.internal.processors.query.QueryState;
-import org.apache.ignite.internal.processors.query.RunningQuery;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
+import org.apache.ignite.internal.processors.query.calcite.QueryState;
import org.apache.ignite.internal.processors.query.calcite.RootQuery;
import org.apache.ignite.internal.processors.query.calcite.RunningFragment;
import
org.apache.ignite.internal.processors.query.calcite.exec.ddl.DdlCommandHandler;
@@ -98,6 +97,7 @@ import
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import
org.apache.ignite.internal.processors.query.calcite.util.ConvertingClosableIterator;
import
org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
+import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
@@ -524,9 +524,6 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
throw new IgniteSQLException("Failed to execute DDL statement
[stmt=" + qry.sql() +
", err=" + e.getMessage() + ']', e);
}
- finally {
- qryReg.unregister(qry.id());
- }
if (plan.command() instanceof CreateTableCommand
&& ((CreateTableCommand)plan.command()).insertStatement() != null)
{
@@ -656,7 +653,10 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
Function<Object, Object> fieldConverter = (qryProps == null ||
qryProps.keepBinary()) ? null :
o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false,
true, null);
- Function<List<Object>, List<Object>> rowConverter = null;
+ HeavyQueriesTracker.ResultSetChecker resultSetChecker =
ctx.query().runningQueryManager()
+ .heavyQueriesTracker().resultSetChecker(qry);
+
+ Function<List<Object>, List<Object>> rowConverter;
// Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return
result to cursor.
if (qryProps != null && qryProps.cacheName() != null &&
evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
@@ -682,12 +682,21 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
null,
row));
+ resultSetChecker.checkOnFetchNext();
+
+ return row;
+ };
+ }
+ else {
+ rowConverter = row -> {
+ resultSetChecker.checkOnFetchNext();
+
return row;
};
}
Iterator<List<?>> it = new
ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
- fieldConverter, rowConverter);
+ fieldConverter, rowConverter, resultSetChecker::checkOnClose);
return new ListFieldsQueryCursor<>(plan, it, ectx);
}
@@ -697,8 +706,6 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
QueryCursorImpl<List<?>> cur = new
QueryCursorImpl<>(singletonList(singletonList(plan.plan())));
cur.fieldsMeta(plan.fieldsMeta().queryFieldsMetadata(Commons.typeFactory()));
- qryReg.unregister(qry.id());
-
return cur;
}
@@ -742,7 +749,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
nodeId,
null,
exchangeSvc,
- (q) -> qryReg.unregister(q.id()),
+ (q, ex) -> qryReg.unregister(q.id(), ex),
log,
msg.totalFragmentsCount()
)
@@ -806,7 +813,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
private void onMessage(UUID nodeId, QueryStartResponse msg) {
assert nodeId != null && msg != null;
- RunningQuery qry = qryReg.query(msg.queryId());
+ Query<?> qry = qryReg.query(msg.queryId());
if (qry != null) {
assert qry instanceof RootQuery : "Unexpected query object: " +
qry;
@@ -819,7 +826,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
private void onMessage(UUID nodeId, ErrorMessage msg) {
assert nodeId != null && msg != null;
- RunningQuery qry = qryReg.query(msg.queryId());
+ Query<?> qry = qryReg.query(msg.queryId());
if (qry != null && qry.state() != QueryState.CLOSED) {
assert qry instanceof RootQuery : "Unexpected query object: " +
qry;
@@ -841,7 +848,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
/** */
private void onNodeLeft(UUID nodeId) {
qryReg.runningQueries()
- .forEach((qry) -> ((Query<Row>)qry).onNodeLeft(nodeId));
+ .forEach((qry) -> qry.onNodeLeft(nodeId));
}
/** */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index dbc1a81fe4b..0ee802873bb 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -22,9 +22,11 @@ import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.calcite.rel.type.RelDataType;
@@ -49,7 +51,7 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
private final Condition cond = lock.newCondition();
/** */
- private final Runnable onClose;
+ private final Consumer<Throwable> onClose;
/** */
private final AtomicReference<Throwable> ex = new AtomicReference<>();
@@ -66,6 +68,15 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
/** */
private Deque<Row> outBuff = new ArrayDeque<>(IN_BUFFER_SIZE);
+ /** */
+ private long idleTime;
+
+ /** */
+ private long execTime;
+
+ /** Timestamp of last change between idle and execution. */
+ private long prevTs;
+
/** */
private volatile boolean closed;
@@ -75,18 +86,20 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
public RootNode(ExecutionContext<Row> ctx, RelDataType rowType) {
super(ctx, rowType);
- onClose = this::closeInternal;
+ onClose = t -> closeInternal();
converter = TypeUtils.resultTypeConverter(ctx, rowType);
+ prevTs = System.nanoTime();
}
/**
* @param ctx Execution context.
*/
- public RootNode(ExecutionContext<Row> ctx, RelDataType rowType, Runnable
onClose) {
+ public RootNode(ExecutionContext<Row> ctx, RelDataType rowType,
Consumer<Throwable> onClose) {
super(ctx, rowType);
this.onClose = onClose;
converter = TypeUtils.resultTypeConverter(ctx, rowType);
+ prevTs = System.nanoTime();
}
/** */
@@ -112,7 +125,7 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
lock.unlock();
}
- onClose.run();
+ onClose.accept(ex.get());
}
/** {@inheritDoc} */
@@ -139,6 +152,14 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
if (inBuff.size() == IN_BUFFER_SIZE)
cond.signalAll();
+
+ if (waiting == 0) {
+ long curTs = System.nanoTime();
+
+ execTime += curTs - prevTs;
+
+ prevTs = curTs;
+ }
}
finally {
lock.unlock();
@@ -155,6 +176,9 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
waiting = -1;
+ execTime += System.nanoTime() - prevTs;
+ prevTs = 0;
+
cond.signalAll();
}
finally {
@@ -216,6 +240,30 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
throw new UnsupportedOperationException();
}
+ /** Query execution time (millis). */
+ public long execTime() {
+ lock.lock();
+
+ try {
+ return TimeUnit.NANOSECONDS.toMillis(execTime + (waiting > 0 ?
System.nanoTime() - prevTs : 0));
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** Query idle time (waiting for users action, millis). */
+ public long idleTime() {
+ lock.lock();
+
+ try {
+ return TimeUnit.NANOSECONDS.toMillis(idleTime + (waiting == 0 ?
System.nanoTime() - prevTs : 0));
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
/** */
private void exchangeBuffers() {
assert !F.isEmpty(sources()) && sources().size() == 1;
@@ -236,6 +284,12 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
else if (inBuff.isEmpty() && waiting == 0) {
int req = waiting = IN_BUFFER_SIZE;
context().execute(() -> source().request(req),
this::onError);
+
+ long curTs = System.nanoTime();
+
+ idleTime += curTs - prevTs;
+
+ prevTs = curTs;
}
if (!outBuff.isEmpty() || waiting == -1)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
index 998f83bca0a..3e8311ff186 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -42,17 +42,22 @@ public class ConvertingClosableIterator<Row> implements
Iterator<List<?>>, AutoC
/** */
@Nullable Function<List<Object>, List<Object>> rowConverter;
+ /** */
+ @Nullable Runnable onClose;
+
/** */
public ConvertingClosableIterator(
Iterator<Row> it,
ExecutionContext<Row> ectx,
@Nullable Function<Object, Object> fieldConverter,
- @Nullable Function<List<Object>, List<Object>> rowConverter
+ @Nullable Function<List<Object>, List<Object>> rowConverter,
+ @Nullable Runnable onClose
) {
this.it = it;
rowHnd = ectx.rowHandler();
this.fieldConverter = fieldConverter;
this.rowConverter = rowConverter;
+ this.onClose = onClose;
}
/**
@@ -83,5 +88,8 @@ public class ConvertingClosableIterator<Row> implements
Iterator<List<?>>, AutoC
*/
@Override public void close() throws Exception {
Commons.close(it);
+
+ if (onClose != null)
+ onClose.run();
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
index 7727c53f28e..e427f05f623 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CancelTest.java
@@ -118,7 +118,8 @@ public class CancelTest extends GridCommonAbstractTest {
*/
@Test
public void testNotOriginatorNodeStop() throws Exception {
- QueryEngine engine = Commons.lookupComponent(grid(0).context(),
QueryEngine.class);
+ CalciteQueryProcessor engine =
(CalciteQueryProcessor)Commons.lookupComponent(
+ grid(0).context(), QueryEngine.class);
List<FieldsQueryCursor<List<?>>> cursors =
engine.query(null, "PUBLIC",
@@ -172,7 +173,8 @@ public class CancelTest extends GridCommonAbstractTest {
stopGrid(0);
- QueryEngine engine1 = Commons.lookupComponent(grid(1).context(),
QueryEngine.class);
+ CalciteQueryProcessor engine1 =
(CalciteQueryProcessor)Commons.lookupComponent(
+ grid(1).context(), QueryEngine.class);
Assert.assertTrue(GridTestUtils.waitForCondition(
() -> engine1.runningQueries().isEmpty(), 10_000));
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TimeCalculationExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TimeCalculationExecutionTest.java
new file mode 100644
index 00000000000..91b8a760270
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/TimeCalculationExecutionTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Test execution/idle time calculation.
+ */
+public class TimeCalculationExecutionTest extends AbstractExecutionTest {
+ /** */
+ @Test
+ public void testTime() throws Exception {
+ long sleepTime = 200;
+
+ ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
+ IgniteTypeFactory tf = ctx.getTypeFactory();
+ RelDataType rowType = TypeUtils.createRowType(tf, int.class);
+
+ long ts0 = System.nanoTime();
+
+ RootNode<Object[]> rootNode = new RootNode<>(ctx, rowType);
+ SourceNode srcNode = new SourceNode(ctx, rowType);
+
+ rootNode.register(srcNode);
+
+ // Check time calculation after execution.
+ while (System.nanoTime() < ts0 +
TimeUnit.MILLISECONDS.toNanos(sleepTime)) {
+ assertTrue(rootNode.hasNext());
+ rootNode.next();
+ }
+
+ srcNode.latch = new CountDownLatch(1);
+
+ rootNode.hasNext();
+
+ srcNode.latch.await(1_000L, TimeUnit.MILLISECONDS);
+
+ long execTime0 = rootNode.execTime();
+ long idleTime0 = rootNode.idleTime();
+
+ long ts1 = System.nanoTime();
+
+ assertTrue(execTime0 + idleTime0 <= TimeUnit.NANOSECONDS.toMillis(ts1
- ts0) + 1);
+ assertTrue(execTime0 >= idleTime0);
+
+ // Check time calculation during idle.
+ doSleep(sleepTime);
+
+ long ts2 = System.nanoTime();
+
+ long execTime1 = rootNode.execTime();
+ long idleTime1 = rootNode.idleTime();
+
+ assertEquals(execTime0, execTime1);
+ assertTrue(idleTime1 - idleTime0 >= sleepTime);
+
+ // Check time calculation during execution.
+ while (System.nanoTime() < ts2 +
TimeUnit.MILLISECONDS.toNanos(sleepTime)) {
+ assertTrue(rootNode.hasNext());
+ rootNode.next();
+
+ long execTime2 = rootNode.execTime();
+ long idleTime2 = rootNode.idleTime();
+
+ assertTrue(idleTime2 >= idleTime1);
+ assertTrue(idleTime2 <= idleTime1 +
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ts2) + 1);
+ assertTrue(execTime2 >= execTime1);
+ assertTrue(execTime2 <= execTime1 +
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ts2) + 1);
+ }
+
+ // Check time calculation after stop.
+ srcNode.stop = true;
+ rootNode.hasNext();
+ assertTrue(GridTestUtils.waitForCondition(() -> srcNode.stopped,
1_000L));
+
+ long execTime3 = rootNode.execTime();
+ long idleTime3 = rootNode.idleTime();
+
+ doSleep(sleepTime);
+
+ assertEquals(execTime3, rootNode.execTime());
+ assertEquals(idleTime3, rootNode.idleTime());
+ }
+
+ /** */
+ private static class SourceNode extends AbstractNode<Object[]> {
+ /** */
+ volatile int requested;
+
+ /** */
+ volatile boolean stop;
+
+ /** */
+ volatile boolean stopped;
+
+ /** */
+ volatile CountDownLatch latch;
+
+ /** */
+ public SourceNode(ExecutionContext<Object[]> ctx, RelDataType rowType)
{
+ super(ctx, rowType);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void rewindInternal() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Downstream<Object[]> requestDownstream(int idx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void request(int rowsCnt) {
+ requested = rowsCnt;
+
+ context().execute(this::push, this::onError);
+ }
+
+ /** */
+ public void push() throws Exception {
+ if (stop) {
+ downstream().end();
+ stopped = true;
+ return;
+ }
+
+ int rowsCnt = ThreadLocalRandom.current().nextInt(requested) + 1;
+
+ for (int i = 0; i < rowsCnt; i++)
+ downstream().push(new Object[] {0});
+
+ requested -= rowsCnt;
+
+ if (requested > 0)
+ context().execute(this::push, this::onError);
+ else {
+ if (latch != null)
+ latch.countDown();
+ }
+ }
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
index 05f0fdad2bc..454e93ee7ca 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/KillQueryCommandDdlIntegrationTest.java
@@ -49,7 +49,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static
org.apache.ignite.internal.processors.query.RunningQueryManager.SQL_QRY_VIEW;
+import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_QRY_VIEW;
/**
* Tests `KILL QUERY` command.
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index b927ff6b3d7..bd841718a15 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -41,11 +41,10 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.QueryEngine;
-import org.apache.ignite.internal.processors.query.QueryState;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.RunningQuery;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.Query;
+import org.apache.ignite.internal.processors.query.calcite.QueryState;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl;
@@ -63,8 +62,8 @@ import org.junit.Test;
import static java.util.stream.Collectors.joining;
import static org.apache.ignite.IgniteSystemProperties.getLong;
-import static
org.apache.ignite.internal.processors.query.RunningQueryManager.SQL_QRY_VIEW;
import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_PLANNER_TIMEOUT;
+import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_QRY_VIEW;
/**
*
@@ -101,7 +100,7 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
*/
@Test
public void testCancelAtPlanningPhase() throws IgniteCheckedException {
- QueryEngine engine = queryProcessor(client);
+ CalciteQueryProcessor engine = queryProcessor(client);
int cnt = 9;
for (int i = 0; i < cnt; i++)
@@ -115,11 +114,11 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
Assert.assertTrue(GridTestUtils.waitForCondition(
() -> !engine.runningQueries().isEmpty() || fut.isDone(),
TIMEOUT_IN_MS));
- Collection<? extends RunningQuery> running = engine.runningQueries();
+ Collection<? extends Query<?>> running = engine.runningQueries();
assertEquals("Running: " + running, 1, running.size());
- RunningQuery qry = F.first(running);
+ Query<?> qry = F.first(running);
assertSame(qry, engine.runningQuery(qry.id()));
@@ -195,7 +194,7 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
// Check state on client.
assertEquals(1, cliEngine.runningQueries().size());
- RunningQuery qry = F.first(cliEngine.runningQueries());
+ Query<?> qry = F.first(cliEngine.runningQueries());
assertEquals(QueryState.EXECUTING, qry.state());
qry.cancel();
@@ -221,8 +220,8 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
*/
@Test
public void testCancelByRemoteFragment() throws IgniteCheckedException {
- QueryEngine clientEngine = queryProcessor(client);
- QueryEngine serverEngine = queryProcessor(srv);
+ CalciteQueryProcessor clientEngine = queryProcessor(client);
+ CalciteQueryProcessor serverEngine = queryProcessor(srv);
int cnt = 6;
sql("CREATE TABLE t (id int, val varchar)");
@@ -239,7 +238,7 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
Assert.assertTrue(GridTestUtils.waitForCondition(
() -> {
- Collection<? extends RunningQuery> queries =
clientEngine.runningQueries();
+ Collection<? extends Query<?>> queries =
clientEngine.runningQueries();
return !queries.isEmpty() && F.first(queries).state() ==
QueryState.EXECUTING;
},
@@ -247,8 +246,8 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
Assert.assertTrue(GridTestUtils.waitForCondition(() ->
!serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
- Collection<? extends RunningQuery> running =
serverEngine.runningQueries();
- RunningQuery qry = F.first(running);
+ Collection<? extends Query<?>> running = serverEngine.runningQueries();
+ Query<?> qry = F.first(running);
assertSame(qry, serverEngine.runningQuery(qry.id()));
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index e3a19264bc6..63294238a3b 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -42,11 +42,13 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.RunningQuery;
+import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
@@ -56,15 +58,31 @@ import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.stopCollectStatisticsAndRead;
+import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.BIG_RESULT_SET_MSG;
+import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_ERROR_MSG;
+import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
+import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_FINISHED_MSG;
/**
* Test SQL diagnostic tools.
*/
public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest
{
+ /** */
+ private static final long LONG_QRY_TIMEOUT = 1_000L;
+
+ /** */
+ private static final int BIG_RESULT_SET_THRESHOLD = 10_000;
+
+ /** */
+ private ListeningTestLogger log;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
- .setSqlConfiguration(new
SqlConfiguration().setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration()))
+ .setGridLogger(log)
+ .setSqlConfiguration(new SqlConfiguration()
+ .setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration())
+ .setLongQueryWarningTimeout(LONG_QRY_TIMEOUT))
.setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION,
EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ);
}
@@ -77,6 +95,8 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
@Override protected void beforeTest() throws Exception {
super.beforeTest();
+ log = new ListeningTestLogger(log());
+
startGrids(nodeCount());
client = startClientGrid();
@@ -256,7 +276,7 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
List<List<?>> res = sql(grid(0), "SELECT * FROM
(VALUES('sensitive')) t(v) " +
- "WHERE v = 'sensitive' and waitLatch()");
+ "WHERE v = 'sensitive' and waitLatch(1000)");
assertEquals(1, res.size());
assertEquals(1, res.get(0).size());
@@ -266,7 +286,7 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
try {
QueryRegistry qreg =
queryProcessor(grid(0)).queryRegistry();
assertTrue(GridTestUtils.waitForCondition(() ->
qreg.runningQueries().size() == 1, 1000L));
- RunningQuery qry = F.first(qreg.runningQueries());
+ Query<?> qry = F.first(qreg.runningQueries());
assertFalse(qry.toString().contains("sensitive"));
}
finally {
@@ -300,6 +320,79 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
}
}
+ /** */
+ @Test
+ public void testLongRunningQueries() throws Exception {
+ client.getOrCreateCache(new CacheConfiguration<Integer,
Integer>("func_cache")
+ .setSqlFunctionClasses(FunctionsLibrary.class)
+ .setSqlSchema("PUBLIC")
+ );
+
+ LogListener logLsnr0 =
LogListener.matches(LONG_QUERY_EXEC_MSG).build();
+
+ log.registerListener(logLsnr0);
+
+ FunctionsLibrary.latch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() ->
sql(grid(0), "SELECT waitLatch(10000)"));
+
+ doSleep(LONG_QRY_TIMEOUT * 3);
+
+ assertTrue(logLsnr0.check());
+
+ LogListener logLsnr1 =
LogListener.matches(LONG_QUERY_FINISHED_MSG).build();
+
+ log.registerListener(logLsnr1);
+
+ FunctionsLibrary.latch.countDown();
+
+ fut.get();
+
+ assertTrue(logLsnr1.check(1000L));
+
+ FunctionsLibrary.latch = new CountDownLatch(1);
+
+ fut = GridTestUtils.runAsync(() -> sql(grid(0), "SELECT
waitLatch(2000)"));
+
+ LogListener logLsnr2 =
LogListener.matches(LONG_QUERY_ERROR_MSG).build();
+
+ log.registerListener(logLsnr2);
+
+ doSleep(LONG_QRY_TIMEOUT * 2);
+
+ try {
+ fut.get();
+ }
+ catch (Exception ignore) {
+ // Expected.
+ }
+
+ assertTrue(logLsnr2.check(1000L));
+ }
+
+ /** */
+ @Test
+ public void testBigResultSet() throws Exception {
+ grid(0).context().query().runningQueryManager().heavyQueriesTracker()
+ .setResultSetSizeThreshold(BIG_RESULT_SET_THRESHOLD);
+
+ int rowCnt = BIG_RESULT_SET_THRESHOLD * 5 + 1;
+
+ LogListener logLsnr0 = LogListener.matches(BIG_RESULT_SET_MSG).build();
+ LogListener logLsnr1 = LogListener.matches("fetched=" +
BIG_RESULT_SET_THRESHOLD).build();
+ LogListener logLsnr2 = LogListener.matches("fetched=" +
rowCnt).build();
+
+ log.registerListener(logLsnr0);
+ log.registerListener(logLsnr1);
+ log.registerListener(logLsnr2);
+
+ sql(grid(0), "SELECT * FROM TABLE(SYSTEM_RANGE(1, ?))", rowCnt);
+
+ assertTrue(logLsnr0.check(1000L));
+ assertTrue(logLsnr1.check(1000L));
+ assertTrue(logLsnr2.check(1000L));
+ }
+
/** */
public static class FunctionsLibrary {
/** */
@@ -307,9 +400,10 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
/** */
@QuerySqlFunction
- public static boolean waitLatch() {
+ public static boolean waitLatch(long time) {
try {
- latch.await(1000L, TimeUnit.MILLISECONDS);
+ if (!latch.await(time, TimeUnit.MILLISECONDS))
+ throw new RuntimeException();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
index 0854cf95d4a..ee4547cd4ea 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/ExecutionTestSuite.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.rel.NestedLoopJo
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortAggregateExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.SortedIndexSpoolExecutionTest;
import
org.apache.ignite.internal.processors.query.calcite.exec.rel.TableSpoolExecutionTest;
+import
org.apache.ignite.internal.processors.query.calcite.exec.rel.TimeCalculationExecutionTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -53,6 +54,7 @@ import org.junit.runners.Suite;
IntersectExecutionTest.class,
RuntimeSortedIndexTest.class,
LimitExecutionTest.class,
+ TimeCalculationExecutionTest.class,
})
public class ExecutionTestSuite {
}
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPartitionAwarenessSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPartitionAwarenessSelfTest.java
index df4b2fd55ff..7dfe06128e3 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPartitionAwarenessSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPartitionAwarenessSelfTest.java
@@ -43,7 +43,7 @@ import org.apache.ignite.internal.jdbc.thin.AffinityCache;
import org.apache.ignite.internal.jdbc.thin.JdbcThinPartitionResultDescriptor;
import org.apache.ignite.internal.jdbc.thin.QualifiedSQLQuery;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
-import org.apache.ignite.internal.processors.query.QueryHistory;
+import org.apache.ignite.internal.processors.query.running.QueryHistory;
import org.apache.ignite.internal.sql.optimizer.affinity.PartitionResult;
import org.apache.ignite.internal.util.GridBoundedLinkedHashMap;
import org.apache.ignite.internal.util.typedef.F;
diff --git
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPartitionAwarenessTransactionsSelfTest.java
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPartitionAwarenessTransactionsSelfTest.java
index 4bd20ccbed8..47f1da18f42 100644
---
a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPartitionAwarenessTransactionsSelfTest.java
+++
b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcThinPartitionAwarenessTransactionsSelfTest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.query.NestedTxMode;
-import org.apache.ignite.internal.processors.query.QueryHistory;
+import org.apache.ignite.internal.processors.query.running.QueryHistory;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
index f204c0907b6..34f99dc1d78 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/IgniteMBeansManager.java
@@ -41,6 +41,8 @@ import
org.apache.ignite.internal.processors.cache.warmup.WarmUpMXBeanImpl;
import
org.apache.ignite.internal.processors.cluster.BaselineAutoAdjustMXBeanImpl;
import org.apache.ignite.internal.processors.metric.MetricsMxBeanImpl;
import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsMBeanImpl;
+import org.apache.ignite.internal.processors.query.running.SqlQueryMXBean;
+import org.apache.ignite.internal.processors.query.running.SqlQueryMXBeanImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.worker.FailureHandlingMxBeanImpl;
import org.apache.ignite.internal.worker.WorkersControlMXBeanImpl;
@@ -165,8 +167,10 @@ public class IgniteMBeansManager {
registerMBean("Kernal", blockOpCtrlMXBean.getClass().getSimpleName(),
blockOpCtrlMXBean,
FailureHandlingMxBean.class);
- if (ctx.query().indexingEnabled())
- ctx.query().getIndexing().registerMxBeans(this);
+ if (ctx.query().moduleEnabled()) {
+ SqlQueryMXBean sqlQryMXBean = new SqlQueryMXBeanImpl(ctx);
+ registerMBean("SQL Query",
sqlQryMXBean.getClass().getSimpleName(), sqlQryMXBean, SqlQueryMXBean.class);
+ }
PerformanceStatisticsMBeanImpl performanceStatMbean = new
PerformanceStatisticsMBeanImpl(ctx);
registerMBean("PerformanceStatistics",
performanceStatMbean.getClass().getSimpleName(), performanceStatMbean,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
index af863ea8200..bdb800f0674 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/bulkload/BulkLoadProcessor.java
@@ -21,8 +21,8 @@ import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteIllegalStateException;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.NoopSpan;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 273a3d73289..f0e3c20db31 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -25,13 +25,13 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -267,12 +267,4 @@ public interface GridQueryIndexing {
* @param sql sql statement.
*/
public boolean isStreamableInsertStatement(String schemaName,
SqlFieldsQuery sql) throws SQLException;
-
- /**
- * Register SQL JMX beans.
- *
- * @param mbMgr Ignite MXBean manager.
- * @throws IgniteCheckedException On bean registration error.
- */
- public void registerMxBeans(IgniteMBeansManager mbMgr) throws
IgniteCheckedException;
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index d6a7b7e113f..7d5e3e244dd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -101,6 +101,8 @@ import
org.apache.ignite.internal.processors.platform.PlatformProcessor;
import
org.apache.ignite.internal.processors.query.aware.IndexBuildStatusStorage;
import
org.apache.ignite.internal.processors.query.aware.IndexRebuildFutureStorage;
import
org.apache.ignite.internal.processors.query.property.QueryBinaryProperty;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import
org.apache.ignite.internal.processors.query.schema.IndexRebuildCancelToken;
import
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
import
org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
@@ -3376,7 +3378,7 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
* @return Collection of long running queries.
*/
public Collection<GridRunningQueryInfo> runningQueries(long duration) {
- return runningQryMgr.longRunningQueries(duration);
+ return runningQryMgr.runningQueries(duration);
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
index 51ea2abb275..62dbdeb3d4f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.processors.query;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.UUID;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -72,14 +70,4 @@ public class NoOpQueryEngine extends GridProcessorAdapter
implements QueryEngine
) throws IgniteSQLException {
return Collections.emptyList();
}
-
- /** {@inheritDoc} */
- @Override public Collection<? extends RunningQuery> runningQueries() {
- return Collections.emptyList();
- }
-
- /** {@inheritDoc} */
- @Override public RunningQuery runningQuery(UUID id) {
- return null;
- }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
index d6ffeef4507..faf2f78ad6b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
@@ -17,9 +17,7 @@
package org.apache.ignite.internal.processors.query;
-import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.internal.processors.GridProcessor;
import org.jetbrains.annotations.Nullable;
@@ -83,10 +81,4 @@ public interface QueryEngine extends GridProcessor {
String qry,
List<Object[]> batchedParams
) throws IgniteSQLException;
-
- /** */
- Collection<? extends RunningQuery> runningQueries();
-
- /** */
- RunningQuery runningQuery(UUID id);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/GridRunningQueryInfo.java
similarity index 94%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/GridRunningQueryInfo.java
index 65686bcf213..9f001585a34 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/GridRunningQueryInfo.java
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.running;
import java.util.UUID;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -177,15 +179,6 @@ public class GridRunningQueryInfo {
return startTimeNanos;
}
- /**
- * @param curTime Current time.
- * @param duration Duration of long query.
- * @return {@code true} if this query should be considered as long running
query.
- */
- public boolean longQuery(long curTime, long duration) {
- return curTime - startTime > duration;
- }
-
/**
* Cancel query.
*/
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/LongRunningQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
similarity index 68%
rename from
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/LongRunningQueryManager.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
index 3500597cf50..17cfdde0a91 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/LongRunningQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
@@ -15,21 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2;
+package org.apache.ignite.internal.processors.query.running;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
/**
- * Long running query manager.
+ * Class to track heavy queries (long-running or producing a big result-set).
*/
-public final class LongRunningQueryManager {
+public final class HeavyQueriesTracker {
/** Check period in ms. */
private static final long CHECK_PERIOD = 1_000;
@@ -42,8 +44,17 @@ public final class LongRunningQueryManager {
/** Message about the long execution of the query. */
public static final String LONG_QUERY_EXEC_MSG = "Query execution is too
long";
+ /** */
+ public static final String LONG_QUERY_FINISHED_MSG = "Long running query
is finished";
+
+ /** */
+ public static final String LONG_QUERY_ERROR_MSG = "Long running query is
finished with error: ";
+
+ /** */
+ public static final String BIG_RESULT_SET_MSG = "Query produced big result
set.";
+
/** Queries collection. Sorted collection isn't used to reduce 'put' time.
*/
- private final ConcurrentHashMap<H2QueryInfo, TimeoutChecker> qrys = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<TrackableQuery, TimeoutChecker> qrys = new
ConcurrentHashMap<>();
/** Check worker. */
private final GridWorker checkWorker;
@@ -82,8 +93,8 @@ public final class LongRunningQueryManager {
/**
* @param ctx Kernal context.
*/
- public LongRunningQueryManager(GridKernalContext ctx) {
- log = ctx.log(LongRunningQueryManager.class);
+ public HeavyQueriesTracker(GridKernalContext ctx) {
+ log = ctx.log(HeavyQueriesTracker.class);
checkWorker = new GridWorker(ctx.igniteInstanceName(), "long-qry",
log) {
@Override protected void body() throws InterruptedException,
IgniteInterruptedCheckedException {
@@ -115,7 +126,7 @@ public final class LongRunningQueryManager {
/**
* @param qryInfo Query info to register.
*/
- public void registerQuery(H2QueryInfo qryInfo) {
+ public void startTracking(TrackableQuery qryInfo) {
assert qryInfo != null;
final long timeout0 = timeout;
@@ -126,22 +137,38 @@ public final class LongRunningQueryManager {
/**
* @param qryInfo Query info to remove.
+ * @param err Exception if query executed with error.
*/
- public void unregisterQuery(H2QueryInfo qryInfo) {
+ public void stopTracking(TrackableQuery qryInfo, @Nullable Throwable err) {
assert qryInfo != null;
qrys.remove(qryInfo);
+
+ if (qryInfo.time() > timeout) {
+ if (err == null)
+ LT.warn(log, LONG_QUERY_FINISHED_MSG +
qryInfo.queryInfo(null));
+ else
+ LT.warn(log, LONG_QUERY_ERROR_MSG + err.getMessage() +
qryInfo.queryInfo(null));
+ }
+ }
+
+ /**
+ * Creates new result-set checker.
+ * @param qryInfo Query info.
+ */
+ public ResultSetChecker resultSetChecker(TrackableQuery qryInfo) {
+ return new ResultSetChecker(log, qryInfo, rsSizeThreshold,
rsSizeThresholdMult);
}
/**
*
*/
private void checkLongRunning() {
- for (Map.Entry<H2QueryInfo, TimeoutChecker> e : qrys.entrySet()) {
- H2QueryInfo qinfo = e.getKey();
+ for (Map.Entry<TrackableQuery, TimeoutChecker> e : qrys.entrySet()) {
+ TrackableQuery qinfo = e.getKey();
if (e.getValue().checkTimeout(qinfo.time())) {
- qinfo.printLogMessage(log, LONG_QUERY_EXEC_MSG, null);
+ LT.warn(log, LONG_QUERY_EXEC_MSG + qinfo.queryInfo(null));
if (e.getValue().timeoutMult <= 1)
qrys.remove(qinfo);
@@ -235,7 +262,7 @@ public final class LongRunningQueryManager {
private long timeout;
/** */
- private int timeoutMult;
+ private final int timeoutMult;
/**
* @param timeout Initial timeout.
@@ -247,7 +274,6 @@ public final class LongRunningQueryManager {
}
/**
- * @param time Query execution time.
* @return {@code true} if timeout occurred.
*/
public boolean checkTimeout(long time) {
@@ -261,4 +287,60 @@ public final class LongRunningQueryManager {
return false;
}
}
+
+ /**
+ * Holds result-set limit settings for the specified query.
+ */
+ public static class ResultSetChecker {
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Query info. */
+ private final TrackableQuery qryInfo;
+
+ /** Result set size threshold. */
+ private long threshold;
+
+ /** Result set size threshold multiplier. */
+ private final int thresholdMult;
+
+ /** Fetched count of rows. */
+ private long fetchedSize;
+
+ /** Big results flag. */
+ private boolean bigResults;
+
+ /** Ctor. */
+ private ResultSetChecker(IgniteLogger log, TrackableQuery qryInfo,
long threshold, int thresholdMult) {
+ this.log = log;
+ this.qryInfo = qryInfo;
+ this.threshold = threshold;
+ this.thresholdMult = thresholdMult;
+ }
+
+ /**
+ * Print warning message to log when query result size fetch count is
bigger than specified threshold.
+ * Threshold may be recalculated with multiplier.
+ */
+ public void checkOnFetchNext() {
+ ++fetchedSize;
+
+ if (threshold > 0 && fetchedSize >= threshold) {
+ LT.warn(log, BIG_RESULT_SET_MSG + qryInfo.queryInfo("fetched="
+ fetchedSize));
+
+ if (thresholdMult > 1)
+ threshold *= thresholdMult;
+ else
+ threshold = 0;
+
+ bigResults = true;
+ }
+ }
+
+ /** */
+ public void checkOnClose() {
+ if (bigResults)
+ LT.warn(log, BIG_RESULT_SET_MSG + qryInfo.queryInfo("fetched="
+ fetchedSize));
+ }
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistory.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistory.java
similarity index 98%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistory.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistory.java
index 1e5ec3ff03f..6ce73a7335e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistory.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.running;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryKey.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryKey.java
similarity index 97%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryKey.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryKey.java
index 7052ff7c7df..4b0c53473e3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryKey.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryKey.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.running;
import org.apache.ignite.internal.util.typedef.F;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryMetricsValue.java
similarity index 97%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsValue.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryMetricsValue.java
index babac6dcb15..976e1a0930c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryMetricsValue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryMetricsValue.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.running;
/**
* Immutable query metrics.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryTracker.java
similarity index 98%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryTracker.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryTracker.java
index 5c94d34ad88..33b986590a3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryHistoryTracker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryHistoryTracker.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.running;
import java.util.Collections;
import java.util.Map;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryRunningFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryRunningFuture.java
similarity index 94%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryRunningFuture.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryRunningFuture.java
index 29c23242bd3..fead8857885 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryRunningFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/QueryRunningFuture.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.running;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
similarity index 96%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
index 013f0538c6e..f9632fbfa2f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.running;
import java.util.ArrayList;
import java.util.Collection;
@@ -50,6 +50,11 @@ import
org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryFinishedInfo;
+import org.apache.ignite.internal.processors.query.GridQueryStartedInfo;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.QueryUtils;
import
org.apache.ignite.internal.processors.query.messages.GridQueryKillRequest;
import
org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse;
import org.apache.ignite.internal.processors.tracing.Span;
@@ -152,6 +157,9 @@ public class RunningQueryManager {
}
};
+ /** Heavy query tracker. */
+ private final HeavyQueriesTracker heavyQrysTracker;
+
/** */
private final List<Consumer<GridQueryStartedInfo>> qryStartedListeners =
new CopyOnWriteArrayList<>();
@@ -175,6 +183,8 @@ public class RunningQueryManager {
qryHistTracker = new QueryHistoryTracker(histSz);
+ heavyQrysTracker = ctx.query().moduleEnabled() ? new
HeavyQueriesTracker(ctx) : null;
+
ctx.systemView().registerView(SQL_QRY_VIEW, SQL_QRY_VIEW_DESC,
new SqlQueryViewWalker(),
runs.values(),
@@ -426,6 +436,11 @@ public class RunningQueryManager {
return res;
}
+ /** */
+ public HeavyQueriesTracker heavyQueriesTracker() {
+ return heavyQrysTracker;
+ }
+
/**
* @param lsnr Listener.
*/
@@ -475,18 +490,18 @@ public class RunningQueryManager {
}
/**
- * Return long running user queries.
+ * Return running user queries.
*
- * @param duration Duration of long query.
+ * @param duration Duration of query.
* @return Collection of queries which running longer than given duration.
*/
- public Collection<GridRunningQueryInfo> longRunningQueries(long duration) {
+ public Collection<GridRunningQueryInfo> runningQueries(long duration) {
Collection<GridRunningQueryInfo> res = new ArrayList<>();
long curTime = System.currentTimeMillis();
for (GridRunningQueryInfo runningQryInfo : runs.values()) {
- if (runningQryInfo.longQuery(curTime, duration))
+ if (curTime - runningQryInfo.startTime() > duration)
res.add(runningQryInfo);
}
@@ -527,6 +542,9 @@ public class RunningQueryManager {
// No-op.
}
}
+
+ if (heavyQrysTracker != null)
+ heavyQrysTracker.stop();
}
/**
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/mxbean/SqlQueryMXBean.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/SqlQueryMXBean.java
similarity index 98%
rename from
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/mxbean/SqlQueryMXBean.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/SqlQueryMXBean.java
index 852b063adf8..06233e65743 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/mxbean/SqlQueryMXBean.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/SqlQueryMXBean.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2.mxbean;
+package org.apache.ignite.internal.processors.query.running;
import org.apache.ignite.mxbean.MXBeanDescription;
import org.apache.ignite.mxbean.MXBeanParameter;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/mxbean/SqlQueryMXBeanImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/SqlQueryMXBeanImpl.java
similarity index 69%
rename from
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/mxbean/SqlQueryMXBeanImpl.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/SqlQueryMXBeanImpl.java
index 4130579b0bf..a4894dd85b7 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/mxbean/SqlQueryMXBeanImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/SqlQueryMXBeanImpl.java
@@ -15,62 +15,61 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2.mxbean;
+package org.apache.ignite.internal.processors.query.running;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
/**
* QueryMXBean implementation.
*/
public class SqlQueryMXBeanImpl implements SqlQueryMXBean {
/** */
- private final IgniteH2Indexing h2idx;
+ private final HeavyQueriesTracker heavyQrysTracker;
/**
* @param ctx Context.
*/
public SqlQueryMXBeanImpl(GridKernalContext ctx) {
- h2idx = (IgniteH2Indexing)ctx.query().getIndexing();
+ heavyQrysTracker =
ctx.query().runningQueryManager().heavyQueriesTracker();
}
/** {@inheritDoc} */
@Override public long getLongQueryWarningTimeout() {
- return h2idx.longRunningQueries().getTimeout();
+ return heavyQrysTracker.getTimeout();
}
/** {@inheritDoc} */
@Override public void setLongQueryWarningTimeout(long
longQryWarningTimeout) {
- h2idx.longRunningQueries().setTimeout(longQryWarningTimeout);
+ heavyQrysTracker.setTimeout(longQryWarningTimeout);
}
/** {@inheritDoc} */
@Override public int getLongQueryTimeoutMultiplier() {
- return h2idx.longRunningQueries().getTimeoutMultiplier();
+ return heavyQrysTracker.getTimeoutMultiplier();
}
/** {@inheritDoc} */
@Override public void setLongQueryTimeoutMultiplier(int
longQryTimeoutMultiplier) {
-
h2idx.longRunningQueries().setTimeoutMultiplier(longQryTimeoutMultiplier);
+ heavyQrysTracker.setTimeoutMultiplier(longQryTimeoutMultiplier);
}
/** {@inheritDoc} */
@Override public long getResultSetSizeThreshold() {
- return h2idx.longRunningQueries().getResultSetSizeThreshold();
+ return heavyQrysTracker.getResultSetSizeThreshold();
}
/** {@inheritDoc} */
@Override public void setResultSetSizeThreshold(long rsSizeThreshold) {
- h2idx.longRunningQueries().setResultSetSizeThreshold(rsSizeThreshold);
+ heavyQrysTracker.setResultSetSizeThreshold(rsSizeThreshold);
}
/** {@inheritDoc} */
@Override public int getResultSetSizeThresholdMultiplier() {
- return
h2idx.longRunningQueries().getResultSetSizeThresholdMultiplier();
+ return heavyQrysTracker.getResultSetSizeThresholdMultiplier();
}
/** {@inheritDoc} */
@Override public void setResultSetSizeThresholdMultiplier(int
rsSizeThresholdMultiplier) {
-
h2idx.longRunningQueries().setResultSetSizeThresholdMultiplier(rsSizeThresholdMultiplier);
+
heavyQrysTracker.setResultSetSizeThresholdMultiplier(rsSizeThresholdMultiplier);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/TrackableQuery.java
similarity index 69%
rename from
modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/TrackableQuery.java
index 447e92519ba..af5a413a088 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/RunningQuery.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/TrackableQuery.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query;
+package org.apache.ignite.internal.processors.query.running;
-import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
/**
- *
+ * Query, that can be tracked by {@link HeavyQueriesTracker}.
*/
-public interface RunningQuery {
- /** */
- UUID id();
-
- /** */
- QueryState state();
+public interface TrackableQuery {
+ /** Query execution time. */
+ public long time();
- /** */
- void cancel();
+ /** Query info to print to log. */
+ public String queryInfo(@Nullable String additinalInfo);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryHistoryView.java
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryHistoryView.java
index 588e1480182..f60cd79ee0d 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryHistoryView.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryHistoryView.java
@@ -19,7 +19,7 @@ package org.apache.ignite.spi.systemview.view;
import java.util.Date;
import org.apache.ignite.internal.managers.systemview.walker.Order;
-import org.apache.ignite.internal.processors.query.QueryHistory;
+import org.apache.ignite.internal.processors.query.running.QueryHistory;
/**
* SQL query history representation for a {@link SystemView}.
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
index 045efa9171e..df7190ec81f 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/systemview/view/SqlQueryView.java
@@ -20,7 +20,7 @@ package org.apache.ignite.spi.systemview.view;
import java.util.Date;
import java.util.UUID;
import org.apache.ignite.internal.managers.systemview.walker.Order;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index 4189348893d..f3895b26ed4 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1684,8 +1684,7 @@ org.apache.ignite.internal.processors.query.QueryEntityEx
org.apache.ignite.internal.processors.query.QueryField
org.apache.ignite.internal.processors.query.QueryIndexKey
org.apache.ignite.internal.processors.query.QuerySchema
-org.apache.ignite.internal.processors.query.QueryState
-org.apache.ignite.internal.processors.query.RunningQueryManager$1
+org.apache.ignite.internal.processors.query.running.RunningQueryManager$1
org.apache.ignite.internal.processors.query.UpdateSourceIterator
org.apache.ignite.internal.processors.query.aware.IndexBuildStatusHolder$Status
org.apache.ignite.internal.processors.query.aware.IndexRebuildCacheInfo
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
index 37f8f8ff12f..44be692e128 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/query/DummyQueryIndexing.java
@@ -24,13 +24,13 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheContextInfo;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -197,9 +197,4 @@ public class DummyQueryIndexing implements
GridQueryIndexing {
@Override public boolean isStreamableInsertStatement(String schemaName,
SqlFieldsQuery sql) {
return false;
}
-
- /** {@inheritDoc} */
- @Override public void registerMxBeans(IgniteMBeansManager mbMgr) {
-
- }
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
deleted file mode 100644
index 526793b0ee2..00000000000
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryFetchSizeInterceptor.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.h2;
-
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.query.h2.mxbean.SqlQueryMXBean;
-
-/**
- * Print warning message to log when query result size fetch count is bigger
then specified threshold.
- * Threshold may be recalculated with multiplier.
- *
- * @see SqlQueryMXBean
- */
-public class H2QueryFetchSizeInterceptor {
- /** Query info. */
- private final IgniteLogger log;
-
- /** Query info. */
- private final H2QueryInfo qryInfo;
-
- /** Result set size threshold. */
- private long threshold;
-
- /** Result set size threshold multiplier. */
- private final int thresholdMult;
-
- /** Fetched count of rows. */
- private long fetchedSize;
-
- /** Big results flag. */
- private boolean bigResults;
-
- /**
- * @param h2 Indexing.
- * @param qryInfo Query will be print when fetch size will be greater than
threshold.
- * @param log Logger to print warning.
- */
- public H2QueryFetchSizeInterceptor(IgniteH2Indexing h2, H2QueryInfo
qryInfo, IgniteLogger log) {
- assert log != null;
- assert qryInfo != null;
-
- this.log = log;
- this.qryInfo = qryInfo;
-
- threshold = h2.longRunningQueries().getResultSetSizeThreshold();
- thresholdMult =
h2.longRunningQueries().getResultSetSizeThresholdMultiplier();
- }
-
- /**
- *
- */
- public void checkOnFetchNext() {
- ++fetchedSize;
-
- if (threshold > 0 && fetchedSize >= threshold) {
- qryInfo.printLogMessage(log, "Query produced big result set.",
- "fetched=" + fetchedSize);
-
- if (thresholdMult > 1)
- threshold *= thresholdMult;
- else
- threshold = 0;
-
- bigResults = true;
- }
- }
-
- /**
- *
- */
- public void checkOnClose() {
- if (bigResults) {
- qryInfo.printLogMessage(log, "Query produced big result set.",
- "fetched=" + fetchedSize);
- }
- }
-}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
index 09afcbbdc0f..f3c508a8620 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
@@ -20,21 +20,21 @@ package org.apache.ignite.internal.processors.query.h2;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.UUID;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
-import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
+import org.apache.ignite.internal.processors.query.running.TrackableQuery;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.command.Prepared;
import org.h2.engine.Session;
+import org.jetbrains.annotations.Nullable;
/**
* Base H2 query info with commons for MAP, LOCAL, REDUCE queries.
*/
-public class H2QueryInfo {
+public class H2QueryInfo implements TrackableQuery {
/** Type. */
private final QueryType type;
@@ -106,20 +106,16 @@ public class H2QueryInfo {
// No-op.
}
- /**
- * @return Query execution time.
- */
- public long time() {
+ /** {@inheritDoc} */
+ @Override public long time() {
return U.currentTimeMillis() - beginTs;
}
/**
- * @param log Logger.
- * @param msg Log message
* @param additionalInfo Additional query info.
*/
- public void printLogMessage(IgniteLogger log, String msg, String
additionalInfo) {
- StringBuilder msgSb = new StringBuilder(msg);
+ @Override public String queryInfo(@Nullable String additionalInfo) {
+ StringBuilder msgSb = new StringBuilder();
if (queryId == RunningQueryManager.UNDEFINED_QUERY_ID)
msgSb.append(" [globalQueryId=(undefined), node=").append(nodeId);
@@ -142,7 +138,7 @@ public class H2QueryInfo {
msgSb.append(']');
- LT.warn(log, msgSb.toString());
+ return msgSb.toString();
}
/**
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
index 3fba0b17a5c..f9bdd321c9b 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
@@ -30,6 +30,7 @@ import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import
org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
+import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.Tracing;
@@ -104,8 +105,8 @@ public abstract class H2ResultSetIterator<T> extends
GridIteratorAdapter<T> impl
/** Canceled. */
private boolean canceled;
- /** Fetch size interceptor. */
- final H2QueryFetchSizeInterceptor fetchSizeInterceptor;
+ /** Fetch size checker. */
+ final HeavyQueriesTracker.ResultSetChecker resultSetChecker;
/** Tracing processor. */
protected final Tracing tracing;
@@ -161,7 +162,7 @@ public abstract class H2ResultSetIterator<T> extends
GridIteratorAdapter<T> impl
assert h2 != null;
assert qryInfo != null;
- fetchSizeInterceptor = new H2QueryFetchSizeInterceptor(h2, qryInfo,
log);
+ resultSetChecker = h2.heavyQueriesTracker().resultSetChecker(qryInfo);
}
/**
@@ -277,7 +278,7 @@ public abstract class H2ResultSetIterator<T> extends
GridIteratorAdapter<T> impl
if (rowIter != null && rowIter.hasNext()) {
row = rowIter.next();
- fetchSizeInterceptor.checkOnFetchNext();
+ resultSetChecker.checkOnFetchNext();
return true;
}
@@ -291,7 +292,7 @@ public abstract class H2ResultSetIterator<T> extends
GridIteratorAdapter<T> impl
if (rowIter != null && rowIter.hasNext()) {
row = rowIter.next();
- fetchSizeInterceptor.checkOnFetchNext();
+ resultSetChecker.checkOnFetchNext();
return true;
}
@@ -315,7 +316,7 @@ public abstract class H2ResultSetIterator<T> extends
GridIteratorAdapter<T> impl
lockTables();
try {
- fetchSizeInterceptor.checkOnClose();
+ resultSetChecker.checkOnClose();
data.close();
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index f54a223cc6f..bf5e54e1393 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -53,7 +53,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryUtils;
import
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.IgniteMBeansManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -92,7 +91,6 @@ import
org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.SqlClientContext;
import org.apache.ignite.internal.processors.query.UpdateSourceIterator;
import
org.apache.ignite.internal.processors.query.h2.affinity.H2PartitionResolver;
@@ -103,8 +101,6 @@ import
org.apache.ignite.internal.processors.query.h2.dml.DmlUpdateSingleEntryIt
import org.apache.ignite.internal.processors.query.h2.dml.DmlUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
-import org.apache.ignite.internal.processors.query.h2.mxbean.SqlQueryMXBean;
-import
org.apache.ignite.internal.processors.query.h2.mxbean.SqlQueryMXBeanImpl;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContextRegistry;
@@ -120,6 +116,8 @@ import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
import
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
import
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
import
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import
org.apache.ignite.internal.processors.query.schema.AbstractSchemaChangeListener;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
@@ -256,8 +254,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** Schema manager. */
private H2SchemaManager schemaMgr;
- /** H2 Connection manager. */
- private LongRunningQueryManager longRunningQryMgr;
+ /** Heavy queries tracker. */
+ private HeavyQueriesTracker heavyQryTracker;
/** Discovery event listener. */
private GridLocalEventListener discoLsnr;
@@ -837,27 +835,20 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
final H2QueryInfo qryInfo
) throws IgniteCheckedException {
if (qryInfo != null)
- longRunningQryMgr.registerQuery(qryInfo);
+ heavyQryTracker.startTracking(qryInfo);
enableDataPageScan(dataPageScanEnabled);
+ Throwable err = null;
try (
TraceSurroundings ignored = MTC.support(ctx.tracing()
.create(SQL_QRY_EXECUTE, MTC.span())
.addTag(SQL_QRY_TEXT, () -> sql))
) {
- ResultSet rs = executeSqlQuery(conn, stmt, timeoutMillis, cancel);
-
- if (qryInfo != null && qryInfo.time() >
longRunningQryMgr.getTimeout())
- qryInfo.printLogMessage(log, "Long running query is finished",
null);
-
- return rs;
+ return executeSqlQuery(conn, stmt, timeoutMillis, cancel);
}
catch (Throwable e) {
- if (qryInfo != null && qryInfo.time() >
longRunningQryMgr.getTimeout()) {
- qryInfo.printLogMessage(log, "Long running query is finished
with error: "
- + e.getMessage(), null);
- }
+ err = e;
throw e;
}
@@ -865,7 +856,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
CacheDataTree.setDataPageScanEnabled(false);
if (qryInfo != null)
- longRunningQryMgr.unregisterQuery(qryInfo);
+ heavyQryTracker.stopTracking(qryInfo, err);
}
}
@@ -1866,7 +1857,7 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
connMgr = new ConnectionManager(ctx);
- longRunningQryMgr = new LongRunningQueryManager(ctx);
+ heavyQryTracker =
ctx.query().runningQueryManager().heavyQueriesTracker();
parser = new QueryParser(this, connMgr, cmd ->
cmdProc.isCommandSupported(cmd));
@@ -2118,7 +2109,6 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
qryCtxRegistry.clearSharedOnLocalNodeStop();
- longRunningQryMgr.stop();
connMgr.stop();
if (log.isDebugEnabled())
@@ -2785,18 +2775,11 @@ public class IgniteH2Indexing implements
GridQueryIndexing {
}
}
- /** {@inheritDoc} */
- @Override public void registerMxBeans(IgniteMBeansManager mbMgr) throws
IgniteCheckedException {
- SqlQueryMXBean qryMXBean = new SqlQueryMXBeanImpl(ctx);
-
- mbMgr.registerMBean("SQL Query", qryMXBean.getClass().getSimpleName(),
qryMXBean, SqlQueryMXBean.class);
- }
-
/**
- * @return Long running queries manager.
+ * @return Heavy queries tracker.
*/
- public LongRunningQueryManager longRunningQueries() {
- return longRunningQryMgr;
+ public HeavyQueriesTracker heavyQueriesTracker() {
+ return heavyQryTracker;
}
/**
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/RegisteredQueryCursor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/RegisteredQueryCursor.java
index 34fe8d62aeb..11f0eb9a0a6 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/RegisteredQueryCursor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/RegisteredQueryCursor.java
@@ -23,15 +23,16 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.processors.tracing.MTC;
import org.apache.ignite.internal.processors.tracing.MTC.TraceSurroundings;
import org.apache.ignite.internal.processors.tracing.NoopSpan;
import org.apache.ignite.internal.processors.tracing.Span;
import org.apache.ignite.internal.processors.tracing.TraceableIterator;
import org.apache.ignite.internal.processors.tracing.Tracing;
+
import static org.apache.ignite.internal.processors.tracing.SpanTags.ERROR;
import static
org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_CANCEL;
import static
org.apache.ignite.internal.processors.tracing.SpanType.SQL_CURSOR_CLOSE;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index c1106f244b0..5f96cacef51 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -31,13 +31,13 @@ import
org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
-import
org.apache.ignite.internal.processors.query.h2.H2QueryFetchSizeInterceptor;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.MapH2QueryInfo;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import
org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
import org.apache.ignite.internal.processors.query.h2.opt.QueryContext;
+import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.h2.engine.Session;
@@ -248,7 +248,7 @@ class MapQueryResult {
rows.add(res.res.currentRow());
- res.fetchSizeInterceptor.checkOnFetchNext();
+ res.resultSetChecker.checkOnFetchNext();
}
return !res.res.hasNext();
@@ -337,7 +337,7 @@ class MapQueryResult {
private final int rowCnt;
/** */
- private final H2QueryFetchSizeInterceptor fetchSizeInterceptor;
+ private final HeavyQueriesTracker.ResultSetChecker resultSetChecker;
/**
* Constructor.
@@ -357,12 +357,12 @@ class MapQueryResult {
rowCnt = (res instanceof LazyResult) ? -1 : res.getRowCount();
cols = res.getVisibleColumnCount();
- fetchSizeInterceptor = new H2QueryFetchSizeInterceptor(h2,
qryInfo, log);
+ resultSetChecker =
h2.heavyQueriesTracker().resultSetChecker(qryInfo);
}
/** */
void close() {
- fetchSizeInterceptor.checkOnClose();
+ resultSetChecker.checkOnClose();
U.close(rs, log);
}
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 2ffc781a171..f7a64443c02 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -36,8 +36,8 @@ import
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import
org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
import org.apache.ignite.internal.processors.query.h2.QueryTable;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -47,6 +47,7 @@ import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
+
import static
org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
/**
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/metric/SqlStatisticsUserQueriesFastTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/metric/SqlStatisticsUserQueriesFastTest.java
index 41d0559f934..36101662046 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/metric/SqlStatisticsUserQueriesFastTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/metric/SqlStatisticsUserQueriesFastTest.java
@@ -28,7 +28,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.TransactionDuplicateKeyException;
import org.junit.Test;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/metric/SqlStatisticsUserQueriesLongTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/metric/SqlStatisticsUserQueriesLongTest.java
index abe9afefcc7..cc0c61e1b4b 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/metric/SqlStatisticsUserQueriesLongTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/metric/SqlStatisticsUserQueriesLongTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.metric;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.processors.query.RunningQueryManager;
+import org.apache.ignite.internal.processors.query.running.RunningQueryManager;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.After;
import org.junit.Assert;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/metric/UserQueriesTestBase.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/metric/UserQueriesTestBase.java
index 0fce6991c99..b6ab705e396 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/metric/UserQueriesTestBase.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/metric/UserQueriesTestBase.java
@@ -31,7 +31,7 @@ import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.Metric;
@@ -39,7 +39,7 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Assert;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
-import static
org.apache.ignite.internal.processors.query.RunningQueryManager.SQL_USER_QUERIES_REG_NAME;
+import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME;
/**
* Test base for the tests for user metrics. Contains methods that are common
for the scenarios that require and don't
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
index 0ca70499c4b..edc6db63734 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheSqlQueryValueCopySelfTest.java
@@ -33,7 +33,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
index 7440759e3f3..8a483949293 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSkipReducerOnUpdateDmlSelfTest.java
@@ -50,6 +50,7 @@ import
org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import
org.apache.ignite.internal.processors.query.h2.twostep.GridMapQueryExecutor;
import
org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryErrorOnCancelTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryErrorOnCancelTest.java
index d058f16f037..1cf5b62707c 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryErrorOnCancelTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryErrorOnCancelTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import
org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryOnClientDisconnectTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryOnClientDisconnectTest.java
index cab832cbc15..0969f1d6731 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryOnClientDisconnectTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryOnClientDisconnectTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import
org.apache.ignite.internal.processors.query.messages.GridQueryKillResponse;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
index d98b260d2e4..73f67d0b1a9 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/KillQueryTest.java
@@ -77,6 +77,7 @@ import
org.apache.ignite.internal.processors.query.h2.twostep.PartitionReservati
import
org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapResult;
import
org.apache.ignite.internal.processors.query.h2.twostep.ReducePartitionMapper;
import
org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.typedef.F;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
index 66819f44aa8..8ba7223c998 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/LongRunningQueryTest.java
@@ -32,7 +32,7 @@ import
org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
-import org.apache.ignite.internal.processors.query.h2.LongRunningQueryManager;
+import org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
@@ -40,10 +40,10 @@ import org.apache.ignite.testframework.LogListener;
import org.junit.Test;
import static java.lang.Thread.currentThread;
-import static
org.apache.ignite.internal.processors.query.h2.LongRunningQueryManager.LONG_QUERY_EXEC_MSG;
+import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
/**
- * Tests for log print for long running query.
+ * Tests for log print for long-running query.
*/
public class LongRunningQueryTest extends AbstractIndexingCommonTest {
/** Keys count. */
@@ -115,7 +115,7 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
*/
@Test
public void testCorrectThreadName() {
- GridWorker checkWorker =
GridTestUtils.getFieldValue(longRunningQueryManager(), "checkWorker");
+ GridWorker checkWorker =
GridTestUtils.getFieldValue(heavyQueriesTracker(), "checkWorker");
LogListener logLsnr = LogListener
.matches(LONG_QUERY_EXEC_MSG)
@@ -172,7 +172,7 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
}
/**
- * Do long running query canceled by timeout and check log output.
+ * Do long-running query canceled by timeout and check log output.
* Log messages must contain info about long query.
*/
private void checkLongRunning() {
@@ -219,7 +219,7 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
}
/**
- * Execute long running sql with a check for errors.
+ * Execute long-running sql with a check for errors.
*/
private void sqlCheckLongRunning() {
sqlCheckLongRunning("SELECT T0.id FROM test AS T0, test AS T1, test AS
T2 where T0.id > ?", 0);
@@ -268,7 +268,7 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
private ListeningTestLogger testLog() {
ListeningTestLogger testLog = new ListeningTestLogger(log);
-
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid().context().query().getIndexing()).longRunningQueries(),
+
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker(),
"log", testLog);
GridTestUtils.setFieldValue(((IgniteH2Indexing)grid().context().query().getIndexing()).mapQueryExecutor(),
@@ -280,11 +280,11 @@ public class LongRunningQueryTest extends
AbstractIndexingCommonTest {
}
/**
- * Getting {@link LongRunningQueryManager} from the node.
+ * Getting {@link HeavyQueriesTracker} from the node.
*
- * @return LongRunningQueryManager.
+ * @return Heavy queries tracker.
*/
- private LongRunningQueryManager longRunningQueryManager() {
- return
((IgniteH2Indexing)grid().context().query().getIndexing()).longRunningQueries();
+ private HeavyQueriesTracker heavyQueriesTracker() {
+ return
((IgniteH2Indexing)grid().context().query().getIndexing()).heavyQueriesTracker();
}
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
index 4ec5685f7b6..fa09f8ebc33 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/RunningQueriesTest.java
@@ -54,6 +54,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe
import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import
org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import
org.apache.ignite.internal.processors.query.running.GridRunningQueryInfo;
import
org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.lang.IgniteInClosure;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistorySelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistorySelfTest.java
index c7ab85427b7..0b4bb85a29c 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistorySelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/SqlQueryHistorySelfTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.running.QueryHistory;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/systemview/SystemViewSecurityTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/systemview/SystemViewSecurityTest.java
index 2c654dce32c..87c072567dc 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/systemview/SystemViewSecurityTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/systemview/SystemViewSecurityTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.spi.systemview.view.SqlQueryView;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.junit.Test;
-import static
org.apache.ignite.internal.processors.query.RunningQueryManager.SQL_QRY_VIEW;
+import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_QRY_VIEW;
import static
org.apache.ignite.plugin.security.SecurityPermissionSetBuilder.ALL_PERMISSIONS;
/**