This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch concurrent_schema_fetch in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aa1f667ac1745c216bbef22fbf9954799ea5b1f3 Author: Marccos <[email protected]> AuthorDate: Mon Dec 12 11:33:19 2022 +0800 extract AutoCreateSchemaExecutor and ClusterSchemaFetchExecutor --- .../analyze/schema/AutoCreateSchemaExecutor.java | 137 +++++++++++++ .../analyze/schema/ClusterSchemaFetchExecutor.java | 118 +++++++++++ .../plan/analyze/schema/ClusterSchemaFetcher.java | 223 ++++----------------- 3 files changed, 292 insertions(+), 186 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java new file mode 100644 index 0000000000..b5389698b5 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/AutoCreateSchemaExecutor.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.analyze.schema; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree; +import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; +import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement; +import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +class AutoCreateSchemaExecutor { + + private final Function<Statement, ExecutionResult> statementExecutor; + + AutoCreateSchemaExecutor(Function<Statement, ExecutionResult> statementExecutor) { + this.statementExecutor = statementExecutor; + } + + // try to create the target timeseries and return schemaTree involving successfully created + // timeseries and existing timeseries + ClusterSchemaTree internalCreateTimeseries( + PartialPath devicePath, + List<String> measurements, + List<TSDataType> tsDataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors, + boolean isAligned) { + List<MeasurementPath> measurementPathList = + executeInternalCreateTimeseriesStatement( + new InternalCreateTimeSeriesStatement( + devicePath, measurements, tsDataTypes, encodings, compressors, isAligned)); + + Set<Integer> alreadyExistingMeasurementIndexSet = + measurementPathList.stream() + .map(o -> measurements.indexOf(o.getMeasurement())) + .collect(Collectors.toSet()); + + ClusterSchemaTree schemaTree = new ClusterSchemaTree(); + schemaTree.appendMeasurementPaths(measurementPathList); + + for (int i = 0, size = measurements.size(); i < size; i++) { + if (alreadyExistingMeasurementIndexSet.contains(i)) { + continue; + } + + schemaTree.appendSingleMeasurement( + devicePath.concatNode(measurements.get(i)), + new MeasurementSchema( + measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)), + null, + null, + isAligned); + } + + return schemaTree; + } + + // auto create timeseries and return the existing timeseries info + private List<MeasurementPath> executeInternalCreateTimeseriesStatement( + InternalCreateTimeSeriesStatement statement) { + + ExecutionResult executionResult = statementExecutor.apply(statement); + + int statusCode = executionResult.status.getCode(); + if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return Collections.emptyList(); + } + + if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { + throw new RuntimeException( + new IoTDBException(executionResult.status.getMessage(), statusCode)); + } + + Set<String> failedCreationSet = new HashSet<>(); + List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>(); + for (TSStatus subStatus : executionResult.status.subStatus) { + if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { + alreadyExistingMeasurements.add( + MeasurementPath.parseDataFromString(subStatus.getMessage())); + } else { + failedCreationSet.add(subStatus.message); + } + } + + if (!failedCreationSet.isEmpty()) { + throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet))); + } + + return alreadyExistingMeasurements; + } + + void internalActivateTemplate(PartialPath devicePath) { + ExecutionResult executionResult = + statementExecutor.apply(new ActivateTemplateStatement(devicePath)); + TSStatus status = executionResult.status; + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { + throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode())); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java new file mode 100644 index 0000000000..9b0d3d7c81 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.plan.analyze.schema; + +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree; +import org.apache.iotdb.db.mpp.plan.Coordinator; +import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; +import org.apache.iotdb.db.mpp.plan.statement.Statement; +import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement; +import org.apache.iotdb.db.utils.SetThreadName; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Supplier; + +class ClusterSchemaFetchExecutor { + + private final Coordinator coordinator; + private final Supplier<Long> queryIdProvider; + private final BiFunction<Long, Statement, ExecutionResult> statementExecutor; + + ClusterSchemaFetchExecutor( + Coordinator coordinator, + Supplier<Long> queryIdProvider, + BiFunction<Long, Statement, ExecutionResult> statementExecutor) { + this.coordinator = coordinator; + this.queryIdProvider = queryIdProvider; + this.statementExecutor = statementExecutor; + } + + ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) { + long queryId = queryIdProvider.get(); + try { + ExecutionResult executionResult = statementExecutor.apply(queryId, schemaFetchStatement); + if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new RuntimeException( + String.format( + "cannot fetch schema, status is: %s, msg is: %s", + executionResult.status.getCode(), executionResult.status.getMessage())); + } + try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) { + ClusterSchemaTree result = new ClusterSchemaTree(); + Set<String> databaseSet = new HashSet<>(); + while (coordinator.getQueryExecution(queryId).hasNextResult()) { + // The query will be transited to FINISHED when invoking getBatchResult() at the last time + // So we don't need to clean up it manually + Optional<TsBlock> tsBlock; + try { + tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); + } catch (IoTDBException e) { + throw new RuntimeException("Fetch Schema failed. ", e); + } + if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { + break; + } + Column column = tsBlock.get().getColumn(0); + for (int i = 0; i < column.getPositionCount(); i++) { + parseFetchedData(column.getBinary(i), result, databaseSet); + } + } + result.setDatabases(databaseSet); + return result; + } + } finally { + coordinator.cleanupQueryExecution(queryId); + } + } + + private void parseFetchedData( + Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) { + InputStream inputStream = new ByteArrayInputStream(data.getValues()); + try { + byte type = ReadWriteIOUtils.readByte(inputStream); + if (type == 0) { + int size = ReadWriteIOUtils.readInt(inputStream); + for (int i = 0; i < size; i++) { + databaseSet.add(ReadWriteIOUtils.readString(inputStream)); + } + } else if (type == 1) { + resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream)); + } else { + throw new RuntimeException( + new MetadataException("Failed to fetch schema because of unrecognized data")); + } + } catch (IOException e) { + // Totally memory operation. This case won't happen. + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java index 67fc47d941..3f88d88825 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ClusterSchemaFetcher.java @@ -18,15 +18,11 @@ */ package org.apache.iotdb.db.mpp.plan.analyze.schema; -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.exception.IoTDBException; -import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache; import org.apache.iotdb.db.metadata.template.ClusterTemplateManager; import org.apache.iotdb.db.metadata.template.ITemplateManager; @@ -36,37 +32,22 @@ import org.apache.iotdb.db.mpp.common.schematree.DeviceSchemaInfo; import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree; import org.apache.iotdb.db.mpp.plan.Coordinator; import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher; -import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult; -import org.apache.iotdb.db.mpp.plan.statement.Statement; -import org.apache.iotdb.db.mpp.plan.statement.internal.InternalCreateTimeSeriesStatement; import org.apache.iotdb.db.mpp.plan.statement.internal.SchemaFetchStatement; -import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplateStatement; import org.apache.iotdb.db.query.control.SessionManager; -import org.apache.iotdb.db.utils.SetThreadName; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.common.block.column.Column; -import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -77,11 +58,36 @@ import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncodin public class ClusterSchemaFetcher implements ISchemaFetcher { private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - private final Coordinator coordinator = Coordinator.getInstance(); + private final DataNodeSchemaCache schemaCache = DataNodeSchemaCache.getInstance(); private final ITemplateManager templateManager = ClusterTemplateManager.getInstance(); + private final AutoCreateSchemaExecutor autoCreateSchemaExecutor = + new AutoCreateSchemaExecutor( + statement -> + coordinator.execute( + statement, + SessionManager.getInstance().requestQueryId(), + null, + "", + ClusterPartitionFetcher.getInstance(), + this, + config.getQueryTimeoutThreshold())); + private final ClusterSchemaFetchExecutor clusterSchemaFetchExecutor = + new ClusterSchemaFetchExecutor( + coordinator, + () -> SessionManager.getInstance().requestQueryId(), + (queryId, statement) -> + coordinator.execute( + statement, + queryId, + null, + "", + ClusterPartitionFetcher.getInstance(), + this, + config.getQueryTimeoutThreshold())); + private static final class ClusterSchemaFetcherHolder { private static final ClusterSchemaFetcher INSTANCE = new ClusterSchemaFetcher(); @@ -113,7 +119,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } if (withTags) { - return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags)); + return clusterSchemaFetchExecutor.executeSchemaFetchQuery( + new SchemaFetchStatement(patternTree, templateMap, withTags)); } List<PartialPath> fullPathList = new ArrayList<>(); @@ -124,7 +131,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } if (fullPathList.isEmpty()) { - return executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags)); + return clusterSchemaFetchExecutor.executeSchemaFetchQuery( + new SchemaFetchStatement(patternTree, templateMap, withTags)); } // The schema cache R/W and fetch operation must be locked together thus the cache clean @@ -148,13 +156,17 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } } if (isAllCached) { + // The entry iterating order of HashMap is to some extent decided by the putting order. + // Therefore, we must avoid merge operation on cachedSchemaTree and fetchedSchemaTree, + // since the cache state varies among DataNodes. schemaTree.setDatabases(storageGroupSet); return schemaTree; } } schemaTree = - executeSchemaFetchQuery(new SchemaFetchStatement(patternTree, templateMap, withTags)); + clusterSchemaFetchExecutor.executeSchemaFetchQuery( + new SchemaFetchStatement(patternTree, templateMap, withTags)); // only cache the schema fetched by full path List<MeasurementPath> measurementPathList; @@ -172,73 +184,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } } - private ClusterSchemaTree executeSchemaFetchQuery(SchemaFetchStatement schemaFetchStatement) { - long queryId = SessionManager.getInstance().requestQueryId(); - try { - ExecutionResult executionResult = - coordinator.execute( - schemaFetchStatement, - queryId, - null, - "", - ClusterPartitionFetcher.getInstance(), - this, - config.getQueryTimeoutThreshold()); - if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - throw new RuntimeException( - String.format( - "cannot fetch schema, status is: %s, msg is: %s", - executionResult.status.getCode(), executionResult.status.getMessage())); - } - try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) { - ClusterSchemaTree result = new ClusterSchemaTree(); - Set<String> databaseSet = new HashSet<>(); - while (coordinator.getQueryExecution(queryId).hasNextResult()) { - // The query will be transited to FINISHED when invoking getBatchResult() at the last time - // So we don't need to clean up it manually - Optional<TsBlock> tsBlock; - try { - tsBlock = coordinator.getQueryExecution(queryId).getBatchResult(); - } catch (IoTDBException e) { - throw new RuntimeException("Fetch Schema failed. ", e); - } - if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) { - break; - } - Column column = tsBlock.get().getColumn(0); - for (int i = 0; i < column.getPositionCount(); i++) { - parseFetchedData(column.getBinary(i), result, databaseSet); - } - } - result.setDatabases(databaseSet); - return result; - } - } finally { - coordinator.cleanupQueryExecution(queryId); - } - } - - private void parseFetchedData( - Binary data, ClusterSchemaTree resultSchemaTree, Set<String> databaseSet) { - InputStream inputStream = new ByteArrayInputStream(data.getValues()); - try { - byte type = ReadWriteIOUtils.readByte(inputStream); - if (type == 0) { - int size = ReadWriteIOUtils.readInt(inputStream); - for (int i = 0; i < size; i++) { - databaseSet.add(ReadWriteIOUtils.readString(inputStream)); - } - } else if (type == 1) { - resultSchemaTree.mergeSchemaTree(ClusterSchemaTree.deserialize(inputStream)); - } else { - throw new RuntimeException( - new MetadataException("Failed to fetch schema because of unrecognized data")); - } - } catch (IOException e) { - // Totally memory operation. This case won't happen. - } - } - @Override public ISchemaTree fetchSchemaWithAutoCreate( PartialPath devicePath, @@ -420,7 +365,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { } if (shouldActivateTemplate) { - internalActivateTemplate(devicePath); + autoCreateSchemaExecutor.internalActivateTemplate(devicePath); List<Integer> recheckedIndexOfMissingMeasurements = new ArrayList<>(); for (int i = 0; i < indexOfMissingMeasurements.size(); i++) { if (!template.hasSchema(measurements[i])) { @@ -470,7 +415,7 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { if (!missingMeasurements.isEmpty()) { schemaTree.mergeSchemaTree( - internalCreateTimeseries( + autoCreateSchemaExecutor.internalCreateTimeseries( devicePath, missingMeasurements, dataTypesOfMissingMeasurement, @@ -499,100 +444,6 @@ public class ClusterSchemaFetcher implements ISchemaFetcher { return indexOfMissingMeasurements; } - // try to create the target timeseries and return schemaTree involving successfully created - // timeseries and existing timeseries - private ClusterSchemaTree internalCreateTimeseries( - PartialPath devicePath, - List<String> measurements, - List<TSDataType> tsDataTypes, - List<TSEncoding> encodings, - List<CompressionType> compressors, - boolean isAligned) { - List<MeasurementPath> measurementPathList = - executeInternalCreateTimeseriesStatement( - new InternalCreateTimeSeriesStatement( - devicePath, measurements, tsDataTypes, encodings, compressors, isAligned)); - - Set<Integer> alreadyExistingMeasurementIndexSet = - measurementPathList.stream() - .map(o -> measurements.indexOf(o.getMeasurement())) - .collect(Collectors.toSet()); - - ClusterSchemaTree schemaTree = new ClusterSchemaTree(); - schemaTree.appendMeasurementPaths(measurementPathList); - - for (int i = 0, size = measurements.size(); i < size; i++) { - if (alreadyExistingMeasurementIndexSet.contains(i)) { - continue; - } - - schemaTree.appendSingleMeasurement( - devicePath.concatNode(measurements.get(i)), - new MeasurementSchema( - measurements.get(i), tsDataTypes.get(i), encodings.get(i), compressors.get(i)), - null, - null, - isAligned); - } - - return schemaTree; - } - - // auto create timeseries and return the existing timeseries info - private List<MeasurementPath> executeInternalCreateTimeseriesStatement( - InternalCreateTimeSeriesStatement statement) { - - ExecutionResult executionResult = executeStatement(statement); - - int statusCode = executionResult.status.getCode(); - if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return Collections.emptyList(); - } - - if (statusCode != TSStatusCode.MULTIPLE_ERROR.getStatusCode()) { - throw new RuntimeException( - new IoTDBException(executionResult.status.getMessage(), statusCode)); - } - - Set<String> failedCreationSet = new HashSet<>(); - List<MeasurementPath> alreadyExistingMeasurements = new ArrayList<>(); - for (TSStatus subStatus : executionResult.status.subStatus) { - if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { - alreadyExistingMeasurements.add( - MeasurementPath.parseDataFromString(subStatus.getMessage())); - } else { - failedCreationSet.add(subStatus.message); - } - } - - if (!failedCreationSet.isEmpty()) { - throw new SemanticException(new MetadataException(String.join("; ", failedCreationSet))); - } - - return alreadyExistingMeasurements; - } - - public void internalActivateTemplate(PartialPath devicePath) { - ExecutionResult executionResult = executeStatement(new ActivateTemplateStatement(devicePath)); - TSStatus status = executionResult.status; - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) { - throw new RuntimeException(new IoTDBException(status.getMessage(), status.getCode())); - } - } - - private ExecutionResult executeStatement(Statement statement) { - long queryId = SessionManager.getInstance().requestQueryId(); - return coordinator.execute( - statement, - queryId, - null, - "", - ClusterPartitionFetcher.getInstance(), - this, - config.getQueryTimeoutThreshold()); - } - @Override public void invalidAllCache() { DataNodeSchemaCache.getInstance().cleanUp();
