This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/intoOperator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a8d74daf90583ce962593733c2243d7b4ef10ada Author: Minghui Liu <[email protected]> AuthorDate: Wed Oct 19 16:17:42 2022 +0800 finish execute insertMultiTabletsStatement --- .../iotdb/db/client/DataNodeInternalClient.java | 96 ++++++++++++++++++++++ .../iotdb/db/exception/IntoProcessException.java | 27 ++++++ .../operator/process/AbstractIntoOperator.java | 20 ++++- .../operator/process/DeviceViewIntoOperator.java | 3 +- .../iotdb/db/query/control/SessionManager.java | 5 ++ 5 files changed, 149 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java new file mode 100644 index 0000000000..43f7271bd2 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeInternalClient.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.client; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.conf.OperationType; +import org.apache.iotdb.db.mpp.plan.Coordinator; +import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.StandalonePartitionFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher; +import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; +import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; + +import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; + +public class DataNodeInternalClient { + + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + private static final Coordinator COORDINATOR = Coordinator.getInstance(); + + private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); + + private final IPartitionFetcher PARTITION_FETCHER; + + private final ISchemaFetcher SCHEMA_FETCHER; + + private final long sessionId; + + public DataNodeInternalClient() { + if (config.isClusterMode()) { + PARTITION_FETCHER = ClusterPartitionFetcher.getInstance(); + SCHEMA_FETCHER = ClusterSchemaFetcher.getInstance(); + } else { + PARTITION_FETCHER = StandalonePartitionFetcher.getInstance(); + SCHEMA_FETCHER = StandaloneSchemaFetcher.getInstance(); + } + sessionId = SESSION_MANAGER.requestInternalSessionId(); + } + + public TSStatus insertTablets(InsertMultiTabletsStatement statement) { + try { + if (statement.isEmpty()) { + // return success when this statement is empty because server doesn't need to execute it + return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); + } + + // permission check + TSStatus status = AuthorityChecker.checkAuthority(statement, sessionId); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + + // call the coordinator + long queryId = SESSION_MANAGER.requestQueryId(false); + ExecutionResult result = + COORDINATOR.execute( + statement, + queryId, + SESSION_MANAGER.getSessionInfo(sessionId), + "", + PARTITION_FETCHER, + SCHEMA_FETCHER); + return result.status; + } catch (Exception e) { + return onNPEOrUnexpectedException( + e, OperationType.INSERT_TABLETS, TSStatusCode.EXECUTE_STATEMENT_ERROR); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java new file mode 100644 index 0000000000..a19b049152 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/exception/IntoProcessException.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception; + +public class IntoProcessException extends RuntimeException { + + public IntoProcessException(String message) { + super(message); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java index 28b90daae7..21cf6951fa 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/AbstractIntoOperator.java @@ -19,13 +19,17 @@ package org.apache.iotdb.db.mpp.execution.operator.process; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.client.DataNodeInternalClient; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.IntoProcessException; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.mpp.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -34,6 +38,8 @@ import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import com.google.common.util.concurrent.ListenableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -44,6 +50,8 @@ import java.util.concurrent.atomic.AtomicInteger; public abstract class AbstractIntoOperator implements ProcessOperator { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIntoOperator.class); + protected final OperatorContext operatorContext; protected final Operator child; @@ -51,6 +59,8 @@ public abstract class AbstractIntoOperator implements ProcessOperator { protected final Map<String, InputLocation> sourceColumnToInputLocationMap; + private final DataNodeInternalClient client = new DataNodeInternalClient(); + public AbstractIntoOperator( OperatorContext operatorContext, Operator child, @@ -95,7 +105,15 @@ public abstract class AbstractIntoOperator implements ProcessOperator { InsertMultiTabletsStatement insertMultiTabletsStatement = new InsertMultiTabletsStatement(); insertMultiTabletsStatement.setInsertTabletStatementList(insertTabletStatementList); - // TODO: execute insertMultiTabletsStatement + TSStatus executionStatus = client.insertTablets(insertMultiTabletsStatement); + if (executionStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + String message = + String.format( + "Error occurred while inserting tablets in SELECT INTO. %s", + executionStatus.getMessage()); + LOGGER.error(message); + throw new IntoProcessException(message); + } for (IntoOperator.InsertTabletStatementGenerator generator : insertTabletStatementGenerators) { generator.reset(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java index f0eab0b71a..201aaaef3b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/DeviceViewIntoOperator.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.IntoProcessException; import org.apache.iotdb.db.mpp.common.header.ColumnHeader; import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant; import org.apache.iotdb.db.mpp.execution.operator.Operator; @@ -74,7 +75,7 @@ public class DeviceViewIntoOperator extends AbstractIntoOperator { } @Override - public TsBlock next() { + public TsBlock next() throws IntoProcessException { TsBlock inputTsBlock = child.next(); if (inputTsBlock != null) { String device = String.valueOf(inputTsBlock.getValueColumns()[0].getBinary(0)); diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java index 53f140d317..1ee80d5df2 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java @@ -237,6 +237,11 @@ public class SessionManager { return sessionId; } + public long requestInternalSessionId() { + return requestSessionId( + "__internal", ZoneId.systemDefault().getId(), IoTDBConstant.ClientVersion.V_0_13); + } + public boolean releaseSessionResource(long sessionId) { return releaseSessionResource(sessionId, this::releaseQueryResourceNoExceptions); }
