This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch feature_async_close_tsfile in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit faa9e8ec4a99217b4d5550bd15aa3598c4212267 Author: lta <[email protected]> AuthorDate: Fri Jun 28 17:23:24 2019 +0800 fix a bug of don't update endTimeMap when normal close UFP --- .../db/engine/filenodeV2/FileNodeProcessorV2.java | 14 ++++++----- .../db/engine/filenodeV2/TsFileResourceV2.java | 27 ++++++++++++---------- .../filenodeV2/UnsealedTsFileProcessorV2.java | 24 ++++++++++++------- .../engine/filenodeV2/FileNodeProcessorV2Test.java | 2 +- .../filenodeV2/UnsealedTsFileProcessorV2Test.java | 9 ++++---- 5 files changed, 45 insertions(+), 31 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java index a98a140..0c64be3 100755 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java @@ -47,7 +47,6 @@ import org.apache.iotdb.db.exception.UnsealedTsFileProcessorException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; -import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.db.utils.datastructure.TVListAllocator; import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; @@ -288,10 +287,11 @@ public class FileNodeProcessorV2 { // insert BufferWrite long start2 = System.currentTimeMillis(); - result = unsealedTsFileProcessor.insert(insertPlan); + result = unsealedTsFileProcessor.insert(insertPlan, sequence); start2 = System.currentTimeMillis() - start2; if (start2 > 1000) { - LOGGER.info("FNP {} insert a record into unsealed file processor cost: {}", storageGroupName, start2); + LOGGER.info("FNP {} insert a record into unsealed file processor cost: {}", storageGroupName, + start2); } // try to update the latest time of the device of this tsRecord @@ -355,6 +355,7 @@ public class FileNodeProcessorV2 { if (sequence) { closingSequenceTsFileProcessor.add(unsealedTsFileProcessor); workSequenceTsFileProcessor = null; + updateEndTimeMap(unsealedTsFileProcessor); } else { closingUnSequenceTsFileProcessor.add(unsealedTsFileProcessor); workUnSequenceTsFileProcessor = null; @@ -389,7 +390,8 @@ public class FileNodeProcessorV2 { lock.writeLock().lock(); time = System.currentTimeMillis() - time; if (time > 1000) { - LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time, new RuntimeException()); + LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time, + new RuntimeException()); } timerr.set(System.currentTimeMillis()); } @@ -398,7 +400,8 @@ public class FileNodeProcessorV2 { lock.writeLock().unlock(); long time = System.currentTimeMillis() - timerr.get(); if (time > 1000) { - LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time, new RuntimeException()); + LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time, + new RuntimeException()); } } @@ -535,7 +538,6 @@ public class FileNodeProcessorV2 { if (workUnSequenceTsFileProcessor != null) { closingUnSequenceTsFileProcessor.add(workUnSequenceTsFileProcessor); workUnSequenceTsFileProcessor.asyncClose(); - updateEndTimeMap(workUnSequenceTsFileProcessor); workUnSequenceTsFileProcessor = null; } } finally { diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java index a60ccf5..d226517 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java @@ -22,18 +22,14 @@ import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; -import org.apache.iotdb.db.engine.filenode.TsFileResource; import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData; @@ -51,8 +47,7 @@ public class TsFileResourceV2 { private Map<String, Long> startTimeMap; /** - * device -> end time - * null if it's an unsealed tsfile + * device -> end time null if it's an unsealed tsfile */ private Map<String, Long> endTimeMap; @@ -96,7 +91,8 @@ public class TsFileResourceV2 { } public void serialize() throws IOException { - try (OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file + RESOURCE_SUFFIX))){ + try (OutputStream outputStream = new BufferedOutputStream( + new FileOutputStream(file + RESOURCE_SUFFIX))) { ReadWriteIOUtils.write(this.startTimeMap.size(), outputStream); for (Entry<String, Long> entry : this.startTimeMap.entrySet()) { ReadWriteIOUtils.write(entry.getKey(), outputStream); @@ -111,7 +107,8 @@ public class TsFileResourceV2 { } public void deSerialize() throws IOException { - try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file + RESOURCE_SUFFIX))) { + try (InputStream inputStream = new BufferedInputStream( + new FileInputStream(file + RESOURCE_SUFFIX))) { int size = ReadWriteIOUtils.readInt(inputStream); Map<String, Long> startTimes = new HashMap<>(); for (int i = 0; i < size; i++) { @@ -132,10 +129,16 @@ public class TsFileResourceV2 { } public void updateStartTime(String device, long time) { - startTimeMap.putIfAbsent(device, time); - long startTime = startTimeMap.get(device); - if (time < startTimeMap.get(device)) { - startTimeMap.put(device, startTime); + long startTime = startTimeMap.getOrDefault(device, Long.MAX_VALUE); + if (time < startTime) { + startTimeMap.put(device, time); + } + } + + public void updateEndTime(String device, long time) { + long endTime = endTimeMap.getOrDefault(device, Long.MIN_VALUE); + if (time > endTime) { + endTimeMap.put(device, time); } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java index 81c3cf0..c01c33c 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java @@ -116,7 +116,7 @@ public class UnsealedTsFileProcessorV2 { * @param insertPlan physical plan of insertion * @return succeed or fail */ - public boolean insert(InsertPlan insertPlan) { + public boolean insert(InsertPlan insertPlan, boolean sequence) { long start1 = System.currentTimeMillis(); if (workMemTable == null) { @@ -144,13 +144,17 @@ public class UnsealedTsFileProcessorV2 { } // update start time of this memtable tsFileResource.updateStartTime(insertPlan.getDeviceId(), insertPlan.getTime()); + if (!sequence) { + tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime()); + } long start2 = System.currentTimeMillis(); // insert tsRecord to work memtable workMemTable.insert(insertPlan); start2 = System.currentTimeMillis() - start2; if (start2 > 1000) { - LOGGER.info("UFP {} insert into memtable cost: {}, insertPlan: {}, current data points in memtable: {}", + LOGGER.info( + "UFP {} insert into memtable cost: {}, insertPlan: {}, current data points in memtable: {}", storageGroupName, start2, insertPlan, workMemTable.size()); } @@ -212,8 +216,8 @@ public class UnsealedTsFileProcessorV2 { } /** - * Ensure there must be a flush thread submitted after setCloseMark() is called, - * therefore the close task will be executed by a flush thread. + * Ensure there must be a flush thread submitted after setCloseMark() is called, therefore the + * close task will be executed by a flush thread. */ public void asyncClose() { flushQueryLock.writeLock().lock(); @@ -221,7 +225,9 @@ public class UnsealedTsFileProcessorV2 { try { IMemTable tmpMemTable = workMemTable == null ? new EmptyMemTable() : workMemTable; if (!tmpMemTable.isManagedByMemPool()) { - LOGGER.info("storage group {} add an empty memtable into flushing memtable list when async close", storageGroupName); + LOGGER.info( + "storage group {} add an empty memtable into flushing memtable list when async close", + storageGroupName); } else { LOGGER.info("storage group {} async flush a memtable when async close", storageGroupName); } @@ -299,7 +305,7 @@ public class UnsealedTsFileProcessorV2 { writer.makeMetadataVisible(); flushingMemTables.remove(memTable); LOGGER.info("flush finished, remove a memtable from flushing list, " - + "flushing memtable list size: {}", flushingMemTables.size()); + + "flushing memtable list size: {}", flushingMemTables.size()); } finally { flushQueryLock.writeLock().unlock(); } @@ -317,7 +323,8 @@ public class UnsealedTsFileProcessorV2 { // null memtable only appears when calling asyncClose() if (memTableToFlush.isManagedByMemPool()) { - MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, storageGroupName, + MemTableFlushTaskV2 flushTask = new MemTableFlushTaskV2(memTableToFlush, fileSchema, writer, + storageGroupName, this::releaseFlushedMemTableCallback); flushTask.flushMemTable(); long start = System.currentTimeMillis(); @@ -397,7 +404,8 @@ public class UnsealedTsFileProcessorV2 { public void close() throws IOException { tsFileResource.close(); - MultiFileLogNodeManager.getInstance().deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName()); + MultiFileLogNodeManager.getInstance() + .deleteNode(storageGroupName + "-" + tsFileResource.getFile().getName()); } public void setManagedByFlushManager(boolean managedByFlushManager) { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java index 6c6120f..e53c351 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java @@ -60,7 +60,7 @@ public class FileNodeProcessorV2Test { processor.insert(new InsertPlan(record)); processor.asyncForceClose(); } - + processor.syncCloseFileNode(); QueryDataSourceV2 queryDataSource = null; try { diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java index 94f5f1b..2331391 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2Test.java @@ -19,7 +19,8 @@ package org.apache.iotdb.db.engine.filenodeV2; import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import java.io.File; import java.io.IOException; @@ -86,7 +87,7 @@ public class UnsealedTsFileProcessorV2Test { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertPlan(record), true); } // query data in memory @@ -134,7 +135,7 @@ public class UnsealedTsFileProcessorV2Test { for (int i = 1; i <= 10; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertPlan(record), true); } processor.asyncFlush(); } @@ -177,7 +178,7 @@ public class UnsealedTsFileProcessorV2Test { for (int i = 1; i <= 100; i++) { TSRecord record = new TSRecord(i, deviceId); record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(i))); - processor.insert(new InsertPlan(record)); + processor.insert(new InsertPlan(record), true); } // query data in memory
