This is an automated email from the ASF dual-hosted git repository. alexpl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new b26a4c52f87 IGNITE-20079 SQL Calcite: Write additional performance statistics info for queries - Fixes #10880. b26a4c52f87 is described below commit b26a4c52f87b09e99ca0ddd0e9e3e3d6ff4d1935 Author: Aleksey Plekhanov <plehanov.a...@gmail.com> AuthorDate: Thu Sep 14 11:04:43 2023 +0300 IGNITE-20079 SQL Calcite: Write additional performance statistics info for queries - Fixes #10880. Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com> --- .../query/calcite/CalciteQueryProcessor.java | 21 +++ .../query/calcite/QueryRegistryImpl.java | 2 +- .../query/calcite/exec/ExecutionServiceImpl.java | 27 +++- .../query/calcite/exec/LogicalRelImplementor.java | 22 ++-- .../query/calcite/exec/rel/ScanStorageNode.java | 27 +++- .../query/calcite/exec/tracker/IoTracker.java | 15 +++ .../query/calcite/exec/tracker/NoOpIoTracker.java | 13 ++ .../tracker/PerformanceStatisticsIoTracker.java | 51 +++++++- .../calcite/prepare/AbstractMultiStepPlan.java | 10 ++ .../query/calcite/prepare/MultiStepDmlPlan.java | 5 +- .../query/calcite/prepare/MultiStepPlan.java | 5 + .../query/calcite/prepare/MultiStepQueryPlan.java | 5 +- .../query/calcite/prepare/PlanExtractor.java | 108 ++++++++++++++++ .../query/calcite/prepare/PrepareServiceImpl.java | 13 +- .../query/calcite/prepare/bounds/ExactBounds.java | 6 + .../query/calcite/prepare/bounds/MultiBounds.java | 8 ++ .../query/calcite/prepare/bounds/RangeBounds.java | 12 ++ .../query/calcite/prepare/bounds/SearchBounds.java | 6 + .../query/calcite/schema/CacheTableImpl.java | 5 + .../query/calcite/schema/IgniteTable.java | 5 + .../query/calcite/schema/SystemViewTableImpl.java | 5 + .../query/calcite/sql/IgniteSqlCreateTable.java | 19 ++- .../query/calcite/sql/IgniteSqlOption.java | 6 - .../calcite/util/ConvertingClosableIterator.java | 12 +- .../integration/SqlDiagnosticIntegrationTest.java | 111 +++++++++++++--- .../query/calcite/planner/PlannerTest.java | 2 +- .../query/calcite/planner/TestTable.java | 2 +- .../FilePerformanceStatisticsReader.java | 67 ++++++++++ .../FilePerformanceStatisticsWriter.java | 47 ++++++- .../performancestatistics/OperationType.java | 28 +++- .../PerformanceStatisticsHandler.java | 24 ++++ .../PerformanceStatisticsProcessor.java | 22 ++++ .../query/running/HeavyQueriesTracker.java | 5 + .../query/running/RunningQueryManager.java | 25 ++++ .../AbstractPerformanceStatisticsTest.java | 12 ++ .../internal/processors/query/h2/H2QueryInfo.java | 15 +++ .../processors/query/h2/H2ResultSetIterator.java | 26 +++- .../processors/query/h2/IgniteH2Indexing.java | 10 ++ .../query/h2/twostep/GridMapQueryExecutor.java | 10 ++ .../query/h2/twostep/GridReduceQueryExecutor.java | 11 ++ .../query/h2/twostep/MapQueryResult.java | 18 +++ .../PerformanceStatisticsQueryTest.java | 141 +++++++++++++++++++-- 42 files changed, 913 insertions(+), 71 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index 3a675e8882c..7be8acb3ed5 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -36,12 +36,14 @@ import org.apache.calcite.rel.RelCollationTraitDef; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.hint.HintStrategyTable; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlDynamicParam; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.util.SqlOperatorTables; import org.apache.calcite.sql.util.SqlShuttle; import org.apache.calcite.sql.validate.SqlValidator; @@ -94,7 +96,10 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCach import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl; import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder; import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolderImpl; +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlAlterUser; import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlConformance; +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateUser; +import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlOption; import org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteOwnSqlOperatorTable; import org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteStdSqlOperatorTable; import org.apache.ignite.internal.processors.query.calcite.sql.generated.IgniteSqlParserImpl; @@ -527,6 +532,22 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query @Override public SqlNode visit(SqlLiteral literal) { return new SqlDynamicParam(-1, literal.getParserPosition()); } + + @Override public SqlNode visit(SqlCall call) { + // Handle some special cases. + if (call instanceof IgniteSqlOption) + return call; + else if (call instanceof IgniteSqlCreateUser) { + return new IgniteSqlCreateUser(call.getParserPosition(), ((IgniteSqlCreateUser)call).user(), + SqlLiteral.createCharString("hidden", SqlParserPos.ZERO)); + } + else if (call instanceof IgniteSqlAlterUser) { + return new IgniteSqlAlterUser(call.getParserPosition(), ((IgniteSqlAlterUser)call).user(), + SqlLiteral.createCharString("hidden", SqlParserPos.ZERO)); + } + + return super.visit(call); + } } ).toString(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java index 7fe09c6bc4a..aecf0fe69d8 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/QueryRegistryImpl.java @@ -63,7 +63,7 @@ public class QueryRegistryImpl extends AbstractService implements QueryRegistry String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null; long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(), - false, createCancelToken(qry), initiatorId, false, false, false); + false, createCancelToken(qry), initiatorId, false, true, false); rootQry.localQueryId(locId); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index 5472042b80a..1d79b50a930 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.processors.cache.QueryCursorImpl; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor; @@ -673,6 +674,16 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut } } + if (perfStatProc.enabled()) { + perfStatProc.queryProperty( + GridCacheQueryType.SQL_FIELDS, + qry.initiatorNodeId(), + qry.localQueryId(), + "Query plan", + plan.textPlan() + ); + } + QueryProperties qryProps = qry.context().unwrap(QueryProperties.class); Function<Object, Object> fieldConverter = (qryProps == null || qryProps.keepBinary()) ? null : @@ -720,8 +731,22 @@ public class ExecutionServiceImpl<Row> extends AbstractService implements Execut }; } + Runnable onClose = () -> { + if (perfStatProc.enabled()) { + perfStatProc.queryRowsProcessed( + GridCacheQueryType.SQL_FIELDS, + qry.initiatorNodeId(), + qry.localQueryId(), + "Fetched", + resultSetChecker.fetchedSize() + ); + } + + resultSetChecker.checkOnClose(); + }; + Iterator<List<?>> it = new ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx, - fieldConverter, rowConverter, resultSetChecker::checkOnClose); + fieldConverter, rowConverter, onClose); return new ListFieldsQueryCursor<>(plan, it, ectx); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java index 3300f01b58e..cb816373109 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java @@ -315,7 +315,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> { if (idx != null && !tbl.isIndexRebuildInProgress()) { Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges, requiredColumns); - return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj); + return new ScanStorageNode<>(idx.name(), ctx, rowType, rowsIter, filters, prj); } else { // Index was invalidated after planning, workaround through table-scan -> sort -> index spool. @@ -339,7 +339,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> { if (!spoolNodeRequired && projects != null) rowType = rel.getRowType(); - Node<Row> node = new ScanStorageNode<>(ctx, rowType, rowsIter, filterHasCorrelation ? null : filters, + Node<Row> node = new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filterHasCorrelation ? null : filters, projNodeRequired ? null : prj); RelCollation collation = rel.collation(); @@ -406,14 +406,14 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> { IgniteIndex idx = tbl.getIndex(rel.indexName()); if (idx != null && !tbl.isIndexRebuildInProgress()) { - return new ScanStorageNode<>(ctx, rel.getRowType(), () -> Collections.singletonList(ctx.rowHandler() - .factory(ctx.getTypeFactory(), rel.getRowType()) - .create(idx.count(ctx, ctx.group(rel.sourceId()), rel.notNull()))).iterator()); + return new ScanStorageNode<>(idx.name() + "_COUNT", ctx, rel.getRowType(), + () -> Collections.singletonList(ctx.rowHandler().factory(ctx.getTypeFactory(), rel.getRowType()) + .create(idx.count(ctx, ctx.group(rel.sourceId()), rel.notNull()))).iterator()); } else { CollectNode<Row> replacement = CollectNode.createCountCollector(ctx); - replacement.register(new ScanStorageNode<>(ctx, rel.getTable().getRowType(), tbl.scan(ctx, + replacement.register(new ScanStorageNode<>(tbl.name(), ctx, rel.getTable().getRowType(), tbl.scan(ctx, ctx.group(rel.sourceId()), ImmutableBitSet.of(0)))); return replacement; @@ -429,14 +429,16 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> { ImmutableBitSet requiredColumns = idxBndRel.requiredColumns(); RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns); - if (idx != null && !tbl.isIndexRebuildInProgress()) - return new ScanStorageNode<>(ctx, rowType, idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns)); + if (idx != null && !tbl.isIndexRebuildInProgress()) { + return new ScanStorageNode<>(idx.name() + "_BOUND", ctx, rowType, + idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns)); + } else { assert requiredColumns.cardinality() == 1; Iterable<Row> rowsIter = tbl.scan(ctx, grp, idxBndRel.requiredColumns()); - Node<Row> scanNode = new ScanStorageNode<>(ctx, rowType, rowsIter, + Node<Row> scanNode = new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, r -> ctx.rowHandler().get(0, r) != null, null); RelCollation collation = idx.collation().apply(LogicalScanConverterRule.createMapping( @@ -481,7 +483,7 @@ public class LogicalRelImplementor<Row> implements IgniteRelVisitor<Node<Row>> { Iterable<Row> rowsIter = tbl.scan(ctx, group, requiredColunms); - return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj); + return new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filters, prj); } /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java index 4d126208431..48c98dbd2b4 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ScanStorageNode.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.calcite.exec.rel; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.function.Predicate; import org.apache.calcite.rel.type.RelDataType; @@ -27,7 +28,11 @@ import org.jetbrains.annotations.Nullable; * Scan storage node. */ public class ScanStorageNode<Row> extends ScanNode<Row> { + /** */ + @Nullable private final AtomicLong processedRowsCntr; + /** + * @param storageName Storage (index or table) name. * @param ctx Execution context. * @param rowType Row type. * @param src Source. @@ -35,6 +40,7 @@ public class ScanStorageNode<Row> extends ScanNode<Row> { * @param rowTransformer Row transformer (projection). */ public ScanStorageNode( + String storageName, ExecutionContext<Row> ctx, RelDataType rowType, Iterable<Row> src, @@ -42,15 +48,18 @@ public class ScanStorageNode<Row> extends ScanNode<Row> { @Nullable Function<Row, Row> rowTransformer ) { super(ctx, rowType, src, filter, rowTransformer); + + processedRowsCntr = context().ioTracker().processedRowsCounter("Scanned " + storageName); } /** + * @param storageName Storage (index or table) name. * @param ctx Execution context. * @param rowType Row type. * @param src Source. */ - public ScanStorageNode(ExecutionContext<Row> ctx, RelDataType rowType, Iterable<Row> src) { - super(ctx, rowType, src); + public ScanStorageNode(String storageName, ExecutionContext<Row> ctx, RelDataType rowType, Iterable<Row> src) { + this(storageName, ctx, rowType, src, null, null); } /** {@inheritDoc} */ @@ -58,10 +67,22 @@ public class ScanStorageNode<Row> extends ScanNode<Row> { try { context().ioTracker().startTracking(); - return super.processNextBatch(); + int processed = super.processNextBatch(); + + if (processedRowsCntr != null) + processedRowsCntr.addAndGet(processed); + + return processed; } finally { context().ioTracker().stopTracking(); } } + + /** */ + @Override public void closeInternal() { + super.closeInternal(); + + context().ioTracker().flush(); + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java index fb8d9e15e51..a246cb175b2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/IoTracker.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.query.calcite.exec.tracker; +import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.Nullable; + /** * I/O operations tracker interface. */ @@ -26,4 +29,16 @@ public interface IoTracker { /** Stop tracking and save result. */ public void stopTracking(); + + /** + * Register counter for processed rows. + * + * @param action Action with rows. + */ + @Nullable public AtomicLong processedRowsCounter(String action); + + /** + * Flush tracked data. + */ + public void flush(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java index 5c4f51a50fb..4cc26d1967b 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/NoOpIoTracker.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.query.calcite.exec.tracker; +import java.util.concurrent.atomic.AtomicLong; +import org.jetbrains.annotations.Nullable; + /** * I/O operations tracker that does nothing. */ @@ -33,4 +36,14 @@ public class NoOpIoTracker implements IoTracker { @Override public void stopTracking() { // No-op. } + + /** {@inheritDoc} */ + @Nullable @Override public AtomicLong processedRowsCounter(String action) { + return null; + } + + /** {@inheritDoc} */ + @Override public void flush() { + // No-op. + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java index fef1f56bb4a..2978e19cf28 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/PerformanceStatisticsIoTracker.java @@ -17,11 +17,15 @@ package org.apache.ignite.internal.processors.query.calcite.exec.tracker; +import java.util.List; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.metric.IoStatisticsQueryHelper; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor; +import org.apache.ignite.internal.util.typedef.T2; /** * Performance statistics gathering I/O operations tracker. @@ -36,6 +40,15 @@ public class PerformanceStatisticsIoTracker implements IoTracker { /** */ private final long originatingQryId; + /** */ + private final AtomicLong logicalReads = new AtomicLong(); + + /** */ + private final AtomicLong physicalReads = new AtomicLong(); + + /** */ + private final List<T2<String, AtomicLong>> cntrs = new CopyOnWriteArrayList<>(); + /** */ public PerformanceStatisticsIoTracker( PerformanceStatisticsProcessor perfStatProc, @@ -56,13 +69,45 @@ public class PerformanceStatisticsIoTracker implements IoTracker { @Override public void stopTracking() { IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics(); - if (stat.logicalReads() > 0 || stat.physicalReads() > 0) { + logicalReads.addAndGet(stat.logicalReads()); + physicalReads.addAndGet(stat.physicalReads()); + } + + /** {@inheritDoc} */ + @Override public AtomicLong processedRowsCounter(String action) { + AtomicLong cntr = new AtomicLong(); + + cntrs.add(new T2<>(action, cntr)); + + return cntr; + } + + /** {@inheritDoc} */ + @Override public void flush() { + long logicalReads = this.logicalReads.getAndSet(0); + long physicalReads = this.physicalReads.getAndSet(0); + + if (logicalReads > 0 || physicalReads > 0) { perfStatProc.queryReads( GridCacheQueryType.SQL_FIELDS, originatingNodeId, originatingQryId, - stat.logicalReads(), - stat.physicalReads()); + logicalReads, + physicalReads); + } + + for (T2<String, AtomicLong> cntr : cntrs) { + long rowsCnt = cntr.get2().getAndSet(0); + + if (rowsCnt > 0) { + perfStatProc.queryRowsProcessed( + GridCacheQueryType.SQL_FIELDS, + originatingNodeId, + originatingQryId, + cntr.get1(), + rowsCnt + ); + } } } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java index 0e4ef5e739a..0184ba91f86 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/AbstractMultiStepPlan.java @@ -47,15 +47,20 @@ public abstract class AbstractMultiStepPlan extends AbstractQueryPlan implements /** */ protected ExecutionPlan executionPlan; + /** */ + private final String textPlan; + /** */ protected AbstractMultiStepPlan( String qry, + String textPlan, QueryTemplate queryTemplate, FieldsMetadata fieldsMetadata, @Nullable FieldsMetadata paramsMetadata ) { super(qry); + this.textPlan = textPlan; this.queryTemplate = queryTemplate; this.fieldsMetadata = fieldsMetadata; this.paramsMetadata = paramsMetadata; @@ -119,4 +124,9 @@ public abstract class AbstractMultiStepPlan extends AbstractQueryPlan implements "fragments=" + fragments() + "]")) .mapping(); } + + /** {@inheritDoc} */ + @Override public String textPlan() { + return textPlan; + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java index 6d844bd0de9..5cbc4e6c187 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepDmlPlan.java @@ -28,11 +28,12 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan { */ public MultiStepDmlPlan( String qry, + String textPlan, QueryTemplate queryTemplate, FieldsMetadata fieldsMeta, @Nullable FieldsMetadata paramsMetadata ) { - super(qry, queryTemplate, fieldsMeta, paramsMetadata); + super(qry, textPlan, queryTemplate, fieldsMeta, paramsMetadata); } /** {@inheritDoc} */ @@ -42,6 +43,6 @@ public class MultiStepDmlPlan extends AbstractMultiStepPlan { /** {@inheritDoc} */ @Override public QueryPlan copy() { - return new MultiStepDmlPlan(query(), queryTemplate, fieldsMetadata, paramsMetadata); + return new MultiStepDmlPlan(query(), textPlan(), queryTemplate, fieldsMetadata, paramsMetadata); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java index 1636bcc1124..e738d9b92a0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepPlan.java @@ -61,4 +61,9 @@ public interface MultiStepPlan extends QueryPlan { * @param ctx Planner context. */ void init(MappingService mappingService, MappingQueryContext ctx); + + /** + * @return Text representation of query plan + */ + String textPlan(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java index ea7164c0df3..53125413160 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/MultiStepQueryPlan.java @@ -28,11 +28,12 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan { */ public MultiStepQueryPlan( String qry, + String textPlan, QueryTemplate queryTemplate, FieldsMetadata fieldsMeta, @Nullable FieldsMetadata paramsMetadata ) { - super(qry, queryTemplate, fieldsMeta, paramsMetadata); + super(qry, textPlan, queryTemplate, fieldsMeta, paramsMetadata); } /** {@inheritDoc} */ @@ -42,6 +43,6 @@ public class MultiStepQueryPlan extends AbstractMultiStepPlan { /** {@inheritDoc} */ @Override public QueryPlan copy() { - return new MultiStepQueryPlan(query(), queryTemplate, fieldsMetadata, paramsMetadata); + return new MultiStepQueryPlan(query(), textPlan(), queryTemplate, fieldsMetadata, paramsMetadata); } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java new file mode 100644 index 00000000000..4c1273222a6 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlanExtractor.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.prepare; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collection; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelWriter; +import org.apache.calcite.rel.externalize.RelWriterImpl; +import org.apache.calcite.rex.RexDynamicParam; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.SqlExplainLevel; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor; +import org.apache.ignite.internal.processors.query.QueryUtils; +import org.apache.ignite.internal.processors.query.calcite.prepare.bounds.SearchBounds; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel; +import org.apache.ignite.internal.util.typedef.F; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Sensitive data aware plan extractor. + */ +public class PlanExtractor { + /** */ + private final PerformanceStatisticsProcessor perfStatProc; + + /** */ + public PlanExtractor(GridKernalContext ctx) { + perfStatProc = ctx.performanceStatistics(); + } + + /** */ + public String extract(IgniteRel rel) { + // Currently, plan required only for preformance statistics, skip it if performance statistics disabled. + if (!perfStatProc.enabled()) + return null; + + if (QueryUtils.INCLUDE_SENSITIVE) + return RelOptUtil.toString(rel, SqlExplainLevel.ALL_ATTRIBUTES); + else { + StringWriter sw = new StringWriter(); + RelWriter planWriter = new SensitiveDataAwarePlanWriter(new PrintWriter(sw)); + rel.explain(planWriter); + return sw.toString(); + } + } + + /** */ + private static final class LiteralRemoveShuttle extends RexShuttle { + /** */ + private static final LiteralRemoveShuttle INSTANCE = new LiteralRemoveShuttle(); + + /** {@inheritDoc} */ + @Override public RexNode visitLiteral(RexLiteral literal) { + return new RexDynamicParam(literal.getType(), -1); + } + } + + /** */ + private static class SensitiveDataAwarePlanWriter extends RelWriterImpl { + /** */ + public SensitiveDataAwarePlanWriter(PrintWriter pw) { + super(pw, SqlExplainLevel.ALL_ATTRIBUTES, false); + } + + /** {@inheritDoc} */ + @Override public RelWriter item(String term, @Nullable Object val) { + return super.item(term, removeSensitive(val)); + } + + /** */ + private Object removeSensitive(Object val) { + if (val instanceof RexNode) + return LiteralRemoveShuttle.INSTANCE.apply((RexNode)val); + else if (val instanceof Collection) + return F.transform((Collection<?>)val, this::removeSensitive); + else if (val instanceof SearchBounds) + return ((SearchBounds)val).transform(LiteralRemoveShuttle.INSTANCE::apply); + + return val; + } + + /** {@inheritDoc} */ + @Override public boolean nest() { + // Don't try to expand some values by rel nodes, use original values. + return true; + } + } +} diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java index bc6e2b99ddc..8efb4a882f0 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PrepareServiceImpl.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.query.calcite.prepare; import java.util.List; - import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.type.RelDataType; @@ -55,12 +54,16 @@ public class PrepareServiceImpl extends AbstractService implements PrepareServic /** */ private final DdlSqlToCommandConverter ddlConverter; + /** */ + private final PlanExtractor planExtractor; + /** * @param ctx Kernal. */ public PrepareServiceImpl(GridKernalContext ctx) { super(ctx); + planExtractor = new PlanExtractor(ctx); ddlConverter = new DdlSqlToCommandConverter(); } @@ -163,6 +166,8 @@ public class PrepareServiceImpl extends AbstractService implements PrepareServic IgniteRel igniteRel = optimize(sqlNode, planner, log); + String plan = planExtractor.extract(igniteRel); + // Extract parameters meta. FieldsMetadata params = DynamicParamTypeExtractor.go(igniteRel); @@ -171,7 +176,7 @@ public class PrepareServiceImpl extends AbstractService implements PrepareServic QueryTemplate template = new QueryTemplate(fragments); - return new MultiStepQueryPlan(ctx.query(), template, + return new MultiStepQueryPlan(ctx.query(), plan, template, queryFieldsMetadata(ctx, validated.dataType(), validated.origins()), params); } @@ -185,6 +190,8 @@ public class PrepareServiceImpl extends AbstractService implements PrepareServic // Convert to Relational operators graph IgniteRel igniteRel = optimize(sqlNode, planner, log); + String plan = planExtractor.extract(igniteRel); + // Extract parameters meta. FieldsMetadata params = DynamicParamTypeExtractor.go(igniteRel); @@ -193,7 +200,7 @@ public class PrepareServiceImpl extends AbstractService implements PrepareServic QueryTemplate template = new QueryTemplate(fragments); - return new MultiStepDmlPlan(ctx.query(), template, + return new MultiStepDmlPlan(ctx.query(), plan, template, queryFieldsMetadata(ctx, igniteRel.getRowType(), null), params); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/ExactBounds.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/ExactBounds.java index 2e67a1d22a3..6a672c75904 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/ExactBounds.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/ExactBounds.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare.bounds; import java.util.Objects; +import java.util.function.Function; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.util.typedef.internal.S; @@ -58,6 +59,11 @@ public class ExactBounds extends SearchBounds { return bound.equals(((ExactBounds)o).bound); } + /** {@inheritDoc} */ + @Override public SearchBounds transform(Function<RexNode, RexNode> tranformFunction) { + return new ExactBounds(tranformFunction.apply(condition()), tranformFunction.apply(bound)); + } + /** {@inheritDoc} */ @Override public int hashCode() { return Objects.hash(bound); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/MultiBounds.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/MultiBounds.java index 5101fe85b69..e0376124abe 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/MultiBounds.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/MultiBounds.java @@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.query.calcite.prepare.bounds; import java.util.List; import java.util.Objects; +import java.util.function.Function; import org.apache.calcite.rex.RexNode; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -45,6 +47,12 @@ public class MultiBounds extends SearchBounds { return bounds; } + /** {@inheritDoc} */ + @Override public SearchBounds transform(Function<RexNode, RexNode> tranformFunction) { + return new MultiBounds(tranformFunction.apply(condition()), + Commons.transform(bounds, b -> b.transform(tranformFunction))); + } + /** {@inheritDoc} */ @Override public Type type() { return Type.MULTI; diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/RangeBounds.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/RangeBounds.java index 0d61987adf5..1a6e3cbb0da 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/RangeBounds.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/RangeBounds.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare.bounds; import java.util.Objects; +import java.util.function.Function; import org.apache.calcite.rex.RexNode; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -87,6 +88,17 @@ public class RangeBounds extends SearchBounds { return Type.RANGE; } + /** {@inheritDoc} */ + @Override public SearchBounds transform(Function<RexNode, RexNode> tranformFunction) { + return new RangeBounds( + tranformFunction.apply(condition()), + tranformFunction.apply(lowerBound), + tranformFunction.apply(upperBound), + lowerInclude, + upperInclude + ); + } + /** {@inheritDoc} */ @Override public boolean equals(Object o) { if (this == o) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/SearchBounds.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/SearchBounds.java index 42caf9e4488..05f997efa10 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/SearchBounds.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/bounds/SearchBounds.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare.bounds; +import java.util.function.Function; import org.apache.calcite.rex.RexNode; import org.jetbrains.annotations.Nullable; @@ -42,6 +43,11 @@ public abstract class SearchBounds { /** */ public abstract Type type(); + /** + * Create a transformed copy of search bounds. + */ + public abstract SearchBounds transform(Function<RexNode, RexNode> tranformFunction); + /** */ public enum Type { /** Exact search value. */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java index 8e2b80390bb..784201d0099 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/CacheTableImpl.java @@ -158,6 +158,11 @@ public class CacheTableImpl extends AbstractTable implements IgniteCacheTable { return idxRebuildInProgress; } + /** {@inheritDoc} */ + @Override public String name() { + return desc.typeDescription().tableName(); + } + /** {@inheritDoc} */ @Override public <C> C unwrap(Class<C> aCls) { if (aCls.isInstance(desc)) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java index 1c69a60ed39..492fca9c445 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java @@ -149,4 +149,9 @@ public interface IgniteTable extends TranslatableTable { * @return {@code True} if index rebuild in progress. */ boolean isIndexRebuildInProgress(); + + /** + * @return Table name. + */ + String name(); } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java index 44394991a7f..f54d56b5005 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/SystemViewTableImpl.java @@ -159,6 +159,11 @@ public class SystemViewTableImpl extends AbstractTable implements IgniteTable { return false; } + /** {@inheritDoc} */ + @Override public String name() { + return desc.name(); + } + /** */ private static class StatisticsImpl implements Statistic { /** {@inheritDoc} */ diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java index f00b1cabfdb..f43e7ef7f88 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlCreateTable.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.processors.query.calcite.sql; import java.util.List; import java.util.Objects; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlCreate; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlOperator; @@ -47,8 +49,21 @@ public class IgniteSqlCreateTable extends SqlCreate { private final @Nullable SqlNodeList createOptionList; /** */ - private static final SqlOperator OPERATOR = - new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE); + private static final SqlOperator OPERATOR = new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE) { + /** + * Required to override this method to correctly copy SQL nodes on SqlShuttle. + */ + @Override public SqlCall createCall( + @Nullable SqlLiteral functionQualifier, + SqlParserPos pos, + @Nullable SqlNode... operands + ) { + assert operands != null && operands.length == 4 : operands; + + return new IgniteSqlCreateTable(pos, false, (SqlIdentifier)operands[0], + (SqlNodeList)operands[1], operands[2], (SqlNodeList)operands[3]); + } + }; /** Creates a SqlCreateTable. */ public IgniteSqlCreateTable(SqlParserPos pos, boolean ifNotExists, SqlIdentifier name, diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlOption.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlOption.java index 6381403bd33..502debe2aa3 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlOption.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/sql/IgniteSqlOption.java @@ -26,7 +26,6 @@ import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.util.SqlVisitor; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.util.Litmus; @@ -75,11 +74,6 @@ public abstract class IgniteSqlOption<E extends Enum<E>> extends SqlCall { throw new UnsupportedOperationException(); } - /** {@inheritDoc} */ - @Override public <R> R accept(SqlVisitor<R> visitor) { - throw new UnsupportedOperationException(); - } - /** {@inheritDoc} */ @Override public boolean equalsDeep(SqlNode node, Litmus litmus) { if (node == this) { diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java index 3e8311ff186..db131c3996d 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.query.calcite.util; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; @@ -45,6 +46,9 @@ public class ConvertingClosableIterator<Row> implements Iterator<List<?>>, AutoC /** */ @Nullable Runnable onClose; + /** */ + private final AtomicBoolean closed = new AtomicBoolean(); + /** */ public ConvertingClosableIterator( Iterator<Row> it, @@ -87,9 +91,11 @@ public class ConvertingClosableIterator<Row> implements Iterator<List<?>>, AutoC * {@inheritDoc} */ @Override public void close() throws Exception { - Commons.close(it); + if (closed.compareAndSet(false, true)) { + Commons.close(it); - if (onClose != null) - onClose.run(); + if (onClose != null) + onClose.run(); + } } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java index a58e9ae9a6e..25c702bac5c 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java @@ -23,9 +23,11 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.Statement; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -37,8 +39,11 @@ import org.apache.ignite.cache.query.FieldsQueryCursor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.annotations.QuerySqlFunction; import org.apache.ignite.calcite.CalciteQueryEngineConfiguration; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.SqlConfiguration; import org.apache.ignite.events.CacheQueryExecutedEvent; @@ -52,6 +57,7 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.processors.query.calcite.Query; import org.apache.ignite.internal.processors.query.calcite.QueryRegistry; +import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.metric.LongMetric; @@ -63,6 +69,9 @@ import org.junit.Test; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION; +import static org.apache.ignite.internal.processors.authentication.AuthenticationProcessorSelfTest.authenticate; +import static org.apache.ignite.internal.processors.authentication.AuthenticationProcessorSelfTest.withSecurityContextOnAllNodes; +import static org.apache.ignite.internal.processors.authentication.User.DFAULT_USER_NAME; import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS; import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir; import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics; @@ -90,14 +99,22 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { /** */ private ListeningTestLogger log; + /** */ + private SecurityContext secCtxDflt; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { return super.getConfiguration(igniteInstanceName) .setGridLogger(log) + .setAuthenticationEnabled(true) .setSqlConfiguration(new SqlConfiguration() .setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration()) .setLongQueryWarningTimeout(LONG_QRY_TIMEOUT)) - .setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION, EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ); + .setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION, EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ) + .setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true) + ) + ); } /** {@inheritDoc} */ @@ -109,11 +126,17 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { @Override protected void beforeTest() throws Exception { super.beforeTest(); + cleanPersistenceDir(); + log = new ListeningTestLogger(log()); startGrids(nodeCount()); client = startClientGrid(); + + client.cluster().state(ClusterState.ACTIVE); + + secCtxDflt = authenticate(grid(0), DFAULT_USER_NAME, "ignite"); } /** {@inheritDoc} */ @@ -169,6 +192,8 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { /** */ @Test public void testBatchParserMetrics() throws Exception { + withSecurityContextOnAllNodes(secCtxDflt); + MetricRegistry mreg0 = grid(0).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME); MetricRegistry mreg1 = grid(1).context().metric().registry(QUERY_PARSER_METRIC_GROUP_NAME); mreg0.reset(); @@ -186,7 +211,7 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { assertEquals(0, misses0.value()); assertEquals(0, misses1.value()); - try (Connection conn = DriverManager.getConnection(jdbcUrl)) { + try (Connection conn = DriverManager.getConnection(jdbcUrl, DFAULT_USER_NAME, "ignite")) { conn.setSchema("PUBLIC"); try (Statement stmt = conn.createStatement()) { @@ -291,7 +316,7 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { sql(grid(0), "SELECT * FROM table(system_range(1, 1000))"); sql(grid(0), "CREATE TABLE test_perf_stat (a INT)"); sql(grid(0), "INSERT INTO test_perf_stat VALUES (0), (1), (2), (3), (4)"); - sql(grid(0), "SELECT * FROM test_perf_stat"); + sql(grid(0), "SELECT * FROM test_perf_stat WHERE a > 0"); assertTrue(GridTestUtils.waitForCondition(() -> finishQryCnt.get() == 4, 1_000L)); @@ -302,18 +327,17 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { // ScanNode page reads, since table/index scans are local and executed in current thread. ModifyNode uses // distributed `invoke` operation, which can be executed by other threads or on other nodes. It's hard to // obtain correct value of page reads for these types of operations, so, currently we just ignore page reads - // performed by ModifyNode. Despite static values scan themself doesn't require any page reads, it still can - // catch some page reads performed by insert operation. But, taking into account small amount of inserted - // values, it's not enough rows to trigger batch insert during values scan, and we expect zero page-reads - // for this query in this test. + // performed by ModifyNode. // The fourth query is a table scan and should perform page reads on all data nodes. AtomicInteger qryCnt = new AtomicInteger(); - AtomicInteger readsCnt = new AtomicInteger(); + AtomicLong rowsScanned = new AtomicLong(); Iterator<String> sqlIt = F.asList("SELECT", "CREATE", "INSERT", "SELECT").iterator(); Set<UUID> dataNodesIds = new HashSet<>(F.asList(grid(0).localNode().id(), grid(1).localNode().id())); Set<UUID> readsNodes = new HashSet<>(dataNodesIds); Set<Long> readsQueries = new HashSet<>(); + Map<Long, Long> rowsFetchedPerQuery = new HashMap<>(); + AtomicLong firstQryId = new AtomicLong(-1); AtomicLong lastQryId = new AtomicLong(); stopCollectStatisticsAndRead(new AbstractPerformanceStatisticsTest.TestHandler() { @@ -328,13 +352,14 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { ) { qryCnt.incrementAndGet(); - assertTrue(nodeId.equals(grid(0).localNode().id())); + assertEquals(grid(0).localNode().id(), nodeId); assertEquals(SQL_FIELDS, type); assertTrue(text.startsWith(sqlIt.next())); assertTrue(qryStartTime >= startTime); assertTrue(duration >= 0); assertTrue(success); + firstQryId.compareAndSet(-1, id); lastQryId.set(id); } @@ -346,21 +371,43 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { long logicalReads, long physicalReads ) { - readsCnt.incrementAndGet(); - readsQueries.add(id); - assertTrue(dataNodesIds.contains(qryNodeId)); + assertTrue(dataNodesIds.contains(nodeId)); readsNodes.remove(nodeId); - assertTrue(grid(0).localNode().id().equals(qryNodeId)); + assertEquals(grid(0).localNode().id(), qryNodeId); assertEquals(SQL_FIELDS, type); assertTrue(logicalReads > 0); } + + @Override public void queryRows( + UUID nodeId, + GridCacheQueryType type, + UUID qryNodeId, + long id, + String action, + long rows + ) { + assertEquals(grid(0).localNode().id(), qryNodeId); + assertEquals(SQL_FIELDS, type); + + if (action.toLowerCase().contains("test_perf_stat")) { + assertTrue(dataNodesIds.contains(nodeId)); + rowsScanned.addAndGet(rows); + } + else if ("Fetched".equals(action)) { + assertEquals(grid(0).localNode().id(), nodeId); + assertNull(rowsFetchedPerQuery.put(id, rows)); + } + } }); assertEquals(4, qryCnt.get()); assertTrue("Query reads expected on nodes: " + readsNodes, readsNodes.isEmpty()); assertEquals(Collections.singleton(lastQryId.get()), readsQueries); + assertEquals((Long)1000L, rowsFetchedPerQuery.get(firstQryId.get())); + assertEquals((Long)4L, rowsFetchedPerQuery.get(lastQryId.get())); + assertEquals(5L, rowsScanned.get()); } /** */ @@ -426,6 +473,8 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { /** */ @Test public void testSensitiveInformationHiding() throws Exception { + withSecurityContextOnAllNodes(secCtxDflt); + cleanPerformanceStatisticsDir(); startCollectStatistics(); @@ -464,7 +513,24 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { fut.get(); } + // Test bounds hiding in index scans. + sql(grid(0), "CREATE TABLE test_sens (id int, val varchar)"); + sql(grid(0), "CREATE INDEX test_sens_idx ON test_sens(val)"); + sql(grid(0), "INSERT INTO test_sens (id, val) VALUES (0, 'sensitive0'), (1, 'sensitive1'), " + + "(2, 'sensitive2'), (3, 'sensitive3'), (4, 'sensitive4'), (5, 'sensitive5'), (6, 'sensitive6')"); + sql(grid(0), "SELECT * FROM test_sens WHERE val IN ('sensitive0', 'sensitive1')"); + sql(grid(0), "SELECT * FROM test_sens WHERE val BETWEEN 'sensitive1' AND 'sensitive3'"); + sql(grid(0), "SELECT * FROM test_sens WHERE val = 'sensitive4'"); + + // Test CREATE AS SELECT rewrite. + sql(grid(0), "CREATE TABLE test_sens1 (val) WITH CACHE_NAME=\"test_sens1\" AS SELECT 'sensitive' AS val"); + + // Test CREATE/ALTER USER commands rewrite. + sql(grid(0), "CREATE USER test WITH PASSWORD 'sensitive'"); + sql(grid(0), "ALTER USER test WITH PASSWORD 'sensitive'"); + AtomicInteger qryCnt = new AtomicInteger(); + AtomicInteger planCnt = new AtomicInteger(); stopCollectStatisticsAndRead(new AbstractPerformanceStatisticsTest.TestHandler() { @Override public void query( @@ -477,11 +543,26 @@ public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest { boolean success ) { qryCnt.incrementAndGet(); - assertFalse(text.contains("sensitive")); + assertFalse(text, text.contains("sensitive")); + } + + @Override public void queryProperty( + UUID nodeId, + GridCacheQueryType type, + UUID qryNodeId, + long id, + String name, + String val + ) { + if ("Query plan".equals(name)) { + planCnt.incrementAndGet(); + assertFalse(val, val.contains("sensitive")); + } } }); - assertEquals(2, qryCnt.get()); + assertEquals(12, qryCnt.get()); // CREATE AS SELECT counts as two queries. + assertEquals(7, planCnt.get()); // DDL queries don't produce plans, except CREATE AS SELECT. } finally { QueryUtils.INCLUDE_SENSITIVE = true; diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java index e17b1a2067d..aa4cd3e95f2 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/PlannerTest.java @@ -362,7 +362,7 @@ public class PlannerTest extends AbstractPlannerTest { private MultiStepPlan splitPlan(IgniteRel phys) { assertNotNull(phys); - MultiStepPlan plan = new MultiStepQueryPlan(null, new QueryTemplate(new Splitter().go(phys)), null, null); + MultiStepPlan plan = new MultiStepQueryPlan(null, null, new QueryTemplate(new Splitter().go(phys)), null, null); assertNotNull(plan); diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java index 8cf051f358c..530d9fa1ffc 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/planner/TestTable.java @@ -292,7 +292,7 @@ public class TestTable implements IgniteCacheTable { } /** */ - public String name() { + @Override public String name() { return name; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java index 752755fbcd7..b0c8244734a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsReader.java @@ -52,7 +52,9 @@ import static org.apache.ignite.internal.processors.performancestatistics.Operat import static org.apache.ignite.internal.processors.performancestatistics.OperationType.JOB; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.PAGES_WRITE_THROTTLE; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_PROPERTY; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_READS; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_ROWS; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TASK; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_COMMIT; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.cacheOperation; @@ -305,6 +307,45 @@ public class FilePerformanceStatisticsReader { return true; } + else if (opType == QUERY_ROWS) { + String action = readCacheableString(buf); + + if (action == null || buf.remaining() < 1 + 16 + 8 + 8) + return false; + + GridCacheQueryType qryType = GridCacheQueryType.fromOrdinal(buf.get()); + UUID uuid = readUuid(buf); + long id = buf.getLong(); + long rows = buf.getLong(); + + for (PerformanceStatisticsHandler handler : curHnd) + handler.queryRows(nodeId, qryType, uuid, id, action, rows); + + return true; + } + else if (opType == QUERY_PROPERTY) { + String name = readCacheableString(buf); + + if (name == null) + return false; + + String val = readCacheableString(buf); + + if (val == null) + return false; + + if (buf.remaining() < 1 + 16 + 8) + return false; + + GridCacheQueryType qryType = GridCacheQueryType.fromOrdinal(buf.get()); + UUID uuid = readUuid(buf); + long id = buf.getLong(); + + for (PerformanceStatisticsHandler handler : curHnd) + handler.queryProperty(nodeId, qryType, uuid, id, name, val); + + return true; + } else if (opType == TASK) { if (buf.remaining() < 1) return false; @@ -538,6 +579,32 @@ public class FilePerformanceStatisticsReader { return str; } + /** + * Reads cacheable string from byte buffer. + * + * @return String or {@code null} in case of buffer underflow. + */ + private String readCacheableString(ByteBuffer buf) { + if (buf.remaining() < 1 + 4) + return null; + + boolean cached = buf.get() != 0; + + if (cached) { + int hash = buf.getInt(); + + return knownStrs.get(hash); + } + else { + int textLen = buf.getInt(); + + if (buf.remaining() < textLen) + return null; + + return readString(buf, textLen); + } + } + /** Reads {@link UUID} from buffer. */ private static UUID readUuid(ByteBuffer buf) { return new UUID(buf.getLong(), buf.getLong()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java index 405169b1e04..293b675738e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/FilePerformanceStatisticsWriter.java @@ -55,7 +55,9 @@ import static org.apache.ignite.internal.processors.performancestatistics.Operat import static org.apache.ignite.internal.processors.performancestatistics.OperationType.JOB; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.PAGES_WRITE_THROTTLE; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_PROPERTY; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_READS; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.QUERY_ROWS; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TASK; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_COMMIT; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.TX_ROLLBACK; @@ -64,8 +66,10 @@ import static org.apache.ignite.internal.processors.performancestatistics.Operat import static org.apache.ignite.internal.processors.performancestatistics.OperationType.checkpointRecordSize; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.jobRecordSize; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.pagesWriteThrottleRecordSize; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.queryPropertyRecordSize; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.queryReadsRecordSize; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.queryRecordSize; +import static org.apache.ignite.internal.processors.performancestatistics.OperationType.queryRowsRecordSize; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.taskRecordSize; import static org.apache.ignite.internal.processors.performancestatistics.OperationType.transactionRecordSize; @@ -90,7 +94,7 @@ public class FilePerformanceStatisticsWriter { public static final int DFLT_FLUSH_SIZE = (int)(8 * U.MB); /** Default maximum cached strings threshold. String caching will stop on threshold excess. */ - public static final int DFLT_CACHED_STRINGS_THRESHOLD = 1024; + public static final int DFLT_CACHED_STRINGS_THRESHOLD = 10 * 1024; /** File writer thread name. */ static final String WRITER_THREAD_NAME = "performance-statistics-writer"; @@ -280,6 +284,47 @@ public class FilePerformanceStatisticsWriter { }); } + /** + * @param type Cache query type. + * @param qryNodeId Originating node id. + * @param id Query id. + * @param action Action with rows. + * @param rows Number of rows. + */ + public void queryRows(GridCacheQueryType type, UUID qryNodeId, long id, String action, long rows) { + boolean cached = cacheIfPossible(action); + + doWrite(QUERY_ROWS, queryRowsRecordSize(cached ? 0 : action.getBytes().length, cached), buf -> { + writeString(buf, action, cached); + buf.put((byte)type.ordinal()); + writeUuid(buf, qryNodeId); + buf.putLong(id); + buf.putLong(rows); + }); + } + + /** + * @param type Cache query type. + * @param qryNodeId Originating node id. + * @param id Query id. + * @param name Query property name. + * @param val Query property value. + */ + public void queryProperty(GridCacheQueryType type, UUID qryNodeId, long id, String name, String val) { + boolean cachedName = cacheIfPossible(name); + boolean cachedVal = cacheIfPossible(val); + + doWrite(QUERY_PROPERTY, + queryPropertyRecordSize(cachedName ? 0 : name.getBytes().length, cachedName, cachedVal ? 0 : val.getBytes().length, cachedVal), + buf -> { + writeString(buf, name, cachedName); + writeString(buf, val, cachedVal); + buf.put((byte)type.ordinal()); + writeUuid(buf, qryNodeId); + buf.putLong(id); + }); + } + /** * @param sesId Session id. * @param taskName Task name. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java index 4ee2f6e5a16..df18195b653 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/OperationType.java @@ -85,7 +85,13 @@ public enum OperationType { CHECKPOINT(18), /** Pages write throttle. */ - PAGES_WRITE_THROTTLE(19); + PAGES_WRITE_THROTTLE(19), + + /** Count of processed by query rows. */ + QUERY_ROWS(20), + + /** Custom query property. */ + QUERY_PROPERTY(21); /** Cache operations. */ public static final EnumSet<OperationType> CACHE_OPS = EnumSet.of(CACHE_GET, CACHE_PUT, CACHE_REMOVE, @@ -175,6 +181,26 @@ public enum OperationType { return 1 + 16 + 8 + 8 + 8; } + /** + * @param actionLen Rows action length. + * @param cached {@code True} if action is cached. + * @return Query rows record size. + */ + public static int queryRowsRecordSize(int actionLen, boolean cached) { + return 1 + (cached ? 4 : 4 + actionLen) + 1 + 16 + 8 + 8; + } + + /** + * @param nameLen Propery name length. + * @param nameCached {@code True} if property name is cached. + * @param valLen Propery value length. + * @param valCached {@code True} if property value is cached. + * @return Query property record size. + */ + public static int queryPropertyRecordSize(int nameLen, boolean nameCached, int valLen, boolean valCached) { + return 1 + (nameCached ? 4 : 4 + nameLen) + 1 + (valCached ? 4 : 4 + valLen) + 1 + 16 + 8; + } + /** * @param nameLen Task name length. * @param cached {@code True} if task name cached. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java index 0596cfc4add..cc1a203a03f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsHandler.java @@ -75,6 +75,30 @@ public interface PerformanceStatisticsHandler { void queryReads(UUID nodeId, GridCacheQueryType type, UUID queryNodeId, long id, long logicalReads, long physicalReads); + /** + * Count of rows processed by query. + * + * @param nodeId Node id. + * @param type Cache query type. + * @param qryNodeId Originating node id. + * @param id Query id. + * @param action Action with rows. + * @param rows Number of rows processed. + */ + void queryRows(UUID nodeId, GridCacheQueryType type, UUID qryNodeId, long id, String action, long rows); + + /** + * Custom query property. + * + * @param nodeId Node id. + * @param type Cache query type. + * @param qryNodeId Originating node id. + * @param id Query id. + * @param name Query property name. + * @param val Query property value. + */ + void queryProperty(UUID nodeId, GridCacheQueryType type, UUID qryNodeId, long id, String name, String val); + /** * @param nodeId Node id. * @param sesId Session id. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java index 0d720b4ced8..fbafb352afe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsProcessor.java @@ -185,6 +185,28 @@ public class PerformanceStatisticsProcessor extends GridProcessorAdapter { write(writer -> writer.queryReads(type, queryNodeId, id, logicalReads, physicalReads)); } + /** + * @param type Cache query type. + * @param qryNodeId Originating node id. + * @param id Query id. + * @param action Action with rows. + * @param rows Number of rows processed. + */ + public void queryRowsProcessed(GridCacheQueryType type, UUID qryNodeId, long id, String action, long rows) { + write(writer -> writer.queryRows(type, qryNodeId, id, action, rows)); + } + + /** + * @param type Cache query type. + * @param qryNodeId Originating node id. + * @param id Query id. + * @param name Query property name. + * @param val Query property value. + */ + public void queryProperty(GridCacheQueryType type, UUID qryNodeId, long id, String name, String val) { + write(writer -> writer.queryProperty(type, qryNodeId, id, name, val)); + } + /** * @param sesId Session id. * @param taskName Task name. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java index 17cfdde0a91..d7a06793340 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/HeavyQueriesTracker.java @@ -342,5 +342,10 @@ public final class HeavyQueriesTracker { if (bigResults) LT.warn(log, BIG_RESULT_SET_MSG + qryInfo.queryInfo("fetched=" + fetchedSize)); } + + /** */ + public long fetchedSize() { + return fetchedSize; + } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java index 0e3e1f3df25..50617b288e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/running/RunningQueryManager.java @@ -369,6 +369,31 @@ public class RunningQueryManager { } if (ctx.performanceStatistics().enabled() && qry.startTimeNanos() > 0) { + String flags = null; + + // Create string for flags with not default values. + if (qry.local()) + flags = "local"; + + if (!qry.lazy()) + flags = (flags == null ? "" : flags + ", ") + "notLazy"; + + if (qry.distributedJoins()) + flags = (flags == null ? "" : flags + ", ") + "distributedJoins"; + + if (qry.enforceJoinOrder()) + flags = (flags == null ? "" : flags + ", ") + "enforceJoinOrder"; + + if (flags != null) { + ctx.performanceStatistics().queryProperty( + qry.queryType(), + qry.nodeId(), + qry.id(), + "Flags", + flags + ); + } + ctx.performanceStatistics().query( qry.queryType(), qry.query(), diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java index 779f15a4fe9..4384d5471bd 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/performancestatistics/AbstractPerformanceStatisticsTest.java @@ -186,6 +186,18 @@ public abstract class AbstractPerformanceStatisticsTest extends GridCommonAbstra // No-op. } + /** {@inheritDoc} */ + @Override public void queryRows(UUID nodeId, GridCacheQueryType type, UUID qryNodeId, long id, String action, + long rows) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void queryProperty(UUID nodeId, GridCacheQueryType type, UUID qryNodeId, long id, String name, + String val) { + // No-op. + } + /** {@inheritDoc} */ @Override public void task(UUID nodeId, IgniteUuid sesId, String taskName, long startTime, long duration, int affPartId) { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java index f3c508a8620..7cdbed157fe 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2QueryInfo.java @@ -97,6 +97,21 @@ public class H2QueryInfo implements TrackableQuery { } } + /** */ + public UUID nodeId() { + return nodeId; + } + + /** */ + public long queryId() { + return queryId; + } + + /** */ + public String plan() { + return stmt.getPlanSQL(); + } + /** * Print info specified by children. * diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java index f9bdd321c9b..9529b4430b1 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2ResultSetIterator.java @@ -27,6 +27,9 @@ import java.util.NoSuchElementException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.query.QueryCancelledException; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; +import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor; import org.apache.ignite.internal.processors.query.IgniteSQLException; import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table; import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject; @@ -111,6 +114,12 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl /** Tracing processor. */ protected final Tracing tracing; + /** */ + private final GridKernalContext ctx; + + /** */ + private final H2QueryInfo qryInfo; + /** * @param data Data array. * @param log Logger. @@ -126,11 +135,12 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl IgniteH2Indexing h2, H2QueryInfo qryInfo, Tracing tracing - ) - throws IgniteCheckedException { + ) throws IgniteCheckedException { + ctx = h2.ctx; this.pageSize = pageSize; this.data = data; this.tracing = tracing; + this.qryInfo = qryInfo; try { res = (ResultInterface)RESULT_FIELD.get(data); @@ -318,6 +328,18 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl try { resultSetChecker.checkOnClose(); + PerformanceStatisticsProcessor perfStat = ctx.performanceStatistics(); + + if (perfStat.enabled() && resultSetChecker.fetchedSize() > 0) { + perfStat.queryRowsProcessed( + GridCacheQueryType.SQL_FIELDS, + qryInfo.nodeId(), + qryInfo.queryId(), + "Fetched on reducer", + resultSetChecker.fetchedSize() + ); + } + data.close(); } catch (SQLException e) { diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java index e622f2846ea..37559cc4758 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java @@ -474,6 +474,16 @@ public class IgniteH2Indexing implements GridQueryIndexing { H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry, ctx.localNodeId(), qryId); + if (ctx.performanceStatistics().enabled()) { + ctx.performanceStatistics().queryProperty( + GridCacheQueryType.SQL_FIELDS, + qryInfo.nodeId(), + qryInfo.queryId(), + "Local plan", + qryInfo.plan() + ); + } + ResultSet rs = executeSqlQueryWithTimer( stmt, conn, diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java index 18bb7abb412..f2afcfd57bb 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java @@ -468,6 +468,16 @@ public class GridMapQueryExecutor { MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId); + if (performanceStatsEnabled) { + ctx.performanceStatistics().queryProperty( + GridCacheQueryType.SQL_FIELDS, + qryInfo.nodeId(), + qryInfo.queryId(), + "Map phase plan", + qryInfo.plan() + ); + } + ResultSet rs = h2.executeSqlQueryWithTimer( stmt, conn, diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index 0a56fb32b60..125771e865b 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator; import org.apache.ignite.internal.processors.query.GridQueryCancel; @@ -533,6 +534,16 @@ public class GridReduceQueryExecutor { ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(), ctx.localNodeId(), qryId, qryReqId); + if (ctx.performanceStatistics().enabled()) { + ctx.performanceStatistics().queryProperty( + GridCacheQueryType.SQL_FIELDS, + qryInfo.nodeId(), + qryInfo.queryId(), + "Reduce phase plan", + qryInfo.plan() + ); + } + ResultSet res = h2.executeSqlQueryWithTimer(stmt, conn, rdc.query(), diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java index 5f96cacef51..d7d8736e43e 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapQueryResult.java @@ -28,8 +28,10 @@ import org.apache.ignite.events.CacheQueryReadEvent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; +import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; +import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor; import org.apache.ignite.internal.processors.query.h2.H2PooledConnection; import org.apache.ignite.internal.processors.query.h2.H2Utils; import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; @@ -339,6 +341,9 @@ class MapQueryResult { /** */ private final HeavyQueriesTracker.ResultSetChecker resultSetChecker; + /** */ + private final MapH2QueryInfo qryInfo; + /** * Constructor. * @@ -346,6 +351,7 @@ class MapQueryResult { */ Result(@NotNull ResultSet rs, MapH2QueryInfo qryInfo) { this.rs = rs; + this.qryInfo = qryInfo; try { res = (ResultInterface)RESULT_FIELD.get(rs); @@ -364,6 +370,18 @@ class MapQueryResult { void close() { resultSetChecker.checkOnClose(); + PerformanceStatisticsProcessor perfStat = cctx.kernalContext().performanceStatistics(); + + if (perfStat.enabled() && resultSetChecker.fetchedSize() > 0) { + perfStat.queryRowsProcessed( + GridCacheQueryType.SQL_FIELDS, + qryInfo.nodeId(), + qryInfo.queryId(), + "Fetched on mapper", + resultSetChecker.fetchedSize() + ); + } + U.close(rs, log); } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java index d6e912ef19c..1ab897286b7 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/performancestatistics/PerformanceStatisticsQueryTest.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCache; import org.apache.ignite.Ignition; import org.apache.ignite.cache.QueryEntity; @@ -46,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType; import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.junit.Assume; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -153,7 +156,7 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic for (int i = 0; i < ENTRY_COUNT; i++) { cache.put(i, i); - cache2.put(i, i * 2); + cache2.put((long)i, (long)i * 2); } } @@ -179,7 +182,7 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic public void testScanQuery() throws Exception { ScanQuery<Object, Object> qry = new ScanQuery<>().setPageSize(pageSize); - checkQuery(SCAN, qry, DEFAULT_CACHE_NAME); + checkQuery(SCAN, qry, DEFAULT_CACHE_NAME, false); } /** @throws Exception If failed. */ @@ -193,7 +196,7 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic String expText = indexQueryText(DEFAULT_CACHE_NAME, new IndexQueryDesc(qry.getCriteria(), qry.getIndexName(), qry.getValueType())); - checkQuery(INDEX, qry, expText); + checkQuery(INDEX, qry, expText, false); } /** @throws Exception If failed. */ @@ -203,7 +206,7 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize); - checkQuery(SQL_FIELDS, qry, sql); + checkQuery(SQL_FIELDS, qry, sql, false); } /** @throws Exception If failed. */ @@ -213,17 +216,61 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize); - checkQuery(SQL_FIELDS, qry, sql); + checkQuery(SQL_FIELDS, qry, sql, false); + } + + /** @throws Exception If failed. */ + @Test + public void testSqlFieldsQueryWithReducer() throws Exception { + String sql = "select sum(_key) from " + DEFAULT_CACHE_NAME; + + SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize); + + checkQuery(SQL_FIELDS, qry, sql, true); + } + + /** @throws Exception If failed. */ + @Test + public void testSqlFieldsLocalQuery() throws Exception { + Assume.assumeTrue(clientType == SERVER); + + String sql = "select * from " + DEFAULT_CACHE_NAME; + + SqlFieldsQuery qry = new SqlFieldsQuery(sql).setPageSize(pageSize).setLocal(true); + + cleanPerformanceStatisticsDir(); + + startCollectStatistics(); + + srv.cache(DEFAULT_CACHE_NAME).query(qry).getAll(); + + AtomicReference<String> flags = new AtomicReference<>(); + + stopCollectStatisticsAndRead(new TestHandler() { + @Override public void queryProperty( + UUID nodeId, + GridCacheQueryType type, + UUID qryNodeId, + long id, + String name, + String val + ) { + if ("Flags".equals(name)) + assertTrue(flags.compareAndSet(null, val)); + } + }); + + assertEquals("local", flags.get()); } /** Check query. */ - private void checkQuery(GridCacheQueryType type, Query<?> qry, String text) throws Exception { + private void checkQuery(GridCacheQueryType type, Query<?> qry, String text, boolean hasReducer) throws Exception { client.cluster().state(INACTIVE); client.cluster().state(ACTIVE); - runQueryAndCheck(type, qry, text, true, true); + runQueryAndCheck(type, qry, text, true, true, hasReducer); - runQueryAndCheck(type, qry, text, true, false); + runQueryAndCheck(type, qry, text, true, false, hasReducer); } /** @throws Exception If failed. */ @@ -231,21 +278,26 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic public void testDdlAndDmlQueries() throws Exception { String sql = "create table " + SQL_TABLE + " (id int, val varchar, primary key (id))"; - runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false, false); + runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false, false, false); sql = "insert into " + SQL_TABLE + " (id) values (1)"; - runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false, false); + runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, false, false, false); sql = "update " + SQL_TABLE + " set val = 'abc'"; - runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, true, false); + runQueryAndCheck(SQL_FIELDS, new SqlFieldsQuery(sql), sql, true, false, false); } /** Runs query and checks statistics. */ - private void runQueryAndCheck(GridCacheQueryType expType, Query<?> qry, String expText, boolean hasLogicalReads, - boolean hasPhysicalReads) - throws Exception { + private void runQueryAndCheck( + GridCacheQueryType expType, + Query<?> qry, + String expText, + boolean hasLogicalReads, + boolean hasPhysicalReads, + boolean hasReducer + ) throws Exception { long startTime = U.currentTimeMillis(); cleanPerformanceStatisticsDir(); @@ -275,9 +327,14 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic if (hasLogicalReads) srv.cluster().forServers().nodes().forEach(node -> readsNodes.add(node.id())); + Set<UUID> dataNodes = new HashSet<>(readsNodes); AtomicInteger queryCnt = new AtomicInteger(); AtomicInteger readsCnt = new AtomicInteger(); HashSet<Long> qryIds = new HashSet<>(); + AtomicLong mapRowCnt = new AtomicLong(); + AtomicLong rdcRowCnt = new AtomicLong(); + AtomicInteger planMapCnt = new AtomicInteger(); + AtomicInteger planRdcCnt = new AtomicInteger(); stopCollectStatisticsAndRead(new TestHandler() { @Override public void query(UUID nodeId, GridCacheQueryType type, String text, long id, long queryStartTime, @@ -304,11 +361,67 @@ public class PerformanceStatisticsQueryTest extends AbstractPerformanceStatistic assertTrue(logicalReads > 0); assertTrue(hasPhysicalReads ? physicalReads > 0 : physicalReads == 0); } + + @Override public void queryRows( + UUID nodeId, + GridCacheQueryType type, + UUID qryNodeId, + long id, + String action, + long rows + ) { + assertEquals(expType, SQL_FIELDS); + assertTrue(expNodeIds.contains(qryNodeId)); + + if ("Fetched on mapper".equals(action)) { + assertTrue(dataNodes.contains(nodeId)); + mapRowCnt.addAndGet(rows); + } + else if ("Fetched on reducer".equals(action)) { + assertTrue(expNodeIds.contains(nodeId)); + rdcRowCnt.addAndGet(rows); + } + } + + @Override public void queryProperty( + UUID nodeId, + GridCacheQueryType type, + UUID qryNodeId, + long id, + String name, + String val + ) { + assertEquals(expType, SQL_FIELDS); + assertTrue(expNodeIds.contains(qryNodeId)); + + if ("Map phase plan".equals(name)) { + assertTrue(dataNodes.contains(nodeId)); + planMapCnt.incrementAndGet(); + } + else if ("Reduce phase plan".equals(name)) { + assertTrue(expNodeIds.contains(nodeId)); + planRdcCnt.incrementAndGet(); + } + } }); assertEquals(1, queryCnt.get()); assertTrue("Query reads expected on nodes: " + readsNodes, readsNodes.isEmpty()); assertEquals(1, qryIds.size()); + + // If query has logical reads, plan and rows info also expected. + if (hasLogicalReads && expType == SQL_FIELDS) { + assertEquals(dataNodes.size(), planMapCnt.get()); + assertTrue(mapRowCnt.get() > 0); + if (hasReducer) { + assertTrue(rdcRowCnt.get() > 0); + assertEquals(1, planRdcCnt.get()); + } + else { + assertEquals(0, rdcRowCnt.get()); + assertEquals(0, planRdcCnt.get()); + } + } } /** @throws Exception If failed. */