This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch fix_mem_control_211 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2deebdbf2a63f337f95cb2a9fb1e36655dd228f9 Author: HTHou <[email protected]> AuthorDate: Fri Nov 6 17:29:14 2020 +0800 fix cannot close file --- .../iotdb/db/engine/storagegroup/StorageGroupInfo.java | 14 +++++++------- .../db/engine/storagegroup/StorageGroupProcessor.java | 13 ++++++++----- .../iotdb/db/engine/storagegroup/TsFileProcessor.java | 15 ++++++++++----- .../main/java/org/apache/iotdb/db/rescon/SystemInfo.java | 6 ++++++ 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java index e696a60..a31d41a 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java @@ -18,8 +18,8 @@ */ package org.apache.iotdb.db.engine.storagegroup; -import java.util.HashSet; -import java.util.Set; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -44,12 +44,12 @@ public class StorageGroupInfo { private long storageGroupSizeReportThreshold = IoTDBDescriptor.getInstance().getConfig().getStorageGroupSizeReportThreshold(); - private long lastReportedSize = 0L; + private AtomicLong lastReportedSize = new AtomicLong(); /** * A set of all unclosed TsFileProcessors in this SG */ - private Set<TsFileProcessor> reportedTsps = new HashSet<>(); + private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>(); public StorageGroupInfo(StorageGroupProcessor storageGroupProcessor) { this.storageGroupProcessor = storageGroupProcessor; @@ -81,16 +81,16 @@ public class StorageGroupInfo { return memoryCost.get(); } - public Set<TsFileProcessor> getAllReportedTsp() { + public List<TsFileProcessor> getAllReportedTsp() { return reportedTsps; } public boolean needToReportToSystem() { - return memoryCost.get() - lastReportedSize > storageGroupSizeReportThreshold; + return memoryCost.get() - lastReportedSize.get() > storageGroupSizeReportThreshold; } public void setLastReportedSize(long size) { - lastReportedSize = size; + lastReportedSize.set(size); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 17d09d6..be56a73 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -829,6 +829,10 @@ public class StorageGroupProcessor { .put(insertTabletPlan.getDeviceId().getFullPath(), insertTabletPlan.getTimes()[end - 1]); } + if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || + closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { + return true; + } // check memtable size and may async try to flush the work memtable if (tsFileProcessor.shouldFlush()) { fileFlushPolicy.apply(this, tsFileProcessor, sequence); @@ -914,13 +918,12 @@ public class StorageGroupProcessor { } public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) { - if (closingSequenceTsFileProcessor.contains(tsFileProcessor) || - closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { - return; - } writeLock(); try { - fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) && + !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) { + fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + } } finally { writeUnlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 5e5f083..7ba02eb 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -493,15 +493,15 @@ public class TsFileProcessor { } try { - if (logger.isInfoEnabled()) { + if (logger.isDebugEnabled()) { if (workMemTable != null) { - logger.info( + logger.debug( "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile size: {}", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), workMemTable.memSize(), tsFileResource.getTsFileSize()); } else { - logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}", + logger.debug("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), tsFileResource.getTsFileSize()); } @@ -676,13 +676,18 @@ public class TsFileProcessor { } memTable.release(); if (enableMemControl) { - // For text type data, reset the mem cost in tsFileProcessorInfo + // reset the mem cost in StorageGroupProcessorInfo storageGroupInfo.releaseStorageGroupMemCost(memTable.getTVListsRamCost()); + if (logger.isDebugEnabled()) { + logger.debug("[mem control] {}: {} flush finished, try to reset system memcost, " + + "flushing memtable list size: {}", storageGroupName, + tsFileResource.getTsFile().getName(), flushingMemTables.size()); + } // report to System SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, true); } if (logger.isDebugEnabled()) { - logger.debug("{}: {} flush finished, remove a memtable from flushing list, " + logger.debug("[mem_control] {}: {} flush finished, remove a memtable from flushing list, " + "flushing memtable list size: {}", storageGroupName, tsFileResource.getTsFile().getName(), flushingMemTables.size()); } diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index aa32467..74c00f0 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -147,7 +147,13 @@ public class SystemInfo { * Be Careful!! This method can only be called by flush thread! */ private void forceAsyncFlush() { + if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) { + return; + } List<TsFileProcessor> processors = getTsFileProcessorsToFlush(); + if (logger.isDebugEnabled()) { + logger.debug("[mem control] get {} tsp to flush", processors.size()); + } for (TsFileProcessor processor : processors) { if (processor != null) { processor.startAsyncFlush();
