This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch multi-cyclic-pipe in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3d4a14731cffd5fd553f9c5eccda9ccdd58ca6be Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Aug 14 22:06:22 2023 +0800 .. --- .../org/apache/iotdb/db/audit/AuditLogger.java | 1 + .../org/apache/iotdb/db/auth/AuthorityChecker.java | 1 + .../receiver/thrift/IoTDBThriftReceiverV1.java | 16 +- .../queryengine/plan/statement/StatementType.java | 1 + .../plan/statement/StatementVisitor.java | 42 ++-- .../plan/statement/crud/InsertBaseStatement.java | 3 +- .../crud/PipeEnrichedInsertBaseStatement.java | 220 +++++++++++++++++++++ .../quotas/DataNodeThrottleQuotaManager.java | 1 + .../rescon/quotas/DefaultOperationQuota.java | 8 +- 9 files changed, 270 insertions(+), 23 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java index 5c7b9da4923..fa1fd88dbd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java @@ -205,6 +205,7 @@ public class AuditLogger { case BATCH_INSERT_ROWS: case BATCH_INSERT_ONE_DEVICE: case MULTI_BATCH_INSERT: + case PIPE_ENRICHED_INSERT: case DELETE: case SELECT_INTO: case LOAD_FILES: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java index 905f895d349..18be7540625 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java @@ -213,6 +213,7 @@ public class AuthorityChecker { case BATCH_INSERT_ONE_DEVICE: case BATCH_INSERT_ROWS: case MULTI_BATCH_INSERT: + case PIPE_ENRICHED_INSERT: return new int[] {PrivilegeType.WRITE_DATA.ordinal()}; case CREATE_USER: case DELETE_USER: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java index 1f79ebcdf5d..39c47acf439 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java @@ -37,8 +37,10 @@ import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -412,12 +414,22 @@ public class IoTDBThriftReceiverV1 implements IoTDBThriftReceiver { TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null statement."); } - final long queryId = SessionManager.getInstance().requestQueryId(); + switch (statement.getType()) { + case INSERT: + case BATCH_INSERT: + case BATCH_INSERT_ROWS: + case BATCH_INSERT_ONE_DEVICE: + case MULTI_BATCH_INSERT: + statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement) statement); + break; + // TODO: LOAD + } + final ExecutionResult result = Coordinator.getInstance() .execute( statement, - queryId, + SessionManager.getInstance().requestQueryId(), null, "", partitionFetcher, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java index b8089c96e22..b818f763e7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java @@ -64,6 +64,7 @@ public enum StatementType { BATCH_INSERT_ROWS, BATCH_INSERT_ONE_DEVICE, MULTI_BATCH_INSERT, + PIPE_ENRICHED_INSERT, DELETE, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java index e3eb02b7927..1ffa3d8f108 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement; import org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement; @@ -302,6 +303,29 @@ public abstract class StatementVisitor<R, C> { return visitStatement(loadTsFileStatement, context); } + public R visitInsertRow(InsertRowStatement insertRowStatement, C context) { + return visitStatement(insertRowStatement, context); + } + + public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context) { + return visitStatement(insertRowsStatement, context); + } + + public R visitInsertMultiTablets( + InsertMultiTabletsStatement insertMultiTabletsStatement, C context) { + return visitStatement(insertMultiTabletsStatement, context); + } + + public R visitInsertRowsOfOneDevice( + InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C context) { + return visitStatement(insertRowsOfOneDeviceStatement, context); + } + + public R visitPipeEnrichedInsert( + PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, C context) { + return visitStatement(pipeEnrichedInsertBaseStatement, context); + } + /** Data Control Language (DCL) */ public R visitAuthor(AuthorStatement authorStatement, C context) { return visitStatement(authorStatement, context); @@ -339,24 +363,6 @@ public abstract class StatementVisitor<R, C> { return visitStatement(countStatement, context); } - public R visitInsertRow(InsertRowStatement insertRowStatement, C context) { - return visitStatement(insertRowStatement, context); - } - - public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context) { - return visitStatement(insertRowsStatement, context); - } - - public R visitInsertMultiTablets( - InsertMultiTabletsStatement insertMultiTabletsStatement, C context) { - return visitStatement(insertMultiTabletsStatement, context); - } - - public R visitInsertRowsOfOneDevice( - InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C context) { - return visitStatement(insertRowsOfOneDeviceStatement, context); - } - public R visitSchemaFetch(SchemaFetchStatement schemaFetchStatement, C context) { return visitStatement(schemaFetchStatement, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index d118884cef6..848d5806f53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@ -279,8 +279,7 @@ public abstract class InsertBaseStatement extends Statement { * * @return map from device path to its measurements. */ - protected final Map<PartialPath, List<Pair<String, Integer>>> - getMapFromDeviceToMeasurementAndIndex() { + protected Map<PartialPath, List<Pair<String, Integer>>> getMapFromDeviceToMeasurementAndIndex() { boolean[] isLogicalView = new boolean[this.measurements.length]; int[] indexMapToLogicalViewList = new int[this.measurements.length]; Arrays.fill(isLogicalView, false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java new file mode 100644 index 00000000000..0abe0a6eaa9 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.crud; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; +import org.apache.iotdb.db.exception.metadata.PathNotExistException; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; +import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.List; +import java.util.Map; + +public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement { + + private final InsertBaseStatement insertBaseStatement; + + public PipeEnrichedInsertBaseStatement(InsertBaseStatement insertBaseStatement) { + statementType = StatementType.PIPE_ENRICHED_INSERT; + this.insertBaseStatement = insertBaseStatement; + } + + public InsertBaseStatement getInsertBaseStatement() { + return insertBaseStatement; + } + + @Override + public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { + return visitor.visitPipeEnrichedInsert(this, context); + } + + @Override + public boolean isDebug() { + return insertBaseStatement.isDebug(); + } + + @Override + public void setDebug(boolean debug) { + insertBaseStatement.setDebug(debug); + } + + @Override + public boolean isQuery() { + return insertBaseStatement.isQuery(); + } + + @Override + public boolean isAuthenticationRequired() { + return insertBaseStatement.isAuthenticationRequired(); + } + + @Override + public PartialPath getDevicePath() { + return insertBaseStatement.getDevicePath(); + } + + @Override + public void setDevicePath(PartialPath devicePath) { + insertBaseStatement.setDevicePath(devicePath); + } + + @Override + public String[] getMeasurements() { + return insertBaseStatement.getMeasurements(); + } + + @Override + public void setMeasurements(String[] measurements) { + insertBaseStatement.setMeasurements(measurements); + } + + @Override + public MeasurementSchema[] getMeasurementSchemas() { + return insertBaseStatement.getMeasurementSchemas(); + } + + @Override + public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) { + insertBaseStatement.setMeasurementSchemas(measurementSchemas); + } + + @Override + public boolean isAligned() { + return insertBaseStatement.isAligned(); + } + + @Override + public void setAligned(boolean aligned) { + insertBaseStatement.setAligned(aligned); + } + + @Override + public TSDataType[] getDataTypes() { + return insertBaseStatement.getDataTypes(); + } + + @Override + public void setDataTypes(TSDataType[] dataTypes) { + insertBaseStatement.setDataTypes(dataTypes); + } + + @Override + public List<PartialPath> getPaths() { + return insertBaseStatement.getPaths(); + } + + @Override + public void updateAfterSchemaValidation() throws QueryProcessException { + insertBaseStatement.updateAfterSchemaValidation(); + } + + @Override + protected void selfCheckDataTypes(int index) + throws DataTypeMismatchException, PathNotExistException { + insertBaseStatement.selfCheckDataTypes(index); + } + + @Override + public void markFailedMeasurement(int index, Exception cause) { + insertBaseStatement.markFailedMeasurement(index, cause); + } + + @Override + public boolean hasValidMeasurements() { + return insertBaseStatement.hasValidMeasurements(); + } + + @Override + public boolean hasFailedMeasurements() { + return insertBaseStatement.hasFailedMeasurements(); + } + + @Override + public int getFailedMeasurementNumber() { + return insertBaseStatement.getFailedMeasurementNumber(); + } + + @Override + public List<String> getFailedMeasurements() { + return insertBaseStatement.getFailedMeasurements(); + } + + @Override + public List<Exception> getFailedExceptions() { + return insertBaseStatement.getFailedExceptions(); + } + + @Override + public List<String> getFailedMessages() { + return insertBaseStatement.getFailedMessages(); + } + + @Override + public void setFailedMeasurementIndex2Info( + Map<Integer, InsertBaseStatement.FailedMeasurementInfo> failedMeasurementIndex2Info) { + insertBaseStatement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info); + } + + @Override + protected Map<PartialPath, List<Pair<String, Integer>>> getMapFromDeviceToMeasurementAndIndex() { + return insertBaseStatement.getMapFromDeviceToMeasurementAndIndex(); + } + + @Override + public boolean isEmpty() { + return insertBaseStatement.isEmpty(); + } + + @Override + public ISchemaValidation getSchemaValidation() { + return insertBaseStatement.getSchemaValidation(); + } + + @Override + public List<ISchemaValidation> getSchemaValidationList() { + return insertBaseStatement.getSchemaValidationList(); + } + + @Override + protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) { + return insertBaseStatement.checkAndCastDataType(columnIndex, dataType); + } + + @Override + public long getMinTime() { + return insertBaseStatement.getMinTime(); + } + + @Override + public Object getFirstValueOfIndex(int index) { + return insertBaseStatement.getFirstValueOfIndex(index); + } + + @Override + public InsertBaseStatement removeLogicalView() { + return new PipeEnrichedInsertBaseStatement(insertBaseStatement.removeLogicalView()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java index bb07e4457de..70037a99089 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java @@ -85,6 +85,7 @@ public class DataNodeThrottleQuotaManager { case BATCH_INSERT_ONE_DEVICE: case BATCH_INSERT_ROWS: case MULTI_BATCH_INSERT: + case PIPE_ENRICHED_INSERT: return checkQuota(userName, 1, 0, s); case QUERY: case GROUP_BY_TIME: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java index db9c12114e8..750258eef55 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.rescon.quotas; import org.apache.iotdb.commons.exception.RpcThrottlingException; import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; @@ -28,6 +29,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement; import org.apache.iotdb.db.utils.TypeInferenceUtils; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.BitMap; @@ -92,7 +94,11 @@ public class DefaultOperationQuota implements OperationQuota { protected void updateEstimateConsumeQuota(int numWrites, int numReads, Statement s) { if (numWrites > 0) { long avgSize = 0; - switch (s.getType()) { + final StatementType statementType = + s.getType() == StatementType.PIPE_ENRICHED_INSERT + ? ((PipeEnrichedInsertBaseStatement) s).getInsertBaseStatement().getType() + : s.getType(); + switch (statementType) { case INSERT: // InsertStatement InsertRowStatement if (s instanceof InsertStatement) {
