This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b26a4c52f87 IGNITE-20079 SQL Calcite: Write additional performance
statistics info for queries - Fixes #10880.
b26a4c52f87 is described below
commit b26a4c52f87b09e99ca0ddd0e9e3e3d6ff4d1935
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Thu Sep 14 11:04:43 2023 +0300
IGNITE-20079 SQL Calcite: Write additional performance statistics info for
queries - Fixes #10880.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/CalciteQueryProcessor.java | 21 +++
.../query/calcite/QueryRegistryImpl.java | 2 +-
.../query/calcite/exec/ExecutionServiceImpl.java | 27 +++-
.../query/calcite/exec/LogicalRelImplementor.java | 22 ++--
.../query/calcite/exec/rel/ScanStorageNode.java | 27 +++-
.../query/calcite/exec/tracker/IoTracker.java | 15 +++
.../query/calcite/exec/tracker/NoOpIoTracker.java | 13 ++
.../tracker/PerformanceStatisticsIoTracker.java | 51 +++++++-
.../calcite/prepare/AbstractMultiStepPlan.java | 10 ++
.../query/calcite/prepare/MultiStepDmlPlan.java | 5 +-
.../query/calcite/prepare/MultiStepPlan.java | 5 +
.../query/calcite/prepare/MultiStepQueryPlan.java | 5 +-
.../query/calcite/prepare/PlanExtractor.java | 108 ++++++++++++++++
.../query/calcite/prepare/PrepareServiceImpl.java | 13 +-
.../query/calcite/prepare/bounds/ExactBounds.java | 6 +
.../query/calcite/prepare/bounds/MultiBounds.java | 8 ++
.../query/calcite/prepare/bounds/RangeBounds.java | 12 ++
.../query/calcite/prepare/bounds/SearchBounds.java | 6 +
.../query/calcite/schema/CacheTableImpl.java | 5 +
.../query/calcite/schema/IgniteTable.java | 5 +
.../query/calcite/schema/SystemViewTableImpl.java | 5 +
.../query/calcite/sql/IgniteSqlCreateTable.java | 19 ++-
.../query/calcite/sql/IgniteSqlOption.java | 6 -
.../calcite/util/ConvertingClosableIterator.java | 12 +-
.../integration/SqlDiagnosticIntegrationTest.java | 111 +++++++++++++---
.../query/calcite/planner/PlannerTest.java | 2 +-
.../query/calcite/planner/TestTable.java | 2 +-
.../FilePerformanceStatisticsReader.java | 67 ++++++++++
.../FilePerformanceStatisticsWriter.java | 47 ++++++-
.../performancestatistics/OperationType.java | 28 +++-
.../PerformanceStatisticsHandler.java | 24 ++++
.../PerformanceStatisticsProcessor.java | 22 ++++
.../query/running/HeavyQueriesTracker.java | 5 +
.../query/running/RunningQueryManager.java | 25 ++++
.../AbstractPerformanceStatisticsTest.java | 12 ++
.../internal/processors/query/h2/H2QueryInfo.java | 15 +++
.../processors/query/h2/H2ResultSetIterator.java | 26 +++-
.../processors/query/h2/IgniteH2Indexing.java | 10 ++
.../query/h2/twostep/GridMapQueryExecutor.java | 10 ++
.../query/h2/twostep/GridReduceQueryExecutor.java | 11 ++
.../query/h2/twostep/MapQueryResult.java | 18 +++
.../PerformanceStatisticsQueryTest.java | 141 +++++++++++++++++++--
42 files changed, 913 insertions(+), 71 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 3a675e8882c..7be8acb3ed5 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -36,12 +36,14 @@ import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDynamicParam;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.sql.validate.SqlValidator;
@@ -94,7 +96,10 @@ import
org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCach
import
org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
import
org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolderImpl;
+import
org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlAlterUser;
import
org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlConformance;
+import
org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateUser;
+import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlOption;
import
org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteOwnSqlOperatorTable;
import
org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteStdSqlOperatorTable;
import
org.apache.ignite.internal.processors.query.calcite.sql.generated.IgniteSqlParserImpl;
@@ -527,6 +532,22 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
@Override public SqlNode visit(SqlLiteral literal) {
return new SqlDynamicParam(-1,
literal.getParserPosition());
}
+
+ @Override public SqlNode visit(SqlCall call) {
+ // Handle some special cases.
+ if (call instanceof IgniteSqlOption)
+ return call;
+ else if (call instanceof IgniteSqlCreateUser) {
+ return new
IgniteSqlCreateUser(call.getParserPosition(),
((IgniteSqlCreateUser)call).user(),
+ SqlLiteral.createCharString("hidden",
SqlParserPos.ZERO));
+ }
+ else if (call instanceof IgniteSqlAlterUser) {
+ return new
IgniteSqlAlterUser(call.getParserPosition(), ((IgniteSqlAlterUser)call).user(),
+ SqlLiteral.createCharString("hidden",
SqlParserPos.ZERO));
+ }
+
+ return super.visit(call);
+ }
}
).toString();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
index 7fe09c6bc4a..aecf0fe69d8 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java
@@ -63,7 +63,7 @@ public class QueryRegistryImpl extends AbstractService
implements QueryRegistry
String initiatorId = fieldsQry != null ?
fieldsQry.getQueryInitiatorId() : null;
long locId = qryMgr.register(rootQry.sql(),
GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
- false, createCancelToken(qry), initiatorId, false, false,
false);
+ false, createCancelToken(qry), initiatorId, false, true,
false);
rootQry.localQueryId(locId);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 5472042b80a..1d79b50a930 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
@@ -673,6 +674,16 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
}
}
+ if (perfStatProc.enabled()) {
+ perfStatProc.queryProperty(
+ GridCacheQueryType.SQL_FIELDS,
+ qry.initiatorNodeId(),
+ qry.localQueryId(),
+ "Query plan",
+ plan.textPlan()
+ );
+ }
+
QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);
Function<Object, Object> fieldConverter = (qryProps == null ||
qryProps.keepBinary()) ? null :
@@ -720,8 +731,22 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
};
}
+ Runnable onClose = () -> {
+ if (perfStatProc.enabled()) {
+ perfStatProc.queryRowsProcessed(
+ GridCacheQueryType.SQL_FIELDS,
+ qry.initiatorNodeId(),
+ qry.localQueryId(),
+ "Fetched",
+ resultSetChecker.fetchedSize()
+ );
+ }
+
+ resultSetChecker.checkOnClose();
+ };
+
Iterator<List<?>> it = new
ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
- fieldConverter, rowConverter, resultSetChecker::checkOnClose);
+ fieldConverter, rowConverter, onClose);
return new ListFieldsQueryCursor<>(plan, it, ectx);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index 3300f01b58e..cb816373109 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -315,7 +315,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
if (idx != null && !tbl.isIndexRebuildInProgress()) {
Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges,
requiredColumns);
- return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj);
+ return new ScanStorageNode<>(idx.name(), ctx, rowType, rowsIter,
filters, prj);
}
else {
// Index was invalidated after planning, workaround through
table-scan -> sort -> index spool.
@@ -339,7 +339,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
if (!spoolNodeRequired && projects != null)
rowType = rel.getRowType();
- Node<Row> node = new ScanStorageNode<>(ctx, rowType, rowsIter,
filterHasCorrelation ? null : filters,
+ Node<Row> node = new ScanStorageNode<>(tbl.name(), ctx, rowType,
rowsIter, filterHasCorrelation ? null : filters,
projNodeRequired ? null : prj);
RelCollation collation = rel.collation();
@@ -406,14 +406,14 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
IgniteIndex idx = tbl.getIndex(rel.indexName());
if (idx != null && !tbl.isIndexRebuildInProgress()) {
- return new ScanStorageNode<>(ctx, rel.getRowType(), () ->
Collections.singletonList(ctx.rowHandler()
- .factory(ctx.getTypeFactory(), rel.getRowType())
- .create(idx.count(ctx, ctx.group(rel.sourceId()),
rel.notNull()))).iterator());
+ return new ScanStorageNode<>(idx.name() + "_COUNT", ctx,
rel.getRowType(),
+ () ->
Collections.singletonList(ctx.rowHandler().factory(ctx.getTypeFactory(),
rel.getRowType())
+ .create(idx.count(ctx, ctx.group(rel.sourceId()),
rel.notNull()))).iterator());
}
else {
CollectNode<Row> replacement =
CollectNode.createCountCollector(ctx);
- replacement.register(new ScanStorageNode<>(ctx,
rel.getTable().getRowType(), tbl.scan(ctx,
+ replacement.register(new ScanStorageNode<>(tbl.name(), ctx,
rel.getTable().getRowType(), tbl.scan(ctx,
ctx.group(rel.sourceId()), ImmutableBitSet.of(0))));
return replacement;
@@ -429,14 +429,16 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
ImmutableBitSet requiredColumns = idxBndRel.requiredColumns();
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
- if (idx != null && !tbl.isIndexRebuildInProgress())
- return new ScanStorageNode<>(ctx, rowType,
idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns));
+ if (idx != null && !tbl.isIndexRebuildInProgress()) {
+ return new ScanStorageNode<>(idx.name() + "_BOUND", ctx, rowType,
+ idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns));
+ }
else {
assert requiredColumns.cardinality() == 1;
Iterable<Row> rowsIter = tbl.scan(ctx, grp,
idxBndRel.requiredColumns());
- Node<Row> scanNode = new ScanStorageNode<>(ctx, rowType, rowsIter,
+ Node<Row> scanNode = new ScanStorageNode<>(tbl.name(), ctx,
rowType, rowsIter,
r -> ctx.rowHandler().get(0, r) != null, null);
RelCollation collation =
idx.collation().apply(LogicalScanConverterRule.createMapping(
@@ -481,7 +483,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
Iterable<Row> rowsIter = tbl.scan(ctx, group, requiredColunms);
- return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj);
+ return new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter,
filters, prj);
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
index 4d126208431..48c98dbd2b4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
@@ -27,7 +28,11 @@ import org.jetbrains.annotations.Nullable;
* Scan storage node.
*/
public class ScanStorageNode<Row> extends ScanNode<Row> {
+ /** */
+ @Nullable private final AtomicLong processedRowsCntr;
+
/**
+ * @param storageName Storage (index or table) name.
* @param ctx Execution context.
* @param rowType Row type.
* @param src Source.
@@ -35,6 +40,7 @@ public class ScanStorageNode<Row> extends ScanNode<Row> {
* @param rowTransformer Row transformer (projection).
*/
public ScanStorageNode(
+ String storageName,
ExecutionContext<Row> ctx,
RelDataType rowType,
Iterable<Row> src,
@@ -42,15 +48,18 @@ public class ScanStorageNode<Row> extends ScanNode<Row> {
@Nullable Function<Row, Row> rowTransformer
) {
super(ctx, rowType, src, filter, rowTransformer);
+
+ processedRowsCntr =
context().ioTracker().processedRowsCounter("Scanned " + storageName);
}
/**
+ * @param storageName Storage (index or table) name.
* @param ctx Execution context.
* @param rowType Row type.
* @param src Source.
*/
- public ScanStorageNode(ExecutionContext<Row> ctx, RelDataType rowType,
Iterable<Row> src) {
- super(ctx, rowType, src);
+ public ScanStorageNode(String storageName, ExecutionContext<Row> ctx,
RelDataType rowType, Iterable<Row> src) {
+ this(storageName, ctx, rowType, src, null, null);
}
/** {@inheritDoc} */
@@ -58,10 +67,22 @@ public class ScanStorageNode<Row> extends ScanNode<Row> {
try {
context().ioTracker().startTracking();
- return super.processNextBatch();
+ int processed = super.processNextBatch();
+
+ if (processedRowsCntr != null)
+ processedRowsCntr.addAndGet(processed);
+
+ return processed;
}
finally {
context().ioTracker().stopTracking();
}
}
+
+ /** */
+ @Override public void closeInternal() {
+ super.closeInternal();
+
+ context().ioTracker().flush();
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
index fb8d9e15e51..a246cb175b2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.processors.query.calcite.exec.tracker;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jetbrains.annotations.Nullable;
+
/**
* I/O operations tracker interface.
*/
@@ -26,4 +29,16 @@ public interface IoTracker {
/** Stop tracking and save result. */
public void stopTracking();
+
+ /**
+ * Register counter for processed rows.
+ *
+ * @param action Action with rows.
+ */
+ @Nullable public AtomicLong processedRowsCounter(String action);
+
+ /**
+ * Flush tracked data.
+ */
+ public void flush();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
index 5c4f51a50fb..4cc26d1967b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.processors.query.calcite.exec.tracker;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jetbrains.annotations.Nullable;
+
/**
* I/O operations tracker that does nothing.
*/
@@ -33,4 +36,14 @@ public class NoOpIoTracker implements IoTracker {
@Override public void stopTracking() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Nullable @Override public AtomicLong processedRowsCounter(String action) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush() {
+ // No-op.
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
index fef1f56bb4a..2978e19cf28 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java
@@ -17,11 +17,15 @@
package org.apache.ignite.internal.processors.query.calcite.exec.tracker;
+import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
+import org.apache.ignite.internal.util.typedef.T2;
/**
* Performance statistics gathering I/O operations tracker.
@@ -36,6 +40,15 @@ public class PerformanceStatisticsIoTracker implements
IoTracker {
/** */
private final long originatingQryId;
+ /** */
+ private final AtomicLong logicalReads = new AtomicLong();
+
+ /** */
+ private final AtomicLong physicalReads = new AtomicLong();
+
+ /** */
+ private final List<T2<String, AtomicLong>> cntrs = new
CopyOnWriteArrayList<>();
+
/** */
public PerformanceStatisticsIoTracker(
PerformanceStatisticsProcessor perfStatProc,
@@ -56,13 +69,45 @@ public class PerformanceStatisticsIoTracker implements
IoTracker {
@Override public void stopTracking() {
IoStatisticsHolder stat =
IoStatisticsQueryHelper.finishGatheringQueryStatistics();
- if (stat.logicalReads() > 0 || stat.physicalReads() > 0) {
+ logicalReads.addAndGet(stat.logicalReads());
+ physicalReads.addAndGet(stat.physicalReads());
+ }
+
+ /** {@inheritDoc} */
+ @Override public AtomicLong processedRowsCounter(String action) {
+ AtomicLong cntr = new AtomicLong();
+
+ cntrs.add(new T2<>(action, cntr));
+
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void flush() {
+ long logicalReads = this.logicalReads.getAndSet(0);
+ long physicalReads = this.physicalReads.getAndSet(0);
+
+ if (logicalReads > 0 || physicalReads > 0) {
perfStatProc.queryReads(
GridCacheQueryType.SQL_FIELDS,
originatingNodeId,
originatingQryId,
- stat.logicalReads(),
- stat.physicalReads());
+ logicalReads,
+ physicalReads);
+ }
+
+ for (T2<String, AtomicLong> cntr : cntrs) {
+ long rowsCnt = cntr.get2().getAndSet(0);
+
+ if (rowsCnt > 0) {
+ perfStatProc.queryRowsProcessed(
+ GridCacheQueryType.SQL_FIELDS,
+ originatingNodeId,
+ originatingQryId,
+ cntr.get1(),
+ rowsCnt
+ );
+ }
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
index 0e4ef5e739a..0184ba91f86 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java
@@ -47,15 +47,20 @@ public abstract class AbstractMultiStepPlan extends
AbstractQueryPlan implements
/** */
protected ExecutionPlan executionPlan;
+ /** */
+ private final String textPlan;
+
/** */
protected AbstractMultiStepPlan(
String qry,
+ String textPlan,
QueryTemplate queryTemplate,
FieldsMetadata fieldsMetadata,
@Nullable FieldsMetadata paramsMetadata
) {
super(qry);
+ this.textPlan = textPlan;
this.queryTemplate = queryTemplate;
this.fieldsMetadata = fieldsMetadata;
this.paramsMetadata = paramsMetadata;
@@ -119,4 +124,9 @@ public abstract class AbstractMultiStepPlan extends
AbstractQueryPlan implements
"fragments=" + fragments() + "]"))
.mapping();
}
+
+ /** {@inheritDoc} */
+ @Override public String textPlan() {
+ return textPlan;
+ }
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
index 6d844bd0de9..5cbc4e6c187 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java
@@ -28,11 +28,12 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan
{
*/
public MultiStepDmlPlan(
String qry,
+ String textPlan,
QueryTemplate queryTemplate,
FieldsMetadata fieldsMeta,
@Nullable FieldsMetadata paramsMetadata
) {
- super(qry, queryTemplate, fieldsMeta, paramsMetadata);
+ super(qry, textPlan, queryTemplate, fieldsMeta, paramsMetadata);
}
/** {@inheritDoc} */
@@ -42,6 +43,6 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan {
/** {@inheritDoc} */
@Override public QueryPlan copy() {
- return new MultiStepDmlPlan(query(), queryTemplate, fieldsMetadata,
paramsMetadata);
+ return new MultiStepDmlPlan(query(), textPlan(), queryTemplate,
fieldsMetadata, paramsMetadata);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
index 1636bcc1124..e738d9b92a0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java
@@ -61,4 +61,9 @@ public interface MultiStepPlan extends QueryPlan {
* @param ctx Planner context.
*/
void init(MappingService mappingService, MappingQueryContext ctx);
+
+ /**
+ * @return Text representation of query plan
+ */
+ String textPlan();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
index ea7164c0df3..53125413160 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java
@@ -28,11 +28,12 @@ public class MultiStepQueryPlan extends
AbstractMultiStepPlan {
*/
public MultiStepQueryPlan(
String qry,
+ String textPlan,
QueryTemplate queryTemplate,
FieldsMetadata fieldsMeta,
@Nullable FieldsMetadata paramsMetadata
) {
- super(qry, queryTemplate, fieldsMeta, paramsMetadata);
+ super(qry, textPlan, queryTemplate, fieldsMeta, paramsMetadata);
}
/** {@inheritDoc} */
@@ -42,6 +43,6 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan
{
/** {@inheritDoc} */
@Override public QueryPlan copy() {
- return new MultiStepQueryPlan(query(), queryTemplate, fieldsMetadata,
paramsMetadata);
+ return new MultiStepQueryPlan(query(), textPlan(), queryTemplate,
fieldsMetadata, paramsMetadata);
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java
new file mode 100644
index 00000000000..4c1273222a6
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collection;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.externalize.RelWriterImpl;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexShuttle;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.GridKernalContext;
+import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import
org.apache.ignite.internal.processors.query.calcite.prepare.bounds.SearchBounds;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.util.typedef.F;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Sensitive data aware plan extractor.
+ */
+public class PlanExtractor {
+ /** */
+ private final PerformanceStatisticsProcessor perfStatProc;
+
+ /** */
+ public PlanExtractor(GridKernalContext ctx) {
+ perfStatProc = ctx.performanceStatistics();
+ }
+
+ /** */
+ public String extract(IgniteRel rel) {
+ // Currently, plan required only for preformance statistics, skip it
if performance statistics disabled.
+ if (!perfStatProc.enabled())
+ return null;
+
+ if (QueryUtils.INCLUDE_SENSITIVE)
+ return RelOptUtil.toString(rel, SqlExplainLevel.ALL_ATTRIBUTES);
+ else {
+ StringWriter sw = new StringWriter();
+ RelWriter planWriter = new SensitiveDataAwarePlanWriter(new
PrintWriter(sw));
+ rel.explain(planWriter);
+ return sw.toString();
+ }
+ }
+
+ /** */
+ private static final class LiteralRemoveShuttle extends RexShuttle {
+ /** */
+ private static final LiteralRemoveShuttle INSTANCE = new
LiteralRemoveShuttle();
+
+ /** {@inheritDoc} */
+ @Override public RexNode visitLiteral(RexLiteral literal) {
+ return new RexDynamicParam(literal.getType(), -1);
+ }
+ }
+
+ /** */
+ private static class SensitiveDataAwarePlanWriter extends RelWriterImpl {
+ /** */
+ public SensitiveDataAwarePlanWriter(PrintWriter pw) {
+ super(pw, SqlExplainLevel.ALL_ATTRIBUTES, false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RelWriter item(String term, @Nullable Object val) {
+ return super.item(term, removeSensitive(val));
+ }
+
+ /** */
+ private Object removeSensitive(Object val) {
+ if (val instanceof RexNode)
+ return LiteralRemoveShuttle.INSTANCE.apply((RexNode)val);
+ else if (val instanceof Collection)
+ return F.transform((Collection<?>)val, this::removeSensitive);
+ else if (val instanceof SearchBounds)
+ return
((SearchBounds)val).transform(LiteralRemoveShuttle.INSTANCE::apply);
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean nest() {
+ // Don't try to expand some values by rel nodes, use original
values.
+ return true;
+ }
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
index bc6e2b99ddc..8efb4a882f0 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.query.calcite.prepare;
import java.util.List;
-
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.type.RelDataType;
@@ -55,12 +54,16 @@ public class PrepareServiceImpl extends AbstractService
implements PrepareServic
/** */
private final DdlSqlToCommandConverter ddlConverter;
+ /** */
+ private final PlanExtractor planExtractor;
+
/**
* @param ctx Kernal.
*/
public PrepareServiceImpl(GridKernalContext ctx) {
super(ctx);
+ planExtractor = new PlanExtractor(ctx);
ddlConverter = new DdlSqlToCommandConverter();
}
@@ -163,6 +166,8 @@ public class PrepareServiceImpl extends AbstractService
implements PrepareServic
IgniteRel igniteRel = optimize(sqlNode, planner, log);
+ String plan = planExtractor.extract(igniteRel);
+
// Extract parameters meta.
FieldsMetadata params = DynamicParamTypeExtractor.go(igniteRel);
@@ -171,7 +176,7 @@ public class PrepareServiceImpl extends AbstractService
implements PrepareServic
QueryTemplate template = new QueryTemplate(fragments);
- return new MultiStepQueryPlan(ctx.query(), template,
+ return new MultiStepQueryPlan(ctx.query(), plan, template,
queryFieldsMetadata(ctx, validated.dataType(),
validated.origins()), params);
}
@@ -185,6 +190,8 @@ public class PrepareServiceImpl extends AbstractService
implements PrepareServic
// Convert to Relational operators graph
IgniteRel igniteRel = optimize(sqlNode, planner, log);
+ String plan = planExtractor.extract(igniteRel);
+
// Extract parameters meta.
FieldsMetadata params = DynamicParamTypeExtractor.go(igniteRel);
@@ -193,7 +200,7 @@ public class PrepareServiceImpl extends AbstractService
implements PrepareServic
QueryTemplate template = new QueryTemplate(fragments);
- return new MultiStepDmlPlan(ctx.query(), template,
+ return new MultiStepDmlPlan(ctx.query(), plan, template,
queryFieldsMetadata(ctx, igniteRel.getRowType(), null), params);
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/ExactBounds.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/ExactBounds.java
index 2e67a1d22a3..6a672c75904 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/ExactBounds.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/ExactBounds.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare.bounds;
import java.util.Objects;
+import java.util.function.Function;
import org.apache.calcite.rex.RexNode;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -58,6 +59,11 @@ public class ExactBounds extends SearchBounds {
return bound.equals(((ExactBounds)o).bound);
}
+ /** {@inheritDoc} */
+ @Override public SearchBounds transform(Function<RexNode, RexNode>
tranformFunction) {
+ return new ExactBounds(tranformFunction.apply(condition()),
tranformFunction.apply(bound));
+ }
+
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(bound);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/MultiBounds.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/MultiBounds.java
index 5101fe85b69..e0376124abe 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/MultiBounds.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/MultiBounds.java
@@ -19,7 +19,9 @@ package
org.apache.ignite.internal.processors.query.calcite.prepare.bounds;
import java.util.List;
import java.util.Objects;
+import java.util.function.Function;
import org.apache.calcite.rex.RexNode;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -45,6 +47,12 @@ public class MultiBounds extends SearchBounds {
return bounds;
}
+ /** {@inheritDoc} */
+ @Override public SearchBounds transform(Function<RexNode, RexNode>
tranformFunction) {
+ return new MultiBounds(tranformFunction.apply(condition()),
+ Commons.transform(bounds, b -> b.transform(tranformFunction)));
+ }
+
/** {@inheritDoc} */
@Override public Type type() {
return Type.MULTI;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/RangeBounds.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/RangeBounds.java
index 0d61987adf5..1a6e3cbb0da 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/RangeBounds.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/RangeBounds.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare.bounds;
import java.util.Objects;
+import java.util.function.Function;
import org.apache.calcite.rex.RexNode;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
@@ -87,6 +88,17 @@ public class RangeBounds extends SearchBounds {
return Type.RANGE;
}
+ /** {@inheritDoc} */
+ @Override public SearchBounds transform(Function<RexNode, RexNode>
tranformFunction) {
+ return new RangeBounds(
+ tranformFunction.apply(condition()),
+ tranformFunction.apply(lowerBound),
+ tranformFunction.apply(upperBound),
+ lowerInclude,
+ upperInclude
+ );
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/SearchBounds.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/SearchBounds.java
index 42caf9e4488..05f997efa10 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/SearchBounds.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/SearchBounds.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.calcite.prepare.bounds;
+import java.util.function.Function;
import org.apache.calcite.rex.RexNode;
import org.jetbrains.annotations.Nullable;
@@ -42,6 +43,11 @@ public abstract class SearchBounds {
/** */
public abstract Type type();
+ /**
+ * Create a transformed copy of search bounds.
+ */
+ public abstract SearchBounds transform(Function<RexNode, RexNode>
tranformFunction);
+
/** */
public enum Type {
/** Exact search value. */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
index 8e2b80390bb..784201d0099 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java
@@ -158,6 +158,11 @@ public class CacheTableImpl extends AbstractTable
implements IgniteCacheTable {
return idxRebuildInProgress;
}
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return desc.typeDescription().tableName();
+ }
+
/** {@inheritDoc} */
@Override public <C> C unwrap(Class<C> aCls) {
if (aCls.isInstance(desc))
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 1c69a60ed39..492fca9c445 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -149,4 +149,9 @@ public interface IgniteTable extends TranslatableTable {
* @return {@code True} if index rebuild in progress.
*/
boolean isIndexRebuildInProgress();
+
+ /**
+ * @return Table name.
+ */
+ String name();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
index 44394991a7f..f54d56b5005 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java
@@ -159,6 +159,11 @@ public class SystemViewTableImpl extends AbstractTable
implements IgniteTable {
return false;
}
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return desc.name();
+ }
+
/** */
private static class StatisticsImpl implements Statistic {
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java
index f00b1cabfdb..f43e7ef7f88 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java
@@ -18,9 +18,11 @@ package
org.apache.ignite.internal.processors.query.calcite.sql;
import java.util.List;
import java.util.Objects;
+import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlCreate;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
@@ -47,8 +49,21 @@ public class IgniteSqlCreateTable extends SqlCreate {
private final @Nullable SqlNodeList createOptionList;
/** */
- private static final SqlOperator OPERATOR =
- new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);
+ private static final SqlOperator OPERATOR = new SqlSpecialOperator("CREATE
TABLE", SqlKind.CREATE_TABLE) {
+ /**
+ * Required to override this method to correctly copy SQL nodes on
SqlShuttle.
+ */
+ @Override public SqlCall createCall(
+ @Nullable SqlLiteral functionQualifier,
+ SqlParserPos pos,
+ @Nullable SqlNode... operands
+ ) {
+ assert operands != null && operands.length == 4 : operands;
+
+ return new IgniteSqlCreateTable(pos, false,
(SqlIdentifier)operands[0],
+ (SqlNodeList)operands[1], operands[2],
(SqlNodeList)operands[3]);
+ }
+ };
/** Creates a SqlCreateTable. */
public IgniteSqlCreateTable(SqlParserPos pos, boolean ifNotExists,
SqlIdentifier name,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlOption.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlOption.java
index 6381403bd33..502debe2aa3 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlOption.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlOption.java
@@ -26,7 +26,6 @@ import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.util.SqlVisitor;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.util.Litmus;
@@ -75,11 +74,6 @@ public abstract class IgniteSqlOption<E extends Enum<E>>
extends SqlCall {
throw new UnsupportedOperationException();
}
- /** {@inheritDoc} */
- @Override public <R> R accept(SqlVisitor<R> visitor) {
- throw new UnsupportedOperationException();
- }
-
/** {@inheritDoc} */
@Override public boolean equalsDeep(SqlNode node, Litmus litmus) {
if (node == this) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
index 3e8311ff186..db131c3996d 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.processors.query.calcite.util;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
@@ -45,6 +46,9 @@ public class ConvertingClosableIterator<Row> implements
Iterator<List<?>>, AutoC
/** */
@Nullable Runnable onClose;
+ /** */
+ private final AtomicBoolean closed = new AtomicBoolean();
+
/** */
public ConvertingClosableIterator(
Iterator<Row> it,
@@ -87,9 +91,11 @@ public class ConvertingClosableIterator<Row> implements
Iterator<List<?>>, AutoC
* {@inheritDoc}
*/
@Override public void close() throws Exception {
- Commons.close(it);
+ if (closed.compareAndSet(false, true)) {
+ Commons.close(it);
- if (onClose != null)
- onClose.run();
+ if (onClose != null)
+ onClose.run();
+ }
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index a58e9ae9a6e..25c702bac5c 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -23,9 +23,11 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -37,8 +39,11 @@ import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
@@ -52,6 +57,7 @@ import
org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.Query;
import org.apache.ignite.internal.processors.query.calcite.QueryRegistry;
+import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.metric.LongMetric;
@@ -63,6 +69,9 @@ import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
+import static
org.apache.ignite.internal.processors.authentication.AuthenticationProcessorSelfTest.authenticate;
+import static
org.apache.ignite.internal.processors.authentication.AuthenticationProcessorSelfTest.withSecurityContextOnAllNodes;
+import static
org.apache.ignite.internal.processors.authentication.User.DFAULT_USER_NAME;
import static
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir;
import static
org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
@@ -90,14 +99,22 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
/** */
private ListeningTestLogger log;
+ /** */
+ private SecurityContext secCtxDflt;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setGridLogger(log)
+ .setAuthenticationEnabled(true)
.setSqlConfiguration(new SqlConfiguration()
.setQueryEnginesConfiguration(new
CalciteQueryEngineConfiguration())
.setLongQueryWarningTimeout(LONG_QRY_TIMEOUT))
- .setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION,
EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ);
+ .setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION,
EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration().setPersistenceEnabled(true)
+ )
+ );
}
/** {@inheritDoc} */
@@ -109,11 +126,17 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
@Override protected void beforeTest() throws Exception {
super.beforeTest();
+ cleanPersistenceDir();
+
log = new ListeningTestLogger(log());
startGrids(nodeCount());
client = startClientGrid();
+
+ client.cluster().state(ClusterState.ACTIVE);
+
+ secCtxDflt = authenticate(grid(0), DFAULT_USER_NAME, "ignite");
}
/** {@inheritDoc} */
@@ -169,6 +192,8 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
/** */
@Test
public void testBatchParserMetrics() throws Exception {
+ withSecurityContextOnAllNodes(secCtxDflt);
+
MetricRegistry mreg0 =
grid(0).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME);
MetricRegistry mreg1 =
grid(1).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME);
mreg0.reset();
@@ -186,7 +211,7 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
assertEquals(0, misses0.value());
assertEquals(0, misses1.value());
- try (Connection conn = DriverManager.getConnection(jdbcUrl)) {
+ try (Connection conn = DriverManager.getConnection(jdbcUrl,
DFAULT_USER_NAME, "ignite")) {
conn.setSchema("PUBLIC");
try (Statement stmt = conn.createStatement()) {
@@ -291,7 +316,7 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
sql(grid(0), "SELECT * FROM table(system_range(1, 1000))");
sql(grid(0), "CREATE TABLE test_perf_stat (a INT)");
sql(grid(0), "INSERT INTO test_perf_stat VALUES (0), (1), (2), (3),
(4)");
- sql(grid(0), "SELECT * FROM test_perf_stat");
+ sql(grid(0), "SELECT * FROM test_perf_stat WHERE a > 0");
assertTrue(GridTestUtils.waitForCondition(() -> finishQryCnt.get() ==
4, 1_000L));
@@ -302,18 +327,17 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
// ScanNode page reads, since table/index scans are local and executed
in current thread. ModifyNode uses
// distributed `invoke` operation, which can be executed by other
threads or on other nodes. It's hard to
// obtain correct value of page reads for these types of operations,
so, currently we just ignore page reads
- // performed by ModifyNode. Despite static values scan themself
doesn't require any page reads, it still can
- // catch some page reads performed by insert operation. But, taking
into account small amount of inserted
- // values, it's not enough rows to trigger batch insert during values
scan, and we expect zero page-reads
- // for this query in this test.
+ // performed by ModifyNode.
// The fourth query is a table scan and should perform page reads on
all data nodes.
AtomicInteger qryCnt = new AtomicInteger();
- AtomicInteger readsCnt = new AtomicInteger();
+ AtomicLong rowsScanned = new AtomicLong();
Iterator<String> sqlIt = F.asList("SELECT", "CREATE", "INSERT",
"SELECT").iterator();
Set<UUID> dataNodesIds = new
HashSet<>(F.asList(grid(0).localNode().id(), grid(1).localNode().id()));
Set<UUID> readsNodes = new HashSet<>(dataNodesIds);
Set<Long> readsQueries = new HashSet<>();
+ Map<Long, Long> rowsFetchedPerQuery = new HashMap<>();
+ AtomicLong firstQryId = new AtomicLong(-1);
AtomicLong lastQryId = new AtomicLong();
stopCollectStatisticsAndRead(new
AbstractPerformanceStatisticsTest.TestHandler() {
@@ -328,13 +352,14 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
) {
qryCnt.incrementAndGet();
- assertTrue(nodeId.equals(grid(0).localNode().id()));
+ assertEquals(grid(0).localNode().id(), nodeId);
assertEquals(SQL_FIELDS, type);
assertTrue(text.startsWith(sqlIt.next()));
assertTrue(qryStartTime >= startTime);
assertTrue(duration >= 0);
assertTrue(success);
+ firstQryId.compareAndSet(-1, id);
lastQryId.set(id);
}
@@ -346,21 +371,43 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
long logicalReads,
long physicalReads
) {
- readsCnt.incrementAndGet();
-
readsQueries.add(id);
- assertTrue(dataNodesIds.contains(qryNodeId));
+ assertTrue(dataNodesIds.contains(nodeId));
readsNodes.remove(nodeId);
- assertTrue(grid(0).localNode().id().equals(qryNodeId));
+ assertEquals(grid(0).localNode().id(), qryNodeId);
assertEquals(SQL_FIELDS, type);
assertTrue(logicalReads > 0);
}
+
+ @Override public void queryRows(
+ UUID nodeId,
+ GridCacheQueryType type,
+ UUID qryNodeId,
+ long id,
+ String action,
+ long rows
+ ) {
+ assertEquals(grid(0).localNode().id(), qryNodeId);
+ assertEquals(SQL_FIELDS, type);
+
+ if (action.toLowerCase().contains("test_perf_stat")) {
+ assertTrue(dataNodesIds.contains(nodeId));
+ rowsScanned.addAndGet(rows);
+ }
+ else if ("Fetched".equals(action)) {
+ assertEquals(grid(0).localNode().id(), nodeId);
+ assertNull(rowsFetchedPerQuery.put(id, rows));
+ }
+ }
});
assertEquals(4, qryCnt.get());
assertTrue("Query reads expected on nodes: " + readsNodes,
readsNodes.isEmpty());
assertEquals(Collections.singleton(lastQryId.get()), readsQueries);
+ assertEquals((Long)1000L, rowsFetchedPerQuery.get(firstQryId.get()));
+ assertEquals((Long)4L, rowsFetchedPerQuery.get(lastQryId.get()));
+ assertEquals(5L, rowsScanned.get());
}
/** */
@@ -426,6 +473,8 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
/** */
@Test
public void testSensitiveInformationHiding() throws Exception {
+ withSecurityContextOnAllNodes(secCtxDflt);
+
cleanPerformanceStatisticsDir();
startCollectStatistics();
@@ -464,7 +513,24 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
fut.get();
}
+ // Test bounds hiding in index scans.
+ sql(grid(0), "CREATE TABLE test_sens (id int, val varchar)");
+ sql(grid(0), "CREATE INDEX test_sens_idx ON test_sens(val)");
+ sql(grid(0), "INSERT INTO test_sens (id, val) VALUES (0,
'sensitive0'), (1, 'sensitive1'), " +
+ "(2, 'sensitive2'), (3, 'sensitive3'), (4, 'sensitive4'), (5,
'sensitive5'), (6, 'sensitive6')");
+ sql(grid(0), "SELECT * FROM test_sens WHERE val IN ('sensitive0',
'sensitive1')");
+ sql(grid(0), "SELECT * FROM test_sens WHERE val BETWEEN
'sensitive1' AND 'sensitive3'");
+ sql(grid(0), "SELECT * FROM test_sens WHERE val = 'sensitive4'");
+
+ // Test CREATE AS SELECT rewrite.
+ sql(grid(0), "CREATE TABLE test_sens1 (val) WITH
CACHE_NAME=\"test_sens1\" AS SELECT 'sensitive' AS val");
+
+ // Test CREATE/ALTER USER commands rewrite.
+ sql(grid(0), "CREATE USER test WITH PASSWORD 'sensitive'");
+ sql(grid(0), "ALTER USER test WITH PASSWORD 'sensitive'");
+
AtomicInteger qryCnt = new AtomicInteger();
+ AtomicInteger planCnt = new AtomicInteger();
stopCollectStatisticsAndRead(new
AbstractPerformanceStatisticsTest.TestHandler() {
@Override public void query(
@@ -477,11 +543,26 @@ public class SqlDiagnosticIntegrationTest extends
AbstractBasicIntegrationTest {
boolean success
) {
qryCnt.incrementAndGet();
- assertFalse(text.contains("sensitive"));
+ assertFalse(text, text.contains("sensitive"));
+ }
+
+ @Override public void queryProperty(
+ UUID nodeId,
+ GridCacheQueryType type,
+ UUID qryNodeId,
+ long id,
+ String name,
+ String val
+ ) {
+ if ("Query plan".equals(name)) {
+ planCnt.incrementAndGet();
+ assertFalse(val, val.contains("sensitive"));
+ }
}
});
- assertEquals(2, qryCnt.get());
+ assertEquals(12, qryCnt.get()); // CREATE AS SELECT counts as two
queries.
+ assertEquals(7, planCnt.get()); // DDL queries don't produce
plans, except CREATE AS SELECT.
}
finally {
QueryUtils.INCLUDE_SENSITIVE = true;
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
index e17b1a2067d..aa4cd3e95f2 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java
@@ -362,7 +362,7 @@ public class PlannerTest extends AbstractPlannerTest {
private MultiStepPlan splitPlan(IgniteRel phys) {
assertNotNull(phys);
- MultiStepPlan plan = new MultiStepQueryPlan(null, new
QueryTemplate(new Splitter().go(phys)), null, null);
+ MultiStepPlan plan = new MultiStepQueryPlan(null, null, new
QueryTemplate(new Splitter().go(phys)), null, null);
assertNotNull(plan);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
index 8cf051f358c..530d9fa1ffc 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java
@@ -292,7 +292,7 @@ public class TestTable implements IgniteCacheTable {
}
/** */
- public String name() {
+ @Override public String name() {
return name;
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java
index 752755fbcd7..b0c8244734a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java
@@ -52,7 +52,9 @@ import static
org.apache.ignite.internal.processors.performancestatistics.Operat
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.JOB;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.PAGES_WRITE_THROTTLE;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY;
+import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_PROPERTY;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_READS;
+import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_ROWS;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.TASK;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_COMMIT;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheOperation;
@@ -305,6 +307,45 @@ public class FilePerformanceStatisticsReader {
return true;
}
+ else if (opType == QUERY_ROWS) {
+ String action = readCacheableString(buf);
+
+ if (action == null || buf.remaining() < 1 + 16 + 8 + 8)
+ return false;
+
+ GridCacheQueryType qryType =
GridCacheQueryType.fromOrdinal(buf.get());
+ UUID uuid = readUuid(buf);
+ long id = buf.getLong();
+ long rows = buf.getLong();
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.queryRows(nodeId, qryType, uuid, id, action, rows);
+
+ return true;
+ }
+ else if (opType == QUERY_PROPERTY) {
+ String name = readCacheableString(buf);
+
+ if (name == null)
+ return false;
+
+ String val = readCacheableString(buf);
+
+ if (val == null)
+ return false;
+
+ if (buf.remaining() < 1 + 16 + 8)
+ return false;
+
+ GridCacheQueryType qryType =
GridCacheQueryType.fromOrdinal(buf.get());
+ UUID uuid = readUuid(buf);
+ long id = buf.getLong();
+
+ for (PerformanceStatisticsHandler handler : curHnd)
+ handler.queryProperty(nodeId, qryType, uuid, id, name, val);
+
+ return true;
+ }
else if (opType == TASK) {
if (buf.remaining() < 1)
return false;
@@ -538,6 +579,32 @@ public class FilePerformanceStatisticsReader {
return str;
}
+ /**
+ * Reads cacheable string from byte buffer.
+ *
+ * @return String or {@code null} in case of buffer underflow.
+ */
+ private String readCacheableString(ByteBuffer buf) {
+ if (buf.remaining() < 1 + 4)
+ return null;
+
+ boolean cached = buf.get() != 0;
+
+ if (cached) {
+ int hash = buf.getInt();
+
+ return knownStrs.get(hash);
+ }
+ else {
+ int textLen = buf.getInt();
+
+ if (buf.remaining() < textLen)
+ return null;
+
+ return readString(buf, textLen);
+ }
+ }
+
/** Reads {@link UUID} from buffer. */
private static UUID readUuid(ByteBuffer buf) {
return new UUID(buf.getLong(), buf.getLong());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java
index 405169b1e04..293b675738e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java
@@ -55,7 +55,9 @@ import static
org.apache.ignite.internal.processors.performancestatistics.Operat
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.JOB;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.PAGES_WRITE_THROTTLE;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY;
+import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_PROPERTY;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_READS;
+import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_ROWS;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.TASK;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_COMMIT;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_ROLLBACK;
@@ -64,8 +66,10 @@ import static
org.apache.ignite.internal.processors.performancestatistics.Operat
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.checkpointRecordSize;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.jobRecordSize;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.pagesWriteThrottleRecordSize;
+import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.queryPropertyRecordSize;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.queryReadsRecordSize;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.queryRecordSize;
+import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.queryRowsRecordSize;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.taskRecordSize;
import static
org.apache.ignite.internal.processors.performancestatistics.OperationType.transactionRecordSize;
@@ -90,7 +94,7 @@ public class FilePerformanceStatisticsWriter {
public static final int DFLT_FLUSH_SIZE = (int)(8 * U.MB);
/** Default maximum cached strings threshold. String caching will stop on
threshold excess. */
- public static final int DFLT_CACHED_STRINGS_THRESHOLD = 1024;
+ public static final int DFLT_CACHED_STRINGS_THRESHOLD = 10 * 1024;
/** File writer thread name. */
static final String WRITER_THREAD_NAME = "performance-statistics-writer";
@@ -280,6 +284,47 @@ public class FilePerformanceStatisticsWriter {
});
}
+ /**
+ * @param type Cache query type.
+ * @param qryNodeId Originating node id.
+ * @param id Query id.
+ * @param action Action with rows.
+ * @param rows Number of rows.
+ */
+ public void queryRows(GridCacheQueryType type, UUID qryNodeId, long id,
String action, long rows) {
+ boolean cached = cacheIfPossible(action);
+
+ doWrite(QUERY_ROWS, queryRowsRecordSize(cached ? 0 :
action.getBytes().length, cached), buf -> {
+ writeString(buf, action, cached);
+ buf.put((byte)type.ordinal());
+ writeUuid(buf, qryNodeId);
+ buf.putLong(id);
+ buf.putLong(rows);
+ });
+ }
+
+ /**
+ * @param type Cache query type.
+ * @param qryNodeId Originating node id.
+ * @param id Query id.
+ * @param name Query property name.
+ * @param val Query property value.
+ */
+ public void queryProperty(GridCacheQueryType type, UUID qryNodeId, long
id, String name, String val) {
+ boolean cachedName = cacheIfPossible(name);
+ boolean cachedVal = cacheIfPossible(val);
+
+ doWrite(QUERY_PROPERTY,
+ queryPropertyRecordSize(cachedName ? 0 : name.getBytes().length,
cachedName, cachedVal ? 0 : val.getBytes().length, cachedVal),
+ buf -> {
+ writeString(buf, name, cachedName);
+ writeString(buf, val, cachedVal);
+ buf.put((byte)type.ordinal());
+ writeUuid(buf, qryNodeId);
+ buf.putLong(id);
+ });
+ }
+
/**
* @param sesId Session id.
* @param taskName Task name.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
index 4ee2f6e5a16..df18195b653 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java
@@ -85,7 +85,13 @@ public enum OperationType {
CHECKPOINT(18),
/** Pages write throttle. */
- PAGES_WRITE_THROTTLE(19);
+ PAGES_WRITE_THROTTLE(19),
+
+ /** Count of processed by query rows. */
+ QUERY_ROWS(20),
+
+ /** Custom query property. */
+ QUERY_PROPERTY(21);
/** Cache operations. */
public static final EnumSet<OperationType> CACHE_OPS =
EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE,
@@ -175,6 +181,26 @@ public enum OperationType {
return 1 + 16 + 8 + 8 + 8;
}
+ /**
+ * @param actionLen Rows action length.
+ * @param cached {@code True} if action is cached.
+ * @return Query rows record size.
+ */
+ public static int queryRowsRecordSize(int actionLen, boolean cached) {
+ return 1 + (cached ? 4 : 4 + actionLen) + 1 + 16 + 8 + 8;
+ }
+
+ /**
+ * @param nameLen Propery name length.
+ * @param nameCached {@code True} if property name is cached.
+ * @param valLen Propery value length.
+ * @param valCached {@code True} if property value is cached.
+ * @return Query property record size.
+ */
+ public static int queryPropertyRecordSize(int nameLen, boolean nameCached,
int valLen, boolean valCached) {
+ return 1 + (nameCached ? 4 : 4 + nameLen) + 1 + (valCached ? 4 : 4 +
valLen) + 1 + 16 + 8;
+ }
+
/**
* @param nameLen Task name length.
* @param cached {@code True} if task name cached.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java
index 0596cfc4add..cc1a203a03f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java
@@ -75,6 +75,30 @@ public interface PerformanceStatisticsHandler {
void queryReads(UUID nodeId, GridCacheQueryType type, UUID queryNodeId,
long id, long logicalReads,
long physicalReads);
+ /**
+ * Count of rows processed by query.
+ *
+ * @param nodeId Node id.
+ * @param type Cache query type.
+ * @param qryNodeId Originating node id.
+ * @param id Query id.
+ * @param action Action with rows.
+ * @param rows Number of rows processed.
+ */
+ void queryRows(UUID nodeId, GridCacheQueryType type, UUID qryNodeId, long
id, String action, long rows);
+
+ /**
+ * Custom query property.
+ *
+ * @param nodeId Node id.
+ * @param type Cache query type.
+ * @param qryNodeId Originating node id.
+ * @param id Query id.
+ * @param name Query property name.
+ * @param val Query property value.
+ */
+ void queryProperty(UUID nodeId, GridCacheQueryType type, UUID qryNodeId,
long id, String name, String val);
+
/**
* @param nodeId Node id.
* @param sesId Session id.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
index 0d720b4ced8..fbafb352afe 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java
@@ -185,6 +185,28 @@ public class PerformanceStatisticsProcessor extends
GridProcessorAdapter {
write(writer -> writer.queryReads(type, queryNodeId, id, logicalReads,
physicalReads));
}
+ /**
+ * @param type Cache query type.
+ * @param qryNodeId Originating node id.
+ * @param id Query id.
+ * @param action Action with rows.
+ * @param rows Number of rows processed.
+ */
+ public void queryRowsProcessed(GridCacheQueryType type, UUID qryNodeId,
long id, String action, long rows) {
+ write(writer -> writer.queryRows(type, qryNodeId, id, action, rows));
+ }
+
+ /**
+ * @param type Cache query type.
+ * @param qryNodeId Originating node id.
+ * @param id Query id.
+ * @param name Query property name.
+ * @param val Query property value.
+ */
+ public void queryProperty(GridCacheQueryType type, UUID qryNodeId, long
id, String name, String val) {
+ write(writer -> writer.queryProperty(type, qryNodeId, id, name, val));
+ }
+
/**
* @param sesId Session id.
* @param taskName Task name.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
index 17cfdde0a91..d7a06793340 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java
@@ -342,5 +342,10 @@ public final class HeavyQueriesTracker {
if (bigResults)
LT.warn(log, BIG_RESULT_SET_MSG + qryInfo.queryInfo("fetched="
+ fetchedSize));
}
+
+ /** */
+ public long fetchedSize() {
+ return fetchedSize;
+ }
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
index 0e3e1f3df25..50617b288e3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java
@@ -369,6 +369,31 @@ public class RunningQueryManager {
}
if (ctx.performanceStatistics().enabled() && qry.startTimeNanos()
> 0) {
+ String flags = null;
+
+ // Create string for flags with not default values.
+ if (qry.local())
+ flags = "local";
+
+ if (!qry.lazy())
+ flags = (flags == null ? "" : flags + ", ") + "notLazy";
+
+ if (qry.distributedJoins())
+ flags = (flags == null ? "" : flags + ", ") +
"distributedJoins";
+
+ if (qry.enforceJoinOrder())
+ flags = (flags == null ? "" : flags + ", ") +
"enforceJoinOrder";
+
+ if (flags != null) {
+ ctx.performanceStatistics().queryProperty(
+ qry.queryType(),
+ qry.nodeId(),
+ qry.id(),
+ "Flags",
+ flags
+ );
+ }
+
ctx.performanceStatistics().query(
qry.queryType(),
qry.query(),
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java
index 779f15a4fe9..4384d5471bd 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java
@@ -186,6 +186,18 @@ public abstract class AbstractPerformanceStatisticsTest
extends GridCommonAbstra
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public void queryRows(UUID nodeId, GridCacheQueryType type,
UUID qryNodeId, long id, String action,
+ long rows) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void queryProperty(UUID nodeId, GridCacheQueryType
type, UUID qryNodeId, long id, String name,
+ String val) {
+ // No-op.
+ }
+
/** {@inheritDoc} */
@Override public void task(UUID nodeId, IgniteUuid sesId, String
taskName, long startTime, long duration,
int affPartId) {
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
index f3c508a8620..7cdbed157fe 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java
@@ -97,6 +97,21 @@ public class H2QueryInfo implements TrackableQuery {
}
}
+ /** */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** */
+ public long queryId() {
+ return queryId;
+ }
+
+ /** */
+ public String plan() {
+ return stmt.getPlanSQL();
+ }
+
/**
* Print info specified by children.
*
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
index f9bdd321c9b..9529b4430b1 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java
@@ -27,6 +27,9 @@ import java.util.NoSuchElementException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import
org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
@@ -111,6 +114,12 @@ public abstract class H2ResultSetIterator<T> extends
GridIteratorAdapter<T> impl
/** Tracing processor. */
protected final Tracing tracing;
+ /** */
+ private final GridKernalContext ctx;
+
+ /** */
+ private final H2QueryInfo qryInfo;
+
/**
* @param data Data array.
* @param log Logger.
@@ -126,11 +135,12 @@ public abstract class H2ResultSetIterator<T> extends
GridIteratorAdapter<T> impl
IgniteH2Indexing h2,
H2QueryInfo qryInfo,
Tracing tracing
- )
- throws IgniteCheckedException {
+ ) throws IgniteCheckedException {
+ ctx = h2.ctx;
this.pageSize = pageSize;
this.data = data;
this.tracing = tracing;
+ this.qryInfo = qryInfo;
try {
res = (ResultInterface)RESULT_FIELD.get(data);
@@ -318,6 +328,18 @@ public abstract class H2ResultSetIterator<T> extends
GridIteratorAdapter<T> impl
try {
resultSetChecker.checkOnClose();
+ PerformanceStatisticsProcessor perfStat =
ctx.performanceStatistics();
+
+ if (perfStat.enabled() && resultSetChecker.fetchedSize() > 0) {
+ perfStat.queryRowsProcessed(
+ GridCacheQueryType.SQL_FIELDS,
+ qryInfo.nodeId(),
+ qryInfo.queryId(),
+ "Fetched on reducer",
+ resultSetChecker.fetchedSize()
+ );
+ }
+
data.close();
}
catch (SQLException e) {
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e622f2846ea..37559cc4758 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -474,6 +474,16 @@ public class IgniteH2Indexing implements GridQueryIndexing
{
H2QueryInfo qryInfo = new
H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
ctx.localNodeId(), qryId);
+ if (ctx.performanceStatistics().enabled()) {
+ ctx.performanceStatistics().queryProperty(
+ GridCacheQueryType.SQL_FIELDS,
+ qryInfo.nodeId(),
+ qryInfo.queryId(),
+ "Local plan",
+ qryInfo.plan()
+ );
+ }
+
ResultSet rs = executeSqlQueryWithTimer(
stmt,
conn,
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 18bb7abb412..f2afcfd57bb 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -468,6 +468,16 @@ public class GridMapQueryExecutor {
MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt,
qry.query(), node.id(), qryId, reqId, segmentId);
+ if (performanceStatsEnabled) {
+ ctx.performanceStatistics().queryProperty(
+ GridCacheQueryType.SQL_FIELDS,
+ qryInfo.nodeId(),
+ qryInfo.queryId(),
+ "Map phase plan",
+ qryInfo.plan()
+ );
+ }
+
ResultSet rs = h2.executeSqlQueryWithTimer(
stmt,
conn,
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 0a56fb32b60..125771e865b 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -55,6 +55,7 @@ import
org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import
org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -533,6 +534,16 @@ public class GridReduceQueryExecutor {
ReduceH2QueryInfo qryInfo = new
ReduceH2QueryInfo(stmt, qry.originalSql(),
ctx.localNodeId(), qryId, qryReqId);
+ if (ctx.performanceStatistics().enabled()) {
+ ctx.performanceStatistics().queryProperty(
+ GridCacheQueryType.SQL_FIELDS,
+ qryInfo.nodeId(),
+ qryInfo.queryId(),
+ "Reduce phase plan",
+ qryInfo.plan()
+ );
+ }
+
ResultSet res = h2.executeSqlQueryWithTimer(stmt,
conn,
rdc.query(),
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
index 5f96cacef51..d7d8736e43e 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java
@@ -28,8 +28,10 @@ import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
+import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.processors.query.h2.H2PooledConnection;
import org.apache.ignite.internal.processors.query.h2.H2Utils;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
@@ -339,6 +341,9 @@ class MapQueryResult {
/** */
private final HeavyQueriesTracker.ResultSetChecker resultSetChecker;
+ /** */
+ private final MapH2QueryInfo qryInfo;
+
/**
* Constructor.
*
@@ -346,6 +351,7 @@ class MapQueryResult {
*/
Result(@NotNull ResultSet rs, MapH2QueryInfo qryInfo) {
this.rs = rs;
+ this.qryInfo = qryInfo;
try {
res = (ResultInterface)RESULT_FIELD.get(rs);
@@ -364,6 +370,18 @@ class MapQueryResult {
void close() {
resultSetChecker.checkOnClose();
+ PerformanceStatisticsProcessor perfStat =
cctx.kernalContext().performanceStatistics();
+
+ if (perfStat.enabled() && resultSetChecker.fetchedSize() > 0) {
+ perfStat.queryRowsProcessed(
+ GridCacheQueryType.SQL_FIELDS,
+ qryInfo.nodeId(),
+ qryInfo.queryId(),
+ "Fetched on mapper",
+ resultSetChecker.fetchedSize()
+ );
+ }
+
U.close(rs, log);
}
}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java
index d6e912ef19c..1ab897286b7 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java
@@ -26,6 +26,8 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.QueryEntity;
@@ -46,6 +48,7 @@ import
org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -153,7 +156,7 @@ public class PerformanceStatisticsQueryTest extends
AbstractPerformanceStatistic
for (int i = 0; i < ENTRY_COUNT; i++) {
cache.put(i, i);
- cache2.put(i, i * 2);
+ cache2.put((long)i, (long)i * 2);
}
}
@@ -179,7 +182,7 @@ public class PerformanceStatisticsQueryTest extends
AbstractPerformanceStatistic
public void testScanQuery() throws Exception {
ScanQuery<Object, Object> qry = new
ScanQuery<>().setPageSize(pageSize);
- checkQuery(SCAN, qry, DEFAULT_CACHE_NAME);
+ checkQuery(SCAN, qry, DEFAULT_CACHE_NAME, false);
}
/** @throws Exception If failed. */
@@ -193,7 +196,7 @@ public class PerformanceStatisticsQueryTest extends
AbstractPerformanceStatistic
String expText = indexQueryText(DEFAULT_CACHE_NAME,
new IndexQueryDesc(qry.getCriteria(), qry.getIndexName(),
qry.getValueType()));
- checkQuery(INDEX, qry, expText);
+ checkQuery(INDEX, qry, expText, false);
}
/** @throws Exception If failed. */
@@ -203,7 +206,7 @@ public class PerformanceStatisticsQueryTest extends
AbstractPerformanceStatistic
SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize);
- checkQuery(SQL_FIELDS, qry, sql);
+ checkQuery(SQL_FIELDS, qry, sql, false);
}
/** @throws Exception If failed. */
@@ -213,17 +216,61 @@ public class PerformanceStatisticsQueryTest extends
AbstractPerformanceStatistic
SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize);
- checkQuery(SQL_FIELDS, qry, sql);
+ checkQuery(SQL_FIELDS, qry, sql, false);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSqlFieldsQueryWithReducer() throws Exception {
+ String sql = "select sum(_key) from " + DEFAULT_CACHE_NAME;
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize);
+
+ checkQuery(SQL_FIELDS, qry, sql, true);
+ }
+
+ /** @throws Exception If failed. */
+ @Test
+ public void testSqlFieldsLocalQuery() throws Exception {
+ Assume.assumeTrue(clientType == SERVER);
+
+ String sql = "select * from " + DEFAULT_CACHE_NAME;
+
+ SqlFieldsQuery qry = new
SqlFieldsQuery(sql).setPageSize(pageSize).setLocal(true);
+
+ cleanPerformanceStatisticsDir();
+
+ startCollectStatistics();
+
+ srv.cache(DEFAULT_CACHE_NAME).query(qry).getAll();
+
+ AtomicReference<String> flags = new AtomicReference<>();
+
+ stopCollectStatisticsAndRead(new TestHandler() {
+ @Override public void queryProperty(
+ UUID nodeId,
+ GridCacheQueryType type,
+ UUID qryNodeId,
+ long id,
+ String name,
+ String val
+ ) {
+ if ("Flags".equals(name))
+ assertTrue(flags.compareAndSet(null, val));
+ }
+ });
+
+ assertEquals("local", flags.get());
}
/** Check query. */
- private void checkQuery(GridCacheQueryType type, Query<?> qry, String
text) throws Exception {
+ private void checkQuery(GridCacheQueryType type, Query<?> qry, String
text, boolean hasReducer) throws Exception {
client.cluster().state(INACTIVE);
client.cluster().state(ACTIVE);
- runQueryAndCheck(type, qry, text, true, true);
+ runQueryAndCheck(type, qry, text, true, true, hasReducer);
- runQueryAndCheck(type, qry, text, true, false);
+ runQueryAndCheck(type, qry, text, true, false, hasReducer);
}
/** @throws Exception If failed. */
@@ -231,21 +278,26 @@ public class PerformanceStatisticsQueryTest extends
AbstractPerformanceStatistic
public void testDdlAndDmlQueries() throws Exception {
String sql = "create table " + SQL_TABLE + " (id int, val varchar,
primary key (id))";
- runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false,
false);
+ runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false,
false, false);
sql = "insert into " + SQL_TABLE + " (id) values (1)";
- runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false,
false);
+ runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false,
false, false);
sql = "update " + SQL_TABLE + " set val = 'abc'";
- runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, true,
false);
+ runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, true,
false, false);
}
/** Runs query and checks statistics. */
- private void runQueryAndCheck(GridCacheQueryType expType, Query<?> qry,
String expText, boolean hasLogicalReads,
- boolean hasPhysicalReads)
- throws Exception {
+ private void runQueryAndCheck(
+ GridCacheQueryType expType,
+ Query<?> qry,
+ String expText,
+ boolean hasLogicalReads,
+ boolean hasPhysicalReads,
+ boolean hasReducer
+ ) throws Exception {
long startTime = U.currentTimeMillis();
cleanPerformanceStatisticsDir();
@@ -275,9 +327,14 @@ public class PerformanceStatisticsQueryTest extends
AbstractPerformanceStatistic
if (hasLogicalReads)
srv.cluster().forServers().nodes().forEach(node ->
readsNodes.add(node.id()));
+ Set<UUID> dataNodes = new HashSet<>(readsNodes);
AtomicInteger queryCnt = new AtomicInteger();
AtomicInteger readsCnt = new AtomicInteger();
HashSet<Long> qryIds = new HashSet<>();
+ AtomicLong mapRowCnt = new AtomicLong();
+ AtomicLong rdcRowCnt = new AtomicLong();
+ AtomicInteger planMapCnt = new AtomicInteger();
+ AtomicInteger planRdcCnt = new AtomicInteger();
stopCollectStatisticsAndRead(new TestHandler() {
@Override public void query(UUID nodeId, GridCacheQueryType type,
String text, long id, long queryStartTime,
@@ -304,11 +361,67 @@ public class PerformanceStatisticsQueryTest extends
AbstractPerformanceStatistic
assertTrue(logicalReads > 0);
assertTrue(hasPhysicalReads ? physicalReads > 0 :
physicalReads == 0);
}
+
+ @Override public void queryRows(
+ UUID nodeId,
+ GridCacheQueryType type,
+ UUID qryNodeId,
+ long id,
+ String action,
+ long rows
+ ) {
+ assertEquals(expType, SQL_FIELDS);
+ assertTrue(expNodeIds.contains(qryNodeId));
+
+ if ("Fetched on mapper".equals(action)) {
+ assertTrue(dataNodes.contains(nodeId));
+ mapRowCnt.addAndGet(rows);
+ }
+ else if ("Fetched on reducer".equals(action)) {
+ assertTrue(expNodeIds.contains(nodeId));
+ rdcRowCnt.addAndGet(rows);
+ }
+ }
+
+ @Override public void queryProperty(
+ UUID nodeId,
+ GridCacheQueryType type,
+ UUID qryNodeId,
+ long id,
+ String name,
+ String val
+ ) {
+ assertEquals(expType, SQL_FIELDS);
+ assertTrue(expNodeIds.contains(qryNodeId));
+
+ if ("Map phase plan".equals(name)) {
+ assertTrue(dataNodes.contains(nodeId));
+ planMapCnt.incrementAndGet();
+ }
+ else if ("Reduce phase plan".equals(name)) {
+ assertTrue(expNodeIds.contains(nodeId));
+ planRdcCnt.incrementAndGet();
+ }
+ }
});
assertEquals(1, queryCnt.get());
assertTrue("Query reads expected on nodes: " + readsNodes,
readsNodes.isEmpty());
assertEquals(1, qryIds.size());
+
+ // If query has logical reads, plan and rows info also expected.
+ if (hasLogicalReads && expType == SQL_FIELDS) {
+ assertEquals(dataNodes.size(), planMapCnt.get());
+ assertTrue(mapRowCnt.get() > 0);
+ if (hasReducer) {
+ assertTrue(rdcRowCnt.get() > 0);
+ assertEquals(1, planRdcCnt.get());
+ }
+ else {
+ assertEquals(0, rdcRowCnt.get());
+ assertEquals(0, planRdcCnt.get());
+ }
+ }
}
/** @throws Exception If failed. */