This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f39cf38d570 Ignore data of deprecated table in compaction (#16543)
f39cf38d570 is described below
commit f39cf38d57035e5daa8023d43b64f0f825bac30f
Author: shuwenwei <[email protected]>
AuthorDate: Fri Oct 10 11:51:54 2025 +0800
Ignore data of deprecated table in compaction (#16543)
* ignore data of deprecated table in compaction
* fix ut
---
.../performer/impl/FastCompactionPerformer.java | 7 +-
.../impl/ReadChunkCompactionPerformer.java | 5 +-
.../impl/ReadPointCompactionPerformer.java | 6 +-
.../execute/utils/CompactionTableSchema.java | 11 +-
.../utils/CompactionTableSchemaCollector.java | 33 +++++-
.../execute/utils/MultiTsFileDeviceIterator.java | 23 ++++
.../CompactionTableSchemaCollectorTest.java | 8 +-
.../TableModelFastCompactionPerformerTest.java | 125 +++++++++++++++------
...TableModelReadChunkCompactionPerformerTest.java | 94 +++++++++-------
9 files changed, 216 insertions(+), 96 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
index 35383269fe1..fafba7ac1bd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java
@@ -125,7 +125,8 @@ public class FastCompactionPerformer
? new FastCrossCompactionWriter(targetFiles, seqFiles,
readerCacheMap)
: new FastInnerCompactionWriter(targetFiles)) {
List<Schema> schemas =
- CompactionTableSchemaCollector.collectSchema(seqFiles, unseqFiles,
readerCacheMap);
+ CompactionTableSchemaCollector.collectSchema(
+ seqFiles, unseqFiles, readerCacheMap,
deviceIterator.getDeprecatedTableSchemaMap());
compactionWriter.setSchemaForAllTargetFile(schemas);
readModification(seqFiles);
readModification(unseqFiles);
@@ -210,6 +211,10 @@ public class FastCompactionPerformer
measurementSchemas.add(entry.getValue().left);
timeseriesMetadataOffsetMap.put(entry.getKey(), entry.getValue().right);
}
+ // current device may be ignored by some conditions
+ if (measurementSchemas.isEmpty()) {
+ return;
+ }
FastCompactionTaskSummary taskSummary = new FastCompactionTaskSummary();
new FastCompactionPerformerSubTask(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
index 9ce42946416..837978b4ce1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java
@@ -91,7 +91,10 @@ public class ReadChunkCompactionPerformer implements
ISeqCompactionPerformer {
PageException {
try (MultiTsFileDeviceIterator deviceIterator = new
MultiTsFileDeviceIterator(seqFiles)) {
schema =
- CompactionTableSchemaCollector.collectSchema(seqFiles,
deviceIterator.getReaderMap());
+ CompactionTableSchemaCollector.collectSchema(
+ seqFiles,
+ deviceIterator.getReaderMap(),
+ deviceIterator.getDeprecatedTableSchemaMap());
while (deviceIterator.hasNextDevice()) {
currentWriter = getAvailableCompactionWriter();
Pair<IDeviceID, Boolean> deviceInfo = deviceIterator.nextDevice();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index 447ca149462..ca650f28a75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -118,7 +118,10 @@ public class ReadPointCompactionPerformer
new MultiTsFileDeviceIterator(seqFiles, unseqFiles);
List<Schema> schemas =
CompactionTableSchemaCollector.collectSchema(
- seqFiles, unseqFiles, deviceIterator.getReaderMap());
+ seqFiles,
+ unseqFiles,
+ deviceIterator.getReaderMap(),
+ deviceIterator.getDeprecatedTableSchemaMap());
compactionWriter.setSchemaForAllTargetFile(schemas);
while (deviceIterator.hasNextDevice()) {
checkThreadInterrupted();
@@ -271,6 +274,7 @@ public class ReadPointCompactionPerformer
} else {
seriesPath = new NonAlignedFullPath(deviceId, measurementSchemas.get(0));
}
+
return new SeriesDataBlockReader(
seriesPath, new HashSet<>(allSensors), fragmentInstanceContext,
queryDataSource, true);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
index 605cdda0230..3f6e83cbe96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchema.java
@@ -33,9 +33,9 @@ public class CompactionTableSchema extends TableSchema {
super(tableName);
}
- public void merge(TableSchema tableSchema) {
+ public boolean merge(TableSchema tableSchema) {
if (tableSchema == null) {
- return;
+ return true;
}
if (!tableSchema.getTableName().equals(this.tableName)) {
throw new CompactionTableSchemaNotMatchException(
@@ -60,11 +60,7 @@ public class CompactionTableSchema extends TableSchema {
IMeasurementSchema idColumnToMerge = idColumnSchemasToMerge.get(i);
IMeasurementSchema currentIdColumn = measurementSchemas.get(i);
if
(!idColumnToMerge.getMeasurementName().equals(currentIdColumn.getMeasurementName()))
{
- throw new CompactionTableSchemaNotMatchException(
- "current id column name is "
- + currentIdColumn.getMeasurementName()
- + ", other id column name in same position is "
- + idColumnToMerge.getMeasurementName());
+ return false;
}
}
@@ -75,6 +71,7 @@ public class CompactionTableSchema extends TableSchema {
columnCategories.add(ColumnCategory.TAG);
measurementSchemas.add(newIdColumn);
}
+ return true;
}
public CompactionTableSchema copy() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
index 002858d93cf..55640c3fcfa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionTableSchemaCollector.java
@@ -28,8 +28,10 @@ import org.apache.tsfile.write.schema.Schema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -39,13 +41,17 @@ public class CompactionTableSchemaCollector {
public static List<Schema> collectSchema(
List<TsFileResource> seqFiles,
List<TsFileResource> unseqFiles,
- Map<TsFileResource, TsFileSequenceReader> readerMap)
+ Map<TsFileResource, TsFileSequenceReader> readerMap,
+ Map<TsFileResource, Set<String>> deprecatedTableSchemaMap)
throws IOException {
List<Schema> targetSchemas = new ArrayList<>(seqFiles.size());
Schema schema =
collectSchema(
- Stream.concat(seqFiles.stream(),
unseqFiles.stream()).collect(Collectors.toList()),
- readerMap);
+ Stream.concat(seqFiles.stream(), unseqFiles.stream())
+ .sorted(TsFileResource::compareFileName)
+ .collect(Collectors.toList()),
+ readerMap,
+ deprecatedTableSchemaMap);
targetSchemas.add(schema);
for (int i = 1; i < seqFiles.size(); i++) {
@@ -64,11 +70,14 @@ public class CompactionTableSchemaCollector {
}
public static Schema collectSchema(
- List<TsFileResource> sourceFiles, Map<TsFileResource,
TsFileSequenceReader> readerMap)
+ List<TsFileResource> sourceFiles,
+ Map<TsFileResource, TsFileSequenceReader> readerMap,
+ Map<TsFileResource, Set<String>> deprecatedTableSchemaMap)
throws IOException {
Schema targetSchema = new Schema();
Map<String, TableSchema> targetTableSchemaMap = new HashMap<>();
- for (TsFileResource resource : sourceFiles) {
+ for (int i = 0; i < sourceFiles.size(); i++) {
+ TsFileResource resource = sourceFiles.get(i);
TsFileSequenceReader reader = readerMap.get(resource);
Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap();
if (tableSchemaMap == null) {
@@ -89,7 +98,19 @@ public class CompactionTableSchemaCollector {
collectedTableSchema = new CompactionTableSchema(tableName);
targetTableSchemaMap.put(tableName, collectedTableSchema);
}
- collectedTableSchema.merge(currentTableSchema);
+ boolean canMerge = collectedTableSchema.merge(currentTableSchema);
+ if (!canMerge) {
+ // mark resources with deprecated table schema
+ for (int j = 0; j < i; j++) {
+ deprecatedTableSchemaMap
+ .computeIfAbsent(sourceFiles.get(j), k -> new HashSet<>())
+ .add(tableName);
+ }
+ // replace old table schema in targetTableSchemaMap
+ collectedTableSchema = new CompactionTableSchema(tableName);
+ collectedTableSchema.merge(currentTableSchema);
+ targetTableSchemaMap.put(tableName, collectedTableSchema);
+ }
}
}
targetTableSchemaMap.values().forEach(targetSchema::registerTableSchema);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
index 2a49d28bae3..e662e8ed452 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java
@@ -57,6 +57,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -69,6 +70,7 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
// sort from the oldest to the newest by version (Used by ReadChunkPerformer)
private List<TsFileResource> tsFileResourcesSortedByAsc;
private Map<TsFileResource, TsFileSequenceReader> readerMap = new
HashMap<>();
+ private final Map<TsFileResource, Set<String>> deprecatedTableSchemaMap =
new HashMap<>();
private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap =
new HashMap<>();
private final Map<TsFileResource, PatternTreeMap<ModEntry,
PatternTreeMapFactory.ModsSerializer>>
modificationCache = new HashMap<>();
@@ -344,6 +346,9 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
// which means this tsfile does not contain the current device, then
skip it.
continue;
}
+ if (isCurrentDeviceDataInDeprecatedTable(resource)) {
+ continue;
+ }
TsFileSequenceReader reader = readerMap.get(resource);
for (Map.Entry<String, Pair<List<IChunkMetadata>, Pair<Long, Long>>>
entrySet :
@@ -408,6 +413,9 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
if (!currentDevice.equals(iterator.current())) {
continue;
}
+ if (isCurrentDeviceDataInDeprecatedTable(tsFileResource)) {
+ continue;
+ }
MetadataIndexNode firstMeasurementNodeOfCurrentDevice =
iterator.getFirstMeasurementNodeOfCurrentDevice();
TsFileSequenceReader reader = readerMap.get(tsFileResource);
@@ -479,6 +487,10 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
return readerMap;
}
+ public Map<TsFileResource, Set<String>> getDeprecatedTableSchemaMap() {
+ return deprecatedTableSchemaMap;
+ }
+
@Override
public void close() throws IOException {
for (TsFileSequenceReader reader : readerMap.values()) {
@@ -695,4 +707,15 @@ public class MultiTsFileDeviceIterator implements
AutoCloseable {
return readerAndChunkMetadataForThisSeries;
}
}
+
+ // skip data of deleted table
+ private boolean isCurrentDeviceDataInDeprecatedTable(TsFileResource
resource) {
+ if (ignoreAllNullRows) {
+ return false;
+ }
+ String tableName = currentDevice.getLeft().getTableName();
+ Set<String> deprecatedTablesInCurrentFile =
deprecatedTableSchemaMap.get(resource);
+ return deprecatedTablesInCurrentFile != null
+ && deprecatedTablesInCurrentFile.contains(tableName);
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionTableSchemaCollectorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionTableSchemaCollectorTest.java
index 932573ddbce..97006f6d43a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionTableSchemaCollectorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/CompactionTableSchemaCollectorTest.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.tablemodel;
-import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionTableSchemaNotMatchException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionTableSchema;
import org.apache.tsfile.enums.ColumnCategory;
@@ -104,11 +103,6 @@ public class CompactionTableSchemaCollectorTest {
columnTypeList4.add(ColumnCategory.FIELD);
columnTypeList4.add(ColumnCategory.TAG);
TableSchema tableSchema4 = new TableSchema("t1", measurementSchemaList4,
columnTypeList4);
- try {
- compactionTableSchema.merge(tableSchema4);
- } catch (CompactionTableSchemaNotMatchException e) {
- return;
- }
- Assert.fail();
+ Assert.assertFalse(compactionTableSchema.merge(tableSchema4));
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelFastCompactionPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelFastCompactionPerformerTest.java
index 66602eb347e..778220d8263 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelFastCompactionPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelFastCompactionPerformerTest.java
@@ -32,6 +32,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -41,6 +42,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
public class TableModelFastCompactionPerformerTest extends
AbstractCompactionTest {
@@ -63,8 +66,8 @@ public class TableModelFastCompactionPerformerTest extends
AbstractCompactionTes
new CompactionTableModelTestFileWriter(resource1)) {
writer.registerTableSchema("t1", Arrays.asList("id1", "id2"));
writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -76,13 +79,8 @@ public class TableModelFastCompactionPerformerTest extends
AbstractCompactionTes
new CompactionTableModelTestFileWriter(resource2)) {
writer.registerTableSchema("t2", Arrays.asList("id1", "id2", "id3"));
writer.startChunkGroup("t2", Arrays.asList("id_field1", "id_field2",
"id_field3"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
- new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
- TSEncoding.PLAIN,
- CompressionType.LZ4);
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s2",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -95,13 +93,8 @@ public class TableModelFastCompactionPerformerTest extends
AbstractCompactionTes
new CompactionTableModelTestFileWriter(resource3)) {
writer.registerTableSchema("t1", Arrays.asList("id1", "id2"));
writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
- new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
- TSEncoding.PLAIN,
- CompressionType.LZ4);
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s2",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -113,13 +106,8 @@ public class TableModelFastCompactionPerformerTest extends
AbstractCompactionTes
new CompactionTableModelTestFileWriter(resource4)) {
writer.registerTableSchema("t2", Arrays.asList("id1", "id2", "id3"));
writer.startChunkGroup("t2", Arrays.asList("id_field1", "id_field2",
"id_field3"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
- new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
- TSEncoding.PLAIN,
- CompressionType.LZ4);
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s2",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -172,24 +160,24 @@ public class TableModelFastCompactionPerformerTest
extends AbstractCompactionTes
new CompactionTableModelTestFileWriter(resource2)) {
writer.registerTableSchema("db1.t1", Arrays.asList("id1", "id2"));
writer.startChunkGroup("db1.t1", Arrays.asList("id_field1",
"id_field2"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
writer.startChunkGroup("d1");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
writer.startChunkGroup("node1.node2.device");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -218,16 +206,16 @@ public class TableModelFastCompactionPerformerTest
extends AbstractCompactionTes
new CompactionTableModelTestFileWriter(resource1)) {
writer.registerTableSchema("db1.t1", Arrays.asList("id1", "id2"));
writer.startChunkGroup("db1.t1", Arrays.asList("id_field1",
"id_field2"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
writer.startChunkGroup("d1");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -248,4 +236,75 @@ public class TableModelFastCompactionPerformerTest extends
AbstractCompactionTes
Assert.assertEquals(1, reader.getTableSchemaMap().size());
}
}
+
+ @Test
+ public void testCrossSpaceCompactionOfTableModelCanNotMatchTableSchema()
throws IOException {
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ try (CompactionTableModelTestFileWriter writer =
+ new CompactionTableModelTestFileWriter(resource1)) {
+ writer.registerTableSchema("t1", Arrays.asList("id1", "id2"));
+ writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2"));
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
+ new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ TsFileResource resource2 = createEmptyFileAndResource(false);
+ try (CompactionTableModelTestFileWriter writer =
+ new CompactionTableModelTestFileWriter(resource2)) {
+ writer.registerTableSchema("t1", Arrays.asList("id1", "id3"));
+ writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field3"));
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
+ new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(1, 2)}}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ TsFileResource resource3 = createEmptyFileAndResource(true);
+ try (CompactionTableModelTestFileWriter writer =
+ new CompactionTableModelTestFileWriter(resource3)) {
+ writer.registerTableSchema("t1", Arrays.asList("id1", "id4"));
+ writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field3"));
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
+ new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(100, 200)}}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ seqResources.add(resource1);
+ unseqResources.add(resource2);
+ seqResources.add(resource3);
+ tsFileManager.addAll(seqResources, true);
+
+ CrossSpaceCompactionTask task =
+ new CrossSpaceCompactionTask(
+ 0,
+ tsFileManager,
+ seqResources,
+ unseqResources,
+ new FastCompactionPerformer(true),
+ 0,
+ 0);
+ Assert.assertTrue(task.start());
+ TsFileResource targetResource = tsFileManager.getTsFileList(true).get(0);
+ Assert.assertEquals(1, targetResource.getDevices().size());
+ try (TsFileSequenceReader reader =
+ new
TsFileSequenceReader(targetResource.getTsFile().getAbsolutePath())) {
+ Assert.assertEquals(1, reader.getTableSchemaMap().size());
+ Assert.assertTrue(reader.getTableSchemaMap().containsKey("t1"));
+ List<IMeasurementSchema> tableSchema =
+ reader.getTableSchemaMap().get("t1").getColumnSchemas();
+ Assert.assertEquals("id1", tableSchema.get(0).getMeasurementName());
+ Assert.assertEquals("id4", tableSchema.get(1).getMeasurementName());
+ Assert.assertEquals("s1", tableSchema.get(2).getMeasurementName());
+ Assert.assertEquals("s2", tableSchema.get(3).getMeasurementName());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelReadChunkCompactionPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelReadChunkCompactionPerformerTest.java
index 7c175a16c84..f2721652f99 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelReadChunkCompactionPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/tablemodel/TableModelReadChunkCompactionPerformerTest.java
@@ -36,6 +36,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.TimeRange;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -45,6 +46,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -78,8 +80,8 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
new CompactionTableModelTestFileWriter(resource1)) {
writer.registerTableSchema("t1", Arrays.asList("id1", "id2"));
writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -91,13 +93,8 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
new CompactionTableModelTestFileWriter(resource2)) {
writer.registerTableSchema("t1", Arrays.asList("id1", "id2", "id3"));
writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2",
"id_field3"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
- new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
- TSEncoding.PLAIN,
- CompressionType.LZ4);
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s2",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -128,8 +125,8 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
TsFileResource resource1 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource1)) {
writer.startChunkGroup("d1");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -139,13 +136,8 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
TsFileResource resource2 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource2)) {
writer.startChunkGroup("d1");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
- new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
- TSEncoding.PLAIN,
- CompressionType.LZ4);
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s2",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -168,7 +160,7 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
reader.getAllTimeseriesMetadata(true);
for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry :
allTimeseriesMetadata.entrySet()) {
- Assert.assertEquals(2, entry.getValue().size());
+ Assert.assertEquals(3, entry.getValue().size());
}
}
}
@@ -178,8 +170,8 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
TsFileResource resource1 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource1)) {
writer.startChunkGroup("d1");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -191,8 +183,8 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
new CompactionTableModelTestFileWriter(resource2)) {
writer.registerTableSchema("t1", Arrays.asList("id1", "id2"));
writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -215,15 +207,15 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
}
@Test
- public void
testSequenceInnerSpaceCompactionOfTwoV4TreeModelCanNotMatchTableSchema()
+ public void
testSequenceInnerSpaceCompactionOfTableModelCanNotMatchTableSchema()
throws IOException {
TsFileResource resource1 = createEmptyFileAndResource(true);
try (CompactionTableModelTestFileWriter writer =
new CompactionTableModelTestFileWriter(resource1)) {
writer.registerTableSchema("t1", Arrays.asList("id1", "id2"));
writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field2"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
@@ -235,14 +227,22 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
new CompactionTableModelTestFileWriter(resource2)) {
writer.registerTableSchema("t1", Arrays.asList("id1", "id3"));
writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field3"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s2",
- new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(20, 22)}}},
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ TsFileResource resource3 = createEmptyFileAndResource(true);
+ try (CompactionTableModelTestFileWriter writer =
+ new CompactionTableModelTestFileWriter(resource3)) {
+ writer.registerTableSchema("t1", Arrays.asList("id1", "id4"));
+ writer.startChunkGroup("t1", Arrays.asList("id_field1", "id_field3"));
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Arrays.asList("s1", "s2"),
+ new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(100, 200)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
@@ -250,12 +250,26 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
}
seqResources.add(resource1);
seqResources.add(resource2);
+ seqResources.add(resource3);
tsFileManager.addAll(seqResources, true);
InnerSpaceCompactionTask task =
new InnerSpaceCompactionTask(
0, tsFileManager, seqResources, true, new
ReadChunkCompactionPerformer(), 0);
- Assert.assertFalse(task.start());
+ Assert.assertTrue(task.start());
+ TsFileResource targetResource = tsFileManager.getTsFileList(true).get(0);
+ Assert.assertEquals(1, targetResource.getDevices().size());
+ try (TsFileSequenceReader reader =
+ new
TsFileSequenceReader(targetResource.getTsFile().getAbsolutePath())) {
+ Assert.assertEquals(1, reader.getTableSchemaMap().size());
+ Assert.assertTrue(reader.getTableSchemaMap().containsKey("t1"));
+ List<IMeasurementSchema> tableSchema =
+ reader.getTableSchemaMap().get("t1").getColumnSchemas();
+ Assert.assertEquals("id1", tableSchema.get(0).getMeasurementName());
+ Assert.assertEquals("id4", tableSchema.get(1).getMeasurementName());
+ Assert.assertEquals("s1", tableSchema.get(2).getMeasurementName());
+ Assert.assertEquals("s2", tableSchema.get(3).getMeasurementName());
+ }
}
@Test
@@ -275,32 +289,32 @@ public class TableModelReadChunkCompactionPerformerTest
extends AbstractCompacti
new CompactionTableModelTestFileWriter(resource2)) {
writer.registerTableSchema("db1.t1", Arrays.asList("id1", "id2"));
writer.startChunkGroup("db1.t1", Arrays.asList("id_field1",
"id_field2"));
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
writer.startChunkGroup("d3");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
writer.startChunkGroup("node1.node2.device");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);
writer.endChunkGroup();
writer.startChunkGroup("node1.node2.node3.device");
- writer.generateSimpleNonAlignedSeriesToCurrentDevice(
- "s1",
+ writer.generateSimpleAlignedSeriesToCurrentDevice(
+ Collections.singletonList("s1"),
new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
TSEncoding.PLAIN,
CompressionType.LZ4);