This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch jira1306_012 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit daa44f7d4f45a25b0bbcfbc6cde33fcc61295370 Author: HTHou <[email protected]> AuthorDate: Thu Apr 15 17:08:30 2021 +0800 [To rel/0.12] [IOTDB-1306]Fix DeadLock in MemControl module --- .../engine/storagegroup/StorageGroupProcessor.java | 12 ++++++++++ .../db/engine/storagegroup/TsFileProcessor.java | 27 ++++++++++++++-------- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 5ddf1e3..5a7c2f6 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -106,6 +106,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -160,6 +161,8 @@ public class StorageGroupProcessor { * partitionLatestFlushedTimeForEachDevice) */ private final ReadWriteLock insertLock = new ReentrantReadWriteLock(); + + private final Condition rejectCondition = insertLock.writeLock().newCondition(); /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */ private final Object closeStorageGroupCondition = new Object(); /** @@ -1100,6 +1103,7 @@ public class StorageGroupProcessor { } } finally { writeUnlock(); + rejectConditionSignal(); } } @@ -1585,6 +1589,14 @@ public class StorageGroupProcessor { insertLock.writeLock().unlock(); } + public void rejectConditionAwait() throws InterruptedException { + rejectCondition.await(config.getCheckPeriodWhenInsertBlocked(), TimeUnit.MILLISECONDS); + } + + public void rejectConditionSignal() { + rejectCondition.signal(); + } + /** * @param tsFileResources includes sealed and unsealed tsfile resources * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index c20ec7e..ff795f2 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -22,7 +22,6 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.CompressionRatio; -import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.flush.CloseFileListener; import org.apache.iotdb.db.engine.flush.FlushListener; import org.apache.iotdb.db.engine.flush.FlushManager; @@ -386,15 +385,25 @@ public class TsFileProcessor { storageGroupInfo.addStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement); if (storageGroupInfo.needToReportToSystem()) { - SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo); - try { - StorageEngine.blockInsertionIfReject(); - } catch (WriteProcessRejectException e) { - storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); - tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement); - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false); - throw e; + long startTime = System.currentTimeMillis(); + while (SystemInfo.getInstance().isRejected()) { + try { + storageGroupInfo.getStorageGroupProcessor().rejectConditionAwait(); + if (System.currentTimeMillis() - startTime + > config.getMaxWaitingTimeWhenInsertBlocked()) { + throw new WriteProcessRejectException( + "System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() + "ms"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (WriteProcessRejectException e) { + storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); + tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + chunkMetadataIncrement); + SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, false); + throw e; + } } + logger.debug("Time for Waiting memory release is {}", System.currentTimeMillis() - startTime); } workMemTable.addTVListRamCost(memTableIncrement); workMemTable.addTextDataSize(textDataIncrement);
