This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch fix_3326 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d23155c81ddce7cf1ff7b5105cec731ed2a65161 Author: jt <[email protected]> AuthorDate: Wed Jun 1 14:33:40 2022 +0800 fix new series in unseq files are not merged into correct seqfiles --- .../db/engine/merge/task/MergeMultiChunkTask.java | 265 ++++++++++----------- .../iotdb/db/engine/merge/MergeNewSeriesTest.java | 115 +++++++++ .../iotdb/db/engine/merge/MergeOverLapTest.java | 62 +---- .../iotdb/db/engine/merge/MergeTaskTest.java | 6 +- .../apache/iotdb/db/engine/merge/MergeTest.java | 216 ++++++++++++----- .../iotdb/tsfile/read/TsFileSequenceReader.java | 83 +++++++ .../apache/iotdb/tsfile/utils/PreviewIterator.java | 27 +++ 7 files changed, 507 insertions(+), 267 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java index 36032f9269..c94bc38a58 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java @@ -41,6 +41,8 @@ import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Chunk; import org.apache.iotdb.tsfile.read.reader.IPointReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.PreviewIterator; import org.apache.iotdb.tsfile.write.chunk.IChunkWriter; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter; @@ -53,10 +55,8 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.PriorityQueue; import java.util.TreeMap; import java.util.concurrent.Callable; @@ -68,6 +68,7 @@ import static org.apache.iotdb.db.utils.MergeUtils.writeBatchPoint; import static org.apache.iotdb.db.utils.MergeUtils.writeTVPair; import static org.apache.iotdb.db.utils.QueryUtils.modifyChunkMetaData; +@SuppressWarnings("java:S107") // suppress too many args public class MergeMultiChunkTask { private static final Logger logger = LoggerFactory.getLogger(MergeMultiChunkTask.class); @@ -75,38 +76,34 @@ public class MergeMultiChunkTask { IoTDBDescriptor.getInstance().getConfig().getMergeChunkPointNumberThreshold(); private MergeLogger mergeLogger; + // this task will be merging the series private List<PartialPath> unmergedSeries; + private String storageGroupName; private String taskName; private MergeResource resource; - private TimeValuePair[] currTimeValuePairs; + private MergeContext mergeContext; + + private int concurrentMergeSeriesNum; private boolean fullMerge; - private MergeContext mergeContext; + private List<PartialPath> currMergingPaths = new ArrayList<>(); + // the earliest point of each "currMergingPaths" from unseq files + private TimeValuePair[] currUnseqTimeValuePairs; private AtomicInteger mergedChunkNum = new AtomicInteger(); private AtomicInteger unmergedChunkNum = new AtomicInteger(); private int mergedSeriesCnt; private double progress; - private int concurrentMergeSeriesNum; - private List<PartialPath> currMergingPaths = new ArrayList<>(); - // need to be cleared every device - private final Map<TsFileSequenceReader, Iterator<Map<String, List<ChunkMetadata>>>> - measurementChunkMetadataListMapIteratorCache = + // the key of the map is a seq file, and the value is an iterator that traverse chunk lists of + // the current merging device. The chunk lists are ordered by measurements. + private final Map<TsFileSequenceReader, PreviewIterator<Pair<String, List<ChunkMetadata>>>> + seqFileChunkMetadataListIteratorCache = new TreeMap<>( (o1, o2) -> TsFileManagement.compareFileName( new File(o1.getFileName()), new File(o2.getFileName()))); - // need to be cleared every device - private final Map<TsFileSequenceReader, Map<String, List<ChunkMetadata>>> - chunkMetadataListCacheForMerge = - new TreeMap<>( - (o1, o2) -> - TsFileManagement.compareFileName( - new File(o1.getFileName()), new File(o2.getFileName()))); - - private String storageGroupName; public MergeMultiChunkTask( MergeContext context, @@ -152,8 +149,7 @@ public class MergeMultiChunkTask { mergedSeriesCnt += currMergingPaths.size(); logMergeProgress(); } - measurementChunkMetadataListMapIteratorCache.clear(); - chunkMetadataListCacheForMerge.clear(); + seqFileChunkMetadataListIteratorCache.clear(); } if (logger.isInfoEnabled()) { logger.info( @@ -176,15 +172,17 @@ public class MergeMultiChunkTask { } private void mergePaths() throws IOException, MetadataException { + // read the first point of each series from unseq files IPointReader[] unseqReaders = resource.getUnseqReaders(currMergingPaths); - currTimeValuePairs = new TimeValuePair[currMergingPaths.size()]; + currUnseqTimeValuePairs = new TimeValuePair[currMergingPaths.size()]; for (int i = 0; i < currMergingPaths.size(); i++) { if (unseqReaders[i].hasNextTimeValuePair()) { - currTimeValuePairs[i] = unseqReaders[i].currentTimeValuePair(); + currUnseqTimeValuePairs[i] = unseqReaders[i].currentTimeValuePair(); } } for (int i = 0; i < resource.getSeqFiles().size(); i++) { + // merge unseq data into seq files one by one pathsMergeOneFile(i, unseqReaders); if (Thread.interrupted()) { @@ -194,14 +192,14 @@ public class MergeMultiChunkTask { } } - private String getMaxSensor(List<PartialPath> sensors) { - String maxSensor = sensors.get(0).getMeasurement(); - for (int i = 1; i < sensors.size(); i++) { - if (maxSensor.compareTo(sensors.get(i).getMeasurement()) < 0) { - maxSensor = sensors.get(i).getMeasurement(); + private long calculateDeviceMinTime(TsFileResource currTsFile, String deviceId) { + long currDeviceMinTime = currTsFile.getStartTime(deviceId); + for (TimeValuePair timeValuePair : currUnseqTimeValuePairs) { + if (timeValuePair != null && timeValuePair.getTimestamp() < currDeviceMinTime) { + currDeviceMinTime = timeValuePair.getTimestamp(); } } - return maxSensor; + return currDeviceMinTime; } private void pathsMergeOneFile(int seqFileIdx, IPointReader[] unseqReaders) @@ -209,30 +207,24 @@ public class MergeMultiChunkTask { TsFileResource currTsFile = resource.getSeqFiles().get(seqFileIdx); // all paths in one call are from the same device String deviceId = currMergingPaths.get(0).getDevice(); - long currDeviceMinTime = currTsFile.getStartTime(deviceId); + long currDeviceMinTime = calculateDeviceMinTime(currTsFile, deviceId); for (PartialPath path : currMergingPaths) { mergeContext.getUnmergedChunkStartTimes().get(currTsFile).put(path, new ArrayList<>()); } - // if this TsFile receives data later than fileLimitTime, it will overlap the next TsFile, - // which is forbidden - for (TimeValuePair timeValuePair : currTimeValuePairs) { - if (timeValuePair != null && timeValuePair.getTimestamp() < currDeviceMinTime) { - currDeviceMinTime = timeValuePair.getTimestamp(); - } - } boolean isLastFile = seqFileIdx + 1 == resource.getSeqFiles().size(); TsFileSequenceReader fileSequenceReader = resource.getFileReader(currTsFile); List<Modification>[] modifications = new List[currMergingPaths.size()]; List<ChunkMetadata>[] seqChunkMeta = new List[currMergingPaths.size()]; - Iterator<Map<String, List<ChunkMetadata>>> measurementChunkMetadataListMapIterator = - measurementChunkMetadataListMapIteratorCache.computeIfAbsent( + + PreviewIterator<Pair<String, List<ChunkMetadata>>> measurementChunkMetadataListIterator = + seqFileChunkMetadataListIteratorCache.computeIfAbsent( fileSequenceReader, (tsFileSequenceReader -> { try { - return tsFileSequenceReader.getMeasurementChunkMetadataListMapIterator(deviceId); + return tsFileSequenceReader.getMeasurementChunkMetadataListIterator(deviceId); } catch (IOException e) { logger.error( "unseq compaction task {}, getMeasurementChunkMetadataListMapIterator meets error. iterator create failed.", @@ -241,70 +233,12 @@ public class MergeMultiChunkTask { return null; } })); - if (measurementChunkMetadataListMapIterator == null) { + if (measurementChunkMetadataListIterator == null) { return; } - String lastSensor = getMaxSensor(currMergingPaths); - String currSensor = null; - Map<String, List<ChunkMetadata>> measurementChunkMetadataListMap = new TreeMap<>(); - // find all sensor to merge in order, if exceed, then break - while (currSensor == null || currSensor.compareTo(lastSensor) < 0) { - measurementChunkMetadataListMap = - chunkMetadataListCacheForMerge.computeIfAbsent( - fileSequenceReader, tsFileSequenceReader -> new TreeMap<>()); - // if empty, get measurementChunkMetadataList block to use later - if (measurementChunkMetadataListMap.isEmpty()) { - // if do not have more sensor, just break - if (measurementChunkMetadataListMapIterator.hasNext()) { - measurementChunkMetadataListMap.putAll(measurementChunkMetadataListMapIterator.next()); - } else { - break; - } - } - - Iterator<Entry<String, List<ChunkMetadata>>> measurementChunkMetadataListEntryIterator = - measurementChunkMetadataListMap.entrySet().iterator(); - while (measurementChunkMetadataListEntryIterator.hasNext()) { - Entry<String, List<ChunkMetadata>> measurementChunkMetadataListEntry = - measurementChunkMetadataListEntryIterator.next(); - currSensor = measurementChunkMetadataListEntry.getKey(); - - // fill modifications and seqChunkMetas to be used later - for (int i = 0; i < currMergingPaths.size(); i++) { - if (currMergingPaths.get(i).getMeasurement().equals(currSensor)) { - modifications[i] = resource.getModifications(currTsFile, currMergingPaths.get(i)); - seqChunkMeta[i] = measurementChunkMetadataListEntry.getValue(); - modifyChunkMetaData(seqChunkMeta[i], modifications[i]); - for (ChunkMetadata chunkMetadata : seqChunkMeta[i]) { - resource.updateStartTime(currTsFile, deviceId, chunkMetadata.getStartTime()); - resource.updateEndTime(currTsFile, deviceId, chunkMetadata.getEndTime()); - } - - if (Thread.interrupted()) { - Thread.currentThread().interrupt(); - return; - } - break; - } - } - - // current sensor larger than last needed sensor, just break out to outer loop - if (currSensor.compareTo(lastSensor) > 0) { - break; - } else { - measurementChunkMetadataListEntryIterator.remove(); - } - } - } - // update measurementChunkMetadataListMap - chunkMetadataListCacheForMerge.put(fileSequenceReader, measurementChunkMetadataListMap); - - List<Integer> unskippedPathIndices = filterNoDataPaths(seqChunkMeta, seqFileIdx); - - if (unskippedPathIndices.isEmpty()) { - return; - } + readChunkMetaAndModifications( + measurementChunkMetadataListIterator, modifications, seqChunkMeta, currTsFile, deviceId); RestorableTsFileIOWriter mergeFileWriter = resource.getMergeFileWriter(currTsFile, false); for (PartialPath path : currMergingPaths) { @@ -328,18 +262,57 @@ public class MergeMultiChunkTask { } } - private List<Integer> filterNoDataPaths(List[] seqChunkMeta, int seqFileIdx) { - // if the last seqFile does not contains this series but the unseqFiles do, data of this - // series should also be written into a new chunk - List<Integer> ret = new ArrayList<>(); - for (int i = 0; i < currMergingPaths.size(); i++) { - if ((seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty()) - && !(seqFileIdx + 1 == resource.getSeqFiles().size() && currTimeValuePairs[i] != null)) { - continue; + private void readChunkMetaAndModifications( + PreviewIterator<Pair<String, List<ChunkMetadata>>> measurementChunkMetadataListIterator, + List<Modification>[] modifications, + List<ChunkMetadata>[] seqChunkMeta, + TsFileResource currTsFile, + String deviceId) { + if (!measurementChunkMetadataListIterator.hasNext()) { + return; + } + Pair<String, List<ChunkMetadata>> nextFileMeasurement = + measurementChunkMetadataListIterator.previewNext(); + boolean noMoreMeasurement = false; + int i = 0; + while (!noMoreMeasurement) { + String currMergingMeasurement = currMergingPaths.get(i).getMeasurement(); + if (currMergingMeasurement.equals(nextFileMeasurement.left)) { + // current series is found in the file + modifications[i] = resource.getModifications(currTsFile, currMergingPaths.get(i)); + seqChunkMeta[i] = nextFileMeasurement.right; + modifyChunkMetaData(seqChunkMeta[i], modifications[i]); + for (ChunkMetadata chunkMetadata : seqChunkMeta[i]) { + resource.updateStartTime(currTsFile, deviceId, chunkMetadata.getStartTime()); + resource.updateEndTime(currTsFile, deviceId, chunkMetadata.getEndTime()); + } + + // move both cursors to the next + i++; + measurementChunkMetadataListIterator.next(); + } else if (currMergingPaths.get(i).getMeasurement().compareTo(nextFileMeasurement.left) < 0) { + // current series cannot be found in the file, move to the next merging series + i++; + } else { + // check the next series in the file + measurementChunkMetadataListIterator.next(); + } + + if (measurementChunkMetadataListIterator.hasNext()) { + nextFileMeasurement = measurementChunkMetadataListIterator.previewNext(); + } else { + noMoreMeasurement = true; } - ret.add(i); + + if (i >= currMergingPaths.size()) { + noMoreMeasurement = true; + } + } + + if (Thread.interrupted()) { + Thread.currentThread().interrupt(); + return; } - return ret; } @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @@ -355,8 +328,9 @@ public class MergeMultiChunkTask { int[] ptWrittens = new int[seqChunkMeta.length]; int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig().getMergeChunkSubThreadNum(); - MetaListEntry[] metaListEntries = new MetaListEntry[currMergingPaths.size()]; - PriorityQueue<Integer>[] chunkIdxHeaps = new PriorityQueue[mergeChunkSubTaskNum]; + MetaListEntry[] seqMetaListEntries = new MetaListEntry[currMergingPaths.size()]; + // split merge task into sub-tasks + PriorityQueue<Integer>[] pathIdxHeaps = new PriorityQueue[mergeChunkSubTaskNum]; // if merge path is smaller than mergeChunkSubTaskNum, will use merge path number. // so thread are not wasted. @@ -365,18 +339,20 @@ public class MergeMultiChunkTask { } for (int i = 0; i < mergeChunkSubTaskNum; i++) { - chunkIdxHeaps[i] = new PriorityQueue<>(); + pathIdxHeaps[i] = new PriorityQueue<>(); } int idx = 0; for (int i = 0; i < currMergingPaths.size(); i++) { - chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i); + // assign paths to sub-tasks in a round-robin way + pathIdxHeaps[idx % mergeChunkSubTaskNum].add(i); if (seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty()) { continue; } MetaListEntry entry = new MetaListEntry(i, seqChunkMeta[i]); + // move to the first chunk metadata entry.next(); - metaListEntries[i] = entry; + seqMetaListEntries[i] = entry; idx++; ptWrittens[i] = 0; } @@ -390,8 +366,8 @@ public class MergeMultiChunkTask { MergeManager.getINSTANCE() .submitChunkSubTask( new MergeChunkHeapTask( - chunkIdxHeaps[i], - metaListEntries, + pathIdxHeaps[i], + seqMetaListEntries, ptWrittens, reader, mergeFileWriter, @@ -531,12 +507,12 @@ public class MergeMultiChunkTask { IChunkWriter chunkWriter, IPointReader unseqReader, long timeLimit, int pathIdx) throws IOException { int ptWritten = 0; - while (currTimeValuePairs[pathIdx] != null - && currTimeValuePairs[pathIdx].getTimestamp() < timeLimit) { - writeTVPair(currTimeValuePairs[pathIdx], chunkWriter); + while (currUnseqTimeValuePairs[pathIdx] != null + && currUnseqTimeValuePairs[pathIdx].getTimestamp() < timeLimit) { + writeTVPair(currUnseqTimeValuePairs[pathIdx], chunkWriter); ptWritten++; unseqReader.nextTimeValuePair(); - currTimeValuePairs[pathIdx] = + currUnseqTimeValuePairs[pathIdx] = unseqReader.hasNextTimeValuePair() ? unseqReader.currentTimeValuePair() : null; } return ptWritten; @@ -568,14 +544,14 @@ public class MergeMultiChunkTask { // merge data in batch and data in unseqReader boolean overwriteSeqPoint = false; // unseq point.time <= sequence point.time, write unseq point - while (currTimeValuePairs[pathIdx] != null - && currTimeValuePairs[pathIdx].getTimestamp() <= time) { - writeTVPair(currTimeValuePairs[pathIdx], chunkWriter); - if (currTimeValuePairs[pathIdx].getTimestamp() == time) { + while (currUnseqTimeValuePairs[pathIdx] != null + && currUnseqTimeValuePairs[pathIdx].getTimestamp() <= time) { + writeTVPair(currUnseqTimeValuePairs[pathIdx], chunkWriter); + if (currUnseqTimeValuePairs[pathIdx].getTimestamp() == time) { overwriteSeqPoint = true; } unseqReader.nextTimeValuePair(); - currTimeValuePairs[pathIdx] = + currUnseqTimeValuePairs[pathIdx] = unseqReader.hasNextTimeValuePair() ? unseqReader.currentTimeValuePair() : null; cnt++; } @@ -590,21 +566,21 @@ public class MergeMultiChunkTask { public class MergeChunkHeapTask implements Callable<Void> { - private PriorityQueue<Integer> chunkIdxHeap; + private PriorityQueue<Integer> pathIdxHeap; private MetaListEntry[] metaListEntries; private int[] ptWrittens; private TsFileSequenceReader reader; private RestorableTsFileIOWriter mergeFileWriter; private IPointReader[] unseqReaders; private TsFileResource currFile; - private boolean isLastFile; private int taskNum; private long endTimeOfCurrentResource; + private boolean isLastFile; private int totalSeriesNum; public MergeChunkHeapTask( - PriorityQueue<Integer> chunkIdxHeap, + PriorityQueue<Integer> pathIdxHeap, MetaListEntry[] metaListEntries, int[] ptWrittens, TsFileSequenceReader reader, @@ -614,16 +590,15 @@ public class MergeMultiChunkTask { boolean isLastFile, long endTimeOfCurrentResource, int taskNum) { - this.chunkIdxHeap = chunkIdxHeap; + this.pathIdxHeap = pathIdxHeap; this.metaListEntries = metaListEntries; this.ptWrittens = ptWrittens; this.reader = reader; this.mergeFileWriter = mergeFileWriter; this.unseqReaders = unseqReaders; this.currFile = currFile; - this.isLastFile = isLastFile; this.taskNum = taskNum; - this.totalSeriesNum = chunkIdxHeap.size(); + this.totalSeriesNum = pathIdxHeap.size(); this.endTimeOfCurrentResource = endTimeOfCurrentResource; } @@ -635,8 +610,8 @@ public class MergeMultiChunkTask { @SuppressWarnings("java:S2445") // avoid reading the same reader concurrently private void mergeChunkHeap() throws IOException, MetadataException { - while (!chunkIdxHeap.isEmpty()) { - int pathIdx = chunkIdxHeap.poll(); + while (!pathIdxHeap.isEmpty()) { + int pathIdx = pathIdxHeap.poll(); PartialPath path = currMergingPaths.get(pathIdx); MeasurementSchema measurementSchema = IoTDB.metaManager.getSeriesSchema(path); IChunkWriter chunkWriter = resource.getChunkWriter(measurementSchema); @@ -646,12 +621,16 @@ public class MergeMultiChunkTask { } if (metaListEntries[pathIdx] != null) { + // the seq file has this series, read a chunk and merge it with unseq data MetaListEntry metaListEntry = metaListEntries[pathIdx]; ChunkMetadata currMeta = metaListEntry.current(); boolean isLastChunk = !metaListEntry.hasNext(); boolean chunkOverflowed = MergeUtils.isChunkOverflowed( - currTimeValuePairs[pathIdx], currMeta, isLastChunk, endTimeOfCurrentResource); + currUnseqTimeValuePairs[pathIdx], + currMeta, + isLastChunk, + endTimeOfCurrentResource); boolean chunkTooSmall = MergeUtils.isChunkTooSmall( ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum); @@ -676,16 +655,22 @@ public class MergeMultiChunkTask { currFile); if (!isLastChunk) { + // put the series back so its remaining chunks can be merged metaListEntry.next(); - chunkIdxHeap.add(pathIdx); + pathIdxHeap.add(pathIdx); continue; } } - // this only happens when the seqFiles do not contain this series, otherwise the remaining + // the seq file does not have the series, otherwise the remaining // data will be merged with the last chunk in the seqFiles - if (isLastFile && currTimeValuePairs[pathIdx] != null) { + // writing device-level overlapped data into this file + if (currUnseqTimeValuePairs[pathIdx] != null) { ptWrittens[pathIdx] += - writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], Long.MAX_VALUE, pathIdx); + writeRemainingUnseq( + chunkWriter, + unseqReaders[pathIdx], + isLastFile ? Long.MAX_VALUE : endTimeOfCurrentResource + 1, + pathIdx); mergedChunkNum.incrementAndGet(); } // the last merged chunk may still be smaller than the threshold, flush it anyway @@ -707,7 +692,7 @@ public class MergeMultiChunkTask { public String getProgress() { return String.format( - "Processed %d/%d series", totalSeriesNum - chunkIdxHeap.size(), totalSeriesNum); + "Processed %d/%d series", totalSeriesNum - pathIdxHeap.size(), totalSeriesNum); } } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java new file mode 100644 index 0000000000..4b1e8942a7 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeNewSeriesTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.engine.merge; + +import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.merge.manage.MergeResource; +import org.apache.iotdb.db.engine.merge.task.MergeTask; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; +import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.tsfile.exception.write.WriteProcessException; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * In the test, the unseq file has a new series that has never been written to seq files. However, + * other timeseries of its belonging device have already been written to the seq files. As a result, + * the unseq file and seq file are device-level overlapped but not series-level overlapped, and the + * new series in the unseq file should be written into seq files according to their device time + * range. + */ +public class MergeNewSeriesTest extends MergeTest { + + private MeasurementSchema[] unseqSchemas = new MeasurementSchema[1]; + + @Override + public void setUp() throws IOException, WriteProcessException, MetadataException { + measurementNum = 1; + deviceNum = 2; + seqFileNum = 2; + // unseq files are manually prepared because they will have new time series + unseqFileNum = 0; + super.setUp(); + } + + @Override + protected void prepareSeries() throws MetadataException { + super.prepareSeries(); + unseqSchemas[0] = toMeasurementSchema(1); + createTimeseries(deviceIds, unseqSchemas); + } + + @Override + void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException { + // seq file1: root.MergeTest.d0.s0[0,99] root.MergeTest.d1.s0[0,99] + // seq file2: root.MergeTest.d0.s0[100,199] root.MergeTest.d1.s0[100,199] + super.prepareFiles(seqFileNum, unseqFileNum); + // unseq file1: root.MergeTest.d0.s1[0,99] + // unseq file2: root.MergeTest.d1.s1[100,199] + // unseq1 should be written into seq1 while unseq2 should be written into seq2 + TsFileResource unseq1 = prepareResource(2); + unseqResources.add(unseq1); + TsFileResource unseq2 = prepareResource(3); + unseqResources.add(unseq2); + + prepareFile(unseq1, 0, ptNum, 0, Arrays.copyOfRange(deviceIds, 0, 1), unseqSchemas); + prepareFile(unseq2, ptNum, ptNum, 0, Arrays.copyOfRange(deviceIds, 1, 2), unseqSchemas); + } + + @Test + public void testFullMerge() throws Exception { + MergeTask mergeTask = + new MergeTask( + new MergeResource(seqResources, unseqResources), + TestConstant.OUTPUT_DATA_DIR, + (k, v, l) -> {}, + "test", + true, + 1, + MERGE_TEST_SG); + mergeTask.call(); + + // query root.MergeTest.d0.s1 from seq1 + List<TsFileResource> resources = Collections.singletonList(seqResources.get(0)); + queryAndCheck( + deviceIds[0], unseqSchemas[0], resources, checkResultFunc(0), checkResultCntFunc(ptNum)); + + // query root.MergeTest.d1.s1 from seq2 + resources = Collections.singletonList(seqResources.get(1)); + queryAndCheck( + deviceIds[1], unseqSchemas[0], resources, checkResultFunc(0), checkResultCntFunc(ptNum)); + + // query root.MergeTest.d0.s1 from seq2 + resources = Collections.singletonList(seqResources.get(1)); + queryAndCheck( + deviceIds[0], unseqSchemas[0], resources, checkResultFunc(0), checkResultCntFunc(0)); + + // query root.MergeTest.d1.s1 from seq1 + resources = Collections.singletonList(seqResources.get(0)); + queryAndCheck( + deviceIds[1], unseqSchemas[0], resources, checkResultFunc(0), checkResultCntFunc(0)); + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java index 003dafe2e2..2cc290549e 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.engine.merge; -import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.constant.TestConstant; import org.apache.iotdb.db.engine.merge.manage.MergeResource; import org.apache.iotdb.db.engine.merge.task.MergeTask; @@ -75,66 +74,7 @@ public class MergeOverLapTest extends MergeTest { FileUtils.deleteDirectory(tempSGDir); } - @Override - void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException { - for (int i = 0; i < seqFileNum; i++) { - File file = - new File( - TestConstant.OUTPUT_DATA_DIR.concat( - i - + IoTDBConstant.FILE_NAME_SEPARATOR - + i - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + ".tsfile")); - TsFileResource tsFileResource = new TsFileResource(file); - tsFileResource.setClosed(true); - tsFileResource.setMinPlanIndex(i); - tsFileResource.setMaxPlanIndex(i); - seqResources.add(tsFileResource); - prepareFile(tsFileResource, i * ptNum, ptNum, 0); - } - for (int i = 0; i < unseqFileNum; i++) { - File file = - new File( - TestConstant.OUTPUT_DATA_DIR.concat( - (10000 + i) - + IoTDBConstant.FILE_NAME_SEPARATOR - + (10000 + i) - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + ".tsfile")); - TsFileResource tsFileResource = new TsFileResource(file); - tsFileResource.setClosed(true); - tsFileResource.setMaxPlanIndex(i + seqFileNum); - tsFileResource.setMinPlanIndex(i + seqFileNum); - unseqResources.add(tsFileResource); - prepareUnseqFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 10000); - } - File file = - new File( - TestConstant.OUTPUT_DATA_DIR.concat( - unseqFileNum - + IoTDBConstant.FILE_NAME_SEPARATOR - + unseqFileNum - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + ".tsfile")); - TsFileResource tsFileResource = new TsFileResource(file); - tsFileResource.setClosed(true); - tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum); - tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum); - unseqResources.add(tsFileResource); - prepareUnseqFile(tsFileResource, 0, ptNum * unseqFileNum, 20000); - } - - private void prepareUnseqFile( + protected void prepareUnseqFile( TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset) throws IOException, WriteProcessException { TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getTsFile()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java index 6f058c4c9c..6c3bd4e1c6 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java @@ -125,14 +125,14 @@ public class MergeTaskTest extends MergeTest { @Test public void testMergeEndTime() throws Exception { - List<TsFileResource> testSeqResources = seqResources.subList(0, 3); + List<TsFileResource> testSeqResources = seqResources.subList(0, 5); List<TsFileResource> testUnseqResource = unseqResources.subList(5, 6); MergeTask mergeTask = new MergeTask( new MergeResource(testSeqResources, testUnseqResource), tempSGDir.getPath(), (k, v, l) -> { - assertEquals(499, k.get(2).getEndTime("root.mergeTest.device1")); + assertEquals(499, k.get(4).getEndTime("root.mergeTest.device1")); }, "test", false, @@ -646,7 +646,7 @@ public class MergeTaskTest extends MergeTest { tempSGDir.getPath(), (k, v, l) -> { try (TsFileSequenceReader reader = - new TsFileSequenceReader(k.get(2).getTsFilePath())) { + new TsFileSequenceReader(k.get(0).getTsFilePath())) { List<ChunkMetadata> chunkMetadataList = reader.getChunkMetadataList( new PartialPath(deviceIds[0], measurementSchemas[2].getMeasurementId())); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java index 4633178749..10a0842942 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java @@ -27,16 +27,21 @@ import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache; import org.apache.iotdb.db.engine.merge.manage.MergeManager; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.FileReaderManager; +import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.tsfile.exception.write.WriteProcessException; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.reader.IBatchReader; import org.apache.iotdb.tsfile.write.TsFileWriter; import org.apache.iotdb.tsfile.write.record.TSRecord; import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint; @@ -53,6 +58,7 @@ import java.util.Collections; import java.util.List; import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR; +import static org.junit.Assert.assertEquals; abstract class MergeTest { @@ -62,7 +68,9 @@ abstract class MergeTest { int unseqFileNum = 5; int measurementNum = 10; int deviceNum = 10; + // points of each series in a file long ptNum = 100; + // flush each time "flushInterval" points of all series are written long flushInterval = 20; TSEncoding encoding = TSEncoding.PLAIN; @@ -100,18 +108,30 @@ abstract class MergeTest { MergeManager.getINSTANCE().stop(); } + protected MeasurementSchema toMeasurementSchema(int measurementIndex) { + return new MeasurementSchema( + "sensor" + measurementIndex, TSDataType.DOUBLE, encoding, CompressionType.UNCOMPRESSED); + } + + protected String toDeviceId(int deviceIndex) { + return MERGE_TEST_SG + PATH_SEPARATOR + "device" + deviceIndex; + } + protected void prepareSeries() throws MetadataException { measurementSchemas = new MeasurementSchema[measurementNum]; for (int i = 0; i < measurementNum; i++) { - measurementSchemas[i] = - new MeasurementSchema( - "sensor" + i, TSDataType.DOUBLE, encoding, CompressionType.UNCOMPRESSED); + measurementSchemas[i] = toMeasurementSchema(i); } deviceIds = new String[deviceNum]; for (int i = 0; i < deviceNum; i++) { - deviceIds[i] = MERGE_TEST_SG + PATH_SEPARATOR + "device" + i; + deviceIds[i] = toDeviceId(i); } IoTDB.metaManager.setStorageGroup(new PartialPath(MERGE_TEST_SG)); + createTimeseries(deviceIds, measurementSchemas); + } + + protected void createTimeseries(String[] deviceIds, MeasurementSchema[] measurementSchemas) + throws MetadataException { for (String device : deviceIds) { for (MeasurementSchema measurementSchema : measurementSchemas) { PartialPath devicePath = new PartialPath(device); @@ -125,66 +145,60 @@ abstract class MergeTest { } } + protected String toFileName(int fileIndex) { + return TestConstant.OUTPUT_DATA_DIR.concat( + fileIndex + + IoTDBConstant.FILE_NAME_SEPARATOR + + fileIndex + + IoTDBConstant.FILE_NAME_SEPARATOR + + 0 + + IoTDBConstant.FILE_NAME_SEPARATOR + + 0 + + ".tsfile"); + } + + protected TsFileResource prepareResource(int fileIndex) { + File file = new File(toFileName(fileIndex)); + TsFileResource tsFileResource = new TsFileResource(file); + tsFileResource.setClosed(true); + tsFileResource.setMinPlanIndex(fileIndex); + tsFileResource.setMaxPlanIndex(fileIndex); + tsFileResource.setVersion(fileIndex); + return tsFileResource; + } + + protected void prepareSeqFile( + TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset) + throws IOException, WriteProcessException { + prepareFile(tsFileResource, timeOffset, ptNum, valueOffset); + } + + protected void prepareUnseqFile( + TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset) + throws IOException, WriteProcessException { + prepareFile(tsFileResource, timeOffset, ptNum, valueOffset); + } + void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException { + // seq files, each has "ptNum" points of all series for (int i = 0; i < seqFileNum; i++) { - File file = - new File( - TestConstant.OUTPUT_DATA_DIR.concat( - i - + IoTDBConstant.FILE_NAME_SEPARATOR - + i - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + ".tsfile")); - TsFileResource tsFileResource = new TsFileResource(file); - tsFileResource.setClosed(true); - tsFileResource.setMinPlanIndex(i); - tsFileResource.setMaxPlanIndex(i); - tsFileResource.setVersion(i); + TsFileResource tsFileResource = prepareResource(i); seqResources.add(tsFileResource); - prepareFile(tsFileResource, i * ptNum, ptNum, 0); + prepareSeqFile(tsFileResource, i * ptNum, ptNum, 0); } + // unseq files, each overlaps ONE seq file and latter ones has more points for (int i = 0; i < unseqFileNum; i++) { - File file = - new File( - TestConstant.OUTPUT_DATA_DIR.concat( - (10000 + i) - + IoTDBConstant.FILE_NAME_SEPARATOR - + (10000 + i) - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + ".tsfile")); - TsFileResource tsFileResource = new TsFileResource(file); - tsFileResource.setClosed(true); - tsFileResource.setMinPlanIndex(i + seqFileNum); - tsFileResource.setMaxPlanIndex(i + seqFileNum); - tsFileResource.setVersion(i + seqFileNum); + TsFileResource tsFileResource = prepareResource(seqFileNum + i); unseqResources.add(tsFileResource); - prepareFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 10000); + prepareUnseqFile(tsFileResource, i * ptNum, ptNum * (i + 1) / unseqFileNum, 10000); } - File file = - new File( - TestConstant.OUTPUT_DATA_DIR.concat( - unseqFileNum - + IoTDBConstant.FILE_NAME_SEPARATOR - + unseqFileNum - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + IoTDBConstant.FILE_NAME_SEPARATOR - + 0 - + ".tsfile")); - TsFileResource tsFileResource = new TsFileResource(file); - tsFileResource.setClosed(true); - tsFileResource.setMinPlanIndex(seqFileNum + unseqFileNum); - tsFileResource.setMaxPlanIndex(seqFileNum + unseqFileNum); - tsFileResource.setVersion(seqFileNum + unseqFileNum); - unseqResources.add(tsFileResource); - prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000); + if (unseqFileNum > 0) { + // an additional unseq file that overlaps ALL seq files + TsFileResource tsFileResource = prepareResource(seqFileNum + unseqFileNum); + unseqResources.add(tsFileResource); + prepareUnseqFile(tsFileResource, 0, ptNum * unseqFileNum, 20000); + } } void removeFiles(List<TsFileResource> seqResList, List<TsFileResource> unseqResList) @@ -203,6 +217,17 @@ abstract class MergeTest { void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum, long valueOffset) throws IOException, WriteProcessException { + prepareFile(tsFileResource, timeOffset, ptNum, valueOffset, deviceIds, measurementSchemas); + } + + void prepareFile( + TsFileResource tsFileResource, + long timeOffset, + long ptNum, + long valueOffset, + String[] deviceIds, + MeasurementSchema[] measurementSchemas) + throws IOException, WriteProcessException { File tsfile = tsFileResource.getTsFile(); if (!tsfile.getParentFile().exists()) { Assert.assertTrue(tsfile.getParentFile().mkdirs()); @@ -215,18 +240,18 @@ abstract class MergeTest { } } for (long i = timeOffset; i < timeOffset + ptNum; i++) { - for (int j = 0; j < deviceNum; j++) { - TSRecord record = new TSRecord(i, deviceIds[j]); - for (int k = 0; k < measurementNum; k++) { + for (String deviceId : deviceIds) { + TSRecord record = new TSRecord(i, deviceId); + for (MeasurementSchema measurementSchema : measurementSchemas) { record.addTuple( DataPoint.getDataPoint( - measurementSchemas[k].getType(), - measurementSchemas[k].getMeasurementId(), + measurementSchema.getType(), + measurementSchema.getMeasurementId(), String.valueOf(i + valueOffset))); } fileWriter.write(record); - tsFileResource.updateStartTime(deviceIds[j], i); - tsFileResource.updateEndTime(deviceIds[j], i); + tsFileResource.updateStartTime(deviceId, i); + tsFileResource.updateEndTime(deviceId, i); } if ((i + 1) % flushInterval == 0) { fileWriter.flushAllChunkGroups(); @@ -234,4 +259,69 @@ abstract class MergeTest { } fileWriter.close(); } + + @FunctionalInterface + protected interface QueryResultChecker { + void check(long time, Object value, int pointIndex); + } + + @FunctionalInterface + protected interface QueryCntChecker { + void check(int cnt); + } + + protected static QueryResultChecker checkResultFunc(long valueOffset) { + return (time, value, index) -> assertEquals(time + valueOffset, (double) value, 0.001); + } + + protected static QueryCntChecker checkResultCntFunc(long expected) { + return cnt -> assertEquals(expected, cnt); + } + + /** + * Query all points of a timeseries and check point by point. + * + * @param deviceId of the queried timeseries + * @param measurementSchema of the queried timeseries + * @param resources files to be queried + * @param resultChecker how should each point be examined + * @param cntChecker how should the final count be examined + * @throws IOException + * @throws IllegalPathException + */ + protected static void queryAndCheck( + String deviceId, + MeasurementSchema measurementSchema, + List<TsFileResource> resources, + QueryResultChecker resultChecker, + QueryCntChecker cntChecker) + throws IOException, IllegalPathException { + QueryContext context = new QueryContext(); + PartialPath path = new PartialPath(deviceId, measurementSchema.getMeasurementId()); + IBatchReader tsFilesReader = + new SeriesRawDataBatchReader( + path, + measurementSchema.getType(), + context, + resources, + new ArrayList<>(), + null, + null, + true); + int cnt = 0; + try { + while (tsFilesReader.hasNextBatch()) { + BatchData batchData = tsFilesReader.nextBatch(); + for (int i = 0; i < batchData.length(); i++) { + long time = batchData.getTimeByIndex(i); + Object value = batchData.getDoubleByIndex(i); + resultChecker.check(time, value, i); + cnt++; + } + } + cntChecker.check(cnt); + } finally { + tsFilesReader.close(); + } + } } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java index 4d92146d23..cc4116162f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java @@ -47,6 +47,7 @@ import org.apache.iotdb.tsfile.read.reader.TsFileInput; import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.BloomFilter; import org.apache.iotdb.tsfile.utils.Pair; +import org.apache.iotdb.tsfile.utils.PreviewIterator; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -1482,6 +1483,88 @@ public class TsFileSequenceReader implements AutoCloseable { }; } + public class DeviceChunkMetaListIterator + implements PreviewIterator<Pair<String, List<ChunkMetadata>>> { + + private String device; + Queue<Pair<Long, Long>> leafMeasurementNodePositions; + Queue<Pair<String, List<ChunkMetadata>>> currentEntryQueue; + + public DeviceChunkMetaListIterator(String device) throws IOException { + this.device = device; + init(); + } + + private void init() throws IOException { + readFileMetadata(); + + MetadataIndexNode metadataIndexNode = tsFileMetaData.getMetadataIndex(); + Pair<MetadataIndexEntry, Long> metadataIndexPair = + getMetadataAndEndOffset(metadataIndexNode, device, true, true); + + if (metadataIndexPair == null) { + return; + } + + leafMeasurementNodePositions = new LinkedList<>(); + ByteBuffer buffer = readData(metadataIndexPair.left.getOffset(), metadataIndexPair.right); + collectEachLeafMeasurementNodeOffsetRange(buffer, leafMeasurementNodePositions); + currentEntryQueue = new LinkedList<>(); + } + + @Override + public boolean hasNext() { + if (currentEntryQueue != null && !currentEntryQueue.isEmpty()) { + return true; + } + if (leafMeasurementNodePositions == null || leafMeasurementNodePositions.isEmpty()) { + return false; + } + + Pair<Long, Long> startEndPair = leafMeasurementNodePositions.remove(); + try { + ByteBuffer nextBuffer = readData(startEndPair.left, startEndPair.right); + while (nextBuffer.hasRemaining()) { + TimeseriesMetadata timeseriesMetadata = + TimeseriesMetadata.deserializeFrom(nextBuffer, true); + currentEntryQueue.add( + new Pair<>( + timeseriesMetadata.getMeasurementId(), + timeseriesMetadata.getChunkMetadataList())); + } + } catch (IOException e) { + throw new TsFileRuntimeException( + "Error occurred while reading a time series metadata block."); + } + + return !currentEntryQueue.isEmpty(); + } + + public Pair<String, List<ChunkMetadata>> next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentEntryQueue.poll(); + } + + @Override + public Pair<String, List<ChunkMetadata>> previewNext() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return currentEntryQueue.peek(); + } + } + + /** + * @return An iterator of entry ( measurement -> chunk metadata list ). When traversing it, you + * will get chunk metadata lists according to the lexicographic order of the measurements. + */ + public PreviewIterator<Pair<String, List<ChunkMetadata>>> getMeasurementChunkMetadataListIterator( + String device) throws IOException { + return new DeviceChunkMetaListIterator(device); + } + private void collectEachLeafMeasurementNodeOffsetRange( ByteBuffer buffer, Queue<Pair<Long, Long>> queue) throws IOException { try { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PreviewIterator.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PreviewIterator.java new file mode 100644 index 0000000000..96c4a2c2a6 --- /dev/null +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/PreviewIterator.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.tsfile.utils; + +import java.util.Iterator; + +/** A PreviewIterator allows to see what the next value will be without moving the cursor. */ +public interface PreviewIterator<T> extends Iterator<T> { + T previewNext(); +}
