This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch feature_async_close_tsfile in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 8d69b5ea79c3b0c5e402f0d7d1b62049620d045e Author: qiaojialin <[email protected]> AuthorDate: Fri Jun 28 19:45:04 2019 +0800 fix query clone TVList bug --- .../db/engine/filenodeV2/FileNodeProcessorV2.java | 25 ++++--- .../db/engine/filenodeV2/TsFileResourceV2.java | 4 ++ .../filenodeV2/UnsealedTsFileProcessorV2.java | 21 +++--- .../iotdb/db/engine/memtable/AbstractMemTable.java | 44 +++++++++---- .../apache/iotdb/db/engine/memtable/IMemTable.java | 2 + .../db/engine/memtable/MemTableFlushTaskV2.java | 2 - .../db/engine/memtable/TimeValuePairSorter.java | 2 +- .../db/engine/memtable/WritableMemChunkV2.java | 76 +++++++++++----------- .../iotdb/db/utils/datastructure/TVList.java | 4 +- .../engine/filenodeV2/FileNodeProcessorV2Test.java | 1 + .../UnseqSeriesReaderByTimestampTest.java | 2 +- 11 files changed, 109 insertions(+), 74 deletions(-) diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java index a98a140..7615bc0 100755 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java @@ -89,12 +89,13 @@ public class FileNodeProcessorV2 { private String storageGroupName; - private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadWriteLock insertLock = new ReentrantReadWriteLock(); private final Object closeFileNodeCondition = new Object(); private final ThreadLocal<Long> timerr = new ThreadLocal<>(); + private final ReadWriteLock closeQueryLock = new ReentrantReadWriteLock(); /** * Mark whether to close file node */ @@ -372,7 +373,7 @@ public class FileNodeProcessorV2 { // TODO need a read lock, please consider the concurrency with flush manager threads. public QueryDataSourceV2 query(String deviceId, String measurementId) throws FileNodeProcessorException { - lock.readLock().lock(); + insertLock.readLock().lock(); try { List<TsFileResourceV2> seqResources = getFileReSourceListForQuery(sequenceFileList, deviceId, measurementId); @@ -380,13 +381,13 @@ public class FileNodeProcessorV2 { deviceId, measurementId); return new QueryDataSourceV2(new Path(deviceId, measurementId), seqResources, unseqResources); } finally { - lock.readLock().unlock(); + insertLock.readLock().unlock(); } } private void writeLock() { long time = System.currentTimeMillis(); - lock.writeLock().lock(); + insertLock.writeLock().lock(); time = System.currentTimeMillis() - time; if (time > 1000) { LOGGER.info("storage group {} wait for write lock cost: {}", storageGroupName, time, new RuntimeException()); @@ -395,7 +396,7 @@ public class FileNodeProcessorV2 { } private void writeUnlock() { - lock.writeLock().unlock(); + insertLock.writeLock().unlock(); long time = System.currentTimeMillis() - timerr.get(); if (time > 1000) { LOGGER.info("storage group {} take lock for {}ms", storageGroupName, time, new RuntimeException()); @@ -418,8 +419,9 @@ public class FileNodeProcessorV2 { if (!tsFileResource.containsDevice(deviceId)) { continue; } - synchronized (tsFileResource) { - if (!tsFileResource.getStartTimeMap().isEmpty()) { + if (!tsFileResource.getStartTimeMap().isEmpty()) { + closeQueryLock.readLock().lock(); + try { if (tsFileResource.isClosed()) { tsfileResourcesForQuery.add(tsFileResource); } else { @@ -432,8 +434,12 @@ public class FileNodeProcessorV2 { throw new FileNodeProcessorException(e); } tsfileResourcesForQuery - .add(new TsFileResourceV2(tsFileResource.getFile(), pair.left, pair.right)); + .add(new TsFileResourceV2(tsFileResource.getFile(), + tsFileResource.getStartTimeMap(), + tsFileResource.getEndTimeMap(), pair.left, pair.right)); } + } finally { + closeQueryLock.readLock().unlock(); } } } @@ -633,10 +639,13 @@ public class FileNodeProcessorV2 { // TODO please consider concurrency with query and insert method. public void closeUnsealedTsFileProcessorCallback( UnsealedTsFileProcessorV2 unsealedTsFileProcessor) { + closeQueryLock.writeLock().lock(); try { unsealedTsFileProcessor.close(); } catch (IOException e) { LOGGER.error("storage group: {} close unsealedTsFileProcessor failed", storageGroupName, e); + } finally { + closeQueryLock.writeLock().unlock(); } if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) { closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor); diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java index a60ccf5..e9366d0 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/TsFileResourceV2.java @@ -88,9 +88,13 @@ public class TsFileResourceV2 { } public TsFileResourceV2(File file, + Map<String, Long> startTimeMap, + Map<String, Long> endTimeMap, ReadOnlyMemChunk readOnlyMemChunk, List<ChunkMetaData> chunkMetaDatas) { this.file = file; + this.startTimeMap = startTimeMap; + this.endTimeMap = endTimeMap; this.chunkMetaDatas = chunkMetaDatas; this.readOnlyMemChunk = readOnlyMemChunk; } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java index 81c3cf0..b432f3e 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import java.util.function.Supplier; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.memtable.EmptyMemTable; import org.apache.iotdb.db.engine.memtable.IMemTable; @@ -298,6 +300,7 @@ public class UnsealedTsFileProcessorV2 { try { writer.makeMetadataVisible(); flushingMemTables.remove(memTable); + memTable.release(); LOGGER.info("flush finished, remove a memtable from flushing list, " + "flushing memtable list size: {}", flushingMemTables.size()); } finally { @@ -359,14 +362,10 @@ public class UnsealedTsFileProcessorV2 { writer.endFile(fileSchema); - flushQueryLock.writeLock().lock(); - try { // remove this processor from Closing list in FileNodeProcessor, mark the TsFileResource closed, no need writer anymore - closeUnsealedFileCallback.accept(this); - writer = null; - } finally { - flushQueryLock.writeLock().unlock(); - } + closeUnsealedFileCallback.accept(this); + + writer = null; // delete the restore for this bufferwrite processor if (LOGGER.isInfoEnabled()) { @@ -440,12 +439,12 @@ public class UnsealedTsFileProcessorV2 { if (!flushingMemTable.isManagedByMemPool()) { continue; } - memSeriesLazyMerger - .addMemSeries(flushingMemTable.query(deviceId, measurementId, dataType, props)); + ReadOnlyMemChunk memChunk = flushingMemTable.query(deviceId, measurementId, dataType, props); + memSeriesLazyMerger.addMemSeries(memChunk); } if (workMemTable != null) { - memSeriesLazyMerger - .addMemSeries(workMemTable.query(deviceId, measurementId, dataType, props)); + ReadOnlyMemChunk memChunk = workMemTable.query(deviceId, measurementId, dataType, props); + memSeriesLazyMerger.addMemSeries(memChunk); } // memSeriesLazyMerger has handled the props, // so we do not need to handle it again in the following readOnlyMemChunk diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index fe3a5f5..8073704 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -24,13 +24,23 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.utils.MemUtils; import org.apache.iotdb.db.utils.TimeValuePair; +import org.apache.iotdb.db.utils.TsPrimitiveType.TsBinary; +import org.apache.iotdb.db.utils.TsPrimitiveType.TsBoolean; +import org.apache.iotdb.db.utils.TsPrimitiveType.TsDouble; +import org.apache.iotdb.db.utils.TsPrimitiveType.TsFloat; +import org.apache.iotdb.db.utils.TsPrimitiveType.TsInt; +import org.apache.iotdb.db.utils.TsPrimitiveType.TsLong; +import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.db.utils.datastructure.TVListAllocator; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; public abstract class AbstractMemTable implements IMemTable { @@ -143,11 +153,20 @@ public abstract class AbstractMemTable implements IMemTable { @Override public ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType, Map<String, String> props) { - - return new ReadOnlyMemChunk(dataType, getSeriesData(deviceId, - measurement, dataType), props); + TimeValuePairSorter sorter; + if (!checkPath(deviceId, measurement)) { + sorter = new WritableMemChunk(dataType); + } else { + long undeletedTime = findUndeletedTime(deviceId, measurement); + IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement); + IWritableMemChunk chunkCopy = new WritableMemChunkV2(dataType, memChunk.getTVList().clone()); + chunkCopy.setTimeOffset(undeletedTime); + sorter = chunkCopy; + } + return new ReadOnlyMemChunk(dataType, sorter, props); } + private long findUndeletedTime(String deviceId, String measurement) { String path = deviceId + PATH_SEPARATOR + measurement; long undeletedTime = 0; @@ -162,16 +181,6 @@ public abstract class AbstractMemTable implements IMemTable { return undeletedTime + 1; } - private TimeValuePairSorter getSeriesData(String deviceId, String measurement, TSDataType dataType) { - if (!checkPath(deviceId, measurement)) { - return new WritableMemChunk(dataType); - } - long undeletedTime = findUndeletedTime(deviceId, measurement); - IWritableMemChunk memChunk = memTableMap.get(deviceId).get(measurement); - memChunk.setTimeOffset(undeletedTime); - return memChunk; - } - @Override public boolean delete(String deviceId, String measurementId, long timestamp) { Map<String, IWritableMemChunk> deviceMap = memTableMap.get(deviceId); @@ -263,4 +272,13 @@ public abstract class AbstractMemTable implements IMemTable { public TVListAllocator getTVListAllocator() { return allocator; } + + @Override + public void release() { + for (Entry<String, Map<String, IWritableMemChunk>> entry: memTableMap.entrySet()) { + for (Entry<String, IWritableMemChunk> subEntry: entry.getValue().entrySet()) { + allocator.release(entry.getKey() + IoTDBConstant.PATH_SEPARATOR + subEntry.getKey(), subEntry.getValue().getTVList()); + } + } + } } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index 9f0a453..8408b54 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -102,4 +102,6 @@ public interface IMemTable { void setTVListAllocator(TVListAllocator allocator); TVListAllocator getTVListAllocator(); + + void release(); } diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java index a6f182d..6ef25a7 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTableFlushTaskV2.java @@ -143,8 +143,6 @@ public class MemTableFlushTaskV2 { try { writeOneSeries(encodingMessage.left, seriesWriter, encodingMessage.right.getType()); - memTable.getTVListAllocator().release(currDevice + IoTDBConstant.PATH_SEPARATOR - + encodingMessage.right.getMeasurementId(), encodingMessage.left); ioTaskQueue.add(seriesWriter); } catch (IOException e) { LOGGER.error("Storage group {} memtable {}, encoding task error.", storageGroup, diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java index ffbb7ec..12e36f7 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/TimeValuePairSorter.java @@ -26,7 +26,7 @@ import org.apache.iotdb.db.utils.TimeValuePair; public interface TimeValuePairSorter { /** - * get the distinct sorted startTime. + * get the distinct sorted startTime. Only for query. * * @return a List which contains all distinct {@link TimeValuePair}s in ascending order by * timestamp. diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java index 14a8335..8d34e67 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/WritableMemChunkV2.java @@ -39,6 +39,7 @@ public class WritableMemChunkV2 implements IWritableMemChunk { private static final Logger LOGGER = LoggerFactory.getLogger(WritableMemChunkV2.class); private TSDataType dataType; private TVList list; + private List<TimeValuePair> sortedList; public WritableMemChunkV2(TSDataType dataType, TVList list) { this.dataType = dataType; @@ -129,7 +130,7 @@ public class WritableMemChunkV2 implements IWritableMemChunk { } @Override - public TVList getSortedTVList() { + public synchronized TVList getSortedTVList() { list.sort(); return list; } @@ -155,42 +156,43 @@ public class WritableMemChunkV2 implements IWritableMemChunk { } @Override - public List<TimeValuePair> getSortedTimeValuePairList() { - List<TimeValuePair> result = new ArrayList<>(); - TVList cloneList = list.clone(); - cloneList.sort(); - for (int i = 0; i < cloneList.size(); i++) { - long time = cloneList.getTime(i); - if (time < cloneList.getTimeOffset() || - (i+1 < cloneList.size() && (time == cloneList.getTime(i+1)))) { - continue; - } - - switch (dataType) { - case BOOLEAN: - result.add(new TimeValuePair(time, new TsBoolean(cloneList.getBoolean(i)))); - break; - case INT32: - result.add(new TimeValuePair(time, new TsInt(cloneList.getInt(i)))); - break; - case INT64: - result.add(new TimeValuePair(time, new TsLong(cloneList.getLong(i)))); - break; - case FLOAT: - result.add(new TimeValuePair(time, new TsFloat(cloneList.getFloat(i)))); - break; - case DOUBLE: - result.add(new TimeValuePair(time, new TsDouble(cloneList.getDouble(i)))); - break; - case TEXT: - result.add(new TimeValuePair(time, new TsBinary(cloneList.getBinary(i)))); - break; - default: - LOGGER.error("don't support data type: {}", dataType); - break; - } - } - return result; + public synchronized List<TimeValuePair> getSortedTimeValuePairList() { + if (sortedList != null) { + return sortedList; + } + sortedList = new ArrayList<>(); + list.sort(); + for (int i = 0; i < list.size(); i++) { + long time = list.getTime(i); + if (time < list.getTimeOffset() || + (i + 1 < list.size() && (time == list.getTime(i + 1)))) { + continue; + } + switch (dataType) { + case BOOLEAN: + sortedList.add(new TimeValuePair(time, new TsBoolean(list.getBoolean(i)))); + break; + case INT32: + sortedList.add(new TimeValuePair(time, new TsInt(list.getInt(i)))); + break; + case INT64: + sortedList.add(new TimeValuePair(time, new TsLong(list.getLong(i)))); + break; + case FLOAT: + sortedList.add(new TimeValuePair(time, new TsFloat(list.getFloat(i)))); + break; + case DOUBLE: + sortedList.add(new TimeValuePair(time, new TsDouble(list.getDouble(i)))); + break; + case TEXT: + sortedList.add(new TimeValuePair(time, new TsBinary(list.getBinary(i)))); + break; + default: + LOGGER.error("don't support data type: {}", dataType); + break; + } + } + return this.sortedList; } @Override diff --git a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 90779ff..35e9fd6 100644 --- a/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -144,7 +144,6 @@ public abstract class TVList { public void reset() { size = 0; - limit = 0; timeOffset = -1; sorted = false; } @@ -164,6 +163,9 @@ public abstract class TVList { } protected void sort(int lo, int hi) { + if (sorted) { + return; + } if (lo == hi) { return; } diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java index 6c6120f..508a411 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2Test.java @@ -55,6 +55,7 @@ public class FileNodeProcessorV2Test { @Test public void testSequenceSyncClose() { for (int j = 1; j <= 100; j++) { + System.out.println(j); TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); processor.insert(new InsertPlan(record)); diff --git a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java index 78dfdbf..822a9a5 100644 --- a/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java +++ b/iotdb/src/test/java/org/apache/iotdb/db/query/reader/unsequence/UnseqSeriesReaderByTimestampTest.java @@ -74,7 +74,7 @@ public class UnseqSeriesReaderByTimestampTest { TSRecord record = new TSRecord(2, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(100))); FileNodeManagerV2.getInstance().insert(new InsertPlan(record)); -// FileNodeManagerV2.getInstance().asyncFlushAndSealAllFiles(); + // FileNodeManagerV2.getInstance().asyncFlushAndSealAllFiles(); // query List<Path> paths = new ArrayList<>();
