This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-4834 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 58a00a19cbb49965b73c9bfbd1c517e270de3166 Author: Liu Xuxin <[email protected]> AuthorDate: Thu Nov 3 10:44:42 2022 +0800 add data type checking in single series compaction executor --- .../db/engine/compaction/CompactionUtils.java | 27 +++++++++++++ .../utils/SingleSeriesCompactionExecutor.java | 44 +++++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java index 5e5561b43e..ecaceed585 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionUtils.java @@ -19,15 +19,24 @@ package org.apache.iotdb.db.engine.compaction; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternTree; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileNameGenerator; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.mpp.common.schematree.ISchemaTree; +import org.apache.iotdb.db.mpp.plan.analyze.ClusterSchemaFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.ISchemaFetcher; +import org.apache.iotdb.db.mpp.plan.analyze.StandaloneSchemaFetcher; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -217,4 +226,22 @@ public class CompactionUtils { } } } + + public static IMeasurementSchema fetchSchema(String device, String measurementId) + throws IllegalPathException { + ISchemaFetcher schemaFetcher = + IoTDBDescriptor.getInstance().getConfig().isClusterMode() + ? ClusterSchemaFetcher.getInstance() + : StandaloneSchemaFetcher.getInstance(); + PathPatternTree patternTree = new PathPatternTree(); + patternTree.appendFullPath(new PartialPath(device, measurementId)); + patternTree.constructTree(); + ISchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree); + if (!schemaTree.getAllMeasurement().isEmpty()) { + MeasurementPath path = schemaTree.getAllMeasurement().get(0); + return path.getMeasurementSchema(); + } else { + return null; + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java index d1d4a366e7..56ee824fea 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java @@ -18,9 +18,12 @@ */ package org.apache.iotdb.db.engine.compaction.inner.utils; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.CompactionTaskManager; +import org.apache.iotdb.db.engine.compaction.CompactionUtils; import org.apache.iotdb.db.engine.compaction.constant.CompactionType; import org.apache.iotdb.db.engine.compaction.constant.ProcessChunkType; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -40,6 +43,8 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.LinkedList; @@ -47,6 +52,7 @@ import java.util.List; /** This class is used to compact one series during inner space compaction. */ public class SingleSeriesCompactionExecutor { + private static final Logger log = LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private String device; private PartialPath series; private LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList; @@ -63,6 +69,7 @@ public class SingleSeriesCompactionExecutor { private long minStartTimestamp = Long.MAX_VALUE; private long maxEndTimestamp = Long.MIN_VALUE; private long pointCountInChunkWriter = 0; + private boolean alreadyFetchSchema = false; private final long targetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); @@ -110,7 +117,7 @@ public class SingleSeriesCompactionExecutor { * This function execute the compaction of a single time series. Notice, the result of single * series compaction may contain more than one chunk. */ - public void execute() throws IOException { + public void execute() throws IOException, IllegalPathException { while (readerAndChunkMetadataList.size() > 0) { Pair<TsFileSequenceReader, List<ChunkMetadata>> readerListPair = readerAndChunkMetadataList.removeFirst(); @@ -121,6 +128,20 @@ public class SingleSeriesCompactionExecutor { if (this.chunkWriter == null) { constructChunkWriterFromReadChunk(currentChunk); } + + if (!checkDataType(currentChunk)) { + // after fetching the correct schema + // the datatype of current chunk is still inconsistent with schema + // abort current chunk + log.warn( + "Abort a chunk from {}, because the datatype is inconsistent, " + + "type of schema is {}, but type of chunk is {}", + reader.getFileName(), + schema.getType().toString(), + currentChunk.getHeader().getDataType().toString()); + continue; + } + CompactionMetricsRecorder.recordReadInfo( currentChunk.getHeader().getSerializedSize() + currentChunk.getHeader().getDataSize()); @@ -156,6 +177,27 @@ public class SingleSeriesCompactionExecutor { targetResource.updateEndTime(device, maxEndTimestamp); } + private boolean checkDataType(Chunk currentChunk) throws IllegalPathException { + if (currentChunk.getHeader().getDataType() != schema.getType()) { + // the datatype is not consistent + fixSchemaInconsistent(); + } + return currentChunk.getHeader().getDataType() == schema.getType(); + } + + private void fixSchemaInconsistent() throws IllegalPathException { + if (alreadyFetchSchema) { + return; + } + IMeasurementSchema correctSchema = + CompactionUtils.fetchSchema(device, schema.getMeasurementId()); + if (schema.getType() != correctSchema.getType()) { + chunkWriter = new ChunkWriterImpl(correctSchema); + schema = correctSchema; + } + alreadyFetchSchema = true; + } + private void constructChunkWriterFromReadChunk(Chunk chunk) { ChunkHeader chunkHeader = chunk.getHeader(); this.schema =
