This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch multi-cyclic-pipe
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3d4a14731cffd5fd553f9c5eccda9ccdd58ca6be
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Aug 14 22:06:22 2023 +0800

    ..
---
 .../org/apache/iotdb/db/audit/AuditLogger.java     |   1 +
 .../org/apache/iotdb/db/auth/AuthorityChecker.java |   1 +
 .../receiver/thrift/IoTDBThriftReceiverV1.java     |  16 +-
 .../queryengine/plan/statement/StatementType.java  |   1 +
 .../plan/statement/StatementVisitor.java           |  42 ++--
 .../plan/statement/crud/InsertBaseStatement.java   |   3 +-
 .../crud/PipeEnrichedInsertBaseStatement.java      | 220 +++++++++++++++++++++
 .../quotas/DataNodeThrottleQuotaManager.java       |   1 +
 .../rescon/quotas/DefaultOperationQuota.java       |   8 +-
 9 files changed, 270 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index 5c7b9da4923..fa1fd88dbd8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -205,6 +205,7 @@ public class AuditLogger {
       case BATCH_INSERT_ROWS:
       case BATCH_INSERT_ONE_DEVICE:
       case MULTI_BATCH_INSERT:
+      case PIPE_ENRICHED_INSERT:
       case DELETE:
       case SELECT_INTO:
       case LOAD_FILES:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
index 905f895d349..18be7540625 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/AuthorityChecker.java
@@ -213,6 +213,7 @@ public class AuthorityChecker {
       case BATCH_INSERT_ONE_DEVICE:
       case BATCH_INSERT_ROWS:
       case MULTI_BATCH_INSERT:
+      case PIPE_ENRICHED_INSERT:
         return new int[] {PrivilegeType.WRITE_DATA.ordinal()};
       case CREATE_USER:
       case DELETE_USER:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index 1f79ebcdf5d..39c47acf439 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -37,8 +37,10 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -412,12 +414,22 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
           TSStatusCode.PIPE_TRANSFER_EXECUTE_STATEMENT_ERROR, "Execute null 
statement.");
     }
 
-    final long queryId = SessionManager.getInstance().requestQueryId();
+    switch (statement.getType()) {
+      case INSERT:
+      case BATCH_INSERT:
+      case BATCH_INSERT_ROWS:
+      case BATCH_INSERT_ONE_DEVICE:
+      case MULTI_BATCH_INSERT:
+        statement = new PipeEnrichedInsertBaseStatement((InsertBaseStatement) 
statement);
+        break;
+        // TODO: LOAD
+    }
+
     final ExecutionResult result =
         Coordinator.getInstance()
             .execute(
                 statement,
-                queryId,
+                SessionManager.getInstance().requestQueryId(),
                 null,
                 "",
                 partitionFetcher,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index b8089c96e22..b818f763e7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -64,6 +64,7 @@ public enum StatementType {
   BATCH_INSERT_ROWS,
   BATCH_INSERT_ONE_DEVICE,
   MULTI_BATCH_INSERT,
+  PIPE_ENRICHED_INSERT,
 
   DELETE,
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index e3eb02b7927..1ffa3d8f108 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalBatchActivateTemplateStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.internal.InternalCreateMultiTimeSeriesStatement;
@@ -302,6 +303,29 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(loadTsFileStatement, context);
   }
 
+  public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
+    return visitStatement(insertRowStatement, context);
+  }
+
+  public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context) 
{
+    return visitStatement(insertRowsStatement, context);
+  }
+
+  public R visitInsertMultiTablets(
+      InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
+    return visitStatement(insertMultiTabletsStatement, context);
+  }
+
+  public R visitInsertRowsOfOneDevice(
+      InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C 
context) {
+    return visitStatement(insertRowsOfOneDeviceStatement, context);
+  }
+
+  public R visitPipeEnrichedInsert(
+      PipeEnrichedInsertBaseStatement pipeEnrichedInsertBaseStatement, C 
context) {
+    return visitStatement(pipeEnrichedInsertBaseStatement, context);
+  }
+
   /** Data Control Language (DCL) */
   public R visitAuthor(AuthorStatement authorStatement, C context) {
     return visitStatement(authorStatement, context);
@@ -339,24 +363,6 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(countStatement, context);
   }
 
-  public R visitInsertRow(InsertRowStatement insertRowStatement, C context) {
-    return visitStatement(insertRowStatement, context);
-  }
-
-  public R visitInsertRows(InsertRowsStatement insertRowsStatement, C context) 
{
-    return visitStatement(insertRowsStatement, context);
-  }
-
-  public R visitInsertMultiTablets(
-      InsertMultiTabletsStatement insertMultiTabletsStatement, C context) {
-    return visitStatement(insertMultiTabletsStatement, context);
-  }
-
-  public R visitInsertRowsOfOneDevice(
-      InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, C 
context) {
-    return visitStatement(insertRowsOfOneDeviceStatement, context);
-  }
-
   public R visitSchemaFetch(SchemaFetchStatement schemaFetchStatement, C 
context) {
     return visitStatement(schemaFetchStatement, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index d118884cef6..848d5806f53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -279,8 +279,7 @@ public abstract class InsertBaseStatement extends Statement 
{
    *
    * @return map from device path to its measurements.
    */
-  protected final Map<PartialPath, List<Pair<String, Integer>>>
-      getMapFromDeviceToMeasurementAndIndex() {
+  protected Map<PartialPath, List<Pair<String, Integer>>> 
getMapFromDeviceToMeasurementAndIndex() {
     boolean[] isLogicalView = new boolean[this.measurements.length];
     int[] indexMapToLogicalViewList = new int[this.measurements.length];
     Arrays.fill(isLogicalView, false);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
new file mode 100644
index 00000000000..0abe0a6eaa9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/PipeEnrichedInsertBaseStatement.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.crud;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
+
+public class PipeEnrichedInsertBaseStatement extends InsertBaseStatement {
+
+  private final InsertBaseStatement insertBaseStatement;
+
+  public PipeEnrichedInsertBaseStatement(InsertBaseStatement 
insertBaseStatement) {
+    statementType = StatementType.PIPE_ENRICHED_INSERT;
+    this.insertBaseStatement = insertBaseStatement;
+  }
+
+  public InsertBaseStatement getInsertBaseStatement() {
+    return insertBaseStatement;
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitPipeEnrichedInsert(this, context);
+  }
+
+  @Override
+  public boolean isDebug() {
+    return insertBaseStatement.isDebug();
+  }
+
+  @Override
+  public void setDebug(boolean debug) {
+    insertBaseStatement.setDebug(debug);
+  }
+
+  @Override
+  public boolean isQuery() {
+    return insertBaseStatement.isQuery();
+  }
+
+  @Override
+  public boolean isAuthenticationRequired() {
+    return insertBaseStatement.isAuthenticationRequired();
+  }
+
+  @Override
+  public PartialPath getDevicePath() {
+    return insertBaseStatement.getDevicePath();
+  }
+
+  @Override
+  public void setDevicePath(PartialPath devicePath) {
+    insertBaseStatement.setDevicePath(devicePath);
+  }
+
+  @Override
+  public String[] getMeasurements() {
+    return insertBaseStatement.getMeasurements();
+  }
+
+  @Override
+  public void setMeasurements(String[] measurements) {
+    insertBaseStatement.setMeasurements(measurements);
+  }
+
+  @Override
+  public MeasurementSchema[] getMeasurementSchemas() {
+    return insertBaseStatement.getMeasurementSchemas();
+  }
+
+  @Override
+  public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
+    insertBaseStatement.setMeasurementSchemas(measurementSchemas);
+  }
+
+  @Override
+  public boolean isAligned() {
+    return insertBaseStatement.isAligned();
+  }
+
+  @Override
+  public void setAligned(boolean aligned) {
+    insertBaseStatement.setAligned(aligned);
+  }
+
+  @Override
+  public TSDataType[] getDataTypes() {
+    return insertBaseStatement.getDataTypes();
+  }
+
+  @Override
+  public void setDataTypes(TSDataType[] dataTypes) {
+    insertBaseStatement.setDataTypes(dataTypes);
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return insertBaseStatement.getPaths();
+  }
+
+  @Override
+  public void updateAfterSchemaValidation() throws QueryProcessException {
+    insertBaseStatement.updateAfterSchemaValidation();
+  }
+
+  @Override
+  protected void selfCheckDataTypes(int index)
+      throws DataTypeMismatchException, PathNotExistException {
+    insertBaseStatement.selfCheckDataTypes(index);
+  }
+
+  @Override
+  public void markFailedMeasurement(int index, Exception cause) {
+    insertBaseStatement.markFailedMeasurement(index, cause);
+  }
+
+  @Override
+  public boolean hasValidMeasurements() {
+    return insertBaseStatement.hasValidMeasurements();
+  }
+
+  @Override
+  public boolean hasFailedMeasurements() {
+    return insertBaseStatement.hasFailedMeasurements();
+  }
+
+  @Override
+  public int getFailedMeasurementNumber() {
+    return insertBaseStatement.getFailedMeasurementNumber();
+  }
+
+  @Override
+  public List<String> getFailedMeasurements() {
+    return insertBaseStatement.getFailedMeasurements();
+  }
+
+  @Override
+  public List<Exception> getFailedExceptions() {
+    return insertBaseStatement.getFailedExceptions();
+  }
+
+  @Override
+  public List<String> getFailedMessages() {
+    return insertBaseStatement.getFailedMessages();
+  }
+
+  @Override
+  public void setFailedMeasurementIndex2Info(
+      Map<Integer, InsertBaseStatement.FailedMeasurementInfo> 
failedMeasurementIndex2Info) {
+    
insertBaseStatement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
+  }
+
+  @Override
+  protected Map<PartialPath, List<Pair<String, Integer>>> 
getMapFromDeviceToMeasurementAndIndex() {
+    return insertBaseStatement.getMapFromDeviceToMeasurementAndIndex();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return insertBaseStatement.isEmpty();
+  }
+
+  @Override
+  public ISchemaValidation getSchemaValidation() {
+    return insertBaseStatement.getSchemaValidation();
+  }
+
+  @Override
+  public List<ISchemaValidation> getSchemaValidationList() {
+    return insertBaseStatement.getSchemaValidationList();
+  }
+
+  @Override
+  protected boolean checkAndCastDataType(int columnIndex, TSDataType dataType) 
{
+    return insertBaseStatement.checkAndCastDataType(columnIndex, dataType);
+  }
+
+  @Override
+  public long getMinTime() {
+    return insertBaseStatement.getMinTime();
+  }
+
+  @Override
+  public Object getFirstValueOfIndex(int index) {
+    return insertBaseStatement.getFirstValueOfIndex(index);
+  }
+
+  @Override
+  public InsertBaseStatement removeLogicalView() {
+    return new 
PipeEnrichedInsertBaseStatement(insertBaseStatement.removeLogicalView());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
index bb07e4457de..70037a99089 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DataNodeThrottleQuotaManager.java
@@ -85,6 +85,7 @@ public class DataNodeThrottleQuotaManager {
       case BATCH_INSERT_ONE_DEVICE:
       case BATCH_INSERT_ROWS:
       case MULTI_BATCH_INSERT:
+      case PIPE_ENRICHED_INSERT:
         return checkQuota(userName, 1, 0, s);
       case QUERY:
       case GROUP_BY_TIME:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index db9c12114e8..750258eef55 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.rescon.quotas;
 
 import org.apache.iotdb.commons.exception.RpcThrottlingException;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -28,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.BitMap;
@@ -92,7 +94,11 @@ public class DefaultOperationQuota implements OperationQuota 
{
   protected void updateEstimateConsumeQuota(int numWrites, int numReads, 
Statement s) {
     if (numWrites > 0) {
       long avgSize = 0;
-      switch (s.getType()) {
+      final StatementType statementType =
+          s.getType() == StatementType.PIPE_ENRICHED_INSERT
+              ? ((PipeEnrichedInsertBaseStatement) 
s).getInsertBaseStatement().getType()
+              : s.getType();
+      switch (statementType) {
         case INSERT:
           // InsertStatement  InsertRowStatement
           if (s instanceof InsertStatement) {

Reply via email to