This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new c85131866d0 IGNITE-19584 SQL Calcite: Fix SQL metrics
c85131866d0 is described below
commit c85131866d08aa3b4047f1c21fb1262f46003bc6
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Mon Jun 19 11:51:59 2023 +0500
IGNITE-19584 SQL Calcite: Fix SQL metrics
IGNITE-19586 SQL Calcite: Fix SQL metrics - Fixes #10777.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
docs/_docs/monitoring-metrics/new-metrics.adoc | 26 ++++
.../query/calcite/CalciteQueryProcessor.java | 39 ++++-
.../internal/processors/query/calcite/Query.java | 24 +--
.../processors/query/calcite/RootQuery.java | 30 ++--
.../calcite/exec/ExecutionCancelledException.java | 24 ---
.../query/calcite/exec/ExecutionServiceImpl.java | 3 +-
.../query/calcite/exec/rel/AbstractNode.java | 6 +-
.../query/calcite/exec/rel/RootNode.java | 13 +-
.../query/calcite/prepare/QueryPlanCacheImpl.java | 16 +-
.../integration/RunningQueriesIntegrationTest.java | 41 ++++--
.../integration/SqlDiagnosticIntegrationTest.java | 163 +++++++++++++++++++++
.../query}/QueryParserMetricsHolder.java | 4 +-
.../internal/processors/query/h2/QueryParser.java | 1 +
.../query/h2/QueryParserMetricsHolderSelfTest.java | 3 +-
14 files changed, 297 insertions(+), 96 deletions(-)
diff --git a/docs/_docs/monitoring-metrics/new-metrics.adoc
b/docs/_docs/monitoring-metrics/new-metrics.adoc
index 7eae8cd24da..1b260d435e0 100644
--- a/docs/_docs/monitoring-metrics/new-metrics.adoc
+++ b/docs/_docs/monitoring-metrics/new-metrics.adoc
@@ -471,3 +471,29 @@ Register name: `cache`
|LastDataVer| long | The latest data version on the node.
|DataVersionClusterId| integer | Data version cluster id.
|===
+
+== SQL parser metrics
+
+Register name: `sql.parser.cache`
+
+[cols="2,1,3",opts="header"]
+|===
+|Name| Type| Description
+|hits| long | The number of SQL queries that were found in the parsers cache
(doesn't require to be parsed and planned before execution).
+|misses| long | The number of SQL queries that were parsed and planned.
+|===
+
+== SQL executor metrics
+
+Register name: `sql.queries.user`
+
+[cols="2,1,3",opts="header"]
+|===
+|Name| Type| Description
+|success| long | The number of succesfully executed SQL queries.
+|failed| long | The number of failed SQL queries (including canceled).
+|canceled| long | The number of canceled SQL queries.
+|===
+
+
+
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index c52c8f4180e..6bc9cccf5fa 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.calcite.DataContexts;
@@ -49,6 +50,7 @@ import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
import org.apache.ignite.events.SqlQueryExecutionEvent;
@@ -60,6 +62,7 @@ import
org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryEngine;
+import org.apache.ignite.internal.processors.query.QueryParserMetricsHolder;
import org.apache.ignite.internal.processors.query.QueryUtils;
import
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
@@ -177,6 +180,9 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
/** */
private final QueryPlanCache qryPlanCache;
+ /** */
+ private final QueryParserMetricsHolder parserMetrics;
+
/** */
private final QueryTaskExecutor taskExecutor;
@@ -225,6 +231,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
failureProcessor = ctx.failure();
schemaHolder = new SchemaHolderImpl(ctx);
qryPlanCache = new QueryPlanCacheImpl(ctx);
+ parserMetrics = new QueryParserMetricsHolder(ctx.metric());
mailboxRegistry = new MailboxRegistryImpl(ctx);
taskExecutor = new QueryTaskExecutorImpl(ctx);
executionSvc = new ExecutionServiceImpl<>(ctx,
ArrayRowHandler.INSTANCE);
@@ -422,9 +429,18 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
@Override public QueryPlan apply(RootQuery<Object[]> qry,
Object[] params) {
if (plan == null) {
- plan = queryPlanCache().queryPlan(new
CacheKey(schema.getName(), sql, null, params), () ->
- prepareSvc.prepareSingle(qryNode,
qry.planningContext())
- );
+ AtomicBoolean miss = new AtomicBoolean();
+
+ plan = queryPlanCache().queryPlan(new
CacheKey(schema.getName(), sql, null, params), () -> {
+ miss.set(true);
+
+ return prepareSvc.prepareSingle(qryNode,
qry.planningContext());
+ });
+
+ if (miss.get())
+ parserMetrics.countCacheMiss();
+ else
+ parserMetrics.countCacheHit();
}
return plan;
@@ -457,11 +473,15 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
QueryPlan plan = queryPlanCache().queryPlan(new
CacheKey(schema.getName(), sql, null, params));
if (plan != null) {
+ parserMetrics.countCacheHit();
+
return Collections.singletonList(
processQuery(qryCtx, qry -> action.apply(qry, plan),
schema.getName(), plan.query(), null, params)
);
}
+ parserMetrics.countCacheMiss();
+
SqlNodeList qryList = Commons.parse(sql,
FRAMEWORK_CONFIG.getParserConfig());
List<T> res = new ArrayList<>(qryList.size());
@@ -555,12 +575,17 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
if (qrys != null)
qrys.forEach(RootQuery::cancel);
- qryReg.unregister(qry.id(), e);
+ if (isCanceled) {
+ qryReg.unregister(qry.id(), new QueryCancelledException());
+
+ throw new IgniteSQLException("The query was cancelled while
planning",
+ IgniteQueryErrorCode.QUERY_CANCELED, e);
+ }
+ else {
+ qryReg.unregister(qry.id(), e);
- if (isCanceled)
- throw new IgniteSQLException("The query was cancelled while
planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
- else
throw e;
+ }
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
index 9d42e051ac6..38f67f59d1f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java
@@ -28,11 +28,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
-import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.MemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.QueryMemoryTracker;
@@ -165,9 +165,9 @@ public class Query<RowT> {
}
for (RunningFragment<RowT> frag : fragments)
- frag.context().execute(() -> frag.root().onError(new
ExecutionCancelledException()), frag.root()::onError);
+ frag.context().execute(() -> frag.root().onError(new
QueryCancelledException()), frag.root()::onError);
- tryClose(new ExecutionCancelledException());
+ tryClose(queryCanceledException());
}
/** */
@@ -176,13 +176,8 @@ public class Query<RowT> {
if (state == QueryState.INITED)
state = QueryState.EXECUTING;
- if (state == QueryState.CLOSING || state == QueryState.CLOSED) {
- throw new IgniteSQLException(
- "The query was cancelled",
- IgniteQueryErrorCode.QUERY_CANCELED,
- new ExecutionCancelledException()
- );
- }
+ if (state == QueryState.CLOSING || state == QueryState.CLOSED)
+ throw queryCanceledException();
fragments.add(f);
}
@@ -193,6 +188,15 @@ public class Query<RowT> {
return cancel.isCanceled();
}
+ /** */
+ protected IgniteSQLException queryCanceledException() {
+ return new IgniteSQLException(
+ "The query was cancelled",
+ IgniteQueryErrorCode.QUERY_CANCELED,
+ new QueryCancelledException()
+ );
+ }
+
/** */
public void onNodeLeft(UUID nodeId) {
if (initNodeId.equals(nodeId))
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
index c1f5b2d16df..22c1764a77b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java
@@ -42,7 +42,6 @@ import
org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryContext;
import org.apache.ignite.internal.processors.query.QueryUtils;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeService;
-import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Node;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.RootNode;
@@ -174,12 +173,8 @@ public class RootQuery<RowT> extends Query<RowT>
implements TrackableQuery {
*/
public void mapping() {
synchronized (mux) {
- if (state == QueryState.CLOSED) {
- throw new IgniteSQLException(
- "The query was cancelled while executing.",
- IgniteQueryErrorCode.QUERY_CANCELED
- );
- }
+ if (state == QueryState.CLOSED)
+ throw queryCanceledException();
state = QueryState.MAPPING;
}
@@ -190,12 +185,8 @@ public class RootQuery<RowT> extends Query<RowT>
implements TrackableQuery {
*/
public void run(ExecutionContext<RowT> ctx, MultiStepPlan plan, Node<RowT>
root) {
synchronized (mux) {
- if (state == QueryState.CLOSED) {
- throw new IgniteSQLException(
- "The query was cancelled while executing.",
- IgniteQueryErrorCode.QUERY_CANCELED
- );
- }
+ if (state == QueryState.CLOSED)
+ throw queryCanceledException();
planningTime = U.currentTimeMillis() - startTs;
@@ -278,7 +269,7 @@ public class RootQuery<RowT> extends Query<RowT> implements
TrackableQuery {
log.warning("An exception occures during the query
cancel", wrpEx);
}
finally {
- super.tryClose(failure);
+ super.tryClose(failure == null && root != null ?
root.failure() : failure);
}
}
}
@@ -287,18 +278,15 @@ public class RootQuery<RowT> extends Query<RowT>
implements TrackableQuery {
@Override public void cancel() {
cancel.cancel();
- tryClose(new ExecutionCancelledException());
+ U.closeQuiet(root);
+ tryClose(queryCanceledException());
}
/** */
public PlanningContext planningContext() {
synchronized (mux) {
- if (state == QueryState.CLOSED || state == QueryState.CLOSING) {
- throw new IgniteSQLException(
- "The query was cancelled while executing.",
- IgniteQueryErrorCode.QUERY_CANCELED
- );
- }
+ if (state == QueryState.CLOSED || state == QueryState.CLOSING)
+ throw queryCanceledException();
if (state == QueryState.EXECUTING || state == QueryState.MAPPING) {
throw new IgniteSQLException(
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionCancelledException.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionCancelledException.java
deleted file mode 100644
index fb4a8d8a9c4..00000000000
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionCancelledException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.exec;
-
-import org.apache.ignite.IgniteCheckedException;
-
-/** */
-public class ExecutionCancelledException extends IgniteCheckedException {
-}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 4d31c32ef66..92c0b70a10b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.calcite.tools.Frameworks;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryReadEvent;
@@ -833,7 +834,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
Exception e = new RemoteException(nodeId, msg.queryId(),
msg.fragmentId(), msg.error());
- if (X.hasCause(msg.error(), ExecutionCancelledException.class)) {
+ if (X.hasCause(msg.error(), QueryCancelledException.class)) {
e = new IgniteSQLException(
"The query was cancelled while executing.",
IgniteQueryErrorCode.QUERY_CANCELED,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
index 5a4006f42c5..19c2927cbd1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractNode.java
@@ -22,8 +22,8 @@ import java.util.Comparator;
import java.util.List;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionCancelledException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -146,7 +146,7 @@ public abstract class AbstractNode<Row> implements
Node<Row> {
* @param e Exception.
*/
public void onError(Throwable e) {
- if (e instanceof ExecutionCancelledException)
+ if (e instanceof QueryCancelledException)
U.warn(context().logger(), "Execution is cancelled.", e);
else
onErrorInternal(e);
@@ -184,7 +184,7 @@ public abstract class AbstractNode<Row> implements
Node<Row> {
/** */
protected void checkState() throws Exception {
if (context().isCancelled())
- throw new ExecutionCancelledException();
+ throw new QueryCancelledException();
if (Thread.interrupted())
throw new IgniteInterruptedCheckedException("Thread was
interrupted.");
if (!U.assertionsEnabled())
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
index 0ee802873bb..5449e19eb38 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/RootNode.java
@@ -31,12 +31,14 @@ import java.util.function.Function;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.cache.query.QueryCancelledException;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.query.QueryCancelledException.ERR_MSG;
@@ -114,8 +116,10 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
lock.lock();
try {
- if (waiting != -1)
- ex.compareAndSet(null, new IgniteSQLException(ERR_MSG,
IgniteQueryErrorCode.QUERY_CANCELED));
+ if (waiting != -1) {
+ ex.compareAndSet(null, new IgniteSQLException(ERR_MSG,
IgniteQueryErrorCode.QUERY_CANCELED,
+ new QueryCancelledException()));
+ }
closed = true; // an exception has to be set first to get right
check order
@@ -128,6 +132,11 @@ public class RootNode<Row> extends AbstractNode<Row>
implements SingleNode<Row>,
onClose.accept(ex.get());
}
+ /** */
+ public @Nullable Throwable failure() {
+ return ex.get();
+ }
+
/** {@inheritDoc} */
@Override protected boolean isClosed() {
return closed;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
index 178faf8cd5e..a1b038abfff 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryPlanCacheImpl.java
@@ -38,7 +38,7 @@ public class QueryPlanCacheImpl extends AbstractService
implements QueryPlanCach
private static final int CACHE_SIZE = 1024;
/** */
- private GridInternalSubscriptionProcessor subscriptionProcessor;
+ private final GridInternalSubscriptionProcessor subscriptionProc;
/** */
private volatile Map<CacheKey, QueryPlan> cache;
@@ -50,21 +50,14 @@ public class QueryPlanCacheImpl extends AbstractService
implements QueryPlanCach
super(ctx);
cache = new GridBoundedConcurrentLinkedHashMap<>(CACHE_SIZE);
- subscriptionProcessor(ctx.internalSubscriptionProcessor());
+ subscriptionProc = ctx.internalSubscriptionProcessor();
init();
}
- /**
- * @param subscriptionProcessor Subscription processor.
- */
- public void subscriptionProcessor(GridInternalSubscriptionProcessor
subscriptionProcessor) {
- this.subscriptionProcessor = subscriptionProcessor;
- }
-
/** {@inheritDoc} */
@Override public void init() {
- subscriptionProcessor.registerSchemaChangeListener(new
SchemaListener());
+ subscriptionProc.registerSchemaChangeListener(new SchemaListener());
}
/** {@inheritDoc} */
@@ -74,8 +67,6 @@ public class QueryPlanCacheImpl extends AbstractService
implements QueryPlanCach
/** {@inheritDoc} */
@Override public QueryPlan queryPlan(CacheKey key, Supplier<QueryPlan>
planSupplier) {
- Map<CacheKey, QueryPlan> cache = this.cache;
-
QueryPlan plan = cache.computeIfAbsent(key, k -> planSupplier.get());
return plan.copy();
@@ -83,7 +74,6 @@ public class QueryPlanCacheImpl extends AbstractService
implements QueryPlanCach
/** {@inheritDoc} */
@Override public QueryPlan queryPlan(CacheKey key) {
- Map<CacheKey, QueryPlan> cache = this.cache;
QueryPlan plan = cache.get(key);
return plan != null ? plan.copy() : null;
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
index bd841718a15..b069eea3622 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
@@ -51,19 +52,20 @@ import
org.apache.ignite.internal.processors.query.calcite.schema.CacheTableImpl
import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteCacheTable;
import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.systemview.view.SqlQueryView;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.junit.Assert;
import org.junit.Test;
import static java.util.stream.Collectors.joining;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.IGNITE_CALCITE_PLANNER_TIMEOUT;
import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_QRY_VIEW;
+import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME;
/**
*
@@ -100,6 +102,9 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
*/
@Test
public void testCancelAtPlanningPhase() throws IgniteCheckedException {
+ MetricRegistry mreg =
client.context().metric().registry(SQL_USER_QUERIES_REG_NAME);
+ mreg.reset();
+
CalciteQueryProcessor engine = queryProcessor(client);
int cnt = 9;
@@ -111,7 +116,7 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() ->
sql(sql));
- Assert.assertTrue(GridTestUtils.waitForCondition(
+ assertTrue(GridTestUtils.waitForCondition(
() -> !engine.runningQueries().isEmpty() || fut.isDone(),
TIMEOUT_IN_MS));
Collection<? extends Query<?>> running = engine.runningQueries();
@@ -123,15 +128,17 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
assertSame(qry, engine.runningQuery(qry.id()));
// Waits for planning.
- Assert.assertTrue(GridTestUtils.waitForCondition(
+ assertTrue(GridTestUtils.waitForCondition(
() -> qry.state() == QueryState.PLANNING, TIMEOUT_IN_MS));
qry.cancel();
- Assert.assertTrue(GridTestUtils.waitForCondition(
+ assertTrue(GridTestUtils.waitForCondition(
() -> engine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(0),
IgniteSQLException.class, "The query was cancelled while planning");
+
+ assertEquals(1, ((LongMetric)mreg.findMetric("canceled")).value());
}
/**
@@ -140,6 +147,9 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
*/
@Test
public void testCancelAtExecutionPhase() throws Exception {
+ MetricRegistry mreg =
client.context().metric().registry(SQL_USER_QUERIES_REG_NAME);
+ mreg.reset();
+
CalciteQueryProcessor cliEngine = queryProcessor(client);
CalciteQueryProcessor srvEngine = queryProcessor(srv);
@@ -199,10 +209,10 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
qry.cancel();
- Assert.assertTrue(GridTestUtils.waitForCondition(
+ assertTrue(GridTestUtils.waitForCondition(
() -> srvEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
- Assert.assertTrue(GridTestUtils.waitForCondition(
+ assertTrue(GridTestUtils.waitForCondition(
() -> cliEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
}
finally {
@@ -210,7 +220,9 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
}
GridTestUtils.assertThrowsAnyCause(log,
- () -> fut.get(100), IgniteSQLException.class, "The query was
cancelled while executing.");
+ () -> fut.get(100), IgniteSQLException.class, "The query was
cancelled");
+
+ assertEquals(1, ((LongMetric)mreg.findMetric("canceled")).value());
}
/**
@@ -220,6 +232,9 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
*/
@Test
public void testCancelByRemoteFragment() throws IgniteCheckedException {
+ MetricRegistry mreg =
client.context().metric().registry(SQL_USER_QUERIES_REG_NAME);
+ mreg.reset();
+
CalciteQueryProcessor clientEngine = queryProcessor(client);
CalciteQueryProcessor serverEngine = queryProcessor(srv);
int cnt = 6;
@@ -236,7 +251,7 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() ->
sql(sql));
- Assert.assertTrue(GridTestUtils.waitForCondition(
+ assertTrue(GridTestUtils.waitForCondition(
() -> {
Collection<? extends Query<?>> queries =
clientEngine.runningQueries();
@@ -244,7 +259,7 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
},
TIMEOUT_IN_MS));
- Assert.assertTrue(GridTestUtils.waitForCondition(() ->
!serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
+ assertTrue(GridTestUtils.waitForCondition(() ->
!serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
Collection<? extends Query<?>> running = serverEngine.runningQueries();
Query<?> qry = F.first(running);
@@ -253,13 +268,15 @@ public class RunningQueriesIntegrationTest extends
AbstractBasicIntegrationTest
qry.cancel();
- Assert.assertTrue(GridTestUtils.waitForCondition(
+ assertTrue(GridTestUtils.waitForCondition(
() -> clientEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
- Assert.assertTrue(GridTestUtils.waitForCondition(
+ assertTrue(GridTestUtils.waitForCondition(
() -> serverEngine.runningQueries().isEmpty(), TIMEOUT_IN_MS));
- GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100),
IgniteSQLException.class, "The query was cancelled while executing.");
+ GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100),
IgniteSQLException.class, "The query was cancelled");
+
+ assertEquals(1, ((LongMetric)mreg.findMetric("canceled")).value());
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 63294238a3b..2bb661e01d1 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -18,6 +18,10 @@
package org.apache.ignite.internal.processors.query.calcite.integration;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.Statement;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -29,10 +33,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
@@ -40,12 +46,15 @@ import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
@@ -58,15 +67,20 @@ import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.stopCollectStatisticsAndRead;
+import static
org.apache.ignite.internal.processors.query.QueryParserMetricsHolder.QUERY_PARSER_METRIC_GROUP_NAME;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.BIG_RESULT_SET_MSG;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_ERROR_MSG;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_EXEC_MSG;
import static
org.apache.ignite.internal.processors.query.running.HeavyQueriesTracker.LONG_QUERY_FINISHED_MSG;
+import static
org.apache.ignite.internal.processors.query.running.RunningQueryManager.SQL_USER_QUERIES_REG_NAME;
/**
* Test SQL diagnostic tools.
*/
public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest
{
+ /** */
+ private static final String jdbcUrl = "jdbc:ignite:thin://127.0.0.1:" +
ClientConnectorConfiguration.DFLT_PORT;
+
/** */
private static final long LONG_QRY_TIMEOUT = 1_000L;
@@ -114,6 +128,155 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
return 2;
}
+ /** */
+ @Test
+ public void testParserMetrics() {
+ MetricRegistry mreg0 =
grid(0).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME);
+ MetricRegistry mreg1 =
grid(1).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME);
+ mreg0.reset();
+ mreg1.reset();
+
+ LongMetric hits0 = mreg0.findMetric("hits");
+ LongMetric hits1 = mreg1.findMetric("hits");
+ LongMetric misses0 = mreg0.findMetric("misses");
+ LongMetric misses1 = mreg1.findMetric("misses");
+
+ // Parse and plan on client.
+ sql("CREATE TABLE test_parse(a INT)");
+
+ assertEquals(0, hits0.value());
+ assertEquals(0, hits1.value());
+ assertEquals(0, misses0.value());
+ assertEquals(0, misses1.value());
+
+ for (int i = 0; i < 10; i++)
+ sql(grid(0), "INSERT INTO test_parse VALUES (?)", i);
+
+ assertEquals(9, hits0.value());
+ assertEquals(0, hits1.value());
+ assertEquals(1, misses0.value());
+ assertEquals(0, misses1.value());
+
+ for (int i = 0; i < 10; i++)
+ sql(grid(1), "SELECT * FROM test_parse WHERE a = ?", i);
+
+ assertEquals(9, hits0.value());
+ assertEquals(9, hits1.value());
+ assertEquals(1, misses0.value());
+ assertEquals(1, misses1.value());
+ }
+
+ /** */
+ @Test
+ public void testBatchParserMetrics() throws Exception {
+ MetricRegistry mreg0 =
grid(0).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME);
+ MetricRegistry mreg1 =
grid(1).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME);
+ mreg0.reset();
+ mreg1.reset();
+
+ LongMetric hits0 = mreg0.findMetric("hits");
+ LongMetric hits1 = mreg1.findMetric("hits");
+ LongMetric misses0 = mreg0.findMetric("misses");
+ LongMetric misses1 = mreg1.findMetric("misses");
+
+ sql("CREATE TABLE test_batch(a INT)");
+
+ assertEquals(0, hits0.value());
+ assertEquals(0, hits1.value());
+ assertEquals(0, misses0.value());
+ assertEquals(0, misses1.value());
+
+ try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
+ conn.setSchema("PUBLIC");
+
+ try (Statement stmt = conn.createStatement()) {
+ for (int i = 0; i < 10; i++)
+ stmt.addBatch(String.format("INSERT INTO test_batch VALUES
(%d)", i));
+
+ stmt.executeBatch();
+
+ assertEquals(0, hits0.value());
+ assertEquals(0, hits1.value());
+ assertEquals(10, misses0.value());
+ assertEquals(0, misses1.value());
+ }
+
+ String sql = "INSERT INTO test_batch VALUES (?)";
+
+ try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+ for (int i = 10; i < 20; i++) {
+ stmt.setInt(1, i);
+ stmt.addBatch();
+ }
+
+ stmt.executeBatch();
+
+ assertEquals(0, hits0.value());
+ assertEquals(0, hits1.value());
+ assertEquals(11, misses0.value()); // Only one increment per
batch.
+ assertEquals(0, misses1.value());
+ }
+
+ try (PreparedStatement stmt = conn.prepareStatement(sql)) {
+ for (int i = 20; i < 30; i++) {
+ stmt.setInt(1, i);
+ stmt.addBatch();
+ }
+
+ stmt.executeBatch();
+
+ assertEquals(1, hits0.value()); // Only one increment per
batch.
+ assertEquals(0, hits1.value());
+ assertEquals(11, misses0.value());
+ assertEquals(0, misses1.value());
+ }
+ }
+ }
+
+ /** */
+ @Test
+ public void testUserQueriesMetrics() throws Exception {
+ sql(grid(0), "CREATE TABLE test_metric (a INT)");
+
+ MetricRegistry mreg0 =
grid(0).context().metric().registry(SQL_USER_QUERIES_REG_NAME);
+ MetricRegistry mreg1 =
grid(1).context().metric().registry(SQL_USER_QUERIES_REG_NAME);
+ mreg0.reset();
+ mreg1.reset();
+
+ AtomicInteger qryCnt = new AtomicInteger();
+
grid(0).context().query().runningQueryManager().registerQueryFinishedListener(q
-> qryCnt.incrementAndGet());
+
+ sql(grid(0), "INSERT INTO test_metric VALUES (?)", 0);
+ sql(grid(0), "SELECT * FROM test_metric WHERE a = ?", 0);
+
+ try {
+ sql(grid(0), "SELECT * FROM test_fail");
+
+ fail();
+ }
+ catch (IgniteSQLException ignored) {
+ // Expected.
+ }
+
+ FieldsQueryCursor<?> cur = grid(0).getOrCreateCache("test_metric")
+ .query(new SqlFieldsQuery("SELECT * FROM table(system_range(1,
10000))"));
+
+ assertTrue(cur.iterator().hasNext());
+
+ cur.close();
+
+ // Query unregistering is async process, wait for it before metrics
check.
+ assertTrue(GridTestUtils.waitForCondition(() -> qryCnt.get() == 4,
1_000L));
+
+ assertEquals(2, ((LongMetric)mreg0.findMetric("success")).value());
+ assertEquals(2, ((LongMetric)mreg0.findMetric("failed")).value()); //
1 error + 1 cancelled.
+ assertEquals(1, ((LongMetric)mreg0.findMetric("canceled")).value());
+
+ assertEquals(0, ((LongMetric)mreg1.findMetric("success")).value());
+ assertEquals(0, ((LongMetric)mreg1.findMetric("failed")).value());
+ assertEquals(0, ((LongMetric)mreg1.findMetric("canceled")).value());
+ }
+
/** */
@Test
public void testPerformanceStatistics() throws Exception {
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolder.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryParserMetricsHolder.java
similarity index 94%
rename from
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolder.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryParserMetricsHolder.java
index 73efd43d618..157620c3af7 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolder.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryParserMetricsHolder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.query.h2;
+package org.apache.ignite.internal.processors.query;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
@@ -26,7 +26,7 @@ import
org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
*/
public class QueryParserMetricsHolder {
/** Query parser metric group name. */
- static final String QUERY_PARSER_METRIC_GROUP_NAME = "sql.parser.cache";
+ public static final String QUERY_PARSER_METRIC_GROUP_NAME =
"sql.parser.cache";
/** Query cache hits counter. */
private final LongAdderMetric qryCacheHits;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
index 952e33c58bf..e53ea65b477 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/QueryParser.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.processors.odbc.jdbc.JdbcParameterMeta;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.NestedTxMode;
+import org.apache.ignite.internal.processors.query.QueryParserMetricsHolder;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.h2.dml.DmlAstUtils;
import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolderSelfTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolderSelfTest.java
index 3695cec0594..a6f53e686f8 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolderSelfTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/QueryParserMetricsHolderSelfTest.java
@@ -21,12 +21,13 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.QueryParserMetricsHolder;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assert;
import org.junit.Test;
-import static
org.apache.ignite.internal.processors.query.h2.QueryParserMetricsHolder.QUERY_PARSER_METRIC_GROUP_NAME;
+import static
org.apache.ignite.internal.processors.query.QueryParserMetricsHolder.QUERY_PARSER_METRIC_GROUP_NAME;
/**
* Test to check {@link QueryParserMetricsHolder}