This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f7fc12741d125e3232b9d08254f840e7ddca71cd Author: Ali Alsuliman <[email protected]> AuthorDate: Sun Nov 6 20:39:02 2022 -0800 [ASTERIXDB-3085][IDX] Collect secondary index stats while ANALYZE DATASET - user model changes: no - storage format changes: no - interface changes: no Details: As part of collecting stats about a dataset using ANALYZE DATASET, collect stats about the secondary indexes. The stats include the number of pages. Backport changes: - Compensate for not having the additional profiler stats added before this Change-Id: I800ed3015832c311c7075f7cc8c5325b2fc62265 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17277 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17335 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> --- .../asterix/app/translator/QueryTranslator.java | 12 ++-- .../analyze-dataset-with-indexes.01.ddl.sqlpp | 42 +++++-------- .../analyze-dataset-with-indexes.02.update.sqlpp | 34 ++-------- .../analyze-dataset-with-indexes.03.ddl.sqlpp | 33 ++-------- .../analyze-dataset-with-indexes.04.query.sqlpp | 33 ++-------- .../analyze-dataset-with-indexes.05.ddl.sqlpp | 32 +--------- .../analyze-dataset-with-indexes.06.query.sqlpp | 33 ++-------- .../analyze-dataset-with-indexes.99.ddl.sqlpp | 31 +-------- .../analyze-dataset-with-indexes.04.adm | 3 + .../analyze-dataset-with-indexes.06.adm | 3 + .../asterix/common/utils/StoragePathUtil.java | 6 +- .../metadata/declared/MetadataProvider.java | 20 ++++++ .../apache/asterix/metadata/entities/Index.java | 10 ++- .../IndexTupleTranslator.java | 70 ++++++++++++++++++++- .../metadata/utils/SampleOperationsHelper.java | 21 +++++-- .../{StreamStats.java => DatasetStreamStats.java} | 19 ++++-- ...a => DatasetStreamStatsOperatorDescriptor.java} | 50 +++++++++++++-- .../hyracks/api/job/profiling/IOperatorStats.java | 7 +++ .../hyracks/api/job/profiling/IndexStats.java | 73 ++++++++++++++++++++++ .../hyracks/api/job/profiling/OperatorStats.java | 56 ++++++++++++++++- .../common/job/profiling/om/JobProfile.java | 6 +- 21 files changed, 365 insertions(+), 229 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index fa5507fdab..b017648cb4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -211,7 +211,7 @@ import org.apache.asterix.runtime.fulltext.AbstractFullTextFilterDescriptor; import org.apache.asterix.runtime.fulltext.FullTextConfigDescriptor; import org.apache.asterix.runtime.fulltext.IFullTextFilterDescriptor; import org.apache.asterix.runtime.fulltext.StopwordsFullTextFilterDescriptor; -import org.apache.asterix.runtime.operators.StreamStats; +import org.apache.asterix.runtime.operators.DatasetStreamStats; import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory; import org.apache.asterix.translator.AbstractLangTranslator; import org.apache.asterix.translator.ClientRequest; @@ -4266,9 +4266,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen int sampleCardinalityTarget = stmtAnalyze.getSampleSize(); long sampleSeed = stmtAnalyze.getOrCreateSampleSeed(); - Index.SampleIndexDetails newIndexDetailsPendingAdd = - new Index.SampleIndexDetails(dsDetails.getPrimaryKey(), dsDetails.getKeySourceIndicator(), - dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, 0, 0, sampleSeed); + Index.SampleIndexDetails newIndexDetailsPendingAdd = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(), + dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, 0, 0, + sampleSeed, Collections.emptyMap()); newIndexPendingAdd = new Index(dataverseName, datasetName, newIndexName, sampleIndexType, newIndexDetailsPendingAdd, false, false, MetadataUtil.PENDING_ADD_OP); @@ -4306,11 +4306,11 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (opStats == null || opStats.size() == 0) { throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, "", sourceLoc); } - StreamStats stats = new StreamStats(opStats.get(0)); + DatasetStreamStats stats = new DatasetStreamStats(opStats.get(0)); Index.SampleIndexDetails newIndexDetailsFinal = new Index.SampleIndexDetails(dsDetails.getPrimaryKey(), dsDetails.getKeySourceIndicator(), dsDetails.getPrimaryKeyType(), sampleCardinalityTarget, - stats.getCardinality(), stats.getAvgTupleSize(), sampleSeed); + stats.getCardinality(), stats.getAvgTupleSize(), sampleSeed, stats.getIndexesStats()); Index newIndexFinal = new Index(dataverseName, datasetName, newIndexName, sampleIndexType, newIndexDetailsFinal, false, false, MetadataUtil.PENDING_NO_OP); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp index 181249abac..8c374a2a69 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.01.ddl.sqlpp @@ -16,34 +16,26 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job.profiling; -import java.io.Serializable; - -import org.apache.hyracks.api.io.IWritable; -import org.apache.hyracks.api.job.profiling.counters.ICounter; +/* + * Description: Test collecting secondary indexes stats with ANALYZE DATASET statement + */ -public interface IOperatorStats extends IWritable, Serializable { +DROP DATAVERSE test IF EXISTS; +CREATE DATAVERSE test; +USE test; - /** - * @return The name of the operator - */ - String getName(); +CREATE TYPE testType AS OPEN { + id : uuid +}; - /** - * @return A counter used to track the number of tuples - * accessed by an operator - */ - ICounter getTupleCounter(); +CREATE DATASET ds1(testType) PRIMARY KEY id AUTOGENERATED; +CREATE DATASET ds2(testType) PRIMARY KEY id AUTOGENERATED; +CREATE DATASET ds3(testType) PRIMARY KEY id AUTOGENERATED; +CREATE DATASET ds4(testType) PRIMARY KEY id AUTOGENERATED; - /** - * @return A counter used to track the execution time - * of an operator - */ - ICounter getTimeCounter(); +CREATE INDEX ds1_idx1 ON ds1(name: string); +CREATE INDEX ds1_idx2 ON ds1(UNNEST interests: string) EXCLUDE UNKNOWN KEY;; +CREATE PRIMARY INDEX ds1_idx3 ON ds1; - /** - * @return A counter used to track the number of pages pinned by an opeartor - */ - ICounter getDiskIoCounter(); -} +CREATE INDEX ds2_idx1 ON ds2(name: string); \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp index 181249abac..711b91433d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.02.update.sqlpp @@ -16,34 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job.profiling; -import java.io.Serializable; +USE test; -import org.apache.hyracks.api.io.IWritable; -import org.apache.hyracks.api.job.profiling.counters.ICounter; - -public interface IOperatorStats extends IWritable, Serializable { - - /** - * @return The name of the operator - */ - String getName(); - - /** - * @return A counter used to track the number of tuples - * accessed by an operator - */ - ICounter getTupleCounter(); - - /** - * @return A counter used to track the execution time - * of an operator - */ - ICounter getTimeCounter(); - - /** - * @return A counter used to track the number of pages pinned by an opeartor - */ - ICounter getDiskIoCounter(); -} +LOAD DATASET ds1 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm")); +LOAD DATASET ds2 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm")); +LOAD DATASET ds3 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm")); +LOAD DATASET ds4 USING localfs (("path"="asterix_nc1://data/semistructured/co1k_olist/customer.adm"),("format"="adm")); \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp index 181249abac..97e07ba08a 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.03.ddl.sqlpp @@ -16,34 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job.profiling; -import java.io.Serializable; +USE test; -import org.apache.hyracks.api.io.IWritable; -import org.apache.hyracks.api.job.profiling.counters.ICounter; - -public interface IOperatorStats extends IWritable, Serializable { - - /** - * @return The name of the operator - */ - String getName(); - - /** - * @return A counter used to track the number of tuples - * accessed by an operator - */ - ICounter getTupleCounter(); - - /** - * @return A counter used to track the execution time - * of an operator - */ - ICounter getTimeCounter(); - - /** - * @return A counter used to track the number of pages pinned by an opeartor - */ - ICounter getDiskIoCounter(); -} +ANALYZE DATASET ds1; +ANALYZE DATASET ds2; +ANALYZE DATASET ds3; \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp index 181249abac..9c01681ddf 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.query.sqlpp @@ -16,34 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job.profiling; -import java.io.Serializable; +USE test; -import org.apache.hyracks.api.io.IWritable; -import org.apache.hyracks.api.job.profiling.counters.ICounter; - -public interface IOperatorStats extends IWritable, Serializable { - - /** - * @return The name of the operator - */ - String getName(); - - /** - * @return A counter used to track the number of tuples - * accessed by an operator - */ - ICounter getTupleCounter(); - - /** - * @return A counter used to track the execution time - * of an operator - */ - ICounter getTimeCounter(); - - /** - * @return A counter used to track the number of pages pinned by an opeartor - */ - ICounter getDiskIoCounter(); -} +FROM `Metadata`.`Index` t WHERE t.IndexStructure = "SAMPLE" +SELECT t.* EXCLUDE DataverseName, SearchKey, IsPrimary, Timestamp, PendingOp, SampleSeed +ORDER BY t.DatasetName, t.IndexName; \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp index 181249abac..4a2edf05ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.05.ddl.sqlpp @@ -16,34 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job.profiling; -import java.io.Serializable; +USE test; -import org.apache.hyracks.api.io.IWritable; -import org.apache.hyracks.api.job.profiling.counters.ICounter; - -public interface IOperatorStats extends IWritable, Serializable { - - /** - * @return The name of the operator - */ - String getName(); - - /** - * @return A counter used to track the number of tuples - * accessed by an operator - */ - ICounter getTupleCounter(); - - /** - * @return A counter used to track the execution time - * of an operator - */ - ICounter getTimeCounter(); - - /** - * @return A counter used to track the number of pages pinned by an opeartor - */ - ICounter getDiskIoCounter(); -} +DROP INDEX ds1.ds1_idx3; +ANALYZE DATASET ds1; \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp index 181249abac..9c01681ddf 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.query.sqlpp @@ -16,34 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job.profiling; -import java.io.Serializable; +USE test; -import org.apache.hyracks.api.io.IWritable; -import org.apache.hyracks.api.job.profiling.counters.ICounter; - -public interface IOperatorStats extends IWritable, Serializable { - - /** - * @return The name of the operator - */ - String getName(); - - /** - * @return A counter used to track the number of tuples - * accessed by an operator - */ - ICounter getTupleCounter(); - - /** - * @return A counter used to track the execution time - * of an operator - */ - ICounter getTimeCounter(); - - /** - * @return A counter used to track the number of pages pinned by an opeartor - */ - ICounter getDiskIoCounter(); -} +FROM `Metadata`.`Index` t WHERE t.IndexStructure = "SAMPLE" +SELECT t.* EXCLUDE DataverseName, SearchKey, IsPrimary, Timestamp, PendingOp, SampleSeed +ORDER BY t.DatasetName, t.IndexName; \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp similarity index 52% copy from hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java copy to asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp index 181249abac..36b2bab543 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.99.ddl.sqlpp @@ -16,34 +16,5 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.api.job.profiling; -import java.io.Serializable; - -import org.apache.hyracks.api.io.IWritable; -import org.apache.hyracks.api.job.profiling.counters.ICounter; - -public interface IOperatorStats extends IWritable, Serializable { - - /** - * @return The name of the operator - */ - String getName(); - - /** - * @return A counter used to track the number of tuples - * accessed by an operator - */ - ICounter getTupleCounter(); - - /** - * @return A counter used to track the execution time - * of an operator - */ - ICounter getTimeCounter(); - - /** - * @return A counter used to track the number of pages pinned by an opeartor - */ - ICounter getDiskIoCounter(); -} +DROP DATAVERSE test IF EXISTS; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm new file mode 100644 index 0000000000..bc2792b6fa --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.04.adm @@ -0,0 +1,3 @@ +{ "DatasetName": "ds1", "IndexName": "sample_idx_1_ds1", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds1_idx3", "NumPages": 8 }, { "IndexName": "ds1_idx2", "NumPages": 8 }, { "IndexName": "ds1_idx1", "NumPages": 8 } ] } +{ "DatasetName": "ds2", "IndexName": "sample_idx_1_ds2", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds2_idx1", "NumPages": 8 } ] } +{ "DatasetName": "ds3", "IndexName": "sample_idx_1_ds3", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm new file mode 100644 index 0000000000..08f8659cea --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/ddl/analyze-dataset-with-indexes/analyze-dataset-with-indexes.06.adm @@ -0,0 +1,3 @@ +{ "DatasetName": "ds1", "IndexName": "sample_idx_2_ds1", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds1_idx2", "NumPages": 8 }, { "IndexName": "ds1_idx1", "NumPages": 8 } ] } +{ "DatasetName": "ds2", "IndexName": "sample_idx_1_ds2", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369, "IndexesStats": [ { "IndexName": "ds2_idx1", "NumPages": 8 } ] } +{ "DatasetName": "ds3", "IndexName": "sample_idx_1_ds3", "IndexStructure": "SAMPLE", "SampleCardinalityTarget": 1063, "SourceCardinality": 1000, "SourceAvgItemSize": 369 } \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index f84472e376..c87f3685e0 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -46,9 +46,13 @@ public class StoragePathUtil { private StoragePathUtil() { } + public static IFileSplitProvider splitProvider(FileSplit[] splits) { + return new ConstantFileSplitProvider(splits); + } + public static Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraints( FileSplit[] splits) { - IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits); + IFileSplitProvider splitProvider = splitProvider(splits); String[] loc = new String[splits.length]; for (int p = 0; p < splits.length; p++) { loc[p] = splits[p].getNodeName(); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 712a216d8b..53cf3d0058 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -26,11 +26,13 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.config.DatasetConfig.DatasetType; @@ -1830,6 +1832,24 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return StoragePathUtil.splitProviderAndPartitionConstraints(splits); } + public List<Pair<IFileSplitProvider, String>> getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException { + List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream() + .filter(idx -> idx.getIndexType() != IndexType.SAMPLE && idx.isSecondaryIndex()) + .collect(Collectors.toList()); + if (dsIndexes.isEmpty()) { + return Collections.emptyList(); + } + List<String> datasetNodes = findNodes(ds.getNodeGroupName()); + List<Pair<IFileSplitProvider, String>> indexesSplits = + dsIndexes.stream() + .map(idx -> new Pair<>( + StoragePathUtil.splitProvider(SplitsAndConstraintsUtil.getIndexSplits( + appCtx.getClusterStateManager(), ds, idx.getIndexName(), datasetNodes)), + idx.getIndexName())) + .collect(Collectors.toList()); + return indexesSplits; + } + public LockList getLocks() { return locks; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java index 21d2aaac6b..bbccd65cf3 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java @@ -22,6 +22,7 @@ package org.apache.asterix.metadata.entities; import java.io.Serializable; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import org.apache.asterix.common.config.DatasetConfig.IndexType; @@ -39,6 +40,7 @@ import org.apache.asterix.om.types.IAType; import org.apache.asterix.om.utils.NonTaggedFormatUtil; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.job.profiling.IndexStats; import org.apache.hyracks.util.OptionalBoolean; /** @@ -554,10 +556,11 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> { private final int sourceAvgItemSize; private final long sampleSeed; + private final Map<String, IndexStats> indexesStats; public SampleIndexDetails(List<List<String>> keyFieldNames, List<Integer> keyFieldSourceIndicators, List<IAType> keyFieldTypes, int sampleCardinalityTarget, long sourceCardinality, int sourceAvgItemSize, - long sampleSeed) { + long sampleSeed, Map<String, IndexStats> indexesStats) { this.keyFieldNames = keyFieldNames; this.keyFieldSourceIndicators = keyFieldSourceIndicators; this.keyFieldTypes = keyFieldTypes; @@ -565,6 +568,7 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> { this.sourceCardinality = sourceCardinality; this.sourceAvgItemSize = sourceAvgItemSize; this.sampleSeed = sampleSeed; + this.indexesStats = indexesStats; } @Override @@ -604,6 +608,10 @@ public class Index implements IMetadataEntity<Index>, Comparable<Index> { public long getSampleSeed() { return sampleSeed; } + + public Map<String, IndexStats> getIndexesStats() { + return indexesStats; + } } @Deprecated diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java index 9c742ed9c2..fc78d10ffd 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/IndexTupleTranslator.java @@ -25,7 +25,9 @@ import static org.apache.asterix.metadata.bootstrap.MetadataRecordTypes.FIELD_NA import java.util.ArrayList; import java.util.Calendar; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.asterix.builders.IARecordBuilder; @@ -49,6 +51,7 @@ import org.apache.asterix.om.base.ACollectionCursor; import org.apache.asterix.om.base.AInt32; import org.apache.asterix.om.base.AInt64; import org.apache.asterix.om.base.AInt8; +import org.apache.asterix.om.base.AMutableInt64; import org.apache.asterix.om.base.AMutableInt8; import org.apache.asterix.om.base.ANull; import org.apache.asterix.om.base.AOrderedList; @@ -67,6 +70,7 @@ import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.common.utils.Triple; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.job.profiling.IndexStats; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.util.OptionalBoolean; @@ -95,6 +99,9 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { public static final String SAMPLE_CARDINALITY_TARGET = "SampleCardinalityTarget"; public static final String SOURCE_CARDINALITY = "SourceCardinality"; public static final String SOURCE_AVG_ITEM_SIZE = "SourceAvgItemSize"; + public static final String INDEXES_STATS = "IndexesStats"; + public static final String STATS_NUM_PAGES = "NumPages"; + public static final String STATS_INDEX_NAME = "IndexName"; protected final TxnId txnId; protected final MetadataNode metadataNode; @@ -105,13 +112,15 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { protected OrderedListBuilder complexSearchKeyNameListBuilder; protected IARecordBuilder complexSearchKeyNameRecordBuilder; protected IARecordBuilder castRecordBuilder; + protected OrderedListBuilder indexesStatsListBuilder; + protected IARecordBuilder indexStatsRecordBuilder; protected AOrderedListType stringList; protected AOrderedListType int8List; protected ArrayBackedValueStorage nameValue; protected ArrayBackedValueStorage itemValue; protected AMutableInt8 aInt8; + protected AMutableInt64 aInt64; protected ISerializerDeserializer<AInt8> int8Serde; - protected ISerializerDeserializer<ANull> nullSerde; @SuppressWarnings("unchecked") protected IndexTupleTranslator(TxnId txnId, MetadataNode metadataNode, boolean getTuple) { @@ -124,14 +133,16 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { primaryKeyListBuilder = new OrderedListBuilder(); complexSearchKeyNameRecordBuilder = new RecordBuilder(); castRecordBuilder = new RecordBuilder(); + indexesStatsListBuilder = new OrderedListBuilder(); + indexStatsRecordBuilder = new RecordBuilder(); complexSearchKeyNameListBuilder = new OrderedListBuilder(); stringList = new AOrderedListType(BuiltinType.ASTRING, null); int8List = new AOrderedListType(BuiltinType.AINT8, null); nameValue = new ArrayBackedValueStorage(); itemValue = new ArrayBackedValueStorage(); aInt8 = new AMutableInt8((byte) 0); + aInt64 = new AMutableInt64(0); int8Serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8); - nullSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); } } @@ -490,8 +501,26 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { } int sourceAvgItemSize = ((AInt32) indexRecord.getValueByPos(sourceAvgItemSizePos)).getIntegerValue(); + int indexesStatsPos = indexRecord.getType().getFieldIndex(INDEXES_STATS); + Map<String, IndexStats> indexesStats; + if (indexesStatsPos >= 0) { + AOrderedList indexesStatsList = (AOrderedList) indexRecord.getValueByPos(indexesStatsPos); + int numIndexes = indexesStatsList.size(); + indexesStats = numIndexes > 0 ? new HashMap<>() : Collections.emptyMap(); + for (int i = 0; i < numIndexes; i++) { + ARecord stats = (ARecord) indexesStatsList.getItem(i); + IAObject numPages = stats.getValueByPos(stats.getType().getFieldIndex(STATS_NUM_PAGES)); + IAObject idxNameObj = stats.getValueByPos(stats.getType().getFieldIndex(STATS_INDEX_NAME)); + String idxName = ((AString) idxNameObj).getStringValue(); + IndexStats idxStats = new IndexStats(idxName, ((AInt64) numPages).getLongValue()); + indexesStats.put(idxName, idxStats); + } + } else { + indexesStats = Collections.emptyMap(); + } + indexDetails = new Index.SampleIndexDetails(keyFieldNames, keyFieldSourceIndicator, keyFieldTypes, - sampleCardinalityTarget, sourceCardinality, sourceAvgItemSize, sampleSeed); + sampleCardinalityTarget, sourceCardinality, sourceAvgItemSize, sampleSeed, indexesStats); break; default: throw new AsterixException(ErrorCode.METADATA_ERROR, indexType.toString()); @@ -935,6 +964,41 @@ public class IndexTupleTranslator extends AbstractTupleTranslator<Index> { stringSerde.serialize(aString, nameValue.getDataOutput()); int32Serde.serialize(new AInt32(indexDetails.getSourceAvgItemSize()), fieldValue.getDataOutput()); recordBuilder.addField(nameValue, fieldValue); + + Map<String, IndexStats> indexesStats = indexDetails.getIndexesStats(); + if (!indexesStats.isEmpty()) { + indexesStatsListBuilder.reset(AOrderedListType.FULL_OPEN_ORDEREDLIST_TYPE); + for (Map.Entry<String, IndexStats> stats : indexesStats.entrySet()) { + indexStatsRecordBuilder.reset(RecordUtil.FULLY_OPEN_RECORD_TYPE); + // index name + nameValue.reset(); + itemValue.reset(); + aString.setValue(STATS_INDEX_NAME); + stringSerde.serialize(aString, nameValue.getDataOutput()); + aString.setValue(stats.getKey()); + stringSerde.serialize(aString, itemValue.getDataOutput()); + indexStatsRecordBuilder.addField(nameValue, itemValue); + + // index number of pages + nameValue.reset(); + itemValue.reset(); + aString.setValue(STATS_NUM_PAGES); + stringSerde.serialize(aString, nameValue.getDataOutput()); + aInt64.setValue(stats.getValue().getNumPages()); + int64Serde.serialize(aInt64, itemValue.getDataOutput()); + indexStatsRecordBuilder.addField(nameValue, itemValue); + + itemValue.reset(); + indexStatsRecordBuilder.write(itemValue.getDataOutput(), true); + indexesStatsListBuilder.addItem(itemValue); + } + nameValue.reset(); + fieldValue.reset(); + aString.setValue(INDEXES_STATS); + stringSerde.serialize(aString, nameValue.getDataOutput()); + indexesStatsListBuilder.write(fieldValue.getDataOutput(), true); + recordBuilder.addField(nameValue, fieldValue); + } } } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java index 0d3e015c0f..056a8c22f2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java @@ -19,6 +19,7 @@ package org.apache.asterix.metadata.utils; +import java.util.List; import java.util.Map; import java.util.Set; @@ -36,8 +37,8 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.runtime.aggregates.collections.FirstElementEvalFactory; import org.apache.asterix.runtime.evaluators.comparisons.GreaterThanDescriptor; +import org.apache.asterix.runtime.operators.DatasetStreamStatsOperatorDescriptor; import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor; -import org.apache.asterix.runtime.operators.StreamStatsOperatorDescriptor; import org.apache.asterix.runtime.runningaggregates.std.SampleSlotRunningAggregateFunctionFactory; import org.apache.asterix.runtime.runningaggregates.std.TidRunningAggregateDescriptor; import org.apache.asterix.runtime.utils.RuntimeUtils; @@ -82,6 +83,7 @@ import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; import org.apache.hyracks.storage.common.IResourceFactory; +import org.apache.hyracks.storage.common.IStorageManager; /** * Utility class for sampling operations. @@ -164,10 +166,10 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper { for (int i = 0; i < nFields; i++) { columns[i] = i; } - + IStorageManager storageMgr = metadataProvider.getStorageComponentProvider().getStorageManager(); JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory( - metadataProvider.getStorageComponentProvider().getStorageManager(), fileSplitProvider); + IIndexDataflowHelperFactory dataflowHelperFactory = + new IndexDataflowHelperFactory(storageMgr, fileSplitProvider); // job spec: IndexUtil.bindJobEventListener(spec, metadataProvider); @@ -179,7 +181,16 @@ public class SampleOperationsHelper implements ISecondaryIndexOperationsHelper { sourceOp = targetOp; // primary index scan ----> stream stats op - targetOp = new StreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME); + List<Pair<IFileSplitProvider, String>> indexesInfo = metadataProvider.getSplitProviderOfAllIndexes(dataset); + IndexDataflowHelperFactory[] indexes = new IndexDataflowHelperFactory[indexesInfo.size()]; + String[] names = new String[indexesInfo.size()]; + for (int i = 0; i < indexes.length; i++) { + Pair<IFileSplitProvider, String> indexInfo = indexesInfo.get(i); + indexes[i] = new IndexDataflowHelperFactory(storageMgr, indexInfo.first); + names[i] = indexInfo.second; + } + targetOp = + new DatasetStreamStatsOperatorDescriptor(spec, recordDesc, DATASET_STATS_OPERATOR_NAME, indexes, names); spec.connect(new OneToOneConnectorDescriptor(spec), sourceOp, 0, targetOp, 0); sourceOp = targetOp; diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java similarity index 72% rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java index 268009f360..8ea267ac88 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStats.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStats.java @@ -19,26 +19,33 @@ package org.apache.asterix.runtime.operators; +import java.util.Map; + import org.apache.hyracks.api.job.profiling.IOperatorStats; +import org.apache.hyracks.api.job.profiling.IndexStats; /** - * Helper method to access stats produced by {@link org.apache.asterix.runtime.operators.StreamStatsOperatorDescriptor} + * Helper method to access stats produced by {@link DatasetStreamStatsOperatorDescriptor} */ -public final class StreamStats { +public final class DatasetStreamStats { private final long cardinality; private final int avgTupleSize; - public StreamStats(IOperatorStats opStats) { + private final Map<String, IndexStats> indexesStats; + + public DatasetStreamStats(IOperatorStats opStats) { this.cardinality = opStats.getTupleCounter().get(); long totalTupleSize = opStats.getDiskIoCounter().get(); this.avgTupleSize = cardinality > 0 ? (int) (totalTupleSize / cardinality) : 0; + this.indexesStats = opStats.getIndexesStats(); } - static void update(IOperatorStats opStats, long tupleCount, long tupleSize) { + static void update(IOperatorStats opStats, long tupleCount, long tupleSize, Map<String, IndexStats> indexStats) { opStats.getTupleCounter().update(tupleCount); opStats.getDiskIoCounter().update(tupleSize); + opStats.updateIndexesStats(indexStats); } public long getCardinality() { @@ -48,4 +55,8 @@ public final class StreamStats { public int getAvgTupleSize() { return avgTupleSize; } + + public Map<String, IndexStats> getIndexesStats() { + return indexesStats; + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java similarity index 57% rename from asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java rename to asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java index 353401acee..ba294506e8 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StreamStatsOperatorDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java @@ -20,7 +20,10 @@ package org.apache.asterix.runtime.operators; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -29,26 +32,38 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.IOperatorDescriptorRegistry; import org.apache.hyracks.api.job.profiling.IOperatorStats; import org.apache.hyracks.api.job.profiling.IStatsCollector; +import org.apache.hyracks.api.job.profiling.IndexStats; +import org.apache.hyracks.api.job.profiling.OperatorStats; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor; import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBloomFilterDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; /** * Computes total tuple count and total tuple length for all input tuples, * and emits these values as operator stats. */ -public final class StreamStatsOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { +public final class DatasetStreamStatsOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { private static final long serialVersionUID = 1L; private final String operatorName; + private final IIndexDataflowHelperFactory[] indexes; + private final String[] indexesNames; + private Map<String, IndexStats> indexStats; - public StreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, - String operatorName) { + public DatasetStreamStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, + String operatorName, IIndexDataflowHelperFactory[] indexes, String[] indexesNames) { super(spec, 1, 1); outRecDescs[0] = rDesc; this.operatorName = operatorName; + this.indexes = indexes; + this.indexesNames = indexesNames; } @Override @@ -66,6 +81,33 @@ public final class StreamStatsOperatorDescriptor extends AbstractSingleActivityO fta = new FrameTupleAccessor(outRecDescs[0]); totalTupleCount = 0; writer.open(); + IStatsCollector coll = ctx.getStatsCollector(); + if (coll != null) { + coll.add(new OperatorStats(operatorName)); + } + INCServiceContext serviceCtx = ctx.getJobletContext().getServiceContext(); + indexStats = new HashMap<>(); + for (int i = 0; i < indexes.length; i++) { + IIndexDataflowHelper idxFlowHelper = indexes[i].create(serviceCtx, partition); + try { + idxFlowHelper.open(); + ILSMIndex indexInstance = (ILSMIndex) idxFlowHelper.getIndexInstance(); + long numPages = 0; + synchronized (indexInstance.getOperationTracker()) { + for (ILSMDiskComponent component : indexInstance.getDiskComponents()) { + long componentSize = component.getComponentSize(); + if (component instanceof AbstractLSMWithBloomFilterDiskComponent) { + componentSize -= ((AbstractLSMWithBloomFilterDiskComponent) component) + .getBloomFilter().getFileReference().getFile().length(); + } + numPages += componentSize / indexInstance.getBufferCache().getPageSize(); + } + } + indexStats.put(indexesNames[i], new IndexStats(indexesNames[i], numPages)); + } finally { + idxFlowHelper.close(); + } + } } @Override @@ -93,7 +135,7 @@ public final class StreamStatsOperatorDescriptor extends AbstractSingleActivityO IStatsCollector statsCollector = ctx.getStatsCollector(); if (statsCollector != null) { IOperatorStats stats = statsCollector.getOrAddOperatorStats(operatorName); - StreamStats.update(stats, totalTupleCount, totalTupleLength); + DatasetStreamStats.update(stats, totalTupleCount, totalTupleLength, indexStats); } writer.close(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java index 181249abac..0d38fac685 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IOperatorStats.java @@ -19,6 +19,7 @@ package org.apache.hyracks.api.job.profiling; import java.io.Serializable; +import java.util.Map; import org.apache.hyracks.api.io.IWritable; import org.apache.hyracks.api.job.profiling.counters.ICounter; @@ -46,4 +47,10 @@ public interface IOperatorStats extends IWritable, Serializable { * @return A counter used to track the number of pages pinned by an opeartor */ ICounter getDiskIoCounter(); + + void updateIndexesStats(Map<String, IndexStats> indexesStats); + + Map<String, IndexStats> getIndexesStats(); + + void updateFrom(IOperatorStats stats); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java new file mode 100644 index 0000000000..0c471efddb --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/IndexStats.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.job.profiling; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.hyracks.api.com.job.profiling.counters.Counter; +import org.apache.hyracks.api.io.IWritable; +import org.apache.hyracks.api.job.profiling.counters.ICounter; + +public class IndexStats implements IWritable, Serializable { + + private static final long serialVersionUID = 1L; + + private final ICounter numPages; + private String indexName; + + public IndexStats(String indexName, long numPages) { + this.indexName = indexName; + this.numPages = new Counter("numPages"); + this.numPages.set(numPages); + } + + public static IndexStats create(DataInput input) throws IOException { + String indexName = input.readUTF(); + long numPages = input.readLong(); + return new IndexStats(indexName, numPages); + } + + @Override + public void writeFields(DataOutput output) throws IOException { + output.writeUTF(indexName); + output.writeLong(numPages.get()); + } + + @Override + public void readFields(DataInput input) throws IOException { + indexName = input.readUTF(); + numPages.set(input.readLong()); + } + + public void updateNumPages(long delta) { + numPages.update(delta); + } + + public long getNumPages() { + return numPages.get(); + } + + @Override + public String toString() { + return "IndexStats{indexName='" + indexName + "', numPages=" + numPages.get() + '}'; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java index 08c1adcc75..6b7deb6cf8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/profiling/OperatorStats.java @@ -21,6 +21,8 @@ package org.apache.hyracks.api.job.profiling; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.hyracks.api.com.job.profiling.counters.Counter; import org.apache.hyracks.api.job.profiling.counters.ICounter; @@ -32,6 +34,7 @@ public class OperatorStats implements IOperatorStats { public final ICounter tupleCounter; public final ICounter timeCounter; public final ICounter diskIoCounter; + private final Map<String, IndexStats> indexesStats; public OperatorStats(String operatorName) { if (operatorName == null || operatorName.isEmpty()) { @@ -41,6 +44,7 @@ public class OperatorStats implements IOperatorStats { tupleCounter = new Counter("tupleCounter"); timeCounter = new Counter("timeCounter"); diskIoCounter = new Counter("diskIoCounter"); + indexesStats = new HashMap<>(); } public static IOperatorStats create(DataInput input) throws IOException { @@ -70,12 +74,43 @@ public class OperatorStats implements IOperatorStats { return diskIoCounter; } + @Override + public void updateIndexesStats(Map<String, IndexStats> stats) { + if (stats == null) { + return; + } + for (Map.Entry<String, IndexStats> stat : stats.entrySet()) { + String indexName = stat.getKey(); + IndexStats indexStat = stat.getValue(); + IndexStats existingIndexStat = indexesStats.get(indexName); + if (existingIndexStat == null) { + indexesStats.put(indexName, new IndexStats(indexName, indexStat.getNumPages())); + } else { + existingIndexStat.updateNumPages(indexStat.getNumPages()); + } + } + } + + @Override + public Map<String, IndexStats> getIndexesStats() { + return indexesStats; + } + + @Override + public void updateFrom(IOperatorStats stats) { + tupleCounter.update(stats.getTupleCounter().get()); + timeCounter.update(stats.getTimeCounter().get()); + diskIoCounter.update(stats.getDiskIoCounter().get()); + updateIndexesStats(stats.getIndexesStats()); + } + @Override public void writeFields(DataOutput output) throws IOException { output.writeUTF(operatorName); output.writeLong(tupleCounter.get()); output.writeLong(timeCounter.get()); output.writeLong(diskIoCounter.get()); + writeIndexesStats(output); } @Override @@ -83,11 +118,30 @@ public class OperatorStats implements IOperatorStats { tupleCounter.set(input.readLong()); timeCounter.set(input.readLong()); diskIoCounter.set(input.readLong()); + readIndexesStats(input); + } + + private void writeIndexesStats(DataOutput output) throws IOException { + output.writeInt(indexesStats.size()); + for (Map.Entry<String, IndexStats> indexStat : indexesStats.entrySet()) { + output.writeUTF(indexStat.getKey()); + indexStat.getValue().writeFields(output); + } + } + + private void readIndexesStats(DataInput input) throws IOException { + int numIndexes = input.readInt(); + for (int i = 0; i < numIndexes; i++) { + String indexName = input.readUTF(); + IndexStats indexStats = IndexStats.create(input); + indexesStats.put(indexName, indexStats); + } } @Override public String toString() { return "{ " + "\"operatorName\": \"" + operatorName + "\", " + "\"" + tupleCounter.getName() + "\": " - + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + " }"; + + tupleCounter.get() + ", \"" + timeCounter.getName() + "\": " + timeCounter.get() + ", \"" + + ", \"indexesStats\": \"" + indexesStats + "\" }"; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java index 0026af4645..ee4990866d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/job/profiling/om/JobProfile.java @@ -39,7 +39,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; public class JobProfile extends AbstractProfile { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private JobId jobId; @@ -149,9 +149,7 @@ public class JobProfile extends AbstractProfile { opOutStats = new OperatorStats(operatorName); outStats[i] = opOutStats; } - opOutStats.getTupleCounter().update(opTaskStats.getTupleCounter().get()); - opOutStats.getTimeCounter().update(opTaskStats.getTimeCounter().get()); - opOutStats.getDiskIoCounter().update(opTaskStats.getDiskIoCounter().get()); + opOutStats.updateFrom(opTaskStats); } } return Arrays.asList(outStats);
