This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch calc_commons in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 02bc3f281612eca75ed5bbf5cfac6918f56ae44f Author: shuwenwei <[email protected]> AuthorDate: Mon Apr 20 10:09:41 2026 +0800 move TemporaryQueryDataFileService --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 2 + .../aggregation/TableModeAccumulator.java | 4 +- .../grouped/GroupedModeAccumulator.java | 4 +- .../relational/ColumnTransformerBuilder.java | 6 +- .../datastructure/SerializableList.java | 4 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +-- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 +- .../AbstractTemporaryQueryDataFileService.java} | 64 ++++++------- .../ITemporaryQueryDataFileServiceProvider.java | 25 +++++ .../transformation/dag/udf/UDTFContext.java | 4 +- .../java/org/apache/iotdb/db/service/DataNode.java | 3 +- ...aNodeTemporaryQueryDataFileServiceProvider.java | 32 +++++++ .../db/service/TemporaryQueryDataFileService.java | 105 +-------------------- ....service.ITemporaryQueryDataFileServiceProvider | 1 + .../apache/iotdb/commons/conf/CommonConfig.java | 20 ++++ 15 files changed, 135 insertions(+), 159 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 59b318a4b11..3f92340d431 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.confignode.manager.load.balancer.RegionBalancer; import org.apache.iotdb.confignode.manager.load.balancer.router.leader.AbstractLeaderBalancer; @@ -377,6 +378,7 @@ public class ConfigNodeConfig { public void setConfigNodeId(int configNodeId) { this.configNodeId = configNodeId; + CommonDescriptor.getInstance().getConfig().setNodeId(configNodeId); } public String getInternalAddress() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/operator/source/relational/aggregation/TableModeAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/operator/source/relational/aggregation/TableModeAccumulator.java index 01c3609d158..59e240c9bfe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/operator/source/relational/aggregation/TableModeAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/operator/source/relational/aggregation/TableModeAccumulator.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.calc_commons.execution.operator.source.relational.aggregation; -import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; @@ -44,7 +44,7 @@ import static org.apache.tsfile.utils.BytesUtils.bytesToLongFromOffset; public class TableModeAccumulator implements TableAccumulator { private final int MAP_SIZE_THRESHOLD = - IoTDBDescriptor.getInstance().getConfig().getModeMapSizeThreshold(); + CommonDescriptor.getInstance().getConfig().getModeMapSizeThreshold(); private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableModeAccumulator.class); private final TSDataType seriesDataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/operator/source/relational/aggregation/grouped/GroupedModeAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/operator/source/relational/aggregation/grouped/GroupedModeAccumulator.java index 6697117e3bc..0bdc0755dbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/operator/source/relational/aggregation/grouped/GroupedModeAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/operator/source/relational/aggregation/grouped/GroupedModeAccumulator.java @@ -19,10 +19,10 @@ package org.apache.iotdb.db.calc_commons.execution.operator.source.relational.aggregation.grouped; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.db.calc_commons.execution.operator.source.relational.aggregation.AggregationMask; import org.apache.iotdb.db.calc_commons.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; import org.apache.iotdb.db.calc_commons.execution.operator.source.relational.aggregation.grouped.array.MapBigArray; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; @@ -48,7 +48,7 @@ import static org.apache.tsfile.utils.TsPrimitiveType.getByType; public class GroupedModeAccumulator implements GroupedAccumulator { private final int MAP_SIZE_THRESHOLD = - IoTDBDescriptor.getInstance().getConfig().getModeMapSizeThreshold(); + CommonDescriptor.getInstance().getConfig().getModeMapSizeThreshold(); private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(GroupedModeAccumulator.class); private final TSDataType seriesDataType; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/relational/ColumnTransformerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/relational/ColumnTransformerBuilder.java index 0f287136560..5dfba318449 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/relational/ColumnTransformerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/execution/relational/ColumnTransformerBuilder.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.calc_commons.execution.relational; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer; import org.apache.iotdb.db.calc_commons.plan.relational.metadata.ITypeMetadata; @@ -146,7 +147,6 @@ import org.apache.iotdb.db.calc_commons.transformation.dag.column.unary.scalar.T import org.apache.iotdb.db.calc_commons.transformation.dag.column.unary.scalar.UpperColumnTransformer; import org.apache.iotdb.db.calc_commons.transformation.dag.column.unary.scalar.factory.CodecStrategiesFactory; import org.apache.iotdb.db.calc_commons.transformation.dag.column.unary.scalar.factory.NumericCodecStrategiesFactory; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.node_commons.common.SessionInfo; import org.apache.iotdb.db.node_commons.plan.analyze.ITableTypeProvider; @@ -1493,8 +1493,8 @@ public class ColumnTransformerBuilder } throw new IllegalArgumentException( String.format( - "Unknown function %s on DataNode: %d.", - functionName, IoTDBDescriptor.getInstance().getConfig().getDataNodeId())); + "Unknown function %s on Node: %d.", + functionName, CommonDescriptor.getInstance().getConfig().getNodeId())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/transformation/datastructure/SerializableList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/transformation/datastructure/SerializableList.java index f957d9a0c53..7f43cdf29ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/transformation/datastructure/SerializableList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/calc_commons/transformation/datastructure/SerializableList.java @@ -21,7 +21,7 @@ package org.apache.iotdb.db.calc_commons.transformation.datastructure; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.file.SystemFileFactory; -import org.apache.iotdb.db.service.TemporaryQueryDataFileService; +import org.apache.iotdb.db.node_commons.service.AbstractTemporaryQueryDataFileService; import org.apache.tsfile.utils.PublicBAOS; @@ -129,7 +129,7 @@ public interface SerializableList { public RandomAccessFile getFile() throws IOException { if (file == null) { if (fileName == null) { - fileName = TemporaryQueryDataFileService.getInstance().register(this); + fileName = AbstractTemporaryQueryDataFileService.getInstance().register(this); } file = new RandomAccessFile(SystemFileFactory.INSTANCE.getFile(fileName), "rw"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 6d6556987be..d147e39c659 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -340,8 +340,6 @@ public class IoTDBConfig { private int mergeThresholdOfExplainAnalyze = 10; - private int modeMapSizeThreshold = 10000; - /** How many queries can be concurrently executed. When <= 0, use 1000. */ private int maxAllowedConcurrentQueries = 1000; @@ -3241,6 +3239,7 @@ public class IoTDBConfig { public void setDataNodeId(int dataNodeId) { this.dataNodeId = dataNodeId; + CommonDescriptor.getInstance().getConfig().setNodeId(dataNodeId); } public int getPartitionCacheSize() { @@ -3864,14 +3863,6 @@ public class IoTDBConfig { this.candidateCompactionTaskQueueSize = candidateCompactionTaskQueueSize; } - public void setModeMapSizeThreshold(int modeMapSizeThreshold) { - this.modeMapSizeThreshold = modeMapSizeThreshold; - } - - public int getModeMapSizeThreshold() { - return modeMapSizeThreshold; - } - public double getMaxAllocateMemoryRatioForLoad() { return maxAllocateMemoryRatioForLoad; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 966a5bc9934..e65b96d29ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -538,13 +538,14 @@ public class IoTDBDescriptor { "merge_threshold_of_explain_analyze", Integer.toString(conf.getMergeThresholdOfExplainAnalyze())))); - conf.setModeMapSizeThreshold( + commonConfig.setModeMapSizeThreshold( Integer.parseInt( properties.getProperty( - "mode_map_size_threshold", Integer.toString(conf.getModeMapSizeThreshold())))); + "mode_map_size_threshold", + Integer.toString(commonConfig.getModeMapSizeThreshold())))); - if (conf.getModeMapSizeThreshold() <= 0) { - conf.setModeMapSizeThreshold(10000); + if (commonConfig.getModeMapSizeThreshold() <= 0) { + commonConfig.setModeMapSizeThreshold(10000); } conf.setMaxAllowedConcurrentQueries( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/node_commons/service/AbstractTemporaryQueryDataFileService.java similarity index 69% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/node_commons/service/AbstractTemporaryQueryDataFileService.java index 1f789468ad7..9776a5bdeea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/node_commons/service/AbstractTemporaryQueryDataFileService.java @@ -17,14 +17,13 @@ * under the License. */ -package org.apache.iotdb.db.service; +package org.apache.iotdb.db.node_commons.service; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.db.calc_commons.transformation.datastructure.SerializableList.SerializationRecorder; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.tsfile.external.commons.io.FileUtils; import org.slf4j.Logger; @@ -36,28 +35,41 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -public class TemporaryQueryDataFileService implements IService { +public abstract class AbstractTemporaryQueryDataFileService implements IService { - private static final Logger logger = LoggerFactory.getLogger(TemporaryQueryDataFileService.class); + private static final Logger logger = + LoggerFactory.getLogger(AbstractTemporaryQueryDataFileService.class); + private static final AbstractTemporaryQueryDataFileService INSTANCE = loadService(); - private static final String TEMPORARY_FILE_DIR = - IoTDBDescriptor.getInstance().getConfig().getSystemDir() - + File.separator - + "udf" - + File.separator - + "tmp"; + private final AtomicLong uniqueDataId = new AtomicLong(0); + private final Map<String, List<SerializationRecorder>> recorders = new ConcurrentHashMap<>(); - private final AtomicLong uniqueDataId; - private final Map<String, List<SerializationRecorder>> recorders; + public static AbstractTemporaryQueryDataFileService getInstance() { + return INSTANCE; + } - private TemporaryQueryDataFileService() { - uniqueDataId = new AtomicLong(0); - recorders = new ConcurrentHashMap<>(); + private static AbstractTemporaryQueryDataFileService loadService() { + AbstractTemporaryQueryDataFileService service = null; + ServiceLoader<ITemporaryQueryDataFileServiceProvider> loader = + ServiceLoader.load(ITemporaryQueryDataFileServiceProvider.class); + for (ITemporaryQueryDataFileServiceProvider provider : loader) { + if (service != null) { + throw new IllegalStateException("Multiple ITemporaryQueryDataFileServiceProvider found"); + } + service = provider.getService(); + } + if (service == null) { + throw new IllegalStateException("No ITemporaryQueryDataFileServiceProvider found"); + } + return service; } + protected abstract String getTemporaryFileDir(); + public String register(SerializationRecorder recorder) throws IOException { String queryId = recorder.getQueryId(); recorders @@ -99,7 +111,7 @@ public class TemporaryQueryDataFileService implements IService { } private String getDirName(String queryId) { - return TEMPORARY_FILE_DIR + File.separator + queryId + File.separator; + return getTemporaryFileDir() + File.separator + queryId + File.separator; } private String getFileName(String dir, long index) { @@ -110,11 +122,11 @@ public class TemporaryQueryDataFileService implements IService { public void start() throws StartupException { try { // Clean up stale temp directories left from previous runs (e.g., after a crash) - File tmpDir = SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR); + File tmpDir = SystemFileFactory.INSTANCE.getFile(getTemporaryFileDir()); if (tmpDir.exists()) { FileUtils.deleteDirectory(tmpDir); } - makeDirIfNecessary(TEMPORARY_FILE_DIR); + makeDirIfNecessary(getTemporaryFileDir()); } catch (IOException e) { throw new StartupException(e); } @@ -124,9 +136,9 @@ public class TemporaryQueryDataFileService implements IService { public void stop() { recorders.clear(); try { - FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR)); + FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(getTemporaryFileDir())); } catch (IOException e) { - logger.warn("Failed to delete temp dir {}.", TEMPORARY_FILE_DIR, e); + logger.warn("Failed to delete temp dir {}.", getTemporaryFileDir(), e); } } @@ -134,16 +146,4 @@ public class TemporaryQueryDataFileService implements IService { public ServiceType getID() { return ServiceType.TEMPORARY_QUERY_DATA_FILE_SERVICE; } - - public static TemporaryQueryDataFileService getInstance() { - return TemporaryQueryDataFileServiceHelper.INSTANCE; - } - - private static class TemporaryQueryDataFileServiceHelper { - - private static final TemporaryQueryDataFileService INSTANCE = - new TemporaryQueryDataFileService(); - - private TemporaryQueryDataFileServiceHelper() {} - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/node_commons/service/ITemporaryQueryDataFileServiceProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/node_commons/service/ITemporaryQueryDataFileServiceProvider.java new file mode 100644 index 00000000000..204825a5573 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/node_commons/service/ITemporaryQueryDataFileServiceProvider.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.node_commons.service; + +public interface ITemporaryQueryDataFileServiceProvider { + + AbstractTemporaryQueryDataFileService getService(); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFContext.java index 8c37045c124..5cd7c56a8b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFContext.java @@ -21,9 +21,9 @@ package org.apache.iotdb.db.queryengine.transformation.dag.udf; import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager; import org.apache.iotdb.db.calc_commons.transformation.dag.udf.UDTFExecutor; +import org.apache.iotdb.db.node_commons.service.AbstractTemporaryQueryDataFileService; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; -import org.apache.iotdb.db.service.TemporaryQueryDataFileService; import java.time.ZoneId; import java.util.HashMap; @@ -53,7 +53,7 @@ public class UDTFContext { } finally { UDFClassLoaderManager.getInstance().finalizeUDFQuery(queryId); // close and delete UDF temp files - TemporaryQueryDataFileService.getInstance().deregister(queryId); + AbstractTemporaryQueryDataFileService.getInstance().deregister(queryId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index f042e56f45b..ae506274aca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -78,6 +78,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.IoTDBStartCheck; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl; +import org.apache.iotdb.db.node_commons.service.AbstractTemporaryQueryDataFileService; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; @@ -995,7 +996,7 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { } private void registerUdfServices() throws StartupException { - registerManager.register(TemporaryQueryDataFileService.getInstance()); + registerManager.register(AbstractTemporaryQueryDataFileService.getInstance()); registerManager.register(UDFClassLoaderManager.setupAndGetInstance(config.getUdfDir())); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeTemporaryQueryDataFileServiceProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeTemporaryQueryDataFileServiceProvider.java new file mode 100644 index 00000000000..ce4f49be03a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeTemporaryQueryDataFileServiceProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.service; + +import org.apache.iotdb.db.node_commons.service.AbstractTemporaryQueryDataFileService; +import org.apache.iotdb.db.node_commons.service.ITemporaryQueryDataFileServiceProvider; + +public class DataNodeTemporaryQueryDataFileServiceProvider + implements ITemporaryQueryDataFileServiceProvider { + + @Override + public AbstractTemporaryQueryDataFileService getService() { + return TemporaryQueryDataFileService.getInstance(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java index 1f789468ad7..d9e05143a89 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java @@ -19,29 +19,12 @@ package org.apache.iotdb.db.service; -import org.apache.iotdb.commons.exception.StartupException; -import org.apache.iotdb.commons.file.SystemFileFactory; -import org.apache.iotdb.commons.service.IService; -import org.apache.iotdb.commons.service.ServiceType; -import org.apache.iotdb.db.calc_commons.transformation.datastructure.SerializableList.SerializationRecorder; import org.apache.iotdb.db.conf.IoTDBDescriptor; - -import org.apache.tsfile.external.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.iotdb.db.node_commons.service.AbstractTemporaryQueryDataFileService; import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -public class TemporaryQueryDataFileService implements IService { - private static final Logger logger = LoggerFactory.getLogger(TemporaryQueryDataFileService.class); +public class TemporaryQueryDataFileService extends AbstractTemporaryQueryDataFileService { private static final String TEMPORARY_FILE_DIR = IoTDBDescriptor.getInstance().getConfig().getSystemDir() @@ -50,89 +33,9 @@ public class TemporaryQueryDataFileService implements IService { + File.separator + "tmp"; - private final AtomicLong uniqueDataId; - private final Map<String, List<SerializationRecorder>> recorders; - - private TemporaryQueryDataFileService() { - uniqueDataId = new AtomicLong(0); - recorders = new ConcurrentHashMap<>(); - } - - public String register(SerializationRecorder recorder) throws IOException { - String queryId = recorder.getQueryId(); - recorders - .computeIfAbsent(queryId, k -> Collections.synchronizedList(new ArrayList<>())) - .add(recorder); - - String dirName = getDirName(queryId); - makeDirIfNecessary(dirName); - return getFileName(dirName, uniqueDataId.getAndIncrement()); - } - - public void deregister(String queryId) { - List<SerializationRecorder> recorderList = recorders.remove(queryId); - if (recorderList == null) { - return; - } - for (SerializationRecorder recorder : recorderList) { - try { - recorder.closeFile(); - } catch (IOException e) { - logger.warn( - String.format("Failed to close file in method deregister(%s), because %s", queryId, e)); - } - } - try { - FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(getDirName(queryId))); - } catch (IOException e) { - logger.warn( - String.format("Failed to clean dir in method deregister(%s), because %s", queryId, e)); - } - } - - private void makeDirIfNecessary(String dir) throws IOException { - File file = SystemFileFactory.INSTANCE.getFile(dir); - if (file.exists() && file.isDirectory()) { - return; - } - FileUtils.forceMkdir(file); - } - - private String getDirName(String queryId) { - return TEMPORARY_FILE_DIR + File.separator + queryId + File.separator; - } - - private String getFileName(String dir, long index) { - return dir + index; - } - - @Override - public void start() throws StartupException { - try { - // Clean up stale temp directories left from previous runs (e.g., after a crash) - File tmpDir = SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR); - if (tmpDir.exists()) { - FileUtils.deleteDirectory(tmpDir); - } - makeDirIfNecessary(TEMPORARY_FILE_DIR); - } catch (IOException e) { - throw new StartupException(e); - } - } - - @Override - public void stop() { - recorders.clear(); - try { - FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR)); - } catch (IOException e) { - logger.warn("Failed to delete temp dir {}.", TEMPORARY_FILE_DIR, e); - } - } - @Override - public ServiceType getID() { - return ServiceType.TEMPORARY_QUERY_DATA_FILE_SERVICE; + protected String getTemporaryFileDir() { + return TEMPORARY_FILE_DIR; } public static TemporaryQueryDataFileService getInstance() { diff --git a/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.node_commons.service.ITemporaryQueryDataFileServiceProvider b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.node_commons.service.ITemporaryQueryDataFileServiceProvider new file mode 100644 index 00000000000..dbe0c7ff4d8 --- /dev/null +++ b/iotdb-core/datanode/src/main/resources/META-INF/services/org.apache.iotdb.db.node_commons.service.ITemporaryQueryDataFileServiceProvider @@ -0,0 +1 @@ +org.apache.iotdb.db.service.DataNodeTemporaryQueryDataFileServiceProvider diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 222dd1fce59..7af2e2738e0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -501,6 +501,10 @@ public class CommonConfig { /** Maximum execution time of a DriverTask */ private int driverTaskExecutionTimeSliceInMs = 200; + private int modeMapSizeThreshold = 10000; + + private int nodeId = -1; + CommonConfig() { // Empty constructor } @@ -2893,4 +2897,20 @@ public class CommonConfig { public void setDriverTaskExecutionTimeSliceInMs(int driverTaskExecutionTimeSliceInMs) { this.driverTaskExecutionTimeSliceInMs = driverTaskExecutionTimeSliceInMs; } + + public void setModeMapSizeThreshold(int modeMapSizeThreshold) { + this.modeMapSizeThreshold = modeMapSizeThreshold; + } + + public int getModeMapSizeThreshold() { + return modeMapSizeThreshold; + } + + public void setNodeId(int nodeId) { + this.nodeId = nodeId; + } + + public int getNodeId() { + return nodeId; + } }
