This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f51a15f7d72 Load: Optimized the partial path split logic in
modifications coverage judgment (#16212)
f51a15f7d72 is described below
commit f51a15f7d72bcd1042c1ed244f92c163a9a276dd
Author: Caideyipi <[email protected]>
AuthorDate: Wed Aug 20 18:39:57 2025 +0800
Load: Optimized the partial path split logic in modifications coverage
judgment (#16212)
* optimize
* shit
* test
* revert
* opti
* fix
---
.../org/apache/iotdb/it/utils/TsFileGenerator.java | 31 +++++++++++++
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 10 +++-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 33 ++++++++------
.../apache/iotdb/db/utils/ModificationUtils.java | 53 +++++++++++-----------
4 files changed, 85 insertions(+), 42 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
index d30c0c36d50..e71d788fc01 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/utils/TsFileGenerator.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.it.utils;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
@@ -248,6 +249,36 @@ public class TsFileGenerator implements AutoCloseable {
new Binary(String.format("test point %d", random.nextInt()),
TSFileConfig.STRING_CHARSET);
}
+ public void generateDeletion(final String device) throws IOException,
IllegalPathException {
+ try (final ModificationFile modificationFile =
+ new ModificationFile(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX)) {
+ modificationFile.write(
+ new Deletion(
+ new PartialPath(
+ device + TsFileConstant.PATH_SEPARATOR +
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD),
+ tsFile.length(),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE));
+ device2TimeSet.remove(device);
+ device2MeasurementSchema.remove(device);
+ }
+ }
+
+ public void generateDeletion(final String device, final MeasurementSchema
measurement)
+ throws IOException, IllegalPathException {
+ try (final ModificationFile modificationFile =
+ new ModificationFile(tsFile.getAbsolutePath() +
ModificationFile.FILE_SUFFIX)) {
+ modificationFile.write(
+ new Deletion(
+ new PartialPath(
+ device + TsFileConstant.PATH_SEPARATOR +
measurement.getMeasurementId()),
+ tsFile.length(),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE));
+ device2MeasurementSchema.get(device).remove(measurement);
+ }
+ }
+
public void generateDeletion(final String device, final int number)
throws IOException, IllegalPathException {
try (final ModificationFile modificationFile =
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 13131dc617f..12247e277ab 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.it;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -781,9 +782,11 @@ public class IoTDBLoadTsFileIT {
generator.registerTimeseries(
SchemaConfig.DEVICE_3,
Collections.singletonList(SchemaConfig.MEASUREMENT_30));
generator.registerAlignedTimeseries(
- SchemaConfig.DEVICE_4,
Collections.singletonList(SchemaConfig.MEASUREMENT_40));
+ SchemaConfig.DEVICE_4,
+ new ArrayList<>(Arrays.asList(SchemaConfig.MEASUREMENT_30,
SchemaConfig.MEASUREMENT_40)));
generator.generateData(SchemaConfig.DEVICE_2, 100, PARTITION_INTERVAL /
10_000, false);
generator.generateData(SchemaConfig.DEVICE_3, 100, PARTITION_INTERVAL /
10_000, false);
+ generator.generateDeletion(SchemaConfig.DEVICE_3);
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL /
10_000, true);
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
@@ -791,6 +794,7 @@ public class IoTDBLoadTsFileIT {
generator.generateData(SchemaConfig.DEVICE_4, 100, PARTITION_INTERVAL /
10_000, true);
generator.generateDeletion(SchemaConfig.DEVICE_2, 2);
generator.generateDeletion(SchemaConfig.DEVICE_4, 2);
+ generator.generateDeletion(SchemaConfig.DEVICE_4,
SchemaConfig.MEASUREMENT_30);
writtenPoint2 = generator.getTotalNumber();
}
@@ -810,6 +814,10 @@ public class IoTDBLoadTsFileIT {
Assert.fail("This ResultSet is empty.");
}
}
+
+ TestUtils.assertSingleResultSetEqual(
+ TestUtils.executeQueryWithRetry(statement, "count timeSeries"),
+ Collections.singletonMap("count(timeseries)", "18"));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 43ae6b5a0e7..a02393d3906 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
@@ -58,7 +59,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
-import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
@@ -73,6 +73,7 @@ import
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -97,7 +98,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -527,10 +527,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
public void autoCreateAndVerify(
TsFileSequenceReader reader,
- Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadataList)
+ Map<IDeviceID, List<TimeseriesMetadata>> device2TimeSeriesMetadataList)
throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
for (final Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
- device2TimeseriesMetadataList.entrySet()) {
+ device2TimeSeriesMetadataList.entrySet()) {
final IDeviceID device = entry.getKey();
try {
@@ -546,7 +546,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) {
try {
- if (schemaCache.isTimeseriesDeletedByMods(device,
timeseriesMetadata)) {
+ if (schemaCache.isTimeSeriesDeletedByMods(device,
timeseriesMetadata)) {
continue;
}
} catch (IllegalPathException e) {
@@ -554,7 +554,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
// IllegalPathException.
if (!timeseriesMetadata.getMeasurementId().isEmpty()) {
LOGGER.warn(
- "Failed to check if device {}, timeseries {} is deleted by
mods. Will see it as not deleted.",
+ "Failed to check if device {}, timeSeries {} is deleted by
mods. Will see it as not deleted.",
device,
timeseriesMetadata.getMeasurementId(),
e);
@@ -926,7 +926,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private Map<IDeviceID, Boolean> tsFileDevice2IsAligned;
private Set<PartialPath> alreadySetDatabases;
- private Collection<Modification> currentModifications;
+ private PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
currentModifications;
private ITimeIndex currentTimeIndex;
private long batchDevice2TimeSeriesSchemasMemoryUsageSizeInBytes = 0;
@@ -944,7 +944,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
this.tsFileDevice2IsAligned = new HashMap<>();
this.alreadySetDatabases = new HashSet<>();
- this.currentModifications = new ArrayList<>();
+ this.currentModifications =
PatternTreeMapFactory.getModsPatternTreeMap();
}
public Map<IDeviceID, Set<MeasurementSchema>> getDevice2TimeSeries() {
@@ -1003,10 +1003,13 @@ public class LoadTsFileAnalyzer implements
AutoCloseable {
public void setCurrentModificationsAndTimeIndex(TsFileResource resource)
throws IOException {
clearModificationsAndTimeIndex();
- currentModifications = resource.getModFile().getModifications();
- for (final Modification modification : currentModifications) {
- currentModificationsMemoryUsageSizeInBytes += ((Deletion)
modification).getSerializedSize();
- }
+ resource
+ .getModFile()
+ .getModifications()
+ .forEach(
+ modification ->
currentModifications.append(modification.getPath(), modification));
+
+ currentModificationsMemoryUsageSizeInBytes =
currentModifications.ramBytesUsed();
block.addMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
if (resource.resourceFileExists()) {
@@ -1028,9 +1031,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
currentModifications, currentTimeIndex, device);
}
- public boolean isTimeseriesDeletedByMods(
+ public boolean isTimeSeriesDeletedByMods(
IDeviceID device, TimeseriesMetadata timeseriesMetadata) throws
IllegalPathException {
- return ModificationUtils.isTimeseriesDeletedByMods(
+ return ModificationUtils.isTimeSeriesDeletedByMods(
currentModifications,
device,
timeseriesMetadata.getMeasurementId(),
@@ -1068,7 +1071,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
public void clearModificationsAndTimeIndex() {
- currentModifications.clear();
+ currentModifications = PatternTreeMapFactory.getModsPatternTreeMap();
currentTimeIndex = null;
block.reduceMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
block.reduceMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
index 67a3c3c9e8e..8b70bdd6c6a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
@@ -19,16 +19,18 @@
package org.apache.iotdb.db.utils;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -37,7 +39,6 @@ import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Pair;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Objects;
@@ -195,33 +196,31 @@ public class ModificationUtils {
* There are some slight differences from that in {@link SettleSelectorImpl}.
*/
public static boolean isDeviceDeletedByMods(
- Collection<Modification> modifications, IDeviceID device, long
startTime, long endTime)
+ final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
modifications,
+ final IDeviceID device,
+ final long startTime,
+ final long endTime)
throws IllegalPathException {
- for (Modification modification : modifications) {
- PartialPath path = modification.getPath();
- if (path.include(new PartialPath(device,
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD))
- && ((Deletion) modification).getTimeRange().contains(startTime,
endTime)) {
- return true;
- }
- }
- return false;
+ final List<Modification> mods =
+ modifications.getOverlapped(
+ CompactionPathUtils.getPath(device,
AlignedPath.VECTOR_PLACEHOLDER));
+ return mods.stream()
+ .anyMatch(
+ modification -> ((Deletion)
modification).getTimeRange().contains(startTime, endTime));
}
- public static boolean isTimeseriesDeletedByMods(
- Collection<Modification> modifications,
- IDeviceID device,
- String timeseriesId,
- long startTime,
- long endTime)
+ public static boolean isTimeSeriesDeletedByMods(
+ final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
modifications,
+ final IDeviceID device,
+ final String timeSeriesId,
+ final long startTime,
+ final long endTime)
throws IllegalPathException {
- for (Modification modification : modifications) {
- PartialPath path = modification.getPath();
- if (path.include(new PartialPath(device, timeseriesId))
- && ((Deletion) modification).getTimeRange().contains(startTime,
endTime)) {
- return true;
- }
- }
- return false;
+ final List<Modification> mods =
+ modifications.getOverlapped(CompactionPathUtils.getPath(device,
timeSeriesId));
+ return mods.stream()
+ .anyMatch(
+ modification -> ((Deletion)
modification).getTimeRange().contains(startTime, endTime));
}
private static void doModifyChunkMetaData(Modification modification,
IChunkMetadata metaData) {
@@ -298,7 +297,9 @@ public class ModificationUtils {
}
public static boolean isDeviceDeletedByMods(
- Collection<Modification> currentModifications, ITimeIndex
currentTimeIndex, IDeviceID device)
+ PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
currentModifications,
+ ITimeIndex currentTimeIndex,
+ IDeviceID device)
throws IllegalPathException {
return isDeviceDeletedByMods(
currentModifications,