This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-wal-resource-management in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8bedf08b00be5bd826ae35996145b4b790875b3f Author: Steve Yurong Su <[email protected]> AuthorDate: Thu May 25 02:31:42 2023 +0800 wal resource manager --- .../iotdb/commons/concurrent/ThreadName.java | 1 + .../db/engine/storagegroup/TsFileProcessor.java | 27 ++-- .../db/pipe/resource/PipeResourceManager.java | 9 ++ .../{ => file}/PipeFileResourceManager.java | 2 +- .../db/pipe/resource/wal/PipeWALResource.java | 158 +++++++++++++++++++++ .../pipe/resource/wal/PipeWALResourceManager.java | 98 +++++++++++++ .../apache/iotdb/db/wal/utils/WALPipeHandler.java | 4 + .../pipe/resource/PipeFileResourceManagerTest.java | 1 + 8 files changed, 286 insertions(+), 14 deletions(-) diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index d156287e1f3..718c5a6bfc1 100644 --- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -72,6 +72,7 @@ public enum ThreadName { PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"), PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"), PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"), + PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE("Pipe-WAL-Resource-TTL-Checker-Service"), ; private final String name; diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index f4f33c10473..a0a36e3a1c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -254,8 +254,9 @@ public class TsFileProcessor { } long startTime = System.nanoTime(); + WALFlushListener walFlushListener; try { - WALFlushListener walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode); + walFlushListener = walNode.log(workMemTable.getMemTableId(), insertRowNode); if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { throw walFlushListener.getCause(); } @@ -273,17 +274,17 @@ public class TsFileProcessor { } startTime = System.nanoTime(); + + PipeInsertionDataNodeListener.getInstance() + .listenToInsertNode( + dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource); + if (insertRowNode.isAligned()) { workMemTable.insertAlignedRow(insertRowNode); } else { workMemTable.insert(insertRowNode); } - // collect plan node in pipe - PipeInsertionDataNodeListener.getInstance() - .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, tsFileResource); - // update start time of this memtable tsFileResource.updateStartTime( insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime()); @@ -354,9 +355,9 @@ public class TsFileProcessor { } long startTime = System.nanoTime(); + WALFlushListener walFlushListener; try { - WALFlushListener walFlushListener = - walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end); + walFlushListener = walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, end); if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) { throw walFlushListener.getCause(); } @@ -373,6 +374,11 @@ public class TsFileProcessor { } startTime = System.nanoTime(); + + PipeInsertionDataNodeListener.getInstance() + .listenToInsertNode( + dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource); + try { if (insertTabletNode.isAligned()) { workMemTable.insertAlignedTablet(insertTabletNode, start, end); @@ -389,11 +395,6 @@ public class TsFileProcessor { results[i] = RpcUtils.SUCCESS_STATUS; } - // collect plan node in pipe - PipeInsertionDataNodeListener.getInstance() - .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionId(), insertTabletNode, tsFileResource); - tsFileResource.updateStartTime( insertTabletNode.getDeviceID().toStringID(), insertTabletNode.getTimes()[start]); // for sequence tsfile, we update the endTime only when the file is prepared to be closed. diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java index 61b4e61a04e..43bddd872f6 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java @@ -19,18 +19,27 @@ package org.apache.iotdb.db.pipe.resource; +import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager; +import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager; + public class PipeResourceManager { private final PipeFileResourceManager pipeFileResourceManager; + private final PipeWALResourceManager pipeWALResourceManager; public static PipeFileResourceManager file() { return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager; } + public static PipeWALResourceManager wal() { + return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager; + } + ///////////////////////////// SINGLETON ///////////////////////////// private PipeResourceManager() { pipeFileResourceManager = new PipeFileResourceManager(); + pipeWALResourceManager = new PipeWALResourceManager(); } private static class PipeResourceManagerHolder { diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java similarity index 99% rename from server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java rename to server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java index e7d961b3c9f..942ab600536 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.resource; +package org.apache.iotdb.db.pipe.resource.file; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.utils.FileUtils; diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java new file mode 100644 index 00000000000..844420272bd --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.wal; + +import org.apache.iotdb.db.wal.exception.MemTablePinException; +import org.apache.iotdb.db.wal.utils.WALPipeHandler; +import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException; +import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class PipeWALResource implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeWALResource.class); + + private final WALPipeHandler walPipeHandler; + + private final AtomicInteger referenceCount; + + // TODO: make this configurable + public static final long MIN_TIME_TO_LIVE_IN_MS = 1000 * 60; + private final AtomicLong lastLogicalPinTime; + private final AtomicBoolean isPhysicallyPinned; + + public PipeWALResource(WALPipeHandler walPipeHandler) { + this.walPipeHandler = walPipeHandler; + + referenceCount = new AtomicInteger(0); + + lastLogicalPinTime = new AtomicLong(0); + isPhysicallyPinned = new AtomicBoolean(false); + } + + public void pin() throws PipeRuntimeNonCriticalException { + if (referenceCount.get() == 0) { + if (!isPhysicallyPinned.get()) { + try { + walPipeHandler.pinMemTable(); + } catch (MemTablePinException e) { + throw new PipeRuntimeNonCriticalException( + String.format( + "failed to pin wal %d, because %s", + walPipeHandler.getMemTableId(), e.getMessage())); + } + isPhysicallyPinned.set(true); + LOGGER.info("wal {} is pinned by pipe engine", walPipeHandler.getMemTableId()); + } // else means the wal is already pinned, do nothing + + // no matter the wal is pinned or not, update the last pin time + lastLogicalPinTime.set(System.currentTimeMillis()); + } + + referenceCount.incrementAndGet(); + } + + public void unpin() throws PipeRuntimeNonCriticalException { + final int finalReferenceCount = referenceCount.get(); + + if (finalReferenceCount == 1) { + unpinPhysicallyIfOutOfTimeToLive(); + } else if (finalReferenceCount < 1) { + throw new PipeRuntimeCriticalException( + String.format( + "wal %d is unpinned more than pinned, this should not happen", + walPipeHandler.getMemTableId())); + } + + referenceCount.decrementAndGet(); + } + + /** + * Invalidate the wal if it is unpinned and out of time to live. + * + * @return true if the wal is invalidated, false otherwise + */ + public boolean invalidateIfPossible() { + if (referenceCount.get() > 0) { + return false; + } + + // referenceCount.get() == 0 + return unpinPhysicallyIfOutOfTimeToLive(); + } + + /** + * Unpin the wal if it is out of time to live. + * + * @return true if the wal is unpinned physically (then it can be invalidated), false otherwise + */ + private boolean unpinPhysicallyIfOutOfTimeToLive() { + if (isPhysicallyPinned.get()) { + if (System.currentTimeMillis() - lastLogicalPinTime.get() > MIN_TIME_TO_LIVE_IN_MS) { + try { + walPipeHandler.unpinMemTable(); + } catch (MemTablePinException e) { + throw new PipeRuntimeNonCriticalException( + String.format( + "failed to unpin wal %d, because %s", + walPipeHandler.getMemTableId(), e.getMessage())); + } + isPhysicallyPinned.set(false); + LOGGER.info( + "wal {} is unpinned by pipe engine when checking time to live", + walPipeHandler.getMemTableId()); + return true; + } else { + return false; + } + } else { + LOGGER.info( + "wal {} is not pinned physically when checking time to live", + walPipeHandler.getMemTableId()); + return true; + } + } + + @Override + public void close() { + if (isPhysicallyPinned.get()) { + try { + walPipeHandler.unpinMemTable(); + } catch (MemTablePinException e) { + LOGGER.error( + "failed to unpin wal {} when closing pipe wal resource, because {}", + walPipeHandler.getMemTableId(), + e.getMessage()); + } + isPhysicallyPinned.set(false); + LOGGER.info( + "wal {} is unpinned by pipe engine when closing pipe wal resource", + walPipeHandler.getMemTableId()); + } + + referenceCount.set(0); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java new file mode 100644 index 00000000000..c187a29f781 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java @@ -0,0 +1,98 @@ +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless r [...] + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.db.wal.utils.WALPipeHandler; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class PipeWALResourceManager implements AutoCloseable { + + private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap; + + private static final int SEGMENT_LOCK_COUNT = 32; + private final ReentrantLock[] memtableIdSegmentLocks; + + private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE.getName()); + private final ScheduledFuture<?> ttlCheckerFuture; + + public PipeWALResourceManager() { + memtableIdToPipeWALResourceMap = new HashMap<>(); + + memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT]; + for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) { + memtableIdSegmentLocks[i] = new ReentrantLock(); + } + + ttlCheckerFuture = + PIPE_WAL_RESOURCE_TTL_CHECKER.scheduleAtFixedRate( + () -> { + for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) { + final ReentrantLock lock = + memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)]; + + lock.lock(); + try { + if (memtableIdToPipeWALResourceMap.get(memtableId).invalidateIfPossible()) { + memtableIdToPipeWALResourceMap.remove(memtableId); + } + } finally { + lock.unlock(); + } + } + }, + PipeWALResource.MIN_TIME_TO_LIVE_IN_MS, + PipeWALResource.MIN_TIME_TO_LIVE_IN_MS, + TimeUnit.MILLISECONDS); + } + + public void pin(long memtableId, WALPipeHandler walPipeHandler) { + final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)]; + + lock.lock(); + try { + memtableIdToPipeWALResourceMap + .computeIfAbsent(memtableId, id -> new PipeWALResource(walPipeHandler)) + .pin(); + } finally { + lock.unlock(); + } + } + + public void unpin(long memtableId) { + final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)]; + + lock.lock(); + try { + memtableIdToPipeWALResourceMap.get(memtableId).unpin(); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws Exception { + if (ttlCheckerFuture != null) { + ttlCheckerFuture.cancel(true); + } + + for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) { + final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % SEGMENT_LOCK_COUNT)]; + + lock.lock(); + try { + memtableIdToPipeWALResourceMap.get(memtableId).close(); + memtableIdToPipeWALResourceMap.remove(memtableId); + } finally { + lock.unlock(); + } + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java index abdb4771a93..80333cf8ffb 100644 --- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java @@ -101,6 +101,10 @@ public class WALPipeHandler { } } + public long getMemTableId() { + return memTableId; + } + public void setMemTableId(long memTableId) { this.memTableId = memTableId; } diff --git a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java index b2441aa9d9f..ef86b0db285 100644 --- a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java +++ b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.modification.ModificationFile; +import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; import org.apache.iotdb.tsfile.write.TsFileWriter;
