This is an automated email from the ASF dual-hosted git repository. yschengzi pushed a commit to branch revert-11756-load_memory_framework in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 872a3e8530921ca74aeb56e62bc689c03d9cbe8c Author: yschengzi <[email protected]> AuthorDate: Fri Dec 29 19:47:44 2023 +0800 Revert "[IOTDB-6273] Load: Memory Management Framework (#11756)" This reverts commit afde755d4f7d68013282e8e83701b4e20ada9e48. --- .../it/env/cluster/config/MppDataNodeConfig.java | 9 - .../it/env/remote/config/RemoteDataNodeConfig.java | 6 - .../apache/iotdb/itbase/env/DataNodeConfig.java | 3 - .../org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java | 4 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 57 +---- .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +- .../exception/LoadRuntimeOutOfMemoryException.java | 26 -- .../db/queryengine/execution/load/ChunkData.java | 3 - .../queryengine/execution/load/DeletionData.java | 17 +- .../execution/load/LoadTsFileManager.java | 51 +--- .../db/queryengine/execution/load/TsFileData.java | 3 + .../load/LoadTsFileAbstractMemoryBlock.java | 66 ----- .../load/LoadTsFileAnalyzeSchemaMemoryBlock.java | 98 ------- .../load/LoadTsFileDataCacheMemoryBlock.java | 147 ----------- .../queryengine/load/LoadTsFileMemoryManager.java | 149 ----------- .../queryengine/metric/LoadTsFileMemMetricSet.java | 101 -------- .../plan/analyze/LoadTsfileAnalyzer.java | 284 ++++----------------- .../plan/planner/LocalExecutionPlanner.java | 33 +-- .../plan/scheduler/load/LoadTsFileScheduler.java | 34 +-- .../db/service/metrics/DataNodeMetricsHelper.java | 4 - .../dataregion/modification/Deletion.java | 4 - .../dataregion/modification/ModificationFile.java | 19 -- .../io/LocalTextModificationAccessor.java | 9 +- .../modification/io/ModificationWriter.java | 9 - .../iotdb/commons/service/metric/enums/Metric.java | 3 +- ...leSequenceReaderTimeseriesMetadataIterator.java | 42 +-- ...quenceReaderTimeseriesMetadataIteratorTest.java | 8 +- 27 files changed, 103 insertions(+), 1097 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 0f22e0d4286..c188ad388c4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -67,13 +67,4 @@ public class MppDataNodeConfig extends MppBaseConfig implements DataNodeConfig { properties.setProperty("dn_connection_timeout_ms", String.valueOf(connectionTimeoutInMS)); return this; } - - @Override - public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( - long loadTsFileAnalyzeSchemaMemorySizeInBytes) { - properties.setProperty( - "load_tsfile_analyze_schema_memory_size_in_bytes", - String.valueOf(loadTsFileAnalyzeSchemaMemorySizeInBytes)); - return this; - } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index fe89997bc41..19f0ab36132 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -37,10 +37,4 @@ public class RemoteDataNodeConfig implements DataNodeConfig { public DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS) { return this; } - - @Override - public DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( - long loadTsFileAnalyzeSchemaMemorySizeInBytes) { - return this; - } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index 2887b0a9871..db39fa62b42 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -28,7 +28,4 @@ public interface DataNodeConfig { DataNodeConfig setEnableRestService(boolean enableRestService); DataNodeConfig setConnectionTimeoutInMS(int connectionTimeoutInMS); - - DataNodeConfig setLoadTsFileAnalyzeSchemaMemorySizeInBytes( - long loadTsFileAnalyzeSchemaMemorySizeInBytes); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java index d887bc5a53b..b449afc553b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IOTDBLoadTsFileIT.java @@ -65,7 +65,6 @@ public class IOTDBLoadTsFileIT { private static final Logger LOGGER = LoggerFactory.getLogger(IOTDBLoadTsFileIT.class); private static final long PARTITION_INTERVAL = 10 * 1000L; private static final int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(300); - private static final long loadTsFileAnalyzeSchemaMemorySizeInBytes = 10 * 1024L; private File tmpDir; @@ -76,8 +75,7 @@ public class IOTDBLoadTsFileIT { EnvFactory.getEnv() .getConfig() .getDataNodeConfig() - .setConnectionTimeoutInMS(connectionTimeoutInMS) - .setLoadTsFileAnalyzeSchemaMemorySizeInBytes(loadTsFileAnalyzeSchemaMemorySizeInBytes); + .setConnectionTimeoutInMS(connectionTimeoutInMS); EnvFactory.getEnv().initClusterEnvironment(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index a12192cf29c..5cc59bd4837 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -174,6 +174,8 @@ public class IoTDBConfig { /** The proportion of write memory for loading TsFile */ private double loadTsFileProportion = 0.125; + private int maxLoadingTimeseriesNumber = 2000; + /** * If memory cost of data region increased more than proportion of {@linkplain * IoTDBConfig#getAllocateMemoryForStorageEngine()}*{@linkplain @@ -233,7 +235,6 @@ public class IoTDBConfig { /** The period when outdated wal files are periodically deleted. Unit: millisecond */ private volatile long deleteWalFilesPeriodInMs = 20 * 1000L; - // endregion /** @@ -542,7 +543,6 @@ public class IoTDBConfig { * tasks containing mods files are selected first. */ private long innerCompactionTaskSelectionModsFileThreshold = 10 * 1024 * 1024L; - /** * When disk availability is lower than the sum of (disk_space_warning_threshold + * inner_compaction_task_selection_disk_redundancy), inner compaction tasks containing mods files @@ -968,7 +968,6 @@ public class IoTDBConfig { /** Number of queues per forwarding trigger */ private int triggerForwardMaxQueueNumber = 8; - /** The length of one of the queues per forwarding trigger */ private int triggerForwardMaxSizePerQueue = 2000; @@ -1094,15 +1093,6 @@ public class IoTDBConfig { private int maxPendingBatchesNum = 5; private double maxMemoryRatioForQueue = 0.6; - /** Load related */ - private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096; - - private long loadTsFileAnalyzeSchemaMemorySizeInBytes = - 0; // 0 means that the decision will be adaptive based on the number of sequences - - private long loadMemoryAllocateRetryIntervalMs = 1000; - private int loadMemoryAllocateMaxRetries = 5; - /** Pipe related */ /** initialized as empty, updated based on the latest `systemDir` during querying */ private String[] pipeReceiverFileDirs = new String[0]; @@ -3283,6 +3273,14 @@ public class IoTDBConfig { return loadTsFileProportion; } + public int getMaxLoadingTimeseriesNumber() { + return maxLoadingTimeseriesNumber; + } + + public void setMaxLoadingTimeseriesNumber(int maxLoadingTimeseriesNumber) { + this.maxLoadingTimeseriesNumber = maxLoadingTimeseriesNumber; + } + public static String getEnvironmentVariables() { return "\n\t" + IoTDBConstant.IOTDB_HOME @@ -3737,41 +3735,6 @@ public class IoTDBConfig { return modeMapSizeThreshold; } - public int getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber() { - return loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber; - } - - public void setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber( - int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber) { - this.loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = - loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber; - } - - public long getLoadTsFileAnalyzeSchemaMemorySizeInBytes() { - return loadTsFileAnalyzeSchemaMemorySizeInBytes; - } - - public void setLoadTsFileAnalyzeSchemaMemorySizeInBytes( - long loadTsFileAnalyzeSchemaMemorySizeInBytes) { - this.loadTsFileAnalyzeSchemaMemorySizeInBytes = loadTsFileAnalyzeSchemaMemorySizeInBytes; - } - - public long getLoadMemoryAllocateRetryIntervalMs() { - return loadMemoryAllocateRetryIntervalMs; - } - - public void setLoadMemoryAllocateRetryIntervalMs(long loadMemoryAllocateRetryIntervalMs) { - this.loadMemoryAllocateRetryIntervalMs = loadMemoryAllocateRetryIntervalMs; - } - - public int getLoadMemoryAllocateMaxRetries() { - return loadMemoryAllocateMaxRetries; - } - - public void setLoadMemoryAllocateMaxRetries(int loadMemoryAllocateMaxRetries) { - this.loadMemoryAllocateMaxRetries = loadMemoryAllocateMaxRetries; - } - public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) { this.pipeReceiverFileDirs = pipeReceiverFileDirs; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 494e9d5d1c4..6a8eb822521 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -893,16 +893,11 @@ public class IoTDBDescriptor { conf.setIntoOperationExecutionThreadCount(2); } - conf.setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber( + conf.setMaxLoadingTimeseriesNumber( Integer.parseInt( properties.getProperty( - "load_tsfile_analyze_schema_batch_flush_time_series_number", - String.valueOf(conf.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber())))); - conf.setLoadTsFileAnalyzeSchemaMemorySizeInBytes( - Long.parseLong( - properties.getProperty( - "load_tsfile_analyze_schema_memory_size_in_bytes", - String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes())))); + "max_loading_timeseries_number", + String.valueOf(conf.getMaxLoadingTimeseriesNumber())))); conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java deleted file mode 100644 index 050274e9940..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/LoadRuntimeOutOfMemoryException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.exception; - -public class LoadRuntimeOutOfMemoryException extends RuntimeException { - public LoadRuntimeOutOfMemoryException(String message) { - super(message); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java index fde207266e9..a30a62e2b16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java @@ -25,7 +25,6 @@ import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.IOException; import java.io.InputStream; @@ -46,8 +45,6 @@ public interface ChunkData extends TsFileData { void writeDecodePage(long[] times, Object[] values, int satisfiedLength) throws IOException; - void writeToFileWriter(TsFileIOWriter writer) throws IOException; - @Override default boolean isModification() { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java index 18bb50c9c6c..3f486af4adb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/DeletionData.java @@ -23,9 +23,11 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -38,13 +40,18 @@ public class DeletionData implements TsFileData { @Override public long getDataSize() { - return deletion.getSerializedSize(); + return Long.BYTES; } - public void writeToModificationFile(ModificationFile modificationFile, long fileOffset) - throws IOException { - deletion.setFileOffset(fileOffset); - modificationFile.writeWithoutSync(deletion); + @Override + public void writeToFileWriter(TsFileIOWriter writer) throws IOException { + File tsFile = writer.getFile(); + try (ModificationFile modificationFile = + new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) { + writer.flush(); + deletion.setFileOffset(tsFile.length()); + modificationFile.write(deletion); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java index 8b8eb97946c..7c92b327a39 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.metrics.utils.MetricLevel; @@ -133,7 +132,7 @@ public class LoadTsFileManager { writerManager.write( new DataPartitionInfo(dataRegion, chunkData.getTimePartitionSlot()), chunkData); } else { - writerManager.writeDeletion(dataRegion, (DeletionData) tsFileData); + writerManager.writeDeletion(tsFileData); } } } @@ -199,14 +198,12 @@ public class LoadTsFileManager { private final File taskDir; private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer; private Map<DataPartitionInfo, String> dataPartition2LastDevice; - private Map<DataPartitionInfo, ModificationFile> dataPartition2ModificationFile; private boolean isClosed; private TsFileWriterManager(File taskDir) { this.taskDir = taskDir; this.dataPartition2Writer = new HashMap<>(); this.dataPartition2LastDevice = new HashMap<>(); - this.dataPartition2ModificationFile = new HashMap<>(); this.isClosed = false; clearDir(taskDir); @@ -248,32 +245,12 @@ public class LoadTsFileManager { chunkData.writeToFileWriter(writer); } - private void writeDeletion(DataRegion dataRegion, DeletionData deletionData) - throws IOException { + private void writeDeletion(TsFileData deletionData) throws IOException { if (isClosed) { throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir)); } for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) { - final DataPartitionInfo partitionInfo = entry.getKey(); - if (partitionInfo.getDataRegion().equals(dataRegion)) { - final TsFileIOWriter writer = entry.getValue(); - if (!dataPartition2ModificationFile.containsKey(partitionInfo)) { - File newModificationFile = - SystemFileFactory.INSTANCE.getFile( - writer.getFile().getAbsolutePath() + ModificationFile.FILE_SUFFIX); - if (!newModificationFile.createNewFile()) { - LOGGER.error( - "Can not create ModificationFile {} for writing.", newModificationFile.getPath()); - return; - } - - dataPartition2ModificationFile.put( - partitionInfo, new ModificationFile(newModificationFile.getAbsolutePath())); - } - ModificationFile modificationFile = dataPartition2ModificationFile.get(partitionInfo); - writer.flush(); - deletionData.writeToModificationFile(modificationFile, writer.getFile().length()); - } + deletionData.writeToFileWriter(entry.getValue()); } } @@ -281,10 +258,6 @@ public class LoadTsFileManager { if (isClosed) { throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir)); } - for (Map.Entry<DataPartitionInfo, ModificationFile> entry : - dataPartition2ModificationFile.entrySet()) { - entry.getValue().close(); - } for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) { TsFileIOWriter writer = entry.getValue(); if (writer.isWritingChunkGroup()) { @@ -329,7 +302,7 @@ public class LoadTsFileManager { if (dataPartition2Writer != null) { for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : dataPartition2Writer.entrySet()) { try { - final TsFileIOWriter writer = entry.getValue(); + TsFileIOWriter writer = entry.getValue(); if (writer.canWrite()) { writer.close(); } @@ -342,21 +315,6 @@ public class LoadTsFileManager { } } } - if (dataPartition2ModificationFile != null) { - for (Map.Entry<DataPartitionInfo, ModificationFile> entry : - dataPartition2ModificationFile.entrySet()) { - try { - final ModificationFile modificationFile = entry.getValue(); - modificationFile.close(); - final Path modificationFilePath = new File(modificationFile.getFilePath()).toPath(); - if (Files.exists(modificationFilePath)) { - Files.delete(modificationFilePath); - } - } catch (IOException e) { - LOGGER.warn("Close ModificationFile {} error.", entry.getValue().getFilePath(), e); - } - } - } try { Files.delete(taskDir.toPath()); } catch (DirectoryNotEmptyException e) { @@ -366,7 +324,6 @@ public class LoadTsFileManager { } dataPartition2Writer = null; dataPartition2LastDevice = null; - dataPartition2ModificationFile = null; isClosed = true; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java index c5df4f3f8d3..a53a3f5fc30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileData.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.execution.load; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.tsfile.exception.write.PageException; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import java.io.DataOutputStream; import java.io.IOException; @@ -30,6 +31,8 @@ import java.io.InputStream; public interface TsFileData { long getDataSize(); + void writeToFileWriter(TsFileIOWriter writer) throws IOException; + boolean isModification(); void serialize(DataOutputStream stream) throws IOException; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java deleted file mode 100644 index 4f2ad607ccb..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAbstractMemoryBlock.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.load; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicBoolean; - -public abstract class LoadTsFileAbstractMemoryBlock implements AutoCloseable { - - private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileAbstractMemoryBlock.class); - protected static final LoadTsFileMemoryManager MEMORY_MANAGER = - LoadTsFileMemoryManager.getInstance(); - - private final AtomicBoolean isClosed = new AtomicBoolean(false); - - public boolean hasEnoughMemory() { - return hasEnoughMemory(0L); - } - - public abstract boolean hasEnoughMemory(long memoryTobeAddedInBytes); - - public abstract void addMemoryUsage(long memoryInBytes); - - public abstract void reduceMemoryUsage(long memoryInBytes); - - /** - * Release all memory of this block. - * - * <p>NOTE: This method should be called only by {@link LoadTsFileAbstractMemoryBlock#close()}. - */ - protected abstract void releaseAllMemory(); - - public boolean isClosed() { - return isClosed.get(); - } - - @Override - public void close() { - if (isClosed.compareAndSet(false, true)) { - try { - releaseAllMemory(); - } catch (Exception e) { - LOGGER.error("Release memory block {} failed", this, e); - } - } - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java deleted file mode 100644 index 17aa4150bcd..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileAnalyzeSchemaMemoryBlock.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.load; - -import org.apache.iotdb.commons.service.metric.MetricService; -import org.apache.iotdb.commons.service.metric.enums.Metric; -import org.apache.iotdb.commons.service.metric.enums.Tag; -import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet; -import org.apache.iotdb.metrics.utils.MetricLevel; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicLong; - -public class LoadTsFileAnalyzeSchemaMemoryBlock extends LoadTsFileAbstractMemoryBlock { - private static final Logger LOGGER = - LoggerFactory.getLogger(LoadTsFileAnalyzeSchemaMemoryBlock.class); - - private final long totalMemorySizeInBytes; - private final AtomicLong memoryUsageInBytes; - - LoadTsFileAnalyzeSchemaMemoryBlock(long totalMemorySizeInBytes) { - super(); - - this.totalMemorySizeInBytes = totalMemorySizeInBytes; - this.memoryUsageInBytes = new AtomicLong(0); - } - - @Override - public boolean hasEnoughMemory(long memoryTobeAddedInBytes) { - return memoryUsageInBytes.get() + memoryTobeAddedInBytes <= totalMemorySizeInBytes; - } - - @Override - public void addMemoryUsage(long memoryInBytes) { - memoryUsageInBytes.addAndGet(memoryInBytes); - - MetricService.getInstance() - .getOrCreateGauge( - Metric.LOAD_MEM.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) - .incr(memoryInBytes); - } - - @Override - public void reduceMemoryUsage(long memoryInBytes) { - if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) { - LOGGER.warn("{} has reduce memory usage to negative", this); - } - - MetricService.getInstance() - .getOrCreateGauge( - Metric.LOAD_MEM.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) - .decr(memoryInBytes); - } - - @Override - protected void releaseAllMemory() { - if (memoryUsageInBytes.get() != 0) { - LOGGER.warn( - "Try to release memory from a memory block {} which has not released all memory", this); - } - MEMORY_MANAGER.releaseToQuery(totalMemorySizeInBytes); - } - - @Override - public String toString() { - return "LoadTsFileAnalyzeSchemaMemoryBlock{" - + "totalMemorySizeInBytes=" - + totalMemorySizeInBytes - + ", memoryUsageInBytes=" - + memoryUsageInBytes.get() - + '}'; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java deleted file mode 100644 index 7a22951d672..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileDataCacheMemoryBlock.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.load; - -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -public class LoadTsFileDataCacheMemoryBlock extends LoadTsFileAbstractMemoryBlock { - private static final Logger LOGGER = - LoggerFactory.getLogger(LoadTsFileDataCacheMemoryBlock.class); - private static final long MINIMUM_MEMORY_SIZE_IN_BYTES = 1024 * 1024L; // 1 MB - private static final int MAX_ASK_FOR_MEMORY_COUNT = 256; // must be a power of 2 - private static final long EACH_ASK_MEMORY_SIZE_IN_BYTES = - Math.max( - MINIMUM_MEMORY_SIZE_IN_BYTES, - LoadTsFileMemoryManager.MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 4); - - private final AtomicLong limitedMemorySizeInBytes; - private final AtomicLong memoryUsageInBytes; - private final AtomicInteger askForMemoryCount; - private final AtomicInteger referenceCount; - - LoadTsFileDataCacheMemoryBlock(long initialLimitedMemorySizeInBytes) { - super(); - - if (initialLimitedMemorySizeInBytes < MINIMUM_MEMORY_SIZE_IN_BYTES) { - throw new LoadRuntimeOutOfMemoryException( - String.format( - "The initial limited memory size %d is less than the minimum memory size %d", - initialLimitedMemorySizeInBytes, MINIMUM_MEMORY_SIZE_IN_BYTES)); - } - - this.limitedMemorySizeInBytes = new AtomicLong(initialLimitedMemorySizeInBytes); - this.memoryUsageInBytes = new AtomicLong(0L); - this.askForMemoryCount = new AtomicInteger(1); - this.referenceCount = new AtomicInteger(0); - } - - @Override - public boolean hasEnoughMemory(long memoryTobeAddedInBytes) { - return memoryUsageInBytes.get() + memoryTobeAddedInBytes <= limitedMemorySizeInBytes.get(); - } - - @Override - public void addMemoryUsage(long memoryInBytes) { - memoryUsageInBytes.addAndGet(memoryInBytes); - - askForMemoryCount.getAndUpdate( - count -> { - if ((count & (count - 1)) == 0) { - // count is a power of 2 - long actuallyAllocateMemorySizeInBytes = - MEMORY_MANAGER.tryAllocateFromQuery(EACH_ASK_MEMORY_SIZE_IN_BYTES); - limitedMemorySizeInBytes.addAndGet(actuallyAllocateMemorySizeInBytes); - if (actuallyAllocateMemorySizeInBytes < EACH_ASK_MEMORY_SIZE_IN_BYTES) { - return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1; - } else { - return 1; - } - } - return (count & (MAX_ASK_FOR_MEMORY_COUNT - 1)) + 1; - }); - } - - @Override - public void reduceMemoryUsage(long memoryInBytes) { - memoryUsageInBytes.addAndGet(-memoryInBytes); - } - - @Override - protected void releaseAllMemory() { - if (memoryUsageInBytes.get() != 0) { - LOGGER.warn( - "Try to release memory from a memory block {} which has not released all memory", this); - } - MEMORY_MANAGER.releaseToQuery(limitedMemorySizeInBytes.get()); - } - - public boolean doShrink(long shrinkMemoryInBytes) { - if (shrinkMemoryInBytes < 0) { - LOGGER.warn( - "Try to shrink a negative memory size {} from memory block {}", - shrinkMemoryInBytes, - this); - return false; - } else if (shrinkMemoryInBytes == 0) { - return true; - } - - if (limitedMemorySizeInBytes.get() - shrinkMemoryInBytes <= MINIMUM_MEMORY_SIZE_IN_BYTES) { - return false; - } - - limitedMemorySizeInBytes.addAndGet(-shrinkMemoryInBytes); - return true; - } - - void updateReferenceCount(int delta) { - referenceCount.addAndGet(delta); - } - - int getReferenceCount() { - return referenceCount.get(); - } - - long getMemoryUsageInBytes() { - return memoryUsageInBytes.get(); - } - - long getLimitedMemorySizeInBytes() { - return limitedMemorySizeInBytes.get(); - } - - @Override - public String toString() { - return "LoadTsFileDataCacheMemoryBlock{" - + "limitedMemorySizeInBytes=" - + limitedMemorySizeInBytes.get() - + ", memoryUsageInBytes=" - + memoryUsageInBytes.get() - + ", askForMemoryCount=" - + askForMemoryCount.get() - + '}'; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java deleted file mode 100644 index aa4d28bbd02..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/load/LoadTsFileMemoryManager.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.load; - -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; -import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.atomic.AtomicLong; - -public class LoadTsFileMemoryManager { - - private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileMemoryManager.class); - private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - private static final LocalExecutionPlanner QUERY_ENGINE_MEMORY_MANAGER = - LocalExecutionPlanner.getInstance(); - public static final long MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES = - QUERY_ENGINE_MEMORY_MANAGER.getAllocateMemoryForOperators(); - private static final int MEMORY_ALLOCATE_MAX_RETRIES = CONFIG.getLoadMemoryAllocateMaxRetries(); - private static final long MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS = - CONFIG.getLoadMemoryAllocateRetryIntervalMs(); - - private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0); - private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock; - - private synchronized void forceAllocatedFromQuery(long sizeInBytes) - throws LoadRuntimeOutOfMemoryException { - for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) { - // allocate memory from queryEngine - if (QUERY_ENGINE_MEMORY_MANAGER.forceAllocateFreeMemoryForOperators(sizeInBytes)) { - usedMemorySizeInBytes.addAndGet(sizeInBytes); - return; - } - - // wait for available memory - try { - this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOGGER.warn("forceAllocate: interrupted while waiting for available memory", e); - } - } - - throw new LoadRuntimeOutOfMemoryException( - String.format( - "forceAllocate: failed to allocate memory from query engine after %d retries, " - + "total query memory %s, used memory size %d bytes, " - + "requested memory size %d bytes", - MEMORY_ALLOCATE_MAX_RETRIES, - IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(), - usedMemorySizeInBytes.get(), - sizeInBytes)); - } - - public synchronized long tryAllocateFromQuery(long sizeInBytes) { - long actuallyAllocateMemoryInBytes = - Math.max(0L, QUERY_ENGINE_MEMORY_MANAGER.tryAllocateFreeMemoryForOperators(sizeInBytes)); - usedMemorySizeInBytes.addAndGet(actuallyAllocateMemoryInBytes); - return actuallyAllocateMemoryInBytes; - } - - public synchronized void releaseToQuery(long sizeInBytes) { - usedMemorySizeInBytes.addAndGet(-sizeInBytes); - QUERY_ENGINE_MEMORY_MANAGER.releaseToFreeMemoryForOperators(sizeInBytes); - this.notifyAll(); - } - - public synchronized LoadTsFileAnalyzeSchemaMemoryBlock allocateAnalyzeSchemaMemoryBlock( - long sizeInBytes) throws LoadRuntimeOutOfMemoryException { - try { - forceAllocatedFromQuery(sizeInBytes); - } catch (LoadRuntimeOutOfMemoryException e) { - if (dataCacheMemoryBlock != null && dataCacheMemoryBlock.doShrink(sizeInBytes)) { - return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes); - } - throw e; - } - return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes); - } - - public synchronized LoadTsFileDataCacheMemoryBlock allocateDataCacheMemoryBlock() - throws LoadRuntimeOutOfMemoryException { - if (dataCacheMemoryBlock == null) { - long actuallyAllocateMemoryInBytes = - tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2); - dataCacheMemoryBlock = new LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes); - LOGGER.info( - "Create Data Cache Memory Block {}, allocate memory {}", - dataCacheMemoryBlock, - actuallyAllocateMemoryInBytes); - } - dataCacheMemoryBlock.updateReferenceCount(1); - return dataCacheMemoryBlock; - } - - public synchronized void releaseDataCacheMemoryBlock() { - dataCacheMemoryBlock.updateReferenceCount(-1); - if (dataCacheMemoryBlock.getReferenceCount() == 0) { - LOGGER.info("Release Data Cache Memory Block {}", dataCacheMemoryBlock); - dataCacheMemoryBlock.close(); - dataCacheMemoryBlock = null; - } - } - - // used for Metrics - public long getUsedMemorySizeInBytes() { - return usedMemorySizeInBytes.get(); - } - - public long getDataCacheUsedMemorySizeInBytes() { - return dataCacheMemoryBlock == null ? 0 : dataCacheMemoryBlock.getMemoryUsageInBytes(); - } - - public long getDataCacheLimitedMemorySizeInBytes() { - return dataCacheMemoryBlock == null ? 0 : dataCacheMemoryBlock.getLimitedMemorySizeInBytes(); - } - - ///////////////////////////// SINGLETON ///////////////////////////// - private LoadTsFileMemoryManager() {} - - public static LoadTsFileMemoryManager getInstance() { - return LoadTsFileMemoryManagerHolder.INSTANCE; - } - - public static class LoadTsFileMemoryManagerHolder { - private static final LoadTsFileMemoryManager INSTANCE = new LoadTsFileMemoryManager(); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java deleted file mode 100644 index 7e822421302..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/metric/LoadTsFileMemMetricSet.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.queryengine.metric; - -import org.apache.iotdb.commons.service.metric.enums.Metric; -import org.apache.iotdb.commons.service.metric.enums.Tag; -import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager; -import org.apache.iotdb.metrics.AbstractMetricService; -import org.apache.iotdb.metrics.metricsets.IMetricSet; -import org.apache.iotdb.metrics.utils.MetricLevel; -import org.apache.iotdb.metrics.utils.MetricType; - -public class LoadTsFileMemMetricSet implements IMetricSet { - - private static final String LOAD_TSFILE_USED_MEMORY = "LoadTsFileUsedMemory"; - public static final String LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY = "LoadTsFileAnalyzeSchemaMemory"; - - private static final String LOAD_TSFILE_DATA_CACHE_MEMORY = "LoadTsFileDataCacheMemory"; - - @Override - public void bindTo(AbstractMetricService metricService) { - metricService.createAutoGauge( - Metric.LOAD_MEM.toString(), - MetricLevel.IMPORTANT, - LoadTsFileMemoryManager.getInstance(), - LoadTsFileMemoryManager::getUsedMemorySizeInBytes, - Tag.NAME.toString(), - LOAD_TSFILE_USED_MEMORY); - - metricService.createAutoGauge( - Metric.LOAD_MEM.toString(), - MetricLevel.IMPORTANT, - LoadTsFileMemoryManager.getInstance(), - LoadTsFileMemoryManager::getDataCacheUsedMemorySizeInBytes, - Tag.NAME.toString(), - LOAD_TSFILE_DATA_CACHE_MEMORY); - - metricService - .getOrCreateGauge( - Metric.LOAD_MEM.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY) - .set(0L); - } - - @Override - public void unbindFrom(AbstractMetricService metricService) { - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.LOAD_MEM.toString(), - Tag.NAME.toString(), - LOAD_TSFILE_USED_MEMORY); - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.LOAD_MEM.toString(), - Tag.NAME.toString(), - LOAD_TSFILE_DATA_CACHE_MEMORY); - metricService.remove( - MetricType.AUTO_GAUGE, - Metric.LOAD_MEM.toString(), - Tag.NAME.toString(), - LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY); - } - - //////////////////////////// singleton //////////////////////////// - - private static class LoadTsFileMemMetricSetHolder { - - private static final LoadTsFileMemMetricSet INSTANCE = new LoadTsFileMemMetricSet(); - - private LoadTsFileMemMetricSetHolder() { - // empty constructor - } - } - - public static LoadTsFileMemMetricSet getInstance() { - return LoadTsFileMemMetricSet.LoadTsFileMemMetricSetHolder.INSTANCE; - } - - private LoadTsFileMemMetricSet() { - // empty constructor - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java index 84860f5d847..57b15dbdc02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java @@ -33,11 +33,9 @@ import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.LoadFileException; import org.apache.iotdb.db.exception.LoadReadOnlyException; -import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException; import org.apache.iotdb.db.exception.VerifyMetadataException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; @@ -47,8 +45,6 @@ import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; -import org.apache.iotdb.db.queryengine.load.LoadTsFileAnalyzeSchemaMemoryBlock; -import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator; @@ -85,7 +81,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -97,19 +92,6 @@ public class LoadTsfileAnalyzer { private static final IClientManager<ConfigRegionId, ConfigNodeClient> CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance(); - private static final int BATCH_FLUSH_TIME_SERIES_NUMBER; - private static final long ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES; - private static final long FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES; - - static { - final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); - BATCH_FLUSH_TIME_SERIES_NUMBER = CONFIG.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(); - ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES = - CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes() <= 0 - ? ((long) BATCH_FLUSH_TIME_SERIES_NUMBER) << 10 - : CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes(); - FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES = ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES >> 1; - } private final LoadTsFileStatement loadTsFileStatement; private final MPPQueryContext context; @@ -117,7 +99,8 @@ public class LoadTsfileAnalyzer { private final IPartitionFetcher partitionFetcher; private final ISchemaFetcher schemaFetcher; - private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier; + private final SchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier = + new SchemaAutoCreatorAndVerifier(); LoadTsfileAnalyzer( LoadTsFileStatement loadTsFileStatement, @@ -129,16 +112,6 @@ public class LoadTsfileAnalyzer { this.partitionFetcher = partitionFetcher; this.schemaFetcher = schemaFetcher; - - try { - this.schemaAutoCreatorAndVerifier = new SchemaAutoCreatorAndVerifier(); - } catch (LoadRuntimeOutOfMemoryException e) { - LOGGER.warn("Can not allocate memory for analyze TsFile schema.", e); - throw new SemanticException( - String.format( - "Can not allocate memory for analyze TsFile schema when executing statement %s.", - loadTsFileStatement)); - } } public Analysis analyzeFileByFile() { @@ -177,16 +150,15 @@ public class LoadTsfileAnalyzer { i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / tsfileNum)); } } catch (IllegalArgumentException e) { - schemaAutoCreatorAndVerifier.close(); + schemaAutoCreatorAndVerifier.clear(); LOGGER.warn( "Parse file {} to resource error, this TsFile maybe empty.", tsFile.getPath(), e); throw new SemanticException( String.format("TsFile %s is empty or incomplete.", tsFile.getPath())); } catch (AuthException e) { - schemaAutoCreatorAndVerifier.close(); return createFailAnalysisForAuthException(e); } catch (Exception e) { - schemaAutoCreatorAndVerifier.close(); + schemaAutoCreatorAndVerifier.clear(); LOGGER.warn("Parse file {} to resource error.", tsFile.getPath(), e); throw new SemanticException( String.format( @@ -196,10 +168,9 @@ public class LoadTsfileAnalyzer { try { schemaAutoCreatorAndVerifier.flush(); + schemaAutoCreatorAndVerifier.clear(); } catch (AuthException e) { return createFailAnalysisForAuthException(e); - } finally { - schemaAutoCreatorAndVerifier.close(); } LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed."); @@ -214,7 +185,7 @@ public class LoadTsfileAnalyzer { try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { // can be reused when constructing tsfile resource final TsFileSequenceReaderTimeseriesMetadataIterator timeseriesMetadataIterator = - new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true, 1); + new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true); long writePointCount = 0; @@ -268,6 +239,7 @@ public class LoadTsfileAnalyzer { } private Analysis createFailAnalysisForAuthException(AuthException e) { + schemaAutoCreatorAndVerifier.clear(); Analysis analysis = new Analysis(); analysis.setFinishQueryAfterAnalyze(true); analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage())); @@ -275,11 +247,14 @@ public class LoadTsfileAnalyzer { } private final class SchemaAutoCreatorAndVerifier { - private final LoadTsFileAnalyzeSchemaCache schemaCache; - private SchemaAutoCreatorAndVerifier() throws LoadRuntimeOutOfMemoryException { - this.schemaCache = new LoadTsFileAnalyzeSchemaCache(); - } + private final Map<String, Boolean> tsfileDevice2IsAligned = new HashMap<>(); + private final Map<String, Set<MeasurementSchema>> currentBatchDevice2TimeseriesSchemas = + new HashMap<>(); + + private final Set<PartialPath> alreadySetDatabases = new HashSet<>(); + + private SchemaAutoCreatorAndVerifier() {} public void autoCreateAndVerify( TsFileSequenceReader reader, @@ -292,9 +267,7 @@ public class LoadTsfileAnalyzer { for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) { final TSDataType dataType = timeseriesMetadata.getTsDataType(); if (TSDataType.VECTOR.equals(dataType)) { - schemaCache - .clearDeviceIsAlignedCacheIfNecessary(); // must execute before add aligned cache - schemaCache.addIsAlignedCache(device, true, false); + tsfileDevice2IsAligned.put(device, true); // not a timeseries, skip } else { @@ -328,25 +301,21 @@ public class LoadTsfileAnalyzer { } final Pair<CompressionType, TSEncoding> compressionEncodingPair = reader.readTimeseriesCompressionTypeAndEncoding(timeseriesMetadata); - schemaCache.addTimeSeries( - device, - new MeasurementSchema( - timeseriesMetadata.getMeasurementId(), - dataType, - compressionEncodingPair.getRight(), - compressionEncodingPair.getLeft())); - - schemaCache.addIsAlignedCache(device, false, true); - if (!schemaCache.getDeviceIsAligned(device)) { - schemaCache.clearDeviceIsAlignedCacheIfNecessary(); - } - } - - if (schemaCache.shouldFlushTimeSeries()) { - flush(); + currentBatchDevice2TimeseriesSchemas + .computeIfAbsent(device, o -> new HashSet<>()) + .add( + new MeasurementSchema( + timeseriesMetadata.getMeasurementId(), + dataType, + compressionEncodingPair.getRight(), + compressionEncodingPair.getLeft())); + + tsfileDevice2IsAligned.putIfAbsent(device, false); } } } + + flush(); } /** @@ -357,17 +326,20 @@ public class LoadTsfileAnalyzer { throws SemanticException, AuthException { // avoid OOM when loading a tsfile with too many timeseries // or loading too many tsfiles at the same time - schemaCache.clearDeviceIsAlignedCacheIfNecessary(); + if (tsfileDevice2IsAligned.size() > 10000) { + flush(); + tsfileDevice2IsAligned.clear(); + } } public void flush() throws AuthException { doAutoCreateAndVerify(); - schemaCache.clearTimeSeries(); + currentBatchDevice2TimeseriesSchemas.clear(); } private void doAutoCreateAndVerify() throws SemanticException, AuthException { - if (schemaCache.getDevice2TimeSeries().isEmpty()) { + if (currentBatchDevice2TimeseriesSchemas.isEmpty()) { return; } @@ -400,7 +372,7 @@ public class LoadTsfileAnalyzer { private void makeSureNoDuplicatedMeasurementsInDevices() throws VerifyMetadataException { for (final Map.Entry<String, Set<MeasurementSchema>> entry : - schemaCache.getDevice2TimeSeries().entrySet()) { + currentBatchDevice2TimeseriesSchemas.entrySet()) { final String device = entry.getKey(); final Map<String, MeasurementSchema> measurement2Schema = new HashMap<>(); for (final MeasurementSchema timeseriesSchema : entry.getValue()) { @@ -419,7 +391,7 @@ public class LoadTsfileAnalyzer { final int databasePrefixNodesLength = loadTsFileStatement.getDatabaseLevel() + 1; final Set<PartialPath> databasesNeededToBeSet = new HashSet<>(); - for (final String device : schemaCache.getDevice2TimeSeries().keySet()) { + for (final String device : currentBatchDevice2TimeseriesSchemas.keySet()) { final PartialPath devicePath = new PartialPath(device); final String[] devicePrefixNodes = devicePath.getNodes(); @@ -437,7 +409,7 @@ public class LoadTsfileAnalyzer { } // 1. filter out the databases that already exist - if (schemaCache.getAlreadySetDatabases().isEmpty()) { + if (alreadySetDatabases.isEmpty()) { try (final ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TGetDatabaseReq req = @@ -450,13 +422,13 @@ public class LoadTsfileAnalyzer { final TShowDatabaseResp resp = configNodeClient.showDatabase(req); for (final String databaseName : resp.getDatabaseInfoMap().keySet()) { - schemaCache.addAlreadySetDatabase(new PartialPath(databaseName)); + alreadySetDatabases.add(new PartialPath(databaseName)); } } catch (IOException | TException | ClientManagerException e) { throw new LoadFileException(e); } } - databasesNeededToBeSet.removeAll(schemaCache.getAlreadySetDatabases()); + databasesNeededToBeSet.removeAll(alreadySetDatabases); // 2. create the databases that do not exist for (final PartialPath databasePath : databasesNeededToBeSet) { @@ -467,7 +439,7 @@ public class LoadTsfileAnalyzer { statement.setEnablePrintExceptionLog(false); executeSetDatabaseStatement(statement); - schemaCache.addAlreadySetDatabase(databasePath); + alreadySetDatabases.add(databasePath); } } @@ -512,7 +484,7 @@ public class LoadTsfileAnalyzer { final List<Boolean> isAlignedList = new ArrayList<>(); for (final Map.Entry<String, Set<MeasurementSchema>> entry : - schemaCache.getDevice2TimeSeries().entrySet()) { + currentBatchDevice2TimeseriesSchemas.entrySet()) { final int measurementSize = entry.getValue().size(); final String[] measurements = new String[measurementSize]; final TSDataType[] tsDataTypes = new TSDataType[measurementSize]; @@ -532,7 +504,7 @@ public class LoadTsfileAnalyzer { dataTypeList.add(tsDataTypes); encodingsList.add(encodings); compressionTypesList.add(compressionTypes); - isAlignedList.add(schemaCache.getDeviceIsAligned(entry.getKey())); + isAlignedList.add(tsfileDevice2IsAligned.get(entry.getKey())); } return SchemaValidator.validate( @@ -549,7 +521,7 @@ public class LoadTsfileAnalyzer { private void verifySchema(ISchemaTree schemaTree) throws VerifyMetadataException, IllegalPathException { for (final Map.Entry<String, Set<MeasurementSchema>> entry : - schemaCache.getDevice2TimeSeries().entrySet()) { + currentBatchDevice2TimeseriesSchemas.entrySet()) { final String device = entry.getKey(); final List<MeasurementSchema> tsfileTimeseriesSchemas = new ArrayList<>(entry.getValue()); final DeviceSchemaInfo iotdbDeviceSchemaInfo = @@ -568,7 +540,7 @@ public class LoadTsfileAnalyzer { } // check device schema: is aligned or not - final boolean isAlignedInTsFile = schemaCache.getDeviceIsAligned(device); + final boolean isAlignedInTsFile = tsfileDevice2IsAligned.get(device); final boolean isAlignedInIoTDB = iotdbDeviceSchemaInfo.isAligned(); if (isAlignedInTsFile != isAlignedInIoTDB) { throw new VerifyMetadataException( @@ -636,174 +608,10 @@ public class LoadTsfileAnalyzer { } } - public void close() { - schemaCache.close(); - } - } - - private static class LoadTsFileAnalyzeSchemaCache { - - private final LoadTsFileAnalyzeSchemaMemoryBlock block; - - private Map<String, Set<MeasurementSchema>> currentBatchDevice2TimeSeriesSchemas; - private Map<String, Boolean> tsFileDevice2IsAligned; - private Set<PartialPath> alreadySetDatabases; - - private long batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0; - private long tsFileDevice2IsAlignedMemoryUsageSizeInBytes = 0; - private long alreadySetDatabasesMemoryUsageSizeInBytes = 0; - - private int currentBatchTimeSeriesCount = 0; - - public LoadTsFileAnalyzeSchemaCache() throws LoadRuntimeOutOfMemoryException { - this.block = - LoadTsFileMemoryManager.getInstance() - .allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES); - this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>(); - this.tsFileDevice2IsAligned = new HashMap<>(); - this.alreadySetDatabases = new HashSet<>(); - } - - public Map<String, Set<MeasurementSchema>> getDevice2TimeSeries() { - return currentBatchDevice2TimeSeriesSchemas; - } - - public boolean getDeviceIsAligned(String device) { - if (!tsFileDevice2IsAligned.containsKey(device)) { - LOGGER.warn( - "Device {} is not in the tsFileDevice2IsAligned cache {}.", - device, - tsFileDevice2IsAligned); - } - return tsFileDevice2IsAligned.get(device); - } - - public Set<PartialPath> getAlreadySetDatabases() { - return alreadySetDatabases; - } - - public void addTimeSeries(String device, MeasurementSchema measurementSchema) { - long memoryUsageSizeInBytes = 0; - if (!currentBatchDevice2TimeSeriesSchemas.containsKey(device)) { - memoryUsageSizeInBytes += estimateStringSize(device); - } - if (currentBatchDevice2TimeSeriesSchemas - .computeIfAbsent(device, k -> new HashSet<>()) - .add(measurementSchema)) { - memoryUsageSizeInBytes += measurementSchema.serializedSize(); - currentBatchTimeSeriesCount++; - } - - if (memoryUsageSizeInBytes > 0) { - batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes += memoryUsageSizeInBytes; - block.addMemoryUsage(memoryUsageSizeInBytes); - } - } - - public void addIsAlignedCache(String device, boolean isAligned, boolean addIfAbsent) { - long memoryUsageSizeInBytes = 0; - if (!tsFileDevice2IsAligned.containsKey(device)) { - memoryUsageSizeInBytes += estimateStringSize(device); - } - if (addIfAbsent - ? (tsFileDevice2IsAligned.putIfAbsent(device, isAligned) == null) - : (tsFileDevice2IsAligned.put(device, isAligned) == null)) { - memoryUsageSizeInBytes += Byte.BYTES; - } - - if (memoryUsageSizeInBytes > 0) { - tsFileDevice2IsAlignedMemoryUsageSizeInBytes += memoryUsageSizeInBytes; - block.addMemoryUsage(memoryUsageSizeInBytes); - } - } - - public void addAlreadySetDatabase(PartialPath database) { - long memoryUsageSizeInBytes = 0; - if (alreadySetDatabases.add(database)) { - memoryUsageSizeInBytes += PartialPath.estimateSize(database); - } - - if (memoryUsageSizeInBytes > 0) { - alreadySetDatabasesMemoryUsageSizeInBytes += memoryUsageSizeInBytes; - block.addMemoryUsage(memoryUsageSizeInBytes); - } - } - - public boolean shouldFlushTimeSeries() { - return !block.hasEnoughMemory() - || currentBatchTimeSeriesCount >= BATCH_FLUSH_TIME_SERIES_NUMBER; - } - - public boolean shouldFlushAlignedCache() { - return tsFileDevice2IsAlignedMemoryUsageSizeInBytes - >= FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES; - } - - public void clearTimeSeries() { - currentBatchDevice2TimeSeriesSchemas.clear(); - block.reduceMemoryUsage(batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes); - batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0; - currentBatchTimeSeriesCount = 0; - } - - public void clearAlignedCache() { - tsFileDevice2IsAligned.clear(); - block.reduceMemoryUsage(tsFileDevice2IsAlignedMemoryUsageSizeInBytes); - tsFileDevice2IsAlignedMemoryUsageSizeInBytes = 0; - } - - public void clearDeviceIsAlignedCacheIfNecessary() { - if (!shouldFlushAlignedCache()) { - return; - } - - long releaseMemoryInBytes = 0; - final Set<String> timeSeriesCacheKeySet = - new HashSet<>(currentBatchDevice2TimeSeriesSchemas.keySet()); - Iterator<Map.Entry<String, Boolean>> iterator = tsFileDevice2IsAligned.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, Boolean> entry = iterator.next(); - if (!timeSeriesCacheKeySet.contains(entry.getKey())) { - releaseMemoryInBytes += estimateStringSize(entry.getKey()) + Byte.BYTES; - iterator.remove(); - } - } - if (releaseMemoryInBytes > 0) { - tsFileDevice2IsAlignedMemoryUsageSizeInBytes -= releaseMemoryInBytes; - block.reduceMemoryUsage(releaseMemoryInBytes); - } - } - - private void clearDatabasesCache() { + public void clear() { + tsfileDevice2IsAligned.clear(); + currentBatchDevice2TimeseriesSchemas.clear(); alreadySetDatabases.clear(); - block.reduceMemoryUsage(alreadySetDatabasesMemoryUsageSizeInBytes); - alreadySetDatabasesMemoryUsageSizeInBytes = 0; - } - - public void close() { - clearTimeSeries(); - clearAlignedCache(); - clearDatabasesCache(); - - block.close(); - - currentBatchDevice2TimeSeriesSchemas = null; - tsFileDevice2IsAligned = null; - alreadySetDatabases = null; - } - - /** - * String basic total, 32B - * - * <ul> - * <li>Object header, 8B - * <li>char[] reference + header + length, 8 + 4 + 8= 20B - * <li>hash code, 4B - * </ul> - */ - private static int estimateStringSize(String string) { - // each char takes 2B in Java - return string == null ? 0 : 32 + 2 * string.length(); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index 0b1fc6cecd7..d4d51195607 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -45,11 +45,10 @@ import java.util.List; public class LocalExecutionPlanner { private static final Logger LOGGER = LoggerFactory.getLogger(LocalExecutionPlanner.class); - private static final long ALLOCATE_MEMORY_FOR_OPERATORS = - IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(); /** allocated memory for operator execution */ - private long freeMemoryForOperators = ALLOCATE_MEMORY_FOR_OPERATORS; + private long freeMemoryForOperators = + IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForOperators(); public long getFreeMemoryForOperators() { return freeMemoryForOperators; @@ -164,34 +163,6 @@ public class LocalExecutionPlanner { return sourcePaths; } - public synchronized boolean forceAllocateFreeMemoryForOperators(long memoryInBytes) { - if (freeMemoryForOperators < memoryInBytes) { - return false; - } else { - freeMemoryForOperators -= memoryInBytes; - return true; - } - } - - public synchronized long tryAllocateFreeMemoryForOperators(long memoryInBytes) { - if (freeMemoryForOperators < memoryInBytes) { - long result = freeMemoryForOperators; - freeMemoryForOperators = 0; - return result; - } else { - freeMemoryForOperators -= memoryInBytes; - return memoryInBytes; - } - } - - public synchronized void releaseToFreeMemoryForOperators(long memoryInBytes) { - freeMemoryForOperators += memoryInBytes; - } - - public long getAllocateMemoryForOperators() { - return ALLOCATE_MEMORY_FOR_OPERATORS; - } - private static class InstanceHolder { private InstanceHolder() {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 51c4b44df38..92a3ecd1047 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -46,8 +46,6 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo; import org.apache.iotdb.db.queryengine.execution.load.ChunkData; import org.apache.iotdb.db.queryengine.execution.load.TsFileData; import org.apache.iotdb.db.queryengine.execution.load.TsFileSplitter; -import org.apache.iotdb.db.queryengine.load.LoadTsFileDataCacheMemoryBlock; -import org.apache.iotdb.db.queryengine.load.LoadTsFileMemoryManager; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; @@ -95,17 +93,15 @@ import java.util.stream.IntStream; public class LoadTsFileScheduler implements IScheduler { private static final Logger logger = LoggerFactory.getLogger(LoadTsFileScheduler.class); public static final long LOAD_TASK_MAX_TIME_IN_SECOND = 900L; // 15min - private static final long SINGLE_SCHEDULER_MAX_MEMORY_SIZE; + private static final long MAX_MEMORY_SIZE; private static final int TRANSMIT_LIMIT; static { IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - SINGLE_SCHEDULER_MAX_MEMORY_SIZE = + MAX_MEMORY_SIZE = Math.min( config.getThriftMaxFrameSize() >> 2, - (long) - (config.getAllocateMemoryForStorageEngine() - * config.getLoadTsFileProportion())); // TODO: change it to query engine + (long) (config.getAllocateMemoryForStorageEngine() * config.getLoadTsFileProportion())); TRANSMIT_LIMIT = CommonDescriptor.getInstance().getConfig().getTTimePartitionSlotTransmitLimit(); } @@ -118,7 +114,6 @@ public class LoadTsFileScheduler implements IScheduler { private final PlanFragmentId fragmentId; private final Set<TRegionReplicaSet> allReplicaSets; private final boolean isGeneratedByPipe; - private final LoadTsFileDataCacheMemoryBlock block; public LoadTsFileScheduler( DistributedQueryPlan distributedQueryPlan, @@ -135,7 +130,6 @@ public class LoadTsFileScheduler implements IScheduler { this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher); this.allReplicaSets = new HashSet<>(); this.isGeneratedByPipe = isGeneratedByPipe; - this.block = LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock(); for (FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) { tsFileNodeList.add((LoadSingleTsFileNode) fragmentInstance.getFragment().getPlanNodeTree()); @@ -206,11 +200,10 @@ public class LoadTsFileScheduler implements IScheduler { if (isLoadSuccess) { stateMachine.transitionToFinished(); } - LoadTsFileMemoryManager.getInstance().releaseDataCacheMemoryBlock(); } private boolean firstPhase(LoadSingleTsFileNode node) { - final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block); + final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node); try { new TsFileSplitter( node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData) @@ -414,18 +407,13 @@ public class LoadTsFileScheduler implements IScheduler { private long dataSize; private final Map<TRegionReplicaSet, LoadTsFilePieceNode> replicaSet2Piece; private final List<ChunkData> nonDirectionalChunkData; - private final LoadTsFileDataCacheMemoryBlock block; - public TsFileDataManager( - LoadTsFileScheduler scheduler, - LoadSingleTsFileNode singleTsFileNode, - LoadTsFileDataCacheMemoryBlock block) { + public TsFileDataManager(LoadTsFileScheduler scheduler, LoadSingleTsFileNode singleTsFileNode) { this.scheduler = scheduler; this.singleTsFileNode = singleTsFileNode; this.dataSize = 0; this.replicaSet2Piece = new HashMap<>(); this.nonDirectionalChunkData = new ArrayList<>(); - this.block = block; } private boolean addOrSendTsFileData(TsFileData tsFileData) { @@ -434,16 +422,11 @@ public class LoadTsFileScheduler implements IScheduler { : addOrSendChunkData((ChunkData) tsFileData); } - private boolean isMemoryEnough() { - return dataSize <= SINGLE_SCHEDULER_MAX_MEMORY_SIZE && block.hasEnoughMemory(); - } - private boolean addOrSendChunkData(ChunkData chunkData) { nonDirectionalChunkData.add(chunkData); dataSize += chunkData.getDataSize(); - block.addMemoryUsage(chunkData.getDataSize()); - if (!isMemoryEnough()) { + if (dataSize > MAX_MEMORY_SIZE) { routeChunkData(); // start to dispatch from the biggest TsFilePieceNode @@ -463,7 +446,6 @@ public class LoadTsFileScheduler implements IScheduler { } dataSize -= pieceNode.getDataSize(); - block.reduceMemoryUsage(pieceNode.getDataSize()); replicaSet2Piece.put( sortedReplicaSet, new LoadTsFilePieceNode( @@ -471,7 +453,7 @@ public class LoadTsFileScheduler implements IScheduler { singleTsFileNode .getTsFileResource() .getTsFile())); // can not just remove, because of deletion - if (isMemoryEnough()) { + if (dataSize <= MAX_MEMORY_SIZE) { break; } } @@ -510,7 +492,6 @@ public class LoadTsFileScheduler implements IScheduler { for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) { dataSize += deletionData.getDataSize(); - block.addMemoryUsage(deletionData.getDataSize()); entry.getValue().addTsFileData(deletionData); } return true; @@ -520,7 +501,6 @@ public class LoadTsFileScheduler implements IScheduler { routeChunkData(); for (Map.Entry<TRegionReplicaSet, LoadTsFilePieceNode> entry : replicaSet2Piece.entrySet()) { - block.reduceMemoryUsage(entry.getValue().getDataSize()); if (!scheduler.dispatchOnePieceNode(entry.getValue(), entry.getKey())) { logger.warn( "Dispatch piece node {} of TsFile {} error.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java index ef06d98cebb..6e6e2afa2e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/DataNodeMetricsHelper.java @@ -33,7 +33,6 @@ import org.apache.iotdb.db.pipe.metric.PipeDataNodeMetrics; import org.apache.iotdb.db.queryengine.metric.DataExchangeCostMetricSet; import org.apache.iotdb.db.queryengine.metric.DataExchangeCountMetricSet; import org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet; -import org.apache.iotdb.db.queryengine.metric.LoadTsFileMemMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet; import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet; @@ -86,9 +85,6 @@ public class DataNodeMetricsHelper { // bind pipe related metrics MetricService.getInstance().addMetricSet(PipeDataNodeMetrics.getInstance()); - - // bind load tsfile memory related metrics - MetricService.getInstance().addMetricSet(LoadTsFileMemMetricSet.getInstance()); } private static void initSystemMetrics() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java index 67c8969dd84..5d0c53aa611 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java @@ -118,10 +118,6 @@ public class Deletion extends Modification implements Cloneable { new PartialPath(ReadWriteIOUtils.readString(stream)), 0, startTime, endTime); } - public long getSerializedSize() { - return Long.BYTES * 2 + Integer.BYTES + (long) getPathString().length() * Character.BYTES; - } - @Override public boolean equals(Object obj) { if (this == obj) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java index f4f8ab9f153..6491f4c4eaa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/ModificationFile.java @@ -66,7 +66,6 @@ public class ModificationFile implements AutoCloseable { private static final long COMPACT_THRESHOLD = 1024 * 1024; private boolean hasCompacted = false; - /** * Construct a ModificationFile using a file as its storage. * @@ -104,24 +103,6 @@ public class ModificationFile implements AutoCloseable { } } - /** - * Write a modification in this file. The modification will first be written to the persistent - * store then the memory cache. Notice that this method does not synchronize to physical disk - * after - * - * @param mod the modification to be written. - * @throws IOException if IOException is thrown when writing the modification to the store. - */ - public void writeWithoutSync(Modification mod) throws IOException { - synchronized (this) { - if (needVerify && new File(filePath).exists()) { - writer.mayTruncateLastLine(); - needVerify = false; - } - writer.writeWithOutSync(mod); - } - } - @GuardedBy("TsFileResource-WriteLock") public void truncate(long size) { writer.truncate(size); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java index 7f666eab511..75c32adf5e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/LocalTextModificationAccessor.java @@ -163,17 +163,12 @@ public class LocalTextModificationAccessor @Override public void write(Modification mod) throws IOException { - writeWithOutSync(mod); - force(); - } - - @Override - public void writeWithOutSync(Modification mod) throws IOException { if (fos == null) { fos = new FileOutputStream(filePath, true); } fos.write(encodeModification(mod).getBytes()); fos.write(System.lineSeparator().getBytes()); + force(); } @TestOnly @@ -215,7 +210,7 @@ public class LocalTextModificationAccessor public void mayTruncateLastLine() { try (RandomAccessFile file = new RandomAccessFile(filePath, "r")) { long filePointer = file.length() - 1; - if (filePointer <= 0) { + if (filePointer == 0) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java index b4f538ac62d..314405c633c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/io/ModificationWriter.java @@ -37,15 +37,6 @@ public interface ModificationWriter { */ void write(Modification mod) throws IOException; - /** - * Write a new modification to the persistent medium. Notice that after calling write(), a - * fileWriter is opened. Notice that this method does not synchronize to physical disk after - * writing. - * - * @param mod the modification to be written. - */ - void writeWithOutSync(Modification mod) throws IOException; - void truncate(long size); void mayTruncateLastLine(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 61aff528e38..d2b0eb89e1c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -147,8 +147,7 @@ public enum Metric { PIPE_EVENT_COMMIT_QUEUE_SIZE("pipe_event_commit_queue_size"), PIPE_PROCEDURE("pipe_procedure"), PIPE_TASK_STATUS("pipe_task_status"), - // load related - LOAD_MEM("load_mem"); + ; final String value; diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java index dacdbc54fc5..9f077c6806e 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIterator.java @@ -39,22 +39,17 @@ import java.util.NoSuchElementException; public class TsFileSequenceReaderTimeseriesMetadataIterator implements Iterator<Map<String, List<TimeseriesMetadata>>> { - private static final int DEFAULT_TIMESERIES_BATCH_READ_NUMBER = 4000; + private static final int MAX_TIMESERIES_METADATA_COUNT = 2000; private final TsFileSequenceReader reader; private final boolean needChunkMetadata; - private final int timeseriesBatchReadNumber; private ByteBuffer currentBuffer = null; - private long currentEndOffset = Long.MIN_VALUE; private final Deque<MetadataIndexEntryInfo> metadataIndexEntryStack = new ArrayDeque<>(); private String currentDeviceId; private int currentTimeseriesMetadataCount = 0; public TsFileSequenceReaderTimeseriesMetadataIterator( - TsFileSequenceReader reader, boolean needChunkMetadata, int timeseriesBatchReadNumber) - throws IOException { + TsFileSequenceReader reader, boolean needChunkMetadata) throws IOException { this.reader = reader; - this.needChunkMetadata = needChunkMetadata; - this.timeseriesBatchReadNumber = timeseriesBatchReadNumber; if (this.reader.tsFileMetaData == null) { this.reader.readFileMetadata(); @@ -63,6 +58,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator final MetadataIndexNode metadataIndexNode = reader.tsFileMetaData.getMetadataIndex(); long curEntryEndOffset = metadataIndexNode.getEndOffset(); List<MetadataIndexEntry> metadataIndexEntryList = metadataIndexNode.getChildren(); + this.needChunkMetadata = needChunkMetadata; for (int i = metadataIndexEntryList.size() - 1; i >= 0; i--) { metadataIndexEntryStack.push( @@ -72,11 +68,6 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator } } - public TsFileSequenceReaderTimeseriesMetadataIterator( - TsFileSequenceReader reader, boolean needChunkMetadata) throws IOException { - this(reader, needChunkMetadata, DEFAULT_TIMESERIES_BATCH_READ_NUMBER); - } - @Override public boolean hasNext() { return !metadataIndexEntryStack.isEmpty() @@ -91,7 +82,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator final Map<String, List<TimeseriesMetadata>> timeseriesMetadataMap = new HashMap<>(); - while (currentTimeseriesMetadataCount < timeseriesBatchReadNumber) { + while (currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) { // 1. Check Buffer // currentTimeseriesMetadataCount has reached the limit in the previous // loop and maybe there is still some data that remains in the buffer. @@ -99,22 +90,9 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator timeseriesMetadataMap .computeIfAbsent(currentDeviceId, k -> new ArrayList<>()) .addAll(deserializeTimeseriesMetadata()); - } else if (currentEndOffset > Long.MIN_VALUE) { - try { - timeseriesMetadataMap - .computeIfAbsent(currentDeviceId, k -> new ArrayList<>()) - .addAll(deserializeTimeseriesMetadataUsingTsFileInput(currentEndOffset)); - } catch (IOException e) { - throw new TsFileSequenceReaderTimeseriesMetadataIteratorException( - String.format( - "TsFileSequenceReaderTimeseriesMetadataIterator: deserializeTimeseriesMetadataUsingTsFileInput failed, " - + "currentEndOffset: %d, " - + e.getMessage(), - currentEndOffset)); - } } - if (currentTimeseriesMetadataCount >= timeseriesBatchReadNumber + if (currentTimeseriesMetadataCount >= MAX_TIMESERIES_METADATA_COUNT || metadataIndexEntryStack.isEmpty()) { break; } @@ -135,7 +113,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator } // 3. Reset currentTimeseriesMetadataCount - if (currentTimeseriesMetadataCount >= timeseriesBatchReadNumber) { + if (currentTimeseriesMetadataCount >= MAX_TIMESERIES_METADATA_COUNT) { currentTimeseriesMetadataCount = 0; } @@ -179,7 +157,6 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator .computeIfAbsent(currentDeviceId, k -> new ArrayList<>()) .addAll(deserializeTimeseriesMetadata()); } else { - currentEndOffset = endOffset; reader.position(metadataIndexEntry.getOffset()); timeseriesMetadataMap .computeIfAbsent(currentDeviceId, k -> new ArrayList<>()) @@ -190,7 +167,7 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator private List<TimeseriesMetadata> deserializeTimeseriesMetadata() { final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); while (currentBuffer.hasRemaining() - && currentTimeseriesMetadataCount < timeseriesBatchReadNumber) { + && currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) { timeseriesMetadataList.add( TimeseriesMetadata.deserializeFrom(currentBuffer, needChunkMetadata)); currentTimeseriesMetadataCount++; @@ -202,14 +179,11 @@ public class TsFileSequenceReaderTimeseriesMetadataIterator throws IOException { final List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>(); while (reader.position() < endOffset - && currentTimeseriesMetadataCount < DEFAULT_TIMESERIES_BATCH_READ_NUMBER) { + && currentTimeseriesMetadataCount < MAX_TIMESERIES_METADATA_COUNT) { timeseriesMetadataList.add( TimeseriesMetadata.deserializeFrom(reader.tsFileInput, needChunkMetadata)); currentTimeseriesMetadataCount++; } - if (reader.position() >= endOffset) { - currentEndOffset = Long.MIN_VALUE; - } return timeseriesMetadataList; } diff --git a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java index 5648473662b..54ddaabf03c 100644 --- a/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java +++ b/iotdb-core/tsfile/src/test/java/org/apache/iotdb/tsfile/read/TsFileSequenceReaderTimeseriesMetadataIteratorTest.java @@ -36,8 +36,8 @@ public class TsFileSequenceReaderTimeseriesMetadataIteratorTest { @Before public void before() throws IOException { - // create 4040 timeseries, 101 measurements per device. - FileGenerator.generateFile(100, 40, 101); + // create 2020 timeseries, 101 measurements per device. + FileGenerator.generateFile(100, 20, 101); TsFileSequenceReader fileReader = new TsFileSequenceReader(FILE_PATH); tsFile = new TsFileReader(fileReader); } @@ -55,8 +55,8 @@ public class TsFileSequenceReaderTimeseriesMetadataIteratorTest { new TsFileSequenceReaderTimeseriesMetadataIterator(fileReader, false); Assert.assertTrue(iterator.hasNext()); - Assert.assertEquals(4000, iterator.next().values().stream().mapToLong(List::size).sum()); + Assert.assertEquals(2000, iterator.next().values().stream().mapToLong(List::size).sum()); Assert.assertTrue(iterator.hasNext()); - Assert.assertEquals(40, iterator.next().values().stream().mapToLong(List::size).sum()); + Assert.assertEquals(20, iterator.next().values().stream().mapToLong(List::size).sum()); } }
