This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d343711115 Pipe: Fix ConcurrentModificationException occured by ttl 
check in PipeTsFileResourceManager (#11876)
5d343711115 is described below

commit 5d3437111157006528d1a2a695cd6572cec2cf20
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jan 15 11:07:12 2024 +0800

    Pipe: Fix ConcurrentModificationException occured by ttl check in 
PipeTsFileResourceManager (#11876)
---
 .../agent/runtime/PipePeriodicalJobExecutor.java   |  4 +++
 .../resource/tsfile/PipeTsFileResourceManager.java | 33 +++++++++++------
 .../pipe/resource/wal/PipeWALResourceManager.java  | 41 ++++++++++++----------
 3 files changed, 49 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index c3770201371..3202745533d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -114,4 +114,8 @@ public class PipePeriodicalJobExecutor {
     periodicalJobs.clear();
     LOGGER.info("All pipe periodical jobs are cleared successfully.");
   }
+
+  public static long getMinIntervalSeconds() {
+    return MIN_INTERVAL_SECONDS;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 65d4ea60e18..9aadda86ae3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -33,11 +34,11 @@ import java.io.IOException;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 public class PipeTsFileResourceManager {
@@ -45,24 +46,41 @@ public class PipeTsFileResourceManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResourceManager.class);
 
   private final Map<String, PipeTsFileResource> 
hardlinkOrCopiedFileToPipeTsFileResourceMap =
-      new HashMap<>();
+      new ConcurrentHashMap<>();
   private final ReentrantLock lock = new ReentrantLock();
 
   public PipeTsFileResourceManager() {
     PipeAgent.runtime()
         .registerPeriodicalJob(
             "PipeTsFileResourceManager#ttlCheck()",
-            this::ttlCheck,
+            this::tryTtlCheck,
             Math.max(PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS / 1000, 
1));
   }
 
+  private void tryTtlCheck() {
+    try {
+      final long timeout = PipePeriodicalJobExecutor.getMinIntervalSeconds() 
>> 1;
+      if (lock.tryLock(timeout, TimeUnit.SECONDS)) {
+        try {
+          ttlCheck();
+        } finally {
+          lock.unlock();
+        }
+      } else {
+        LOGGER.warn("failed to try lock when checking TTL because of timeout 
({}s)", timeout);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("failed to try lock when checking TTL because of 
interruption", e);
+    }
+  }
+
   private void ttlCheck() {
     final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
         hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
     while (iterator.hasNext()) {
       final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
 
-      lock.lock();
       try {
         if (entry.getValue().closeIfOutOfTimeToLive()) {
           iterator.remove();
@@ -72,13 +90,8 @@ public class PipeTsFileResourceManager {
               entry.getKey(),
               entry.getValue().getReferenceCount());
         }
-      } catch (ConcurrentModificationException e) {
-        LOGGER.info(
-            "Concurrent modification issues happened, skipping the file in 
this round of ttl check");
       } catch (IOException e) {
         LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", 
e);
-      } finally {
-        lock.unlock();
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 0c34edda8a3..aa8f4e71623 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -61,27 +61,30 @@ public abstract class PipeWALResourceManager {
   private void ttlCheck() {
     final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
         memtableIdToPipeWALResourceMap.entrySet().iterator();
-    while (iterator.hasNext()) {
-      final Map.Entry<Long, PipeWALResource> entry = iterator.next();
-      final ReentrantLock lock =
-          memtableIdSegmentLocks[(int) (entry.getKey() % SEGMENT_LOCK_COUNT)];
-
-      lock.lock();
-      try {
-        if (entry.getValue().invalidateIfPossible()) {
-          iterator.remove();
-        } else if (LOGGER.isDebugEnabled()) {
-          LOGGER.debug(
-              "WAL (memtableId {}) is still referenced {} times",
-              entry.getKey(),
-              entry.getValue().getReferenceCount());
+    try {
+      while (iterator.hasNext()) {
+        final Map.Entry<Long, PipeWALResource> entry = iterator.next();
+        final ReentrantLock lock =
+            memtableIdSegmentLocks[(int) (entry.getKey() % 
SEGMENT_LOCK_COUNT)];
+
+        lock.lock();
+        try {
+          if (entry.getValue().invalidateIfPossible()) {
+            iterator.remove();
+          } else if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(
+                "WAL (memtableId {}) is still referenced {} times",
+                entry.getKey(),
+                entry.getValue().getReferenceCount());
+          }
+        } finally {
+          lock.unlock();
         }
-      } catch (ConcurrentModificationException e) {
-        LOGGER.info(
-            "Concurrent modification issues happened, skipping the WAL in this 
round of ttl check");
-      } finally {
-        lock.unlock();
       }
+    } catch (ConcurrentModificationException e) {
+      LOGGER.error(
+          "Concurrent modification issues happened, skipping the WAL in this 
round of ttl check",
+          e);
     }
   }
 

Reply via email to