This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch load_v2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f8f9b096f7f3825e8d3ad399b1acf75a1e7dcc32 Author: Tian Jiang <[email protected]> AuthorDate: Mon Sep 4 16:28:05 2023 +0800 add MergedTsFileSplitter --- .../execution/load/AlignedChunkData.java | 20 + .../db/queryengine/execution/load/ChunkData.java | 4 + .../db/queryengine/execution/load/LoadUtils.java | 25 + .../execution/load/MergedTsFileSplitter.java | 713 +++++++++++++++++++++ .../execution/load/NonAlignedChunkData.java | 39 ++ .../queryengine/execution/load/TsFileSplitter.java | 9 +- .../execution/load/MergedTsFileSplitterTest.java | 62 ++ .../db/queryengine/execution/load/TestBase.java | 119 ++++ .../execution/load/TsFileSplitterTest.java | 48 ++ .../threadpool/WrappedThreadPoolExecutor.java | 2 +- .../iotdb/tsfile/file/header/ChunkHeader.java | 4 + .../iotdb/tsfile/file/header/PageHeader.java | 11 +- .../iotdb/tsfile/read/TsFileSequenceReader.java | 12 +- .../org/apache/iotdb/tsfile/utils/TsFileUtils.java | 16 + 14 files changed, 1067 insertions(+), 17 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java index 689a6236bb0..6fb25fec0c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/AlignedChunkData.java @@ -208,6 +208,21 @@ public class AlignedChunkData implements ChunkData { } } + @Override + public void writeDecodePage(long[] times, Object[] values, int start, int end) + throws IOException { + pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size() - 1) + 1); + satisfiedLengthQueue.offer(end - start); + // serialize needDecode==true + dataSize += ReadWriteIOUtils.write(true, stream); + dataSize += ReadWriteIOUtils.write(end - start, stream); + + for (int i = start; i < end; i++) { + long time = times[i]; + dataSize += ReadWriteIOUtils.write(time, stream); + } + } + public void writeDecodeValuePage(long[] times, TsPrimitiveType[] values, TSDataType dataType) throws IOException { pageNumbers.set(pageNumbers.size() - 1, pageNumbers.get(pageNumbers.size() - 1) + 1); @@ -419,6 +434,11 @@ public class AlignedChunkData implements ChunkData { stream.close(); } + @Override + public String firstMeasurement() { + return chunkHeaderList.get(0).getMeasurementID(); + } + @Override public String toString() { return "AlignedChunkData{" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java index a30a62e2b16..e80993c5ead 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/ChunkData.java @@ -45,6 +45,10 @@ public interface ChunkData extends TsFileData { void writeDecodePage(long[] times, Object[] values, int satisfiedLength) throws IOException; + void writeDecodePage(long[] times, Object[] values, int start, int end) throws IOException; + + String firstMeasurement(); + @Override default boolean isModification() { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadUtils.java new file mode 100644 index 00000000000..3cbec3dacf3 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadUtils.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.load; + +public class LoadUtils { + + private LoadUtils() { + // Util class + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java new file mode 100644 index 00000000000..1a969ebe766 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitter.java @@ -0,0 +1,713 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.load; + +import java.io.DataOutputStream; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; +import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.utils.TimePartitionUtils; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.common.constant.TsFileConstant; +import org.apache.iotdb.tsfile.encoding.decoder.Decoder; +import org.apache.iotdb.tsfile.exception.TsFileRuntimeException; +import org.apache.iotdb.tsfile.file.MetaMarker; +import org.apache.iotdb.tsfile.file.header.ChunkGroupHeader; +import org.apache.iotdb.tsfile.file.header.ChunkHeader; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.TsFileSequenceReader; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import org.apache.iotdb.tsfile.read.reader.page.TimePageReader; +import org.apache.iotdb.tsfile.read.reader.page.ValuePageReader; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Function; + +public class MergedTsFileSplitter { + + private static final Logger logger = LoggerFactory.getLogger(TsFileSplitter.class); + + private final List<File> tsFiles; + private final Function<TsFileData, Boolean> consumer; + private final PriorityQueue<SplitTask> taskPriorityQueue; + private ExecutorService asyncExecutor; + + public MergedTsFileSplitter(List<File> tsFiles, Function<TsFileData, Boolean> consumer, ExecutorService asyncExecutor) { + this.tsFiles = tsFiles; + this.consumer = consumer; + this.asyncExecutor = asyncExecutor; + taskPriorityQueue = new PriorityQueue<>(); + } + + public void splitTsFileByDataPartition() + throws IOException, IllegalStateException { + for (File tsFile : tsFiles) { + SplitTask splitTask = new SplitTask(tsFile, asyncExecutor); + if (splitTask.hasNext()) { + taskPriorityQueue.add(splitTask); + } + } + + List<SplitTask> equalTasks = new ArrayList<>(); + while (!taskPriorityQueue.isEmpty()) { + SplitTask task = taskPriorityQueue.poll(); + equalTasks.add(task); + // find chunks of the same series in other files + while (!taskPriorityQueue.isEmpty()) { + if (taskPriorityQueue.peek().compareTo(task) == 0) { + equalTasks.add(taskPriorityQueue.poll()); + } else { + break; + } + } + + for (SplitTask equalTask : equalTasks) { + TsFileData tsFileData = equalTask.removeNext(); + consumer.apply(tsFileData); + if (equalTask.hasNext()) { + taskPriorityQueue.add(equalTask); + } + } + equalTasks.clear(); + } + } + + public void close() throws IOException { + for (SplitTask task : taskPriorityQueue) { + task.close(); + } + taskPriorityQueue.clear(); + } + + private class SplitTask implements Comparable<SplitTask> { + + private final File tsFile; + private TsFileSequenceReader reader; + private final TreeMap<Long, List<Deletion>> offset2Deletions; + + private String curDevice; + private boolean isTimeChunkNeedDecode; + private Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData; + private Map<Integer, long[]> pageIndex2Times; + private Map<Long, IChunkMetadata> offset2ChunkMetadata; + + private BlockingQueue<TsFileData> nextSplits; + private TsFileData nextSplit; + private byte marker = -1; + + private ExecutorService asyncExecutor; + private Future<Void> asyncTask; + + public SplitTask(File tsFile, ExecutorService asyncExecutor) throws IOException { + this.tsFile = tsFile; + this.asyncExecutor = asyncExecutor; + offset2Deletions = new TreeMap<>(); + init(); + } + + @Override + public int compareTo(SplitTask o) { + try { + TsFileData thisNext = showNext(); + TsFileData thatNext = o.showNext(); + // out put modification first + if (thisNext.isModification() && thatNext.isModification()) { + return 0; + } + if (thisNext.isModification()) { + return 1; + } + if (thatNext.isModification()) { + return -1; + } + + ChunkData thisChunk = (ChunkData) thisNext; + ChunkData thatChunk = ((ChunkData) thatNext); + Comparator<ChunkData> chunkDataComparator = + Comparator.comparing(ChunkData::getDevice, String::compareTo) + .thenComparing(ChunkData::firstMeasurement, String::compareTo); + return chunkDataComparator.compare(thisChunk, thatChunk); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void init() throws IOException { + this.reader = new TsFileSequenceReader(tsFile.getAbsolutePath()); + getAllModification(offset2Deletions); + + if (!checkMagic(reader)) { + throw new TsFileRuntimeException( + String.format("Magic String check error when parsing TsFile %s.", tsFile.getPath())); + } + reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); + + curDevice = null; + isTimeChunkNeedDecode = true; + pageIndex2ChunkData = new HashMap<>(); + pageIndex2Times = null; + offset2ChunkMetadata = new HashMap<>(); + getChunkMetadata(reader, offset2ChunkMetadata); + + nextSplits = new LinkedBlockingDeque<>(64); + if (asyncExecutor != null) { + asyncTask = asyncExecutor.submit(() -> { + try { + asyncComputeNext(); + } catch (Throwable e) { + logger.info("Exception during splitting", e); + throw e; + } + return null; + }); + } + } + + private void asyncComputeNext() throws IOException { + while (reader != null && !Thread.interrupted()) { + computeNext(); + } + logger.info("{} end splitting", tsFile); + } + + public boolean hasNext() throws IOException { + if (nextSplit != null && !(nextSplit instanceof EmptyTsFileData)) { + return true; + } + if (reader == null && nextSplits.isEmpty()) { + return false; + } + + if (asyncExecutor == null) { + computeNext(); + if (!nextSplits.isEmpty()) { + nextSplit = nextSplits.poll(); + return true; + } else { + return false; + } + } else { + try { + nextSplit = nextSplits.take(); + } catch (InterruptedException e) { + return false; + } + return !(nextSplit instanceof EmptyTsFileData); + } + } + + public TsFileData removeNext() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + TsFileData split = nextSplit; + nextSplit = null; + return split; + } + + public TsFileData showNext() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return nextSplit; + } + + public void close() throws IOException { + if (asyncTask != null) { + asyncTask.cancel(true); + try { + asyncTask.get(); + } catch (CancellationException ignored) { + + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } + } + if (reader != null) { + reader.close(); + reader = null; + } + } + + private byte nextMarker() throws IOException { + if (marker != -1) { + // inherit the marker from the previous breakpoint + // e.g. the marker after processing a chunk + return marker; + } + return marker = reader.readMarker(); + } + + private void insertNewChunk(ChunkData chunkData) throws IOException { + if (asyncExecutor == null) { + nextSplits.add(chunkData); + } else { + try { + nextSplits.put(chunkData); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + } + + @SuppressWarnings({"squid:S3776", "squid:S6541"}) + private void computeNext() throws IOException, IllegalStateException { + if (reader == null) { + return; + } + + while (nextMarker() != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.CHUNK_HEADER: + case MetaMarker.TIME_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: + long chunkOffset = reader.position(); + boolean chunkDataGenerated = + consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData); + handleModification(offset2Deletions, chunkOffset); + if (chunkDataGenerated) { + return; + } + + ChunkHeader header = reader.readChunkHeader(marker); + String measurementId = header.getMeasurementID(); + if (header.getDataSize() == 0) { + throw new TsFileRuntimeException( + String.format( + "Empty Nonaligned Chunk or Time Chunk with offset %d in TsFile %s.", + chunkOffset, tsFile.getPath())); + } + + boolean isAligned = + ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) + == TsFileConstant.TIME_COLUMN_MASK); + IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES); + TTimePartitionSlot timePartitionSlot = + TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime()); + ChunkData chunkData = + ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot); + + if (!needDecodeChunk(chunkMetadata)) { + chunkData.setNotDecode(); + chunkData.writeEntireChunk(reader.readChunk(-1, header.getDataSize()), chunkMetadata); + if (isAligned) { + isTimeChunkNeedDecode = false; + pageIndex2ChunkData + .computeIfAbsent(1, o -> new ArrayList<>()) + .add((AlignedChunkData) chunkData); + } else { + insertNewChunk(chunkData); + } + break; + } + + Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + Decoder valueDecoder = + Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); + int dataSize = header.getDataSize(); + int pageIndex = 0; + if (isAligned) { + isTimeChunkNeedDecode = true; + pageIndex2Times = new HashMap<>(); + } + + while (dataSize > 0) { + PageHeader pageHeader = + reader.readPageHeader( + header.getDataType(), + (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); + long pageDataSize = pageHeader.getSerializedPageSize(); + if (!needDecodePage(pageHeader, chunkMetadata)) { // an entire page + long startTime = + pageHeader.getStatistics() == null + ? chunkMetadata.getStartTime() + : pageHeader.getStartTime(); + TTimePartitionSlot pageTimePartitionSlot = + TimePartitionUtils.getTimePartition(startTime); + if (!timePartitionSlot.equals(pageTimePartitionSlot)) { + if (!isAligned) { + insertNewChunk(chunkData); + } + timePartitionSlot = pageTimePartitionSlot; + chunkData = + ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot); + } + if (isAligned) { + pageIndex2ChunkData + .computeIfAbsent(pageIndex, o -> new ArrayList<>()) + .add((AlignedChunkData) chunkData); + } + chunkData.writeEntirePage(pageHeader, reader.readCompressedPage(pageHeader)); + } else { // split page + ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); + Pair<long[], Object[]> tvArray = + decodePage( + isAligned, pageData, pageHeader, defaultTimeDecoder, valueDecoder, header); + long[] times = tvArray.left; + Object[] values = tvArray.right; + if (isAligned) { + pageIndex2Times.put(pageIndex, times); + } + + int start = 0; + long endTime = + timePartitionSlot.getStartTime() + + TimePartitionUtils.getTimePartitionInterval(); + for (int i = 0; i < times.length; i++) { + if (times[i] >= endTime) { + chunkData.writeDecodePage(times, values, start, i); + if (isAligned) { + pageIndex2ChunkData + .computeIfAbsent(pageIndex, o -> new ArrayList<>()) + .add((AlignedChunkData) chunkData); + } else { + insertNewChunk(chunkData); + } + + timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]); + endTime = + timePartitionSlot.getStartTime() + + TimePartitionUtils.getTimePartitionInterval(); + chunkData = + ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot); + start = i; + } + } + chunkData.writeDecodePage(times, values, start, times.length); + if (isAligned) { + pageIndex2ChunkData + .computeIfAbsent(pageIndex, o -> new ArrayList<>()) + .add((AlignedChunkData) chunkData); + } + } + + pageIndex += 1; + dataSize -= pageDataSize; + } + + if (!isAligned) { + insertNewChunk(chunkData); + } + break; + case MetaMarker.VALUE_CHUNK_HEADER: + case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: + chunkOffset = reader.position(); + chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES); + header = reader.readChunkHeader(marker); + if (header.getDataSize() == 0) { + handleEmptyValueChunk( + header, pageIndex2ChunkData, chunkMetadata, isTimeChunkNeedDecode); + break; + } + + if (!isTimeChunkNeedDecode) { + AlignedChunkData alignedChunkData = pageIndex2ChunkData.get(1).get(0); + alignedChunkData.addValueChunk(header); + alignedChunkData.writeEntireChunk( + reader.readChunk(-1, header.getDataSize()), chunkMetadata); + break; + } + + Set<ChunkData> allChunkData = new HashSet<>(); + dataSize = header.getDataSize(); + pageIndex = 0; + valueDecoder = Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); + + while (dataSize > 0) { + PageHeader pageHeader = + reader.readPageHeader( + header.getDataType(), + (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); + List<AlignedChunkData> alignedChunkDataList = pageIndex2ChunkData.get(pageIndex); + for (AlignedChunkData alignedChunkData : alignedChunkDataList) { + if (!allChunkData.contains(alignedChunkData)) { + alignedChunkData.addValueChunk(header); + allChunkData.add(alignedChunkData); + } + } + if (alignedChunkDataList.size() == 1) { // write entire page + // write the entire page if it's not an empty page. + alignedChunkDataList + .get(0) + .writeEntirePage(pageHeader, reader.readCompressedPage(pageHeader)); + } else { // decode page + long[] times = pageIndex2Times.get(pageIndex); + TsPrimitiveType[] values = + decodeValuePage(reader, header, pageHeader, times, valueDecoder); + for (AlignedChunkData alignedChunkData : alignedChunkDataList) { + alignedChunkData.writeDecodeValuePage(times, values, header.getDataType()); + } + } + long pageDataSize = pageHeader.getSerializedPageSize(); + pageIndex += 1; + dataSize -= pageDataSize; + } + break; + case MetaMarker.CHUNK_GROUP_HEADER: + ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); + curDevice = chunkGroupHeader.getDeviceID(); + break; + case MetaMarker.OPERATION_INDEX_RANGE: + reader.readPlanIndex(); + break; + default: + MetaMarker.handleUnexpectedMarker(marker); + } + marker = -1; + if (!nextSplits.isEmpty()) { + return; + } + } + + consumeAllAlignedChunkData(reader.position(), pageIndex2ChunkData); + handleModification(offset2Deletions, Long.MAX_VALUE); + close(); + if (asyncExecutor != null) { + nextSplits.add(new EmptyTsFileData()); + } + } + + private class EmptyTsFileData implements TsFileData { + + @Override + public long getDataSize() { + return 0; + } + + @Override + public void writeToFileWriter(TsFileIOWriter writer) throws IOException { + + } + + @Override + public boolean isModification() { + return false; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + + } + } + + private void getAllModification(Map<Long, List<Deletion>> offset2Deletions) throws IOException { + try (ModificationFile modificationFile = + new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) { + for (Modification modification : modificationFile.getModifications()) { + offset2Deletions + .computeIfAbsent(modification.getFileOffset(), o -> new ArrayList<>()) + .add((Deletion) modification); + } + } + } + + private boolean checkMagic(TsFileSequenceReader reader) throws IOException { + String magic = reader.readHeadMagic(); + if (!magic.equals(TSFileConfig.MAGIC_STRING)) { + logger.error("the file's MAGIC STRING is incorrect, file path: {}", reader.getFileName()); + return false; + } + + byte versionNumber = reader.readVersionNumber(); + if (versionNumber != TSFileConfig.VERSION_NUMBER) { + logger.error("the file's Version Number is incorrect, file path: {}", reader.getFileName()); + return false; + } + + if (!reader.readTailMagic().equals(TSFileConfig.MAGIC_STRING)) { + logger.error("the file is not closed correctly, file path: {}", reader.getFileName()); + return false; + } + return true; + } + + private void getChunkMetadata( + TsFileSequenceReader reader, Map<Long, IChunkMetadata> offset2ChunkMetadata) + throws IOException { + Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true); + for (Map.Entry<String, List<TimeseriesMetadata>> entry : device2Metadata.entrySet()) { + for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) { + for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { + offset2ChunkMetadata.put(chunkMetadata.getOffsetOfChunkHeader(), chunkMetadata); + } + } + } + } + + private void handleModification( + TreeMap<Long, List<Deletion>> offset2Deletions, long chunkOffset) { + while (!offset2Deletions.isEmpty() && offset2Deletions.firstEntry().getKey() <= chunkOffset) { + offset2Deletions + .pollFirstEntry() + .getValue() + .forEach(o -> nextSplits.add(new DeletionData(o))); + } + } + + private boolean consumeAllAlignedChunkData( + long offset, Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData) { + if (pageIndex2ChunkData.isEmpty()) { + return false; + } + + Set<ChunkData> allChunkData = new HashSet<>(); + for (Map.Entry<Integer, List<AlignedChunkData>> entry : pageIndex2ChunkData.entrySet()) { + allChunkData.addAll(entry.getValue()); + } + nextSplits.addAll(allChunkData); + pageIndex2ChunkData.clear(); + return true; + } + + private boolean needDecodeChunk(IChunkMetadata chunkMetadata) { + return !TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime()) + .equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime())); + } + + private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata chunkMetadata) { + if (pageHeader.getStatistics() == null) { + return !TimePartitionUtils.getTimePartition(chunkMetadata.getStartTime()) + .equals(TimePartitionUtils.getTimePartition(chunkMetadata.getEndTime())); + } + return !TimePartitionUtils.getTimePartition(pageHeader.getStartTime()) + .equals(TimePartitionUtils.getTimePartition(pageHeader.getEndTime())); + } + + private Pair<long[], Object[]> decodePage( + boolean isAligned, + ByteBuffer pageData, + PageHeader pageHeader, + Decoder timeDecoder, + Decoder valueDecoder, + ChunkHeader chunkHeader) + throws IOException { + if (isAligned) { + TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, timeDecoder); + long[] times = timePageReader.getNextTimeBatch(); + return new Pair<>(times, new Object[times.length]); + } + + valueDecoder.reset(); + PageReader pageReader = + new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, timeDecoder, null); + BatchData batchData = pageReader.getAllSatisfiedPageData(); + long[] times = new long[batchData.length()]; + Object[] values = new Object[batchData.length()]; + int index = 0; + while (batchData.hasCurrent()) { + times[index] = batchData.currentTime(); + values[index++] = batchData.currentValue(); + batchData.next(); + } + return new Pair<>(times, values); + } + + private void handleEmptyValueChunk( + ChunkHeader header, + Map<Integer, List<AlignedChunkData>> pageIndex2ChunkData, + IChunkMetadata chunkMetadata, + boolean isTimeChunkNeedDecode) + throws IOException { + Set<ChunkData> allChunkData = new HashSet<>(); + for (Map.Entry<Integer, List<AlignedChunkData>> entry : pageIndex2ChunkData.entrySet()) { + for (AlignedChunkData alignedChunkData : entry.getValue()) { + if (!allChunkData.contains(alignedChunkData)) { + alignedChunkData.addValueChunk(header); + if (!isTimeChunkNeedDecode) { + alignedChunkData.writeEntireChunk(ByteBuffer.allocate(0), chunkMetadata); + } + allChunkData.add(alignedChunkData); + } + } + } + } + + /** + * handle empty page in aligned chunk, if uncompressedSize and compressedSize are both 0, and + * the statistics is null, then the page is empty. + * + * @param pageHeader page header + * @return true if the page is empty + */ + private boolean isEmptyPage(PageHeader pageHeader) { + return pageHeader.getUncompressedSize() == 0 + && pageHeader.getCompressedSize() == 0 + && pageHeader.getStatistics() == null; + } + + private TsPrimitiveType[] decodeValuePage( + TsFileSequenceReader reader, + ChunkHeader chunkHeader, + PageHeader pageHeader, + long[] times, + Decoder valueDecoder) + throws IOException { + if (pageHeader.getSerializedPageSize() == 0) { + return new TsPrimitiveType[times.length]; + } + + valueDecoder.reset(); + ByteBuffer pageData = reader.readPage(pageHeader, chunkHeader.getCompressionType()); + ValuePageReader valuePageReader = + new ValuePageReader(pageHeader, pageData, chunkHeader.getDataType(), valueDecoder); + return valuePageReader.nextValueBatch( + times); // should be origin time, so recording satisfied length is necessary + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java index f898a1f6665..6bfcdfadf44 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/NonAlignedChunkData.java @@ -191,6 +191,40 @@ public class NonAlignedChunkData implements ChunkData { } } + public void writeDecodePage(long[] times, Object[] values, int start, int end) + throws IOException { + pageNumber += 1; + dataSize += ReadWriteIOUtils.write(true, stream); + dataSize += ReadWriteIOUtils.write(end - start, stream); + + for (int i = start; i < end; i++) { + dataSize += ReadWriteIOUtils.write(times[i], stream); + switch (chunkHeader.getDataType()) { + case INT32: + dataSize += ReadWriteIOUtils.write((int) values[i], stream); + break; + case INT64: + dataSize += ReadWriteIOUtils.write((long) values[i], stream); + break; + case FLOAT: + dataSize += ReadWriteIOUtils.write((float) values[i], stream); + break; + case DOUBLE: + dataSize += ReadWriteIOUtils.write((double) values[i], stream); + break; + case BOOLEAN: + dataSize += ReadWriteIOUtils.write((boolean) values[i], stream); + break; + case TEXT: + dataSize += ReadWriteIOUtils.write((Binary) values[i], stream); + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", chunkHeader.getDataType())); + } + } + } + private void deserializeTsFileData(InputStream stream) throws IOException, PageException { if (needDecodeChunk) { buildChunkWriter(stream); @@ -283,6 +317,11 @@ public class NonAlignedChunkData implements ChunkData { stream.close(); } + @Override + public String firstMeasurement() { + return chunkHeader.getMeasurementID(); + } + @Override public String toString() { return "NonAlignedChunkData{" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java index 7f66c9813f7..2c915c10941 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitter.java @@ -183,13 +183,13 @@ public class TsFileSplitter { pageIndex2Times.put(pageIndex, times); } - int satisfiedLength = 0; + int start = 0; long endTime = timePartitionSlot.getStartTime() + TimePartitionUtils.getTimePartitionInterval(); for (int i = 0; i < times.length; i++) { if (times[i] >= endTime) { - chunkData.writeDecodePage(times, values, satisfiedLength); + chunkData.writeDecodePage(times, values, start, i); if (isAligned) { pageIndex2ChunkData .computeIfAbsent(pageIndex, o -> new ArrayList<>()) @@ -199,16 +199,15 @@ public class TsFileSplitter { } timePartitionSlot = TimePartitionUtils.getTimePartition(times[i]); - satisfiedLength = 0; endTime = timePartitionSlot.getStartTime() + TimePartitionUtils.getTimePartitionInterval(); chunkData = ChunkData.createChunkData(isAligned, curDevice, header, timePartitionSlot); + start = i; } - satisfiedLength += 1; } - chunkData.writeDecodePage(times, values, satisfiedLength); + chunkData.writeDecodePage(times, values, start, times.length); if (isAligned) { pageIndex2ChunkData .computeIfAbsent(pageIndex, o -> new ArrayList<>()) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java new file mode 100644 index 00000000000..857a5a9a4e7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/MergedTsFileSplitterTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.load; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.IoTThreadFactory; +import org.junit.Test; + +public class MergedTsFileSplitterTest extends TestBase { + + private List<TsFileData> resultSet = new ArrayList<>(); + + @Test + public void testSplit() throws IOException { + long start = System.currentTimeMillis(); + MergedTsFileSplitter splitter = new MergedTsFileSplitter(files, this::consumeSplit, + IoTDBThreadPoolFactory.newThreadPool(16, Integer.MAX_VALUE, 20, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new IoTThreadFactory("MergedTsFileSplitter"), "MergedTsFileSplitter")); + try { + splitter.splitTsFileByDataPartition(); + for (TsFileData tsFileData : resultSet) { + // System.out.println(tsFileData); + } + } finally { + splitter.close(); + } + System.out.printf("%d splits after %dms\n", resultSet.size(), + System.currentTimeMillis() - start); + } + + public boolean consumeSplit(TsFileData data) { + resultSet.add(data); + if (resultSet.size() % 1000 == 0) { + System.out.printf("%d chunks split\n", resultSet.size()); + System.out.printf("Maxmem: %d, freemem: %d\n", Runtime.getRuntime().maxMemory(), Runtime.getRuntime().freeMemory()); + } + return true; + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java new file mode 100644 index 00000000000..9002b3e6168 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.load; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iotdb.db.utils.TimePartitionUtils; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.utils.TsFileGeneratorUtils; +import org.apache.iotdb.tsfile.write.TsFileWriter; +import org.apache.iotdb.tsfile.write.record.TSRecord; +import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestBase { + + private static final Logger logger = LoggerFactory.getLogger(TestBase.class); + public static final String BASE_OUTPUT_PATH = "target".concat(File.separator); + public static final String PARTIAL_PATH_STRING = + "%s" + File.separator + "%d" + File.separator + "%d" + File.separator; + public static final String TEST_TSFILE_PATH = + BASE_OUTPUT_PATH + "testTsFile".concat(File.separator) + PARTIAL_PATH_STRING; + + protected int fileNum = 100; + // series number of each file, sn non-aligned series and 1 aligned series with sn measurements + protected int seriesNum = 1000; + // number of chunks of each series in a file, each series has only one chunk in a file + protected double chunkTimeRangeRatio = 0.3; + // the interval between two consecutive points of a series + protected long pointInterval = 50_000; + protected List<File> files = new ArrayList<>(); + + @Before + public void setup() throws IOException, WriteProcessException { + setupFiles(); + logger.info("Files set up"); + } + + @After + public void cleanup() { + for (File file : files) { + file.delete(); + } + } + + public void setupFiles() throws IOException, WriteProcessException { + + for (int i = 0; i < fileNum; i++) { + File file = new File(getTestTsFilePath("root.sg1", 0, 0, i)); + files.add(file); + + try (TsFileWriter writer = new TsFileWriter(file)) { + // 3 non-aligned series under d1 and 1 aligned series with 3 measurements under d2 + for (int sn = 0; sn < seriesNum; sn++) { + writer.registerTimeseries( + new Path("d1"), new MeasurementSchema("s" + sn, TSDataType.DOUBLE)); + } + List<MeasurementSchema> alignedSchemas = new ArrayList<>(); + for (int sn = 0; sn < seriesNum; sn++) { + alignedSchemas.add(new MeasurementSchema("s" + sn, TSDataType.DOUBLE)); + } + writer.registerAlignedTimeseries(new Path("d2"), alignedSchemas); + + long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval(); + long chunkTimeRange = (long) (timePartitionInterval * chunkTimeRangeRatio); + int chunkPointNum = (int) (chunkTimeRange / pointInterval); + + for (int pn = 0; pn < chunkPointNum; pn++) { + long currTime = chunkTimeRange * fileNum + pointInterval * pn; + TSRecord record = new TSRecord(currTime, "d1"); + for (int sn = 0; sn < seriesNum; sn++) { + record.addTuple(new DoubleDataPoint("s" + sn, pn * 1.0)); + } + writer.write(record); + + record.deviceId = "d2"; + writer.writeAligned(record); + } + writer.flushAllChunkGroups(); + } + } + } + + public static String getTestTsFilePath( + String logicalStorageGroupName, + long VirtualStorageGroupId, + long TimePartitionId, + long tsFileVersion) { + String filePath = + String.format( + TEST_TSFILE_PATH, logicalStorageGroupName, VirtualStorageGroupId, TimePartitionId); + return TsFileGeneratorUtils.getTsFilePath(filePath, tsFileVersion); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java new file mode 100644 index 00000000000..78b61c8d664 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitterTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.load; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.junit.Test; + +public class TsFileSplitterTest extends TestBase{ + private List<TsFileData> resultSet = new ArrayList<>(); + + @Test + public void testSplit() throws IOException { + long start = System.currentTimeMillis(); + for (File file : files) { + TsFileSplitter splitter = new TsFileSplitter(file, this::consumeSplit); + splitter.splitTsFileByDataPartition(); + } + for (TsFileData tsFileData : resultSet) { + // System.out.println(tsFileData); + } + System.out.printf("%d splits after %dms\n", resultSet.size(), System.currentTimeMillis() - start); + } + + public boolean consumeSplit(TsFileData data) { + resultSet.add(data); + return true; + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java index d81c97f8454..c36bb6f34ed 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/threadpool/WrappedThreadPoolExecutor.java @@ -107,7 +107,7 @@ public class WrappedThreadPoolExecutor extends ThreadPoolExecutor Thread.currentThread().interrupt(); } } - if (t != null) { + if (t != null && !(t instanceof CancellationException)) { logger.error("Exception in thread pool {}", mbeanName, t); } } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java index 9fca658693e..956bfef93df 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/ChunkHeader.java @@ -321,4 +321,8 @@ public class ChunkHeader { public void increasePageNums(int i) { numOfPages += i; } + + public boolean hasStatistic() { + return (getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER; + } } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java index 1863c986b72..81274c37b18 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/file/header/PageHeader.java @@ -65,13 +65,22 @@ public class PageHeader { return new PageHeader(uncompressedSize, compressedSize, statistics); } + // for compatibility, the previous implementation does not provide parameter 'hasStatistic' public static PageHeader deserializeFrom(ByteBuffer buffer, TSDataType dataType) { + return deserializeFrom(buffer, dataType, true); + } + + public static PageHeader deserializeFrom( + ByteBuffer buffer, TSDataType dataType, boolean hasStatistic) { int uncompressedSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); if (uncompressedSize == 0) { // Empty Page return new PageHeader(0, 0, null); } int compressedSize = ReadWriteForEncodingUtils.readUnsignedVarInt(buffer); - Statistics<? extends Serializable> statistics = Statistics.deserialize(buffer, dataType); + Statistics<? extends Serializable> statistics = null; + if (hasStatistic) { + statistics = Statistics.deserialize(buffer, dataType); + } return new PageHeader(uncompressedSize, compressedSize, statistics); } diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index e8925543076..ba4f8f1fe97 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -22,7 +22,6 @@ package org.apache.iotdb.tsfile.read; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; -import org.apache.iotdb.tsfile.compress.IUnCompressor; import org.apache.iotdb.tsfile.encoding.decoder.Decoder; import org.apache.iotdb.tsfile.exception.TsFileRuntimeException; import org.apache.iotdb.tsfile.exception.TsFileStatisticsMistakesException; @@ -59,6 +58,7 @@ import org.apache.iotdb.tsfile.utils.BloomFilter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.utils.TsFileUtils; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -1397,15 +1397,7 @@ public class TsFileSequenceReader implements AutoCloseable { } public ByteBuffer readPage(PageHeader header, CompressionType type) throws IOException { - ByteBuffer buffer = readData(-1, header.getCompressedSize()); - if (header.getUncompressedSize() == 0 || type == CompressionType.UNCOMPRESSED) { - return buffer; - } // FIXME if the buffer is not array-implemented. - IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type); - ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize()); - unCompressor.uncompress( - buffer.array(), buffer.position(), buffer.remaining(), uncompressedBuffer.array(), 0); - return uncompressedBuffer; + return TsFileUtils.uncompressPage(header, type, readData(-1, header.getCompressedSize())); } /** diff --git a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java index 03ba354c82d..a84c735e549 100644 --- a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java +++ b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java @@ -17,10 +17,14 @@ */ package org.apache.iotdb.tsfile.utils; +import org.apache.iotdb.tsfile.compress.IUnCompressor; +import org.apache.iotdb.tsfile.file.header.PageHeader; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; public class TsFileUtils { @@ -55,4 +59,16 @@ public class TsFileUtils { File folder = tsFile.getParentFile().getParentFile().getParentFile().getParentFile(); return folder.getName().equals("sequence"); } + + public static ByteBuffer uncompressPage( + PageHeader header, CompressionType type, ByteBuffer buffer) throws IOException { + if (header.getUncompressedSize() == 0 || type == CompressionType.UNCOMPRESSED) { + return buffer; + } // FIXME if the buffer is not array-implemented. + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type); + ByteBuffer uncompressedBuffer = ByteBuffer.allocate(header.getUncompressedSize()); + unCompressor.uncompress( + buffer.array(), buffer.position(), buffer.remaining(), uncompressedBuffer.array(), 0); + return uncompressedBuffer; + } }
