This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d4074920d44 Load: modify TsFile version check to enable V3 load to V4
(#13400)
d4074920d44 is described below
commit d4074920d44ff14a328187a8a4c843465c3a1a54
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Sep 11 12:42:30 2024 +0800
Load: modify TsFile version check to enable V3 load to V4 (#13400)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +++
.../plan/analyze/LoadTsFileAnalyzer.java | 44 +++++++++++++++++-----
.../load/memory/LoadTsFileAbstractMemoryBlock.java | 4 ++
.../memory/LoadTsFileAnalyzeSchemaMemoryBlock.java | 28 +++++++++++---
.../memory/LoadTsFileDataCacheMemoryBlock.java | 7 ++++
.../load/memory/LoadTsFileMemoryManager.java | 28 +++++++++++++-
.../load/splitter/TsFileSplitter.java | 13 ++++++-
.../apache/iotdb/db/utils/ModificationUtils.java | 14 ++++---
9 files changed, 130 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4c4b0e824d0..08245360c4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1136,6 +1136,8 @@ public class IoTDBConfig {
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
0L; // 0 means that the decision will be adaptive based on the number of
sequences
+ private int loadTsFileMaxDeviceCountToUseDeviceTimeIndex = 10000;
+
private long loadMemoryAllocateRetryIntervalMs = 1000L;
private int loadMemoryAllocateMaxRetries = 5;
@@ -3958,6 +3960,16 @@ public class IoTDBConfig {
this.loadTsFileAnalyzeSchemaMemorySizeInBytes =
loadTsFileAnalyzeSchemaMemorySizeInBytes;
}
+ public int getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex() {
+ return loadTsFileMaxDeviceCountToUseDeviceTimeIndex;
+ }
+
+ public void setLoadTsFileMaxDeviceCountToUseDeviceTimeIndex(
+ int loadTsFileMaxDeviceCountToUseDeviceTimeIndex) {
+ this.loadTsFileMaxDeviceCountToUseDeviceTimeIndex =
+ loadTsFileMaxDeviceCountToUseDeviceTimeIndex;
+ }
+
public long getLoadMemoryAllocateRetryIntervalMs() {
return loadMemoryAllocateRetryIntervalMs;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 58de8530dc4..27532b56ae4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2248,6 +2248,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"load_tsfile_analyze_schema_memory_size_in_bytes",
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
+ conf.setLoadTsFileMaxDeviceCountToUseDeviceTimeIndex(
+ Integer.parseInt(
+ properties.getProperty(
+ "load_tsfile_max_device_count_to_use_device_index",
+
String.valueOf(conf.getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex()))));
conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
Long.parseLong(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 34f99c64388..05ff28b6a8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -101,6 +101,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class LoadTsFileAnalyzer implements AutoCloseable {
@@ -110,12 +111,15 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
private static final int BATCH_FLUSH_TIME_SERIES_NUMBER;
+ private static final int MAX_DEVICE_COUNT_TO_USE_DEVICE_TIME_INDEX;
private static final long ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES;
private static final long FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES;
static {
final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
BATCH_FLUSH_TIME_SERIES_NUMBER =
CONFIG.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber();
+ MAX_DEVICE_COUNT_TO_USE_DEVICE_TIME_INDEX =
+ CONFIG.getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex();
ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES =
CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes() <= 0
? ((long) BATCH_FLUSH_TIME_SERIES_NUMBER) << 10
@@ -250,7 +254,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
tsFileResource.deserialize();
}
-
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource);
+
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource,
reader);
// check if the tsfile is empty
if (!timeseriesMetadataIterator.hasNext()) {
@@ -317,8 +321,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.schemaCache = new LoadTsFileAnalyzeSchemaCache();
}
- public void setCurrentModificationsAndTimeIndex(TsFileResource resource)
throws IOException {
- schemaCache.setCurrentModificationsAndTimeIndex(resource);
+ public void setCurrentModificationsAndTimeIndex(
+ TsFileResource resource, TsFileSequenceReader reader) throws
IOException {
+ schemaCache.setCurrentModificationsAndTimeIndex(resource, reader);
}
public void autoCreateAndVerify(
@@ -790,22 +795,41 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
}
- public void setCurrentModificationsAndTimeIndex(TsFileResource resource)
throws IOException {
+ public void setCurrentModificationsAndTimeIndex(
+ TsFileResource resource, TsFileSequenceReader reader) throws
IOException {
clearModificationsAndTimeIndex();
currentModifications = resource.getModFile().getModifications();
for (final Modification modification : currentModifications) {
currentModificationsMemoryUsageSizeInBytes += ((Deletion)
modification).getSerializedSize();
}
+
+ // If there are too many modifications, a larger memory block is needed
to avoid frequent
+ // flush.
+ long newMemorySize =
+ currentModificationsMemoryUsageSizeInBytes >
ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES / 2
+ ? currentModificationsMemoryUsageSizeInBytes +
ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES
+ : ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES;
+ block.forceResize(newMemorySize);
block.addMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
- if (resource.resourceFileExists()) {
- currentTimeIndex = resource.getTimeIndex();
- if (currentTimeIndex instanceof FileTimeIndex) {
- currentTimeIndex = resource.buildDeviceTimeIndex();
+ // No need to build device time index if there are no modifications
+ if (currentModifications.size() > 0 && resource.resourceFileExists()) {
+ final AtomicInteger deviceCount = new AtomicInteger();
+ reader
+ .getAllDevicesIteratorWithIsAligned()
+ .forEachRemaining(o -> deviceCount.getAndIncrement());
+
+ // Use device time index only if the device count is less than the
threshold, avoiding too
+ // much memory usage
+ if (deviceCount.get() < MAX_DEVICE_COUNT_TO_USE_DEVICE_TIME_INDEX) {
+ currentTimeIndex = resource.getTimeIndex();
+ if (currentTimeIndex instanceof FileTimeIndex) {
+ currentTimeIndex = resource.buildDeviceTimeIndex();
+ }
+ currentTimeIndexMemoryUsageSizeInBytes =
currentTimeIndex.calculateRamSize();
+ block.addMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
}
- currentTimeIndexMemoryUsageSizeInBytes =
currentTimeIndex.calculateRamSize();
- block.addMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
index f0df55a9f63..bca1591b1ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
@@ -42,6 +42,10 @@ public abstract class LoadTsFileAbstractMemoryBlock
implements AutoCloseable {
public abstract void reduceMemoryUsage(long memoryInBytes);
+ abstract long getMemoryUsageInBytes();
+
+ public abstract void forceResize(long newSizeInBytes);
+
/**
* Release all memory of this block.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
index c7add4b446f..bd7ff8c2df4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -34,7 +34,7 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends
LoadTsFileAbstractMemory
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileAnalyzeSchemaMemoryBlock.class);
- private final long totalMemorySizeInBytes;
+ private long totalMemorySizeInBytes;
private final AtomicLong memoryUsageInBytes;
LoadTsFileAnalyzeSchemaMemoryBlock(long totalMemorySizeInBytes) {
@@ -45,12 +45,12 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends
LoadTsFileAbstractMemory
}
@Override
- public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
+ public synchronized boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
return memoryUsageInBytes.get() + memoryTobeAddedInBytes <=
totalMemorySizeInBytes;
}
@Override
- public void addMemoryUsage(long memoryInBytes) {
+ public synchronized void addMemoryUsage(long memoryInBytes) {
memoryUsageInBytes.addAndGet(memoryInBytes);
MetricService.getInstance()
@@ -63,7 +63,7 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends
LoadTsFileAbstractMemory
}
@Override
- public void reduceMemoryUsage(long memoryInBytes) {
+ public synchronized void reduceMemoryUsage(long memoryInBytes) {
if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
LOGGER.warn("{} has reduce memory usage to negative", this);
}
@@ -78,7 +78,25 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends
LoadTsFileAbstractMemory
}
@Override
- protected void releaseAllMemory() {
+ synchronized long getMemoryUsageInBytes() {
+ return memoryUsageInBytes.get();
+ }
+
+ synchronized long getTotalMemorySizeInBytes() {
+ return totalMemorySizeInBytes;
+ }
+
+ synchronized void setTotalMemorySizeInBytes(long totalMemorySizeInBytes) {
+ this.totalMemorySizeInBytes = totalMemorySizeInBytes;
+ }
+
+ @Override
+ public synchronized void forceResize(long newSizeInBytes) {
+ MEMORY_MANAGER.forceResize(this, newSizeInBytes);
+ }
+
+ @Override
+ protected synchronized void releaseAllMemory() {
if (memoryUsageInBytes.get() != 0) {
LOGGER.warn(
"Try to release memory from a memory block {} which has not released
all memory", this);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
index e0709cece9e..aefdf84fc0f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
@@ -89,6 +89,12 @@ public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBloc
memoryUsageInBytes.addAndGet(-memoryInBytes);
}
+ @Override
+ public void forceResize(long newSizeInBytes) {
+ throw new UnsupportedOperationException(
+ "resize is not supported for LoadTsFileDataCacheMemoryBlock");
+ }
+
@Override
protected void releaseAllMemory() {
if (memoryUsageInBytes.get() != 0) {
@@ -125,6 +131,7 @@ public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBloc
return referenceCount.get();
}
+ @Override
long getMemoryUsageInBytes() {
return memoryUsageInBytes.get();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index be6e8dcef97..ab6ba1e77fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -44,7 +44,7 @@ public class LoadTsFileMemoryManager {
private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0);
private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock;
- private synchronized void forceAllocatedFromQuery(long sizeInBytes)
+ private synchronized void forceAllocateFromQuery(long sizeInBytes)
throws LoadRuntimeOutOfMemoryException {
for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
// allocate memory from queryEngine
@@ -90,7 +90,7 @@ public class LoadTsFileMemoryManager {
public synchronized LoadTsFileAnalyzeSchemaMemoryBlock
allocateAnalyzeSchemaMemoryBlock(
long sizeInBytes) throws LoadRuntimeOutOfMemoryException {
try {
- forceAllocatedFromQuery(sizeInBytes);
+ forceAllocateFromQuery(sizeInBytes);
} catch (LoadRuntimeOutOfMemoryException e) {
if (dataCacheMemoryBlock != null &&
dataCacheMemoryBlock.doShrink(sizeInBytes)) {
return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
@@ -100,6 +100,30 @@ public class LoadTsFileMemoryManager {
return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
}
+ /**
+ * Resize the memory block to the new size.
+ *
+ * @throws LoadRuntimeOutOfMemoryException if failed to allocate enough
memory
+ */
+ synchronized void forceResize(LoadTsFileAnalyzeSchemaMemoryBlock
memoryBlock, long newSizeInBytes)
+ throws LoadRuntimeOutOfMemoryException {
+ if (memoryBlock.getTotalMemorySizeInBytes() >= newSizeInBytes) {
+ releaseToQuery(memoryBlock.getTotalMemorySizeInBytes() - newSizeInBytes);
+ memoryBlock.setTotalMemorySizeInBytes(newSizeInBytes);
+ return;
+ }
+
+ long bytesNeeded = newSizeInBytes -
memoryBlock.getTotalMemorySizeInBytes();
+ try {
+ forceAllocateFromQuery(bytesNeeded);
+ } catch (LoadRuntimeOutOfMemoryException e) {
+ if (dataCacheMemoryBlock == null ||
!dataCacheMemoryBlock.doShrink(bytesNeeded)) {
+ throw e;
+ }
+ }
+ memoryBlock.setTotalMemorySizeInBytes(newSizeInBytes);
+ }
+
public synchronized LoadTsFileDataCacheMemoryBlock
allocateDataCacheMemoryBlock()
throws LoadRuntimeOutOfMemoryException {
if (dataCacheMemoryBlock == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
index 8d6919062ca..f2536f94568 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -389,8 +389,17 @@ public class TsFileSplitter {
}
byte versionNumber = reader.readVersionNumber();
- if (versionNumber != TSFileConfig.VERSION_NUMBER) {
- logger.error("the file's Version Number is incorrect, file path: {}",
reader.getFileName());
+ if (versionNumber < TSFileConfig.VERSION_NUMBER) {
+ if (versionNumber == TSFileConfig.VERSION_NUMBER_V3 &&
TSFileConfig.VERSION_NUMBER == 4) {
+ logger.info(
+ "try to load TsFile V3 into current version (V4), file path: {}",
reader.getFileName());
+ } else {
+ logger.error("the file's Version Number is too old, file path: {}",
reader.getFileName());
+ return false;
+ }
+ } else if (versionNumber > TSFileConfig.VERSION_NUMBER) {
+ logger.error(
+ "the file's Version Number is higher than current, file path: {}",
reader.getFileName());
return false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
index 9e7b6f7ecb8..6d10c731e7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
@@ -19,10 +19,8 @@
package org.apache.iotdb.db.utils;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
@@ -38,6 +36,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
public class ModificationUtils {
private ModificationUtils() {
@@ -194,9 +194,10 @@ public class ModificationUtils {
public static boolean isDeviceDeletedByMods(
Collection<Modification> modifications, IDeviceID device, long
startTime, long endTime)
throws IllegalPathException {
+ final MeasurementPath deviceWithWildcard = new MeasurementPath(device,
ONE_LEVEL_PATH_WILDCARD);
for (Modification modification : modifications) {
- PartialPath path = modification.getPath();
- if (path.include(new MeasurementPath(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD))
+ MeasurementPath path = modification.getPath();
+ if (path.matchFullPath(deviceWithWildcard)
&& ((Deletion) modification).getTimeRange().contains(startTime,
endTime)) {
return true;
}
@@ -211,9 +212,10 @@ public class ModificationUtils {
long startTime,
long endTime)
throws IllegalPathException {
+ final MeasurementPath measurementPath = new MeasurementPath(device,
timeseriesId);
for (Modification modification : modifications) {
- PartialPath path = modification.getPath();
- if (path.include(new MeasurementPath(device, timeseriesId))
+ MeasurementPath path = modification.getPath();
+ if (path.matchFullPath(measurementPath)
&& ((Deletion) modification).getTimeRange().contains(startTime,
endTime)) {
return true;
}