This is an automated email from the ASF dual-hosted git repository. ejttianyu pushed a commit to branch dev_new_merge_rebase in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 64063bde262b36a1f145548a0a088052ea1bb1ad Author: EJTTianyu <[email protected]> AuthorDate: Wed Jun 10 16:38:19 2020 +0800 fix merge bug --- .../seqMerge/inplace/task/MergeMultiChunkTask.java | 90 +++++++++++----------- .../seqMerge/squeeze/task/SqueezeMergeTask.java | 4 +- .../task/RegularizationMergeSeriesTask.java | 8 ++ .../task/RegularizationMergeTask.java | 4 +- .../engine/storagegroup/StorageGroupProcessor.java | 53 +++++-------- .../java/org/apache/iotdb/db/utils/MergeUtils.java | 4 +- .../iotdb/db/integration/IoTDBSensorUpdateIT.java | 3 + 7 files changed, 84 insertions(+), 82 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java index c2bc33f..67381f4 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/inplace/task/MergeMultiChunkTask.java @@ -213,19 +213,21 @@ class MergeMultiChunkTask { int[] ptWrittens = new int[seqChunkMeta.length]; int mergeChunkSubTaskNum = IoTDBDescriptor.getInstance().getConfig() .getMergeChunkSubThreadNum(); - PriorityQueue<MetaListEntry>[] chunkMetaHeaps = new PriorityQueue[mergeChunkSubTaskNum]; + MetaListEntry[] metaListEntries = new MetaListEntry[currMergingPaths.size()]; + PriorityQueue<Integer>[] chunkIdxHeaps = new PriorityQueue[mergeChunkSubTaskNum]; for (int i = 0; i < mergeChunkSubTaskNum; i++) { - chunkMetaHeaps[i] = new PriorityQueue<>(); + chunkIdxHeaps[i] = new PriorityQueue<>(); } int idx = 0; for (int i = 0; i < currMergingPaths.size(); i++) { + chunkIdxHeaps[idx % mergeChunkSubTaskNum].add(i); if (seqChunkMeta[i].isEmpty()) { continue; } MetaListEntry entry = new MetaListEntry(i, seqChunkMeta[i]); entry.next(); - chunkMetaHeaps[idx % mergeChunkSubTaskNum].add(entry); + metaListEntries[i] = entry; idx++; ptWrittens[i] = 0; } @@ -237,9 +239,8 @@ class MergeMultiChunkTask { for (int i = 0; i < mergeChunkSubTaskNum; i++) { int finalI = i; futures.add(MergeManager.getINSTANCE().submitChunkSubTask(() -> { - mergeChunkHeap(chunkMetaHeaps[finalI], ptWrittens, reader, mergeFileWriter, unseqReaders, - currFile, - isLastFile); + mergeChunkHeap(chunkIdxHeaps[finalI], metaListEntries, ptWrittens, reader, mergeFileWriter, + unseqReaders, currFile, isLastFile); return null; })); } @@ -260,48 +261,49 @@ class MergeMultiChunkTask { return mergedChunkNum.get() > 0; } - private void mergeChunkHeap(PriorityQueue<MetaListEntry> chunkMetaHeap, int[] ptWrittens, - TsFileSequenceReader reader, - RestorableTsFileIOWriter mergeFileWriter, IPointReader[] unseqReaders, - TsFileResource currFile, - boolean isLastFile) throws IOException { - while (!chunkMetaHeap.isEmpty()) { - MetaListEntry metaListEntry = chunkMetaHeap.poll(); - ChunkMetadata currMeta = metaListEntry.current(); - int pathIdx = metaListEntry.getPathId(); - boolean isLastChunk = !metaListEntry.hasNext(); + private void mergeChunkHeap(PriorityQueue<Integer> chunkIdxHeap, MetaListEntry[] metaListEntries, + int[] ptWrittens, TsFileSequenceReader reader, RestorableTsFileIOWriter mergeFileWriter, + IPointReader[] unseqReaders, TsFileResource currFile, boolean isLastFile) throws IOException { + while (!chunkIdxHeap.isEmpty()) { + int pathIdx = chunkIdxHeap.poll(); Path path = currMergingPaths.get(pathIdx); IChunkWriter chunkWriter = resource.getChunkWriter(path); - boolean chunkOverflowed = MergeUtils.isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta); - boolean chunkTooSmall = MergeUtils - .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum); - - Chunk chunk; - synchronized (reader) { - chunk = reader.readMemChunk(currMeta); - } - ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk, - ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter, - currFile); - - if (!isLastChunk) { - metaListEntry.next(); - chunkMetaHeap.add(metaListEntry); - } else { - // this only happens when the seqFiles do not contain this series, otherwise the remaining - // data will be merged with the last chunk in the seqFiles - if (isLastFile && currTimeValuePairs[pathIdx] != null) { - ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], - Long.MAX_VALUE, - pathIdx); - mergedChunkNum.incrementAndGet(); + if (metaListEntries[pathIdx] != null) { + MetaListEntry metaListEntry = metaListEntries[pathIdx]; + ChunkMetadata currMeta = metaListEntry.current(); + boolean isLastChunk = !metaListEntry.hasNext(); + boolean chunkOverflowed = MergeUtils + .isChunkOverflowed(currTimeValuePairs[pathIdx], currMeta); + boolean chunkTooSmall = MergeUtils + .isChunkTooSmall(ptWrittens[pathIdx], currMeta, isLastChunk, minChunkPointNum); + + Chunk chunk; + synchronized (reader) { + chunk = reader.readMemChunk(currMeta); + } + ptWrittens[pathIdx] = mergeChunkV2(currMeta, chunkOverflowed, chunkTooSmall, chunk, + ptWrittens[pathIdx], pathIdx, mergeFileWriter, unseqReaders[pathIdx], chunkWriter, + currFile); + + if (!isLastChunk) { + metaListEntry.next(); + chunkIdxHeap.add(pathIdx); + continue; } - // the last merged chunk may still be smaller than the threshold, flush it anyway - if (ptWrittens[pathIdx] > 0) { - synchronized (mergeFileWriter) { - chunkWriter.writeToFileWriter(mergeFileWriter); - } + } + // this only happens when the seqFiles do not contain this series, otherwise the remaining + // data will be merged with the last chunk in the seqFiles + if (isLastFile && currTimeValuePairs[pathIdx] != null) { + ptWrittens[pathIdx] += writeRemainingUnseq(chunkWriter, unseqReaders[pathIdx], + Long.MAX_VALUE, + pathIdx); + mergedChunkNum.incrementAndGet(); + } + // the last merged chunk may still be smaller than the threshold, flush it anyway + if (ptWrittens[pathIdx] > 0) { + synchronized (mergeFileWriter) { + chunkWriter.writeToFileWriter(mergeFileWriter); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java index 9303d12..13c1b11 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/seqMerge/squeeze/task/SqueezeMergeTask.java @@ -152,7 +152,7 @@ public class SqueezeMergeTask implements Callable<Void> { } private void deleteFile(TsFileResource seqFile) { - seqFile.getWriteQueryLock().writeLock().lock(); + seqFile.writeLock(); try { resource.removeFileReader(seqFile); ChunkMetadataCache.getInstance().remove(seqFile); @@ -163,7 +163,7 @@ public class SqueezeMergeTask implements Callable<Void> { } catch (Exception e) { logger.error(e.getMessage(), e); } finally { - seqFile.getWriteQueryLock().writeLock().unlock(); + seqFile.writeUnlock(); } } } \ No newline at end of file diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java index 0ee3d00..51783bc 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeSeriesTask.java @@ -53,6 +53,14 @@ public class RegularizationMergeSeriesTask extends BaseMergeSeriesTask { .split(IoTDBConstant.TSFILE_NAME_SEPARATOR); String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "") .split(IoTDBConstant.TSFILE_NAME_SEPARATOR); + + //TODO: for test only + try { + Long.parseLong(items1[0]); + } catch (NumberFormatException e) { + return items1[0].compareTo(items2[0]); + } + return Long.compare(Long.parseLong(items1[0]), Long.parseLong(items2[0])); })); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java index 5ff1a38..62f3ca9 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/sizeMerge/regularization/task/RegularizationMergeTask.java @@ -161,7 +161,7 @@ public class RegularizationMergeTask implements Callable<Void> { } private void deleteFile(TsFileResource seqFile) { - seqFile.getWriteQueryLock().writeLock().lock(); + seqFile.writeLock(); try { resource.removeFileReader(seqFile); ChunkMetadataCache.getInstance().remove(seqFile); @@ -170,7 +170,7 @@ public class RegularizationMergeTask implements Callable<Void> { } catch (Exception e) { logger.error(e.getMessage(), e); } finally { - seqFile.getWriteQueryLock().writeLock().unlock(); + seqFile.writeUnlock(); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 90351c7..b2b1e54 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -1708,13 +1708,6 @@ public class StorageGroupProcessor { } private void removeSeqFiles(List<TsFileResource> seqFiles) { - mergeLock.writeLock().lock(); - try { - sequenceFileTreeSet.removeAll(seqFiles); - } finally { - mergeLock.writeLock().unlock(); - } - for (TsFileResource seqFile : seqFiles) { seqFile.writeLock(); try { @@ -1726,13 +1719,6 @@ public class StorageGroupProcessor { } private void removeUnseqFiles(List<TsFileResource> unseqFiles) { - mergeLock.writeLock().lock(); - try { - unSequenceFileList.removeAll(unseqFiles); - } finally { - mergeLock.writeLock().unlock(); - } - for (TsFileResource unseqFile : unseqFiles) { unseqFile.writeLock(); try { @@ -1808,29 +1794,34 @@ public class StorageGroupProcessor { private void handleInplaceMerge(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, File mergeLog) { - + mergeLock.writeLock().lock(); + try { + unSequenceFileList.removeAll(unseqFiles); + } finally { + mergeLock.writeLock().unlock(); + } removeUnseqFiles(unseqFiles); endMerge(mergeLog, seqFiles); } private void handleOtherMerge(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles, File mergeLog, List<TsFileResource> newFile) { - // make sure no queries are holding the seqFiles - for (TsFileResource seqFile : seqFiles) { - seqFile.writeLock(); - } - for (TsFileResource unseqFile : unseqFiles) { - unseqFile.writeLock(); - } // block new queries and insertions to prevent the seqFiles from changing + writeLock(); mergeLock.writeLock().lock(); try { - removeUnseqFiles(unseqFiles); - removeSeqFiles(seqFiles); + + unSequenceFileList.removeAll(unseqFiles); + sequenceFileTreeSet.removeAll(seqFiles); + // move modifications generated during merge into the new file for (TsFileResource tsFileResource : newFile) { - tsFileResource - .setProcessor(getOrCreateTsFileProcessor(tsFileResource.getTimePartition(), true)); + TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, + tsFileResource.getFile().getAbsoluteFile(), + getVersionControllerByTimePartitionId(tsFileResource.getTimePartition()), + this::closeUnsealedTsFileProcessorCallBack, + this::updateLatestFlushTimeCallback, true); + tsFileResource.setProcessor(tsFileProcessor); if (mergingModification != null) { logger.info("{} is updating the merged file's modification file", storageGroupName); for (Modification modification : mergingModification.getModifications()) { @@ -1847,15 +1838,13 @@ public class StorageGroupProcessor { logger.error("{} fails to do the after merge action,", storageGroupName, e); } finally { isMerging = false; + writeUnlock(); mergeLock.writeLock().unlock(); logger.info("{} a merge task ends", storageGroupName); - for (TsFileResource seqFile : seqFiles) { - seqFile.writeUnlock(); - } - for (TsFileResource unseqFile : unseqFiles) { - unseqFile.writeUnlock(); - } } + + removeUnseqFiles(unseqFiles); + removeSeqFiles(seqFiles); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java index 1e5e1e4..3350e24 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java @@ -35,8 +35,8 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.mnode.InternalMNode; -import org.apache.iotdb.db.metadata.mnode.LeafMNode; import org.apache.iotdb.db.metadata.mnode.MNode; +import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.TsFileSequenceReader; @@ -271,7 +271,7 @@ public class MergeUtils { for (String device : devices) { InternalMNode deviceNode = (InternalMNode) MManager.getInstance().getNodeByPath(device); for (Entry<String, MNode> entry : deviceNode.getChildren().entrySet()) { - MeasurementSchema measurementSchema = ((LeafMNode) entry.getValue()).getSchema(); + MeasurementSchema measurementSchema = ((MeasurementMNode) entry.getValue()).getSchema(); chunkWriterCacheMap .put(new Path(device, entry.getKey()), new ChunkWriterImpl(measurementSchema)); } diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java index c404297..9999f9b 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java @@ -25,6 +25,8 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.merge.seqMerge.SeqMergeFileStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; import org.junit.After; @@ -37,6 +39,7 @@ public class IoTDBSensorUpdateIT { public void setUp() throws Exception { EnvironmentUtils.closeStatMonitor(); EnvironmentUtils.envSetUp(); + IoTDBDescriptor.getInstance().getConfig().setSeqMergeFileStrategy(SeqMergeFileStrategy.INPLACE); Class.forName(Config.JDBC_DRIVER_NAME); }
