This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-wal-resource-management
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8bedf08b00be5bd826ae35996145b4b790875b3f
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu May 25 02:31:42 2023 +0800

    wal resource manager
---
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../db/engine/storagegroup/TsFileProcessor.java    |  27 ++--
 .../db/pipe/resource/PipeResourceManager.java      |   9 ++
 .../{ => file}/PipeFileResourceManager.java        |   2 +-
 .../db/pipe/resource/wal/PipeWALResource.java      | 158 +++++++++++++++++++++
 .../pipe/resource/wal/PipeWALResourceManager.java  |  98 +++++++++++++
 .../apache/iotdb/db/wal/utils/WALPipeHandler.java  |   4 +
 .../pipe/resource/PipeFileResourceManagerTest.java |   1 +
 8 files changed, 286 insertions(+), 14 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index d156287e1f3..718c5a6bfc1 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -72,6 +72,7 @@ public enum ThreadName {
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
   PIPE_META_SYNC_SERVICE("Pipe-Meta-Sync-Service"),
+  
PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE("Pipe-WAL-Resource-TTL-Checker-Service"),
   ;
 
   private final String name;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f4f33c10473..a0a36e3a1c0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -254,8 +254,9 @@ public class TsFileProcessor {
     }
 
     long startTime = System.nanoTime();
+    WALFlushListener walFlushListener;
     try {
-      WALFlushListener walFlushListener = 
walNode.log(workMemTable.getMemTableId(), insertRowNode);
+      walFlushListener = walNode.log(workMemTable.getMemTableId(), 
insertRowNode);
       if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) 
{
         throw walFlushListener.getCause();
       }
@@ -273,17 +274,17 @@ public class TsFileProcessor {
     }
 
     startTime = System.nanoTime();
+
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToInsertNode(
+            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, 
tsFileResource);
+
     if (insertRowNode.isAligned()) {
       workMemTable.insertAlignedRow(insertRowNode);
     } else {
       workMemTable.insert(insertRowNode);
     }
 
-    // collect plan node in pipe
-    PipeInsertionDataNodeListener.getInstance()
-        .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), insertRowNode, 
tsFileResource);
-
     // update start time of this memtable
     tsFileResource.updateStartTime(
         insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
@@ -354,9 +355,9 @@ public class TsFileProcessor {
     }
 
     long startTime = System.nanoTime();
+    WALFlushListener walFlushListener;
     try {
-      WALFlushListener walFlushListener =
-          walNode.log(workMemTable.getMemTableId(), insertTabletNode, start, 
end);
+      walFlushListener = walNode.log(workMemTable.getMemTableId(), 
insertTabletNode, start, end);
       if (walFlushListener.waitForResult() == WALFlushListener.Status.FAILURE) 
{
         throw walFlushListener.getCause();
       }
@@ -373,6 +374,11 @@ public class TsFileProcessor {
     }
 
     startTime = System.nanoTime();
+
+    PipeInsertionDataNodeListener.getInstance()
+        .listenToInsertNode(
+            dataRegionInfo.getDataRegion().getDataRegionId(), 
insertTabletNode, tsFileResource);
+
     try {
       if (insertTabletNode.isAligned()) {
         workMemTable.insertAlignedTablet(insertTabletNode, start, end);
@@ -389,11 +395,6 @@ public class TsFileProcessor {
       results[i] = RpcUtils.SUCCESS_STATUS;
     }
 
-    // collect plan node in pipe
-    PipeInsertionDataNodeListener.getInstance()
-        .listenToInsertNode(
-            dataRegionInfo.getDataRegion().getDataRegionId(), 
insertTabletNode, tsFileResource);
-
     tsFileResource.updateStartTime(
         insertTabletNode.getDeviceID().toStringID(), 
insertTabletNode.getTimes()[start]);
     // for sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
index 61b4e61a04e..43bddd872f6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeResourceManager.java
@@ -19,18 +19,27 @@
 
 package org.apache.iotdb.db.pipe.resource;
 
+import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
+import org.apache.iotdb.db.pipe.resource.wal.PipeWALResourceManager;
+
 public class PipeResourceManager {
 
   private final PipeFileResourceManager pipeFileResourceManager;
+  private final PipeWALResourceManager pipeWALResourceManager;
 
   public static PipeFileResourceManager file() {
     return PipeResourceManagerHolder.INSTANCE.pipeFileResourceManager;
   }
 
+  public static PipeWALResourceManager wal() {
+    return PipeResourceManagerHolder.INSTANCE.pipeWALResourceManager;
+  }
+
   ///////////////////////////// SINGLETON /////////////////////////////
 
   private PipeResourceManager() {
     pipeFileResourceManager = new PipeFileResourceManager();
+    pipeWALResourceManager = new PipeWALResourceManager();
   }
 
   private static class PipeResourceManagerHolder {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
similarity index 99%
rename from 
server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
rename to 
server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
index e7d961b3c9f..942ab600536 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.pipe.resource;
+package org.apache.iotdb.db.pipe.resource.file;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.FileUtils;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
new file mode 100644
index 00000000000..844420272bd
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.wal;
+
+import org.apache.iotdb.db.wal.exception.MemTablePinException;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
+import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeWALResource implements AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeWALResource.class);
+
+  private final WALPipeHandler walPipeHandler;
+
+  private final AtomicInteger referenceCount;
+
+  // TODO: make this configurable
+  public static final long MIN_TIME_TO_LIVE_IN_MS = 1000 * 60;
+  private final AtomicLong lastLogicalPinTime;
+  private final AtomicBoolean isPhysicallyPinned;
+
+  public PipeWALResource(WALPipeHandler walPipeHandler) {
+    this.walPipeHandler = walPipeHandler;
+
+    referenceCount = new AtomicInteger(0);
+
+    lastLogicalPinTime = new AtomicLong(0);
+    isPhysicallyPinned = new AtomicBoolean(false);
+  }
+
+  public void pin() throws PipeRuntimeNonCriticalException {
+    if (referenceCount.get() == 0) {
+      if (!isPhysicallyPinned.get()) {
+        try {
+          walPipeHandler.pinMemTable();
+        } catch (MemTablePinException e) {
+          throw new PipeRuntimeNonCriticalException(
+              String.format(
+                  "failed to pin wal %d, because %s",
+                  walPipeHandler.getMemTableId(), e.getMessage()));
+        }
+        isPhysicallyPinned.set(true);
+        LOGGER.info("wal {} is pinned by pipe engine", 
walPipeHandler.getMemTableId());
+      } // else means the wal is already pinned, do nothing
+
+      // no matter the wal is pinned or not, update the last pin time
+      lastLogicalPinTime.set(System.currentTimeMillis());
+    }
+
+    referenceCount.incrementAndGet();
+  }
+
+  public void unpin() throws PipeRuntimeNonCriticalException {
+    final int finalReferenceCount = referenceCount.get();
+
+    if (finalReferenceCount == 1) {
+      unpinPhysicallyIfOutOfTimeToLive();
+    } else if (finalReferenceCount < 1) {
+      throw new PipeRuntimeCriticalException(
+          String.format(
+              "wal %d is unpinned more than pinned, this should not happen",
+              walPipeHandler.getMemTableId()));
+    }
+
+    referenceCount.decrementAndGet();
+  }
+
+  /**
+   * Invalidate the wal if it is unpinned and out of time to live.
+   *
+   * @return true if the wal is invalidated, false otherwise
+   */
+  public boolean invalidateIfPossible() {
+    if (referenceCount.get() > 0) {
+      return false;
+    }
+
+    // referenceCount.get() == 0
+    return unpinPhysicallyIfOutOfTimeToLive();
+  }
+
+  /**
+   * Unpin the wal if it is out of time to live.
+   *
+   * @return true if the wal is unpinned physically (then it can be 
invalidated), false otherwise
+   */
+  private boolean unpinPhysicallyIfOutOfTimeToLive() {
+    if (isPhysicallyPinned.get()) {
+      if (System.currentTimeMillis() - lastLogicalPinTime.get() > 
MIN_TIME_TO_LIVE_IN_MS) {
+        try {
+          walPipeHandler.unpinMemTable();
+        } catch (MemTablePinException e) {
+          throw new PipeRuntimeNonCriticalException(
+              String.format(
+                  "failed to unpin wal %d, because %s",
+                  walPipeHandler.getMemTableId(), e.getMessage()));
+        }
+        isPhysicallyPinned.set(false);
+        LOGGER.info(
+            "wal {} is unpinned by pipe engine when checking time to live",
+            walPipeHandler.getMemTableId());
+        return true;
+      } else {
+        return false;
+      }
+    } else {
+      LOGGER.info(
+          "wal {} is not pinned physically when checking time to live",
+          walPipeHandler.getMemTableId());
+      return true;
+    }
+  }
+
+  @Override
+  public void close() {
+    if (isPhysicallyPinned.get()) {
+      try {
+        walPipeHandler.unpinMemTable();
+      } catch (MemTablePinException e) {
+        LOGGER.error(
+            "failed to unpin wal {} when closing pipe wal resource, because 
{}",
+            walPipeHandler.getMemTableId(),
+            e.getMessage());
+      }
+      isPhysicallyPinned.set(false);
+      LOGGER.info(
+          "wal {} is unpinned by pipe engine when closing pipe wal resource",
+          walPipeHandler.getMemTableId());
+    }
+
+    referenceCount.set(0);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
new file mode 100644
index 00000000000..c187a29f781
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -0,0 +1,98 @@
+/*  * Licensed to the Apache Software Foundation (ASF) under one  * or more 
contributor license agreements.  See the NOTICE file  * distributed with this 
work for additional information  * regarding copyright ownership.  The ASF 
licenses this file  * to you under the Apache License, Version 2.0 (the  * 
"License"); you may not use this file except in compliance  * with the License. 
 You may obtain a copy of the License at  *  *     
http://www.apache.org/licenses/LICENSE-2.0  *  * Unless r [...]
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.db.wal.utils.WALPipeHandler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class PipeWALResourceManager implements AutoCloseable {
+
+  private final Map<Long, PipeWALResource> memtableIdToPipeWALResourceMap;
+
+  private static final int SEGMENT_LOCK_COUNT = 32;
+  private final ReentrantLock[] memtableIdSegmentLocks;
+
+  private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER_SERVICE.getName());
+  private final ScheduledFuture<?> ttlCheckerFuture;
+
+  public PipeWALResourceManager() {
+    memtableIdToPipeWALResourceMap = new HashMap<>();
+
+    memtableIdSegmentLocks = new ReentrantLock[SEGMENT_LOCK_COUNT];
+    for (int i = 0; i < SEGMENT_LOCK_COUNT; i++) {
+      memtableIdSegmentLocks[i] = new ReentrantLock();
+    }
+
+    ttlCheckerFuture =
+        PIPE_WAL_RESOURCE_TTL_CHECKER.scheduleAtFixedRate(
+            () -> {
+              for (final long memtableId : 
memtableIdToPipeWALResourceMap.keySet()) {
+                final ReentrantLock lock =
+                    memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
+
+                lock.lock();
+                try {
+                  if 
(memtableIdToPipeWALResourceMap.get(memtableId).invalidateIfPossible()) {
+                    memtableIdToPipeWALResourceMap.remove(memtableId);
+                  }
+                } finally {
+                  lock.unlock();
+                }
+              }
+            },
+            PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+            PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
+            TimeUnit.MILLISECONDS);
+  }
+
+  public void pin(long memtableId, WALPipeHandler walPipeHandler) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      memtableIdToPipeWALResourceMap
+          .computeIfAbsent(memtableId, id -> new 
PipeWALResource(walPipeHandler))
+          .pin();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public void unpin(long memtableId) {
+    final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
+
+    lock.lock();
+    try {
+      memtableIdToPipeWALResourceMap.get(memtableId).unpin();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (ttlCheckerFuture != null) {
+      ttlCheckerFuture.cancel(true);
+    }
+
+    for (final long memtableId : memtableIdToPipeWALResourceMap.keySet()) {
+      final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId % 
SEGMENT_LOCK_COUNT)];
+
+      lock.lock();
+      try {
+        memtableIdToPipeWALResourceMap.get(memtableId).close();
+        memtableIdToPipeWALResourceMap.remove(memtableId);
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java 
b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
index abdb4771a93..80333cf8ffb 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/utils/WALPipeHandler.java
@@ -101,6 +101,10 @@ public class WALPipeHandler {
     }
   }
 
+  public long getMemTableId() {
+    return memTableId;
+  }
+
   public void setMemTableId(long memTableId) {
     this.memTableId = memTableId;
   }
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
index b2441aa9d9f..ef86b0db285 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/resource/PipeFileResourceManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.engine.modification.Deletion;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.pipe.resource.file.PipeFileResourceManager;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.write.TsFileWriter;

Reply via email to