This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch issue2208 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7fdc72a6d543adefa09d7ae8e42c973bf5934c59 Author: Alima777 <[email protected]> AuthorDate: Mon Dec 27 16:52:50 2021 +0800 split some operation from internalExecute() method --- .../iotdb/db/qp/physical/crud/AggregationPlan.java | 12 ++- .../apache/iotdb/db/qp/physical/crud/UDAFPlan.java | 13 +++ .../apache/iotdb/db/qp/physical/crud/UDTFPlan.java | 9 +- .../db/service/thrift/impl/TSServiceImpl.java | 104 +++++++++++++++++---- 4 files changed, 115 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java index fbd6605..9c5d9ed 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AggregationPlan.java @@ -32,7 +32,11 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.thrift.TException; -import java.util.*; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; public class AggregationPlan extends RawDataQueryPlan { @@ -56,11 +60,11 @@ public class AggregationPlan extends RawDataQueryPlan { @Override public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery) throws TException, MetadataException { + TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS); if (isGroupByLevel()) { List<String> respColumns = new ArrayList<>(); List<String> columnsTypes = new ArrayList<>(); - TSExecuteStatementResp resp = RpcUtils.getTSExecuteStatementResp(TSStatusCode.SUCCESS_STATUS); for (Map.Entry<String, AggregateResult> groupPathResult : getGroupPathsResultMap().entrySet()) { respColumns.add(groupPathResult.getKey()); @@ -70,8 +74,10 @@ public class AggregationPlan extends RawDataQueryPlan { resp.setDataTypeList(columnsTypes); return resp; } else { - return super.getTSExecuteStatementResp(isJdbcQuery); + resp = super.getTSExecuteStatementResp(isJdbcQuery); } + resp.setIgnoreTimeStamp(true); + return resp; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDAFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDAFPlan.java index 4f69ea6..761476b 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDAFPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDAFPlan.java @@ -22,6 +22,9 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.strategy.PhysicalGenerator; import org.apache.iotdb.db.query.expression.Expression; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; + +import org.apache.thrift.TException; import java.time.ZoneId; import java.util.HashSet; @@ -39,6 +42,16 @@ public class UDAFPlan extends UDTFPlan { setOperatorType(OperatorType.UDAF); } + @Override + public TSExecuteStatementResp getTSExecuteStatementResp(boolean isJdbcQuery) + throws TException, MetadataException { + TSExecuteStatementResp resp = super.getTSExecuteStatementResp(isJdbcQuery); + if (getInnerAggregationPlan().getOperatorType() == OperatorType.AGGREGATION) { + resp.setIgnoreTimeStamp(true); + } + return resp; + } + public void setExpressionToInnerResultIndexMap( Map<Expression, Integer> expressionToInnerResultIndexMap) { expressionToInnerResultIndexMap.forEach((k, v) -> pathNameToReaderIndex.put(k.toString(), v)); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java index 999465d..37c46e3 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/UDTFPlan.java @@ -31,7 +31,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Pair; import java.time.ZoneId; -import java.util.*; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; public class UDTFPlan extends RawDataQueryPlan implements UDFPlan { diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java index 87cf042..1762ad6 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java @@ -37,8 +37,30 @@ import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.metadata.template.TemplateQueryType; import org.apache.iotdb.db.qp.logical.Operator.OperatorType; import org.apache.iotdb.db.qp.physical.PhysicalPlan; -import org.apache.iotdb.db.qp.physical.crud.*; -import org.apache.iotdb.db.qp.physical.sys.*; +import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.DeletePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsOfOneDevicePlan; +import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan; +import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; +import org.apache.iotdb.db.qp.physical.crud.QueryPlan; +import org.apache.iotdb.db.qp.physical.crud.SelectIntoPlan; +import org.apache.iotdb.db.qp.physical.crud.UDFPlan; +import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.AuthorPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan; +import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; +import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan; +import org.apache.iotdb.db.qp.physical.sys.ShowPlan; +import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan; +import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.tracing.TracingConstant; import org.apache.iotdb.db.query.dataset.AlignByDeviceDataSet; @@ -56,7 +78,47 @@ import org.apache.iotdb.db.utils.QueryDataSetUtils; import org.apache.iotdb.rpc.RedirectException; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.iotdb.service.rpc.thrift.*; +import org.apache.iotdb.service.rpc.thrift.EndPoint; +import org.apache.iotdb.service.rpc.thrift.ServerProperties; +import org.apache.iotdb.service.rpc.thrift.TSAppendSchemaTemplateReq; +import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq; +import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq; +import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq; +import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; +import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq; +import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; +import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp; +import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp; +import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp; +import org.apache.iotdb.service.rpc.thrift.TSIService; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq; +import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; +import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq; +import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; +import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet; +import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq; +import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp; +import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; +import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq; +import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; +import org.apache.iotdb.service.rpc.thrift.TSStatus; +import org.apache.iotdb.service.rpc.thrift.TSTracingInfo; +import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq; import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -73,11 +135,19 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.sql.SQLException; import java.time.ZoneId; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.stream.Collectors; -import static org.apache.iotdb.db.utils.ErrorHandlingUtils.*; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNonQueryException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.tryCatchQueryException; /** Thrift RPC implementation at server side. */ public class TSServiceImpl extends BasicServiceProvider implements TSIService.Iface { @@ -592,22 +662,13 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If } if (plan instanceof ShowPlan || plan instanceof AuthorPlan) { - resp = getListDataSetHeaders(newDataSet); + resp = getListDataSetHeaders(plan, newDataSet); } else if (plan instanceof UDFPlan || (plan instanceof QueryPlan && ((QueryPlan) plan).isGroupByLevel())) { resp = getQueryColumnHeaders(plan, username, isJdbcQuery); } resp.setOperationType(plan.getOperatorType().toString()); - if (plan.getOperatorType() == OperatorType.AGGREGATION - || (plan instanceof UDAFPlan - && ((UDAFPlan) plan).getInnerAggregationPlan().getOperatorType() - == OperatorType.AGGREGATION)) { - resp.setIgnoreTimeStamp(true); - } else if (plan instanceof ShowQueryProcesslistPlan) { - resp.setIgnoreTimeStamp(false); - } - if (newDataSet instanceof DirectNonAlignDataSet) { resp.setNonAlignQueryDataSet(fillRpcNonAlignReturnData(fetchSize, newDataSet, username)); } else { @@ -663,10 +724,15 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If } } - private TSExecuteStatementResp getListDataSetHeaders(QueryDataSet dataSet) { - return StaticResps.getNoTimeExecuteResp( - dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()), - dataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList())); + private TSExecuteStatementResp getListDataSetHeaders(PhysicalPlan plan, QueryDataSet dataSet) { + TSExecuteStatementResp resp = + StaticResps.getNoTimeExecuteResp( + dataSet.getPaths().stream().map(Path::getFullPath).collect(Collectors.toList()), + dataSet.getDataTypes().stream().map(Enum::toString).collect(Collectors.toList())); + if (plan instanceof ShowQueryProcesslistPlan) { + resp.setIgnoreTimeStamp(false); + } + return resp; } /** get ResultSet schema */
