This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch debug-baowu in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0efd3230ca064d06be684097cdb072cc789034ea Author: LiuXuxin <[email protected]> AuthorDate: Thu Oct 27 22:13:01 2022 +0800 temp --- .../inner/utils/InnerSpaceCompactionUtils.java | 32 ++++++++++++++++++++-- .../utils/SingleSeriesCompactionExecutor.java | 28 ++++++++++++------- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java index e21a5c3a57..16f319b3da 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/InnerSpaceCompactionUtils.java @@ -33,13 +33,14 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.query.control.FileReaderManager; import org.apache.iotdb.db.rescon.SystemInfo; -import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; +import org.apache.iotdb.tsfile.write.chunk.AlignedChunkWriterImpl; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; @@ -50,8 +51,12 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.TreeMap; public class InnerSpaceCompactionUtils { @@ -99,7 +104,7 @@ public class InnerSpaceCompactionUtils { private static void checkThreadInterrupted(TsFileResource tsFileResource) throws InterruptedException { - if (Thread.interrupted() || !IoTDB.activated) { + if (Thread.interrupted()) { throw new InterruptedException( String.format( "[Compaction] compaction for target file %s abort", tsFileResource.toString())); @@ -114,6 +119,8 @@ public class InnerSpaceCompactionUtils { throws IOException, MetadataException, InterruptedException { MultiTsFileDeviceIterator.MeasurementIterator seriesIterator = deviceIterator.iterateNotAlignedSeries(device, true); + Map<Long, Map<String, TsPrimitiveType>> valueMap = new TreeMap<>(); + Map<String, IMeasurementSchema> schemaMap = new HashMap<>(); while (seriesIterator.hasNextSeries()) { checkThreadInterrupted(targetResource); // TODO: we can provide a configuration item to enable concurrent between each series @@ -125,9 +132,21 @@ public class InnerSpaceCompactionUtils { LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList = seriesIterator.getMetadataListForCurrentSeries(); SingleSeriesCompactionExecutor compactionExecutorOfCurrentTimeSeries = - new SingleSeriesCompactionExecutor(p, readerAndChunkMetadataList, writer, targetResource); + new SingleSeriesCompactionExecutor( + p, readerAndChunkMetadataList, writer, targetResource, valueMap, schemaMap); compactionExecutorOfCurrentTimeSeries.execute(); } + List<IMeasurementSchema> schemaList = new ArrayList<>(schemaMap.values()); + AlignedChunkWriterImpl chunkWriter = new AlignedChunkWriterImpl(schemaList); + for (Map.Entry<Long, Map<String, TsPrimitiveType>> entry : valueMap.entrySet()) { + Map<String, TsPrimitiveType> vMap = entry.getValue(); + TsPrimitiveType[] types = new TsPrimitiveType[schemaList.size()]; + for (int i = 0; i < schemaList.size(); ++i) { + types[i] = vMap.get(schemaList.get(i).getMeasurementId()); + } + chunkWriter.write(entry.getKey(), types); + } + chunkWriter.writeToFileWriter(writer); writer.checkMetadataSizeAndMayFlush(); } @@ -264,4 +283,11 @@ public class InnerSpaceCompactionUtils { targetResource.serialize(); targetResource.close(); } + + public static void main(String[] args) throws Exception { + TsFileResource sourceResource = + new TsFileResource(new File("E:\\1664917514523-394-1-0.tsfile")); + TsFileResource targetResource = new TsFileResource(new File("E:\\Gzip-aligned.tsfile")); + InnerSpaceCompactionUtils.compact(targetResource, Collections.singletonList(sourceResource)); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java index 2d49094f44..32077d7c43 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/inner/utils/SingleSeriesCompactionExecutor.java @@ -28,6 +28,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.metrics.config.MetricConfigDescriptor; import org.apache.iotdb.tsfile.file.header.ChunkHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import org.apache.iotdb.tsfile.read.common.Chunk; @@ -35,6 +36,7 @@ import org.apache.iotdb.tsfile.read.reader.IChunkReader; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReaderByTimestamp; import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -43,8 +45,10 @@ import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; import com.google.common.util.concurrent.RateLimiter; import java.io.IOException; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; /** This class is used to compact one series during inner space compaction. */ public class SingleSeriesCompactionExecutor { @@ -75,11 +79,16 @@ public class SingleSeriesCompactionExecutor { private final boolean enableMetrics = MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric(); + private Map<Long, Map<String, TsPrimitiveType>> valueMap; + private Map<String, IMeasurementSchema> schemaMap; + public SingleSeriesCompactionExecutor( PartialPath series, LinkedList<Pair<TsFileSequenceReader, List<ChunkMetadata>>> readerAndChunkMetadataList, TsFileIOWriter fileWriter, - TsFileResource targetResource) { + TsFileResource targetResource, + Map<Long, Map<String, TsPrimitiveType>> valueMap, + Map<String, IMeasurementSchema> schemaMap) { this.device = series.getDevice(); this.readerAndChunkMetadataList = readerAndChunkMetadataList; this.fileWriter = fileWriter; @@ -88,6 +97,8 @@ public class SingleSeriesCompactionExecutor { this.cachedChunk = null; this.cachedChunkMetadata = null; this.targetResource = targetResource; + this.valueMap = valueMap; + this.schemaMap = schemaMap; } /** @@ -110,7 +121,7 @@ public class SingleSeriesCompactionExecutor { currentChunk.getHeader().getSerializedSize() + currentChunk.getHeader().getDataSize()); // if this chunk is modified, deserialize it into points - if (chunkMetadata.getDeleteIntervalList() != null) { + if (chunkMetadata.getStartTime() > 0) { processModifiedChunk(currentChunk); continue; } @@ -147,7 +158,8 @@ public class SingleSeriesCompactionExecutor { series.getMeasurement(), chunkHeader.getDataType(), chunkHeader.getEncodingType(), - chunkHeader.getCompressionType()); + CompressionType.GZIP); + schemaMap.put(series.getMeasurement(), schema); this.chunkWriter = new ChunkWriterImpl(this.schema); } @@ -220,13 +232,9 @@ public class SingleSeriesCompactionExecutor { IPointReader batchIterator = chunkReader.nextPageData().getBatchDataIterator(); while (batchIterator.hasNextTimeValuePair()) { TimeValuePair timeValuePair = batchIterator.nextTimeValuePair(); - writeTimeAndValueToChunkWriter(timeValuePair); - if (timeValuePair.getTimestamp() > maxEndTimestamp) { - maxEndTimestamp = timeValuePair.getTimestamp(); - } - if (timeValuePair.getTimestamp() < minStartTimestamp) { - minStartTimestamp = timeValuePair.getTimestamp(); - } + valueMap + .computeIfAbsent(timeValuePair.getTimestamp(), x -> new HashMap<>()) + .put(series.getMeasurement(), timeValuePair.getValue()); } } pointCountInChunkWriter += chunk.getChunkStatistic().getCount();
