This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5d343711115 Pipe: Fix ConcurrentModificationException occured by ttl
check in PipeTsFileResourceManager (#11876)
5d343711115 is described below
commit 5d3437111157006528d1a2a695cd6572cec2cf20
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jan 15 11:07:12 2024 +0800
Pipe: Fix ConcurrentModificationException occured by ttl check in
PipeTsFileResourceManager (#11876)
---
.../agent/runtime/PipePeriodicalJobExecutor.java | 4 +++
.../resource/tsfile/PipeTsFileResourceManager.java | 33 +++++++++++------
.../pipe/resource/wal/PipeWALResourceManager.java | 41 ++++++++++++----------
3 files changed, 49 insertions(+), 29 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index c3770201371..3202745533d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -114,4 +114,8 @@ public class PipePeriodicalJobExecutor {
periodicalJobs.clear();
LOGGER.info("All pipe periodical jobs are cleared successfully.");
}
+
+ public static long getMinIntervalSeconds() {
+ return MIN_INTERVAL_SECONDS;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 65d4ea60e18..9aadda86ae3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.agent.runtime.PipePeriodicalJobExecutor;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -33,11 +34,11 @@ import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class PipeTsFileResourceManager {
@@ -45,24 +46,41 @@ public class PipeTsFileResourceManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileResourceManager.class);
private final Map<String, PipeTsFileResource>
hardlinkOrCopiedFileToPipeTsFileResourceMap =
- new HashMap<>();
+ new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
public PipeTsFileResourceManager() {
PipeAgent.runtime()
.registerPeriodicalJob(
"PipeTsFileResourceManager#ttlCheck()",
- this::ttlCheck,
+ this::tryTtlCheck,
Math.max(PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS / 1000,
1));
}
+ private void tryTtlCheck() {
+ try {
+ final long timeout = PipePeriodicalJobExecutor.getMinIntervalSeconds()
>> 1;
+ if (lock.tryLock(timeout, TimeUnit.SECONDS)) {
+ try {
+ ttlCheck();
+ } finally {
+ lock.unlock();
+ }
+ } else {
+ LOGGER.warn("failed to try lock when checking TTL because of timeout
({}s)", timeout);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("failed to try lock when checking TTL because of
interruption", e);
+ }
+ }
+
private void ttlCheck() {
final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
- lock.lock();
try {
if (entry.getValue().closeIfOutOfTimeToLive()) {
iterator.remove();
@@ -72,13 +90,8 @@ public class PipeTsFileResourceManager {
entry.getKey(),
entry.getValue().getReferenceCount());
}
- } catch (ConcurrentModificationException e) {
- LOGGER.info(
- "Concurrent modification issues happened, skipping the file in
this round of ttl check");
} catch (IOException e) {
LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ",
e);
- } finally {
- lock.unlock();
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 0c34edda8a3..aa8f4e71623 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -61,27 +61,30 @@ public abstract class PipeWALResourceManager {
private void ttlCheck() {
final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
memtableIdToPipeWALResourceMap.entrySet().iterator();
- while (iterator.hasNext()) {
- final Map.Entry<Long, PipeWALResource> entry = iterator.next();
- final ReentrantLock lock =
- memtableIdSegmentLocks[(int) (entry.getKey() % SEGMENT_LOCK_COUNT)];
-
- lock.lock();
- try {
- if (entry.getValue().invalidateIfPossible()) {
- iterator.remove();
- } else if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(
- "WAL (memtableId {}) is still referenced {} times",
- entry.getKey(),
- entry.getValue().getReferenceCount());
+ try {
+ while (iterator.hasNext()) {
+ final Map.Entry<Long, PipeWALResource> entry = iterator.next();
+ final ReentrantLock lock =
+ memtableIdSegmentLocks[(int) (entry.getKey() %
SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ if (entry.getValue().invalidateIfPossible()) {
+ iterator.remove();
+ } else if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "WAL (memtableId {}) is still referenced {} times",
+ entry.getKey(),
+ entry.getValue().getReferenceCount());
+ }
+ } finally {
+ lock.unlock();
}
- } catch (ConcurrentModificationException e) {
- LOGGER.info(
- "Concurrent modification issues happened, skipping the WAL in this
round of ttl check");
- } finally {
- lock.unlock();
}
+ } catch (ConcurrentModificationException e) {
+ LOGGER.error(
+ "Concurrent modification issues happened, skipping the WAL in this
round of ttl check",
+ e);
}
}