This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch TableModelIngestion in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3ef660940b5ef68dfcf60e811de279b5336ec60b Merge: b354e0884f4 7060d76c6b5 Author: jt2594838 <[email protected]> AuthorDate: Wed Jul 3 18:51:30 2024 +0800 Merge branch 'enhance_ideviceid_partition_cache' into TableModelIngestion .../plan/analyze/ClusterPartitionFetcher.java | 106 ++--- .../plan/analyze/IPartitionFetcher.java | 5 +- ...upCacheResult.java => DatabaseCacheResult.java} | 4 +- .../analyze/cache/partition/PartitionCache.java | 445 ++++++++++++--------- .../plan/relational/planner/LogicalPlanner.java | 6 +- .../plan/relational/planner/node/CollectNode.java | 19 + .../plan/relational/sql/ast/InsertTablet.java | 13 +- .../plan/statement/crud/InsertBaseStatement.java | 13 + .../schemaengine/schemaregion/utils/MetaUtils.java | 4 +- .../iotdb/db/service/metrics/CacheMetrics.java | 28 +- .../apache/iotdb/db/metadata/MetaUtilsTest.java | 8 +- .../plan/analyze/cache/PartitionCacheTest.java | 36 +- .../plan/relational/analyzer/TestMatadata.java | 6 +- .../iotdb/commons/partition/SchemaPartition.java | 4 +- .../executor/SeriesPartitionExecutor.java | 2 + .../partition/executor/hash/APHashExecutor.java | 18 +- .../partition/executor/hash/BKDRHashExecutor.java | 11 +- .../partition/executor/hash/JSHashExecutor.java | 12 +- .../partition/executor/hash/SDBMHashExecutor.java | 12 +- 19 files changed, 460 insertions(+), 292 deletions(-) diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java index d5eb7b9dade,76d76fd000a..171159f57b6 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java @@@ -93,8 -91,8 +93,10 @@@ public class LogicalPlanner public LogicalQueryPlan plan(Analysis analysis) { PlanNode planNode = planStatement(analysis, analysis.getStatement()); -- relationalPlanOptimizers.forEach( -- optimizer -> optimizer.optimize(planNode, analysis, metadata, sessionInfo, context)); ++ if (analysis.getStatement() instanceof Query) { ++ relationalPlanOptimizers.forEach( ++ optimizer -> optimizer.optimize(planNode, analysis, metadata, sessionInfo, context)); ++ } return new LogicalQueryPlan(context, planNode); } diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java index bc230a47690,00000000000..522bd052234 mode 100644,000000..100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java @@@ -1,102 -1,0 +1,105 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; + +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.util.ArrayList; +import java.util.List; + +public class InsertTablet extends WrappedInsertStatement { + + public InsertTablet(InsertTabletStatement insertTabletStatement, MPPQueryContext context) { + super(insertTabletStatement, context); + } + + @Override + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { + return visitor.visitInsertTablet(this, context); + } + + @Override + public InsertTabletStatement getInnerTreeStatement() { + return ((InsertTabletStatement) super.getInnerTreeStatement()); + } + + @Override + public void updateAfterSchemaValidation(MPPQueryContext context) throws QueryProcessException { + getInnerTreeStatement().updateAfterSchemaValidation(context); + } + + @Override + public String getDatabase() { + return context.getSession().getDatabaseName().get(); + } + + @Override + public String getTableName() { + return getInnerTreeStatement().getDevicePath().getFullPath(); + } + + @Override + public List<Object[]> getDeviceIdList() { + List<Object[]> deviceIdList = new ArrayList<>(); + final InsertTabletStatement insertTabletStatement = getInnerTreeStatement(); + for (int i = 0; i < insertTabletStatement.getRowCount(); i++) { + IDeviceID deviceID = insertTabletStatement.getTableDeviceID(i); + Object[] deviceIdSegments = new Object[deviceID.segmentNum()]; + for (int j = 0; j < deviceIdSegments.length; j++) { + deviceIdSegments[j] = deviceID.segment(j); + } + deviceIdList.add(deviceIdSegments); + } + return deviceIdList; + } + + @Override + public List<String> getAttributeColumnNameList() { + final InsertTabletStatement insertTabletStatement = getInnerTreeStatement(); + List<String> result = new ArrayList<>(); + for (int i = 0; i < insertTabletStatement.getColumnCategories().length; i++) { + if (insertTabletStatement.getColumnCategories()[i] == TsTableColumnCategory.ATTRIBUTE) { + result.add(insertTabletStatement.getMeasurements()[i]); + } + } + return result; + } + + @Override + public List<Object[]> getAttributeValueList() { + final InsertTabletStatement insertTabletStatement = getInnerTreeStatement(); - List<Object[]> result = new ArrayList<>(); - for (int i = 0; i < insertTabletStatement.getColumnCategories().length; i++) { - if (insertTabletStatement.getColumnCategories()[i] == TsTableColumnCategory.ATTRIBUTE) { - result.add(((Object[]) insertTabletStatement.getColumns()[i])); ++ List<Object[]> result = new ArrayList<>(insertTabletStatement.getRowCount()); ++ final List<Integer> attrColumnIndices = insertTabletStatement.getAttrColumnIndices(); ++ for (int i = 0; i < insertTabletStatement.getRowCount(); i++) { ++ Object[] attrValues = new Object[attrColumnIndices.size()]; ++ for (int j = 0; j < attrColumnIndices.size(); j++) { ++ final int columnIndex = attrColumnIndices.get(j); ++ attrValues[j] = ((Object[]) insertTabletStatement.getColumns()[columnIndex])[i]; + } ++ result.add(attrValues); + } - + return result; + } +} diff --cc iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index 15f238e3e75,bc31e71ec80..84b7e7a2ef2 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@@ -72,9 -68,6 +72,10 @@@ public abstract class InsertBaseStateme /** index of failed measurements -> info including measurement, data type and value */ protected Map<Integer, FailedMeasurementInfo> failedMeasurementIndex2Info; + protected TsTableColumnCategory[] columnCategories; + protected List<Integer> idColumnIndices; ++ protected List<Integer> attrColumnIndices; + // region params used by analyzing logical views. /** This param records the logical view schema appeared in this statement. */ @@@ -245,34 -238,6 +246,46 @@@ return false; } + public TsTableColumnCategory[] getColumnCategories() { + return columnCategories; + } + + public void setColumnCategories(TsTableColumnCategory[] columnCategories) { + this.columnCategories = columnCategories; + if (columnCategories != null) { + idColumnIndices = new ArrayList<>(); + for (int i = 0; i < columnCategories.length; i++) { + if (columnCategories[i].equals(TsTableColumnCategory.ID)) { + idColumnIndices.add(i); + } + } + } + } + + public List<Integer> getIdColumnIndices() { + if (idColumnIndices == null && columnCategories != null) { + idColumnIndices = new ArrayList<>(); + for (int i = 0; i < columnCategories.length; i++) { + if (columnCategories[i].equals(TsTableColumnCategory.ID)) { + idColumnIndices.add(i); + } + } + } + return idColumnIndices; + } + ++ public List<Integer> getAttrColumnIndices() { ++ if (attrColumnIndices == null && columnCategories != null) { ++ attrColumnIndices = new ArrayList<>(); ++ for (int i = 0; i < columnCategories.length; i++) { ++ if (columnCategories[i].equals(TsTableColumnCategory.ATTRIBUTE)) { ++ attrColumnIndices.add(i); ++ } ++ } ++ } ++ return attrColumnIndices; ++ } ++ public boolean hasFailedMeasurements() { return failedMeasurementIndex2Info != null && !failedMeasurementIndex2Info.isEmpty(); } diff --cc iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java index ca4cd74bb10,ca4cd74bb10..c32b46d6fcd --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SchemaPartition.java @@@ -73,8 -73,8 +73,8 @@@ public class SchemaPartition extends Pa * <p>The device id shall be [table, seg1, ....] */ public TRegionReplicaSet getSchemaRegionReplicaSet(String database, IDeviceID deviceID) { -- // todo implement this interface, @Potato -- throw new UnsupportedOperationException(); ++ TSeriesPartitionSlot seriesPartitionSlot = calculateDeviceGroupId(deviceID); ++ return schemaPartitionMap.get(database).get(seriesPartitionSlot); } // [root, db, ....]
