This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a5b668ba67c Pipe: introduce global cache for TsFile metadatas and
support TTL of Pipe TsFile (#11240)
a5b668ba67c is described below
commit a5b668ba67c665a2efe7faaa0b5519460ab8192d
Author: Zikun Ma <[email protected]>
AuthorDate: Fri Nov 24 01:20:29 2023 +0800
Pipe: introduce global cache for TsFile metadatas and support TTL of Pipe
TsFile (#11240)
Currently, if a TsFile is processed by N pipes, its metadata will be
constructed N times, the overhead will be high when N is very big. This commit
added a global cache to reduce repeated creation and redundant IO.
This commit also introduces TTL of TsFiles pinned by pipe engine, just like
WALs.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../agent/runtime/PipePeriodicalJobExecutor.java | 65 +++---
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 22 +++
.../tsfile/TsFileInsertionDataContainer.java | 50 +++--
.../pipe/extractor/IoTDBDataRegionExtractor.java | 4 +-
.../realtime/PipeRealtimeDataRegionExtractor.java | 5 +-
.../listener/PipeInsertionDataNodeListener.java | 10 +-
.../db/pipe/resource/memory/PipeMemoryBlock.java | 2 +-
.../db/pipe/resource/memory/PipeMemoryManager.java | 37 ++++
.../pipe/resource/memory/PipeMemoryWeighUtil.java | 58 ++++++
.../pipe/resource/tsfile/PipeTsFileResource.java | 217 +++++++++++++++++++++
.../resource/tsfile/PipeTsFileResourceManager.java | 210 ++++++++++++++++----
.../db/pipe/resource/wal/PipeWALResource.java | 4 +-
.../pipe/resource/wal/PipeWALResourceManager.java | 67 +++----
.../resource/PipeTsFileResourceManagerTest.java | 18 +-
.../iotdb/commons/concurrent/ThreadName.java | 2 -
.../apache/iotdb/commons/conf/CommonConfig.java | 2 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 4 +
17 files changed, 648 insertions(+), 129 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
index df9240cabe4..c3770201371 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipePeriodicalJobExecutor.java
@@ -21,14 +21,17 @@ package org.apache.iotdb.db.pipe.agent.runtime;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.WrappedRunnable;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -45,34 +48,44 @@ public class PipePeriodicalJobExecutor {
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR.getName());
- private static final long CRON_EVENT_INJECTOR_INTERVAL_SECONDS =
+ private static final long MIN_INTERVAL_SECONDS =
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
- private long cronEventInjectRoundsInterval;
-
- private static final long MEMORY_EXPANDER_INTERVAL_SECONDS =
- PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds();
- private long memoryExpandRoundsInterval;
-
- // Currently we use the CRON_EVENT_INJECTOR_INTERVAL_SECONDS as minimum
interval
- private static final long EXECUTOR_INTERVAL_SECONDS =
CRON_EVENT_INJECTOR_INTERVAL_SECONDS;
private long rounds;
-
private Future<?> executorFuture;
+ // <Periodical job, Interval in rounds>
+ private static final List<Pair<WrappedRunnable, Long>> periodicalJobs = new
ArrayList<>();
+
+ public synchronized void register(String id, Runnable periodicalJob, long
intervalInSeconds) {
+ periodicalJobs.add(
+ new Pair<>(
+ new WrappedRunnable() {
+ @Override
+ public void runMayThrow() {
+ try {
+ periodicalJob.run();
+ } catch (Exception e) {
+ LOGGER.warn("Periodical job {} failed.", id, e);
+ }
+ }
+ },
+ Math.max(intervalInSeconds / MIN_INTERVAL_SECONDS, 1)));
+ LOGGER.info(
+ "Pipe periodical job {} is registered successfully. Interval: {}
seconds.",
+ id,
+ Math.max(intervalInSeconds / MIN_INTERVAL_SECONDS, 1) *
MIN_INTERVAL_SECONDS);
+ }
+
public synchronized void start() {
if (executorFuture == null) {
rounds = 0;
- cronEventInjectRoundsInterval =
- Math.max(CRON_EVENT_INJECTOR_INTERVAL_SECONDS /
EXECUTOR_INTERVAL_SECONDS, 1);
- memoryExpandRoundsInterval =
- Math.max(MEMORY_EXPANDER_INTERVAL_SECONDS /
EXECUTOR_INTERVAL_SECONDS, 1);
executorFuture =
ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
PERIODICAL_JOB_EXECUTOR,
this::execute,
- EXECUTOR_INTERVAL_SECONDS,
- EXECUTOR_INTERVAL_SECONDS,
+ MIN_INTERVAL_SECONDS,
+ MIN_INTERVAL_SECONDS,
TimeUnit.SECONDS);
LOGGER.info("Pipe periodical job executor is started successfully.");
}
@@ -81,12 +94,10 @@ public class PipePeriodicalJobExecutor {
private synchronized void execute() {
++rounds;
- if (rounds % cronEventInjectRoundsInterval == 0) {
- PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(false);
- }
-
- if (rounds % memoryExpandRoundsInterval == 0) {
- PipeResourceManager.memory().tryExpandAll();
+ for (final Pair<WrappedRunnable, Long> periodicalJob : periodicalJobs) {
+ if (rounds % periodicalJob.right == 0) {
+ periodicalJob.left.run();
+ }
}
}
@@ -97,4 +108,10 @@ public class PipePeriodicalJobExecutor {
LOGGER.info("Pipe periodical job executor is stopped successfully.");
}
}
+
+ @TestOnly
+ public synchronized void clear() {
+ periodicalJobs.clear();
+ LOGGER.info("All pipe periodical jobs are cleared successfully.");
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index e9c6a50920f..9d85ec0d5ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
@@ -134,4 +135,25 @@ public class PipeRuntimeAgent implements IService {
PipeAgent.task().stopAllPipesWithCriticalException();
}
}
+
+ /////////////////////////// Periodical Job Executor
///////////////////////////
+
+ public void registerPeriodicalJob(String id, Runnable periodicalJob, long
intervalInSeconds) {
+ pipePeriodicalJobExecutor.register(id, periodicalJob, intervalInSeconds);
+ }
+
+ @TestOnly
+ public void startPeriodicalJobExecutor() {
+ pipePeriodicalJobExecutor.start();
+ }
+
+ @TestOnly
+ public void stopPeriodicalJobExecutor() {
+ pipePeriodicalJobExecutor.stop();
+ }
+
+ @TestOnly
+ public void clearPeriodicalJobExecutor() {
+ pipePeriodicalJobExecutor.clear();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
index 10b746b3f36..72faf0d83b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/TsFileInsertionDataContainer.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -95,28 +97,48 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
this.sourceEvent = sourceEvent;
try {
- allocatedMemoryBlock =
- PipeResourceManager.memory()
- .forceAllocate(
-
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes());
+ final PipeTsFileResourceManager tsFileResourceManager =
PipeResourceManager.tsfile();
+ final Map<String, List<String>> deviceMeasurementsMap;
- tsFileSequenceReader = new
TsFileSequenceReader(tsFile.getAbsolutePath(), true, true);
+ // TsFileReader is not thread-safe, so we need to create it here and
close it later.
+ long memoryRequiredInBytes =
+
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes();
+ tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true,
true);
tsFileReader = new TsFileReader(tsFileSequenceReader);
- deviceMeasurementsMapIterator =
filterDeviceMeasurementsMapByPattern().entrySet().iterator();
- deviceIsAlignedMap = readDeviceIsAlignedMap();
- measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+ if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
+ // These read-only objects can be found in cache.
+ deviceIsAlignedMap =
tsFileResourceManager.getDeviceIsAlignedMapFromCache(tsFile);
+ measurementDataTypeMap =
tsFileResourceManager.getMeasurementDataTypeMapFromCache(tsFile);
+ deviceMeasurementsMap =
tsFileResourceManager.getDeviceMeasurementsMapFromCache(tsFile);
+ } else {
+ // We need to create these objects here and remove them later.
+ deviceIsAlignedMap = readDeviceIsAlignedMap();
+ memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2Bool(deviceIsAlignedMap);
+
+ measurementDataTypeMap = tsFileSequenceReader.getFullPathDataTypeMap();
+ memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+
+ deviceMeasurementsMap =
tsFileSequenceReader.getDeviceMeasurementsMap();
+ memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2StrList(deviceMeasurementsMap);
+ }
+ allocatedMemoryBlock =
PipeResourceManager.memory().forceAllocate(memoryRequiredInBytes);
+
+ deviceMeasurementsMapIterator =
+
filterDeviceMeasurementsMapByPattern(deviceMeasurementsMap).entrySet().iterator();
+
+ // No longer need this. Help GC.
+ tsFileSequenceReader.clearCachedDeviceMetadata();
} catch (Exception e) {
close();
throw e;
}
}
- private Map<String, List<String>> filterDeviceMeasurementsMapByPattern()
throws IOException {
+ private Map<String, List<String>> filterDeviceMeasurementsMapByPattern(
+ Map<String, List<String>> originalDeviceMeasurementsMap) {
final Map<String, List<String>> filteredDeviceMeasurementsMap = new
HashMap<>();
-
- for (Map.Entry<String, List<String>> entry :
- tsFileSequenceReader.getDeviceMeasurementsMap().entrySet()) {
+ for (Map.Entry<String, List<String>> entry :
originalDeviceMeasurementsMap.entrySet()) {
final String deviceId = entry.getKey();
// case 1: for example, pattern is root.a.b or pattern is null and
device is root.a.b.c
@@ -250,6 +272,8 @@ public class TsFileInsertionDataContainer implements
AutoCloseable {
LOGGER.warn("Failed to close TsFileSequenceReader", e);
}
- allocatedMemoryBlock.close();
+ if (allocatedMemoryBlock != null) {
+ allocatedMemoryBlock.close();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
index b5c6aea4f15..c1a1b440579 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/IoTDBDataRegionExtractor.java
@@ -282,7 +282,9 @@ public class IoTDBDataRegionExtractor implements
PipeExtractor {
public void close() throws Exception {
historicalExtractor.close();
realtimeExtractor.close();
- PipeExtractorMetrics.getInstance().deregister(taskID);
+ if (Objects.nonNull(taskID)) {
+ PipeExtractorMetrics.getInstance().deregister(taskID);
+ }
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
index fb4e6e521c9..d7c80082bfb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionExtractor.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.pipe.api.event.Event;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.iotdb.db.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_PATTERN_KEY;
@@ -119,7 +120,9 @@ public abstract class PipeRealtimeDataRegionExtractor
implements PipeExtractor {
@Override
public void close() throws Exception {
-
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId,
this);
+ if (Objects.nonNull(dataRegionId)) {
+
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId,
this);
+ }
synchronized (isClosed) {
clearPendingQueue();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
index f65cd12a6ef..a2e2b76b8bf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/listener/PipeInsertionDataNodeListener.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.extractor.realtime.listener;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.realtime.assigner.PipeDataRegionAssigner;
@@ -144,7 +146,13 @@ public class PipeInsertionDataNodeListener {
/////////////////////////////// singleton ///////////////////////////////
- private PipeInsertionDataNodeListener() {}
+ private PipeInsertionDataNodeListener() {
+ PipeAgent.runtime()
+ .registerPeriodicalJob(
+ "PipeInsertionDataNodeListener#listenToHeartbeat(false)",
+ () -> listenToHeartbeat(false),
+
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
+ }
public static PipeInsertionDataNodeListener getInstance() {
return PipeChangeDataCaptureListenerHolder.INSTANCE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 7523965aa91..642683fb703 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -165,7 +165,7 @@ public class PipeMemoryBlock implements AutoCloseable {
if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
try {
pipeMemoryManager.release(this);
- return;
+ break;
} finally {
lock.unlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 7d8ead94814..ad4e1951a1e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.resource.memory;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.write.record.Tablet;
@@ -58,6 +59,14 @@ public class PipeMemoryManager {
private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
+ public PipeMemoryManager() {
+ PipeAgent.runtime()
+ .registerPeriodicalJob(
+ "PipeMemoryManager#tryExpandAll()",
+ this::tryExpandAll,
+ PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds());
+ }
+
public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED) {
@@ -94,6 +103,34 @@ public class PipeMemoryManager {
return forceAllocate(calculateTabletSizeInBytes(tablet));
}
+ /**
+ * Allocate a memory block for pipe only if memory already used is less than
specified threshold.
+ *
+ * @param sizeInBytes size of memory needed to allocate
+ * @param usedThreshold proportion of memory used, ranged from 0.0 to 1.0
+ * @return {@code null} if the proportion of memory already used exceeds
{@code usedThreshold}.
+ * Will return a memory block otherwise.
+ */
+ public synchronized PipeMemoryBlock forceAllocateIfSufficient(
+ long sizeInBytes, float usedThreshold) {
+ if (usedThreshold < 0.0f || usedThreshold > 1.0f) {
+ return null;
+ }
+ if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes
+ && (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES <
usedThreshold) {
+ return forceAllocate(sizeInBytes);
+ } else {
+ long memoryToShrink =
+ Math.max(
+ usedMemorySizeInBytes - (long) (TOTAL_MEMORY_SIZE_IN_BYTES *
usedThreshold),
+ sizeInBytes);
+ if (tryShrink4Allocate(memoryToShrink)) {
+ return forceAllocate(sizeInBytes);
+ }
+ }
+ return null;
+ }
+
private long calculateTabletSizeInBytes(Tablet tablet) {
long totalSizeInBytes = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
new file mode 100644
index 00000000000..2445bc510e9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeighUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.memory;
+
+import org.apache.iotdb.db.utils.MemUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.List;
+import java.util.Map;
+
+public class PipeMemoryWeighUtil {
+ /** Estimates memory usage of a {@link Map}<{@link String}, {@link
Boolean}>. */
+ public static long memoryOfStr2Bool(Map<String, Boolean> map) {
+ long usageInBytes = 0L;
+ for (Map.Entry<String, Boolean> entry : map.entrySet()) {
+ usageInBytes = usageInBytes + MemUtils.getStringMem(entry.getKey()) + 1L;
+ }
+ return usageInBytes + 16L; // add the overhead of map
+ }
+
+ /** Estimates memory usage of a {@link Map}<{@link String}, {@link
TSDataType}>. */
+ public static long memoryOfStr2TSDataType(Map<String, TSDataType> map) {
+ long usageInBytes = 0L;
+ for (Map.Entry<String, TSDataType> entry : map.entrySet()) {
+ usageInBytes = usageInBytes + MemUtils.getStringMem(entry.getKey()) + 4L;
+ }
+ return usageInBytes + 16L; // add the overhead of map
+ }
+
+ /** Estimates memory usage of a {@link Map}<{@link String}, {@link
List}<{@link String}>>. */
+ public static long memoryOfStr2StrList(Map<String, List<String>> map) {
+ long usageInBytes = 0L;
+ for (Map.Entry<String, List<String>> entry : map.entrySet()) {
+ usageInBytes += MemUtils.getStringMem(entry.getKey());
+ for (String str : entry.getValue()) {
+ usageInBytes += MemUtils.getStringMem(str);
+ }
+ }
+ return usageInBytes + 16L; // add the overhead of map
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
new file mode 100644
index 00000000000..bbca18a4ca1
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.resource.tsfile;
+
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
+import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeighUtil;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.TsFileDeviceIterator;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeTsFileResource implements AutoCloseable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileResource.class);
+
+ private final File hardlinkOrCopiedFile;
+ private final boolean isTsFile;
+
+ public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
+ private final AtomicInteger referenceCount;
+ private final AtomicLong lastUnpinToZeroTime;
+
+ private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.5f;
+ private PipeMemoryBlock allocatedMemoryBlock;
+ private Map<String, List<String>> deviceMeasurementsMap = null;
+ private Map<String, Boolean> deviceIsAlignedMap = null;
+ private Map<String, TSDataType> measurementDataTypeMap = null;
+
+ public PipeTsFileResource(File hardlinkOrCopiedFile, boolean isTsFile) {
+ this.hardlinkOrCopiedFile = hardlinkOrCopiedFile;
+ this.isTsFile = isTsFile;
+
+ referenceCount = new AtomicInteger(1);
+ lastUnpinToZeroTime = new AtomicLong(Long.MAX_VALUE);
+ }
+
+ public File getFile() {
+ return hardlinkOrCopiedFile;
+ }
+
+ ///////////////////// Reference Count /////////////////////
+
+ public int getReferenceCount() {
+ return referenceCount.get();
+ }
+
+ public int increaseAndGetReference() {
+ return referenceCount.addAndGet(1);
+ }
+
+ public int decreaseAndGetReference() {
+ final int finalReferenceCount = referenceCount.addAndGet(-1);
+ if (finalReferenceCount == 0) {
+ lastUnpinToZeroTime.set(System.currentTimeMillis());
+ }
+ if (finalReferenceCount < 0) {
+ LOGGER.warn("PipeTsFileResource's reference count is decreased to below
0.");
+ }
+ return finalReferenceCount;
+ }
+
+ public synchronized boolean closeIfOutOfTimeToLive() throws IOException {
+ if (referenceCount.get() <= 0
+ && (deviceMeasurementsMap == null // Not cached yet.
+ || System.currentTimeMillis() - lastUnpinToZeroTime.get()
+ > TSFILE_MIN_TIME_TO_LIVE_IN_MS)) {
+ close();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (deviceMeasurementsMap != null) {
+ deviceMeasurementsMap = null;
+ }
+
+ if (deviceIsAlignedMap != null) {
+ deviceIsAlignedMap = null;
+ }
+
+ if (measurementDataTypeMap != null) {
+ measurementDataTypeMap = null;
+ }
+
+ if (allocatedMemoryBlock != null) {
+ allocatedMemoryBlock.close();
+ allocatedMemoryBlock = null;
+ }
+
+ Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
+
+ LOGGER.info("PipeTsFileResource: Closed tsfile {} and cleaned up.",
hardlinkOrCopiedFile);
+ }
+
+ //////////////////////////// Cache Getter ////////////////////////////
+
+ public synchronized Map<String, List<String>> tryGetDeviceMeasurementsMap()
throws IOException {
+ if (deviceMeasurementsMap == null && isTsFile) {
+ cacheObjectsIfAbsent();
+ }
+ return deviceMeasurementsMap;
+ }
+
+ public synchronized Map<String, Boolean> tryGetDeviceIsAlignedMap() throws
IOException {
+ if (deviceIsAlignedMap == null && isTsFile) {
+ cacheObjectsIfAbsent();
+ }
+ return deviceIsAlignedMap;
+ }
+
+ public synchronized Map<String, TSDataType> tryGetMeasurementDataTypeMap()
throws IOException {
+ if (measurementDataTypeMap == null && isTsFile) {
+ cacheObjectsIfAbsent();
+ }
+ return measurementDataTypeMap;
+ }
+
+ synchronized boolean cacheObjectsIfAbsent() throws IOException {
+ if (!isTsFile) {
+ return false;
+ }
+
+ if (allocatedMemoryBlock != null) {
+ // This means objects are already cached.
+ return true;
+ }
+
+ // See if pipe memory is sufficient to be allocated for
TsFileSequenceReader.
+ // Only allocate when pipe memory used is less than 50%, because memory
here
+ // is hard to shrink and may consume too much memory.
+ allocatedMemoryBlock =
+ PipeResourceManager.memory()
+ .forceAllocateIfSufficient(
+
PipeConfig.getInstance().getPipeMemoryAllocateForTsFileSequenceReaderInBytes(),
+ MEMORY_SUFFICIENT_THRESHOLD);
+ if (allocatedMemoryBlock == null) {
+ LOGGER.info(
+ "PipeTsFileResource: Failed to create TsFileSequenceReader for
tsfile {} in cache, because memory usage is high",
+ hardlinkOrCopiedFile.getPath());
+ return false;
+ }
+
+ long memoryRequiredInBytes = 0L;
+ try (TsFileSequenceReader sequenceReader =
+ new TsFileSequenceReader(hardlinkOrCopiedFile.getPath(), true, true)) {
+ deviceMeasurementsMap = sequenceReader.getDeviceMeasurementsMap();
+ memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2StrList(deviceMeasurementsMap);
+
+ deviceIsAlignedMap = new HashMap<>();
+ final TsFileDeviceIterator deviceIsAlignedIterator =
+ sequenceReader.getAllDevicesIteratorWithIsAligned();
+ while (deviceIsAlignedIterator.hasNext()) {
+ final Pair<String, Boolean> deviceIsAlignedPair =
deviceIsAlignedIterator.next();
+ deviceIsAlignedMap.put(deviceIsAlignedPair.getLeft(),
deviceIsAlignedPair.getRight());
+ }
+ memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2Bool(deviceIsAlignedMap);
+
+ measurementDataTypeMap = sequenceReader.getFullPathDataTypeMap();
+ memoryRequiredInBytes +=
PipeMemoryWeighUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+ }
+ // Release memory of TsFileSequenceReader.
+ allocatedMemoryBlock.close();
+ allocatedMemoryBlock = null;
+
+ // Allocate again for the cached objects.
+ allocatedMemoryBlock =
+ PipeResourceManager.memory()
+ .forceAllocateIfSufficient(memoryRequiredInBytes,
MEMORY_SUFFICIENT_THRESHOLD);
+ if (allocatedMemoryBlock == null) {
+ LOGGER.info(
+ "PipeTsFileResource: Failed to cache objects for tsfile {} in cache,
because memory usage is high",
+ hardlinkOrCopiedFile.getPath());
+ deviceIsAlignedMap = null;
+ deviceMeasurementsMap = null;
+ measurementDataTypeMap = null;
+ return false;
+ }
+
+ LOGGER.info(
+ "PipeTsFileResource: Cached objects for tsfile {}.",
hardlinkOrCopiedFile.getPath());
+ return true;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index af92a0fa553..315c2a7be60 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -21,7 +21,12 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -29,14 +34,50 @@ import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
public class PipeTsFileResourceManager {
- private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new
HashMap<>();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTsFileResourceManager.class);
+
+ private final Map<String, PipeTsFileResource>
hardlinkOrCopiedFileToPipeTsFileResourceMap =
+ new HashMap<>();
+ private final ReentrantLock lock = new ReentrantLock();
+
+ public PipeTsFileResourceManager() {
+ PipeAgent.runtime()
+ .registerPeriodicalJob(
+ "PipeTsFileResourceManager#ttlCheck()",
+ this::ttlCheck,
+ Math.max(PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS / 1000,
1));
+ }
+
+ private void ttlCheck() {
+ final Iterator<Map.Entry<String, PipeTsFileResource>> iterator =
+ hardlinkOrCopiedFileToPipeTsFileResourceMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<String, PipeTsFileResource> entry = iterator.next();
- /** Cache the File objects here to avoid redundancy */
- private final Map<String, File> fileNameToFileMap = new HashMap<>();
+ lock.lock();
+ try {
+ if (entry.getValue().closeIfOutOfTimeToLive()) {
+ iterator.remove();
+ } else {
+ LOGGER.info(
+ "Pipe file (file name: {}) is still referenced {} times",
+ entry.getKey(),
+ entry.getValue().getReferenceCount());
+ }
+ } catch (IOException e) {
+ LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ",
e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
/**
* given a file, create a hardlink or copy it to pipe dir, maintain a
reference count for the
@@ -57,34 +98,48 @@ public class PipeTsFileResourceManager {
* @return the hardlink or copied file
* @throws IOException when create hardlink or copy file failed
*/
- public synchronized File increaseFileReference(File file, boolean isTsFile)
throws IOException {
- // if the file is already a hardlink or copied file, just increase
reference count and return it
- if (increaseReferenceIfExists(file.getPath())) {
- return file;
- }
+ public File increaseFileReference(File file, boolean isTsFile) throws
IOException {
+ lock.lock();
+ try {
+ // if the file is already a hardlink or copied file,
+ // just increase reference count and return it
+ if (increaseReferenceIfExists(file.getPath())) {
+ return file;
+ }
- // if the file is not a hardlink or copied file, check if there is a
related hardlink or copied
- // file in pipe dir. if so, increase reference count and return it
- final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
- if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
- return fileNameToFileMap.get(hardlinkOrCopiedFile.getPath());
- }
+ // if the file is not a hardlink or copied file, check if there is a
related hardlink or
+ // copied file in pipe dir. if so, increase reference count and return it
+ final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
+ if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
+ return hardlinkOrCopiedFileToPipeTsFileResourceMap
+ .get(hardlinkOrCopiedFile.getPath())
+ .getFile();
+ }
- // if the file is not a hardlink or copied file, and there is no related
hardlink or copied
- // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a
reference count for
- // the hardlink or copied file, and return the hardlink or copied file.
- hardlinkOrCopiedFileToReferenceMap.put(hardlinkOrCopiedFile.getPath(), 1);
- fileNameToFileMap.put(hardlinkOrCopiedFile.getPath(),
hardlinkOrCopiedFile);
- // if the file is a tsfile, create a hardlink in pipe dir and return it.
- // otherwise, copy the file (.mod or .resource) to pipe dir and return it.
- return isTsFile
- ? createHardLink(file, hardlinkOrCopiedFile)
- : copyFile(file, hardlinkOrCopiedFile);
+ // if the file is a tsfile, create a hardlink in pipe dir and will
return it.
+ // otherwise, copy the file (.mod or .resource) to pipe dir and will
return it.
+ final File resultFile =
+ isTsFile
+ ? createHardLink(file, hardlinkOrCopiedFile)
+ : copyFile(file, hardlinkOrCopiedFile);
+ // if the file is not a hardlink or copied file, and there is no related
hardlink or copied
+ // file in pipe dir, create a hardlink or copy it to pipe dir, maintain
a reference count for
+ // the hardlink or copied file, and return the hardlink or copied file.
+ hardlinkOrCopiedFileToPipeTsFileResourceMap.put(
+ resultFile.getPath(), new PipeTsFileResource(resultFile, isTsFile));
+ return resultFile;
+ } finally {
+ lock.unlock();
+ }
}
private boolean increaseReferenceIfExists(String path) {
- hardlinkOrCopiedFileToReferenceMap.computeIfPresent(path, (key, value) ->
value + 1);
- return hardlinkOrCopiedFileToReferenceMap.containsKey(path);
+ final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(path);
+ if (resource != null) {
+ resource.increaseAndGetReference();
+ return true;
+ }
+ return false;
}
private static File getHardlinkOrCopiedFileInPipeDir(File file) throws
IOException {
@@ -158,15 +213,16 @@ public class PipeTsFileResourceManager {
* @param hardlinkOrCopiedFile the copied or hardlinked file
* @throws IOException when delete file failed
*/
- public synchronized void decreaseFileReference(File hardlinkOrCopiedFile)
throws IOException {
- final Integer updatedReference =
- hardlinkOrCopiedFileToReferenceMap.computeIfPresent(
- hardlinkOrCopiedFile.getPath(), (file, reference) -> reference -
1);
-
- if (updatedReference != null && updatedReference == 0) {
- Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
-
hardlinkOrCopiedFileToReferenceMap.remove(hardlinkOrCopiedFile.getPath());
- fileNameToFileMap.remove(hardlinkOrCopiedFile.getPath());
+ public void decreaseFileReference(File hardlinkOrCopiedFile) {
+ lock.lock();
+ try {
+ final String filePath = hardlinkOrCopiedFile.getPath();
+ final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
+ if (resource != null) {
+ resource.decreaseAndGetReference();
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -176,19 +232,89 @@ public class PipeTsFileResourceManager {
* @param hardlinkOrCopiedFile the copied or hardlinked file
* @return the reference count of the file
*/
- public synchronized int getFileReferenceCount(File hardlinkOrCopiedFile) {
- return
hardlinkOrCopiedFileToReferenceMap.getOrDefault(hardlinkOrCopiedFile.getPath(),
0);
+ public int getFileReferenceCount(File hardlinkOrCopiedFile) {
+ lock.lock();
+ try {
+ final String filePath = hardlinkOrCopiedFile.getPath();
+ final PipeTsFileResource resource =
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(filePath);
+ return resource != null ? resource.getReferenceCount() : 0;
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized void pinTsFileResource(TsFileResource resource) throws
IOException {
- increaseFileReference(resource.getTsFile(), true);
+ /**
+ * Cache maps of the TsFile for further use.
+ *
+ * @return {@code true} if the maps are successfully put into cache or
already cached. {@code
+ * false} if they can not be cached.
+ */
+ public boolean cacheObjectsIfAbsent(File hardlinkOrCopiedTsFile) throws
IOException {
+ lock.lock();
+ try {
+ final PipeTsFileResource resource =
+
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
+ return resource != null && resource.cacheObjectsIfAbsent();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public Map<String, List<String>> getDeviceMeasurementsMapFromCache(File
hardlinkOrCopiedTsFile)
+ throws IOException {
+ lock.lock();
+ try {
+ final PipeTsFileResource resource =
+
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
+ return resource == null ? null : resource.tryGetDeviceMeasurementsMap();
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized void unpinTsFileResource(TsFileResource resource) throws
IOException {
-
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+ public Map<String, Boolean> getDeviceIsAlignedMapFromCache(File
hardlinkOrCopiedTsFile)
+ throws IOException {
+ lock.lock();
+ try {
+ final PipeTsFileResource resource =
+
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
+ return resource == null ? null : resource.tryGetDeviceIsAlignedMap();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public Map<String, TSDataType> getMeasurementDataTypeMapFromCache(File
hardlinkOrCopiedTsFile)
+ throws IOException {
+ lock.lock();
+ try {
+ final PipeTsFileResource resource =
+
hardlinkOrCopiedFileToPipeTsFileResourceMap.get(hardlinkOrCopiedTsFile.getPath());
+ return resource == null ? null : resource.tryGetMeasurementDataTypeMap();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void pinTsFileResource(TsFileResource resource) throws IOException {
+ lock.lock();
+ try {
+ increaseFileReference(resource.getTsFile(), true);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void unpinTsFileResource(TsFileResource resource) throws IOException {
+ lock.lock();
+ try {
+
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+ } finally {
+ lock.unlock();
+ }
}
public int getLinkedTsfileCount() {
- return hardlinkOrCopiedFileToReferenceMap.size();
+ return hardlinkOrCopiedFileToPipeTsFileResourceMap.size();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
index 951ad738460..9d1e530a19d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResource.java
@@ -40,7 +40,7 @@ public abstract class PipeWALResource implements Closeable {
private final AtomicInteger referenceCount;
- public static final long MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
+ public static final long WAL_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
private final AtomicLong lastLogicalPinTime;
private final AtomicBoolean isPhysicallyPinned;
@@ -118,7 +118,7 @@ public abstract class PipeWALResource implements Closeable {
*/
private boolean unpinPhysicallyIfOutOfTimeToLive() {
if (isPhysicallyPinned.get()) {
- if (System.currentTimeMillis() - lastLogicalPinTime.get() >
MIN_TIME_TO_LIVE_IN_MS) {
+ if (System.currentTimeMillis() - lastLogicalPinTime.get() >
WAL_MIN_TIME_TO_LIVE_IN_MS) {
try {
unpinInternal();
} catch (MemTablePinException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index 1b3dff90fd8..ad1baf1b8a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.db.pipe.resource.wal;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
import org.slf4j.Logger;
@@ -32,8 +30,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public abstract class PipeWALResourceManager {
@@ -45,10 +41,6 @@ public abstract class PipeWALResourceManager {
private static final int SEGMENT_LOCK_COUNT = 32;
private final ReentrantLock[] memtableIdSegmentLocks;
- private static final ScheduledExecutorService PIPE_WAL_RESOURCE_TTL_CHECKER =
- IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
- ThreadName.PIPE_WAL_RESOURCE_TTL_CHECKER.getName());
-
protected PipeWALResourceManager() {
// memtableIdToPipeWALResourceMap can be concurrently accessed by multiple
threads
memtableIdToPipeWALResourceMap = new ConcurrentHashMap<>();
@@ -58,38 +50,35 @@ public abstract class PipeWALResourceManager {
memtableIdSegmentLocks[i] = new ReentrantLock();
}
- ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
- PIPE_WAL_RESOURCE_TTL_CHECKER,
- () -> {
- final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
- memtableIdToPipeWALResourceMap.entrySet().iterator();
- while (iterator.hasNext()) {
- final Map.Entry<Long, PipeWALResource> entry = iterator.next();
- final ReentrantLock lock =
- memtableIdSegmentLocks[(int) (entry.getKey() %
SEGMENT_LOCK_COUNT)];
-
- lock.lock();
- try {
- if (entry.getValue().invalidateIfPossible()) {
- iterator.remove();
- } else {
- LOGGER.info(
- "WAL (memtableId {}) is still referenced {} times",
- entry.getKey(),
- entry.getValue().getReferenceCount());
- }
- } finally {
- lock.unlock();
- }
- }
- },
- PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
- PipeWALResource.MIN_TIME_TO_LIVE_IN_MS,
- TimeUnit.MILLISECONDS);
+ PipeAgent.runtime()
+ .registerPeriodicalJob(
+ "PipeWALResourceManager#ttlCheck()",
+ this::ttlCheck,
+ Math.max(PipeWALResource.WAL_MIN_TIME_TO_LIVE_IN_MS / 1000, 1));
}
- public int getApproximatePinnedWALCount() {
- return memtableIdToPipeWALResourceMap.size();
+ private void ttlCheck() {
+ final Iterator<Map.Entry<Long, PipeWALResource>> iterator =
+ memtableIdToPipeWALResourceMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<Long, PipeWALResource> entry = iterator.next();
+ final ReentrantLock lock =
+ memtableIdSegmentLocks[(int) (entry.getKey() % SEGMENT_LOCK_COUNT)];
+
+ lock.lock();
+ try {
+ if (entry.getValue().invalidateIfPossible()) {
+ iterator.remove();
+ } else {
+ LOGGER.info(
+ "WAL (memtableId {}) is still referenced {} times",
+ entry.getKey(),
+ entry.getValue().getReferenceCount());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
}
public final void pin(final WALEntryHandler walEntryHandler) throws
IOException {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
index 04308d09cab..03d408b7743 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/PipeTsFileResourceManagerTest.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
+import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResource;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
@@ -45,7 +47,9 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.concurrent.TimeUnit;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.fail;
public class PipeTsFileResourceManagerTest {
@@ -61,6 +65,7 @@ public class PipeTsFileResourceManagerTest {
@Before
public void setUp() throws Exception {
pipeTsFileResourceManager = new PipeTsFileResourceManager();
+ PipeAgent.runtime().startPeriodicalJobExecutor();
createTsfile(TS_FILE_NAME);
creatModsFile(MODS_FILE_NAME);
@@ -145,6 +150,9 @@ public class PipeTsFileResourceManagerTest {
if (pipeFolder.exists()) {
FileUtils.deleteDirectory(pipeFolder);
}
+
+ PipeAgent.runtime().stopPeriodicalJobExecutor();
+ PipeAgent.runtime().clearPeriodicalJobExecutor();
}
@Test
@@ -218,7 +226,13 @@ public class PipeTsFileResourceManagerTest {
Assert.assertEquals(0,
pipeTsFileResourceManager.getFileReferenceCount(pipeModFile));
Assert.assertFalse(Files.exists(originFile.toPath()));
Assert.assertFalse(Files.exists(originModFile.toPath()));
- Assert.assertFalse(Files.exists(pipeTsfile.toPath()));
- Assert.assertFalse(Files.exists(pipeModFile.toPath()));
+ // Pipe TsFile will be cleaned by a timed thread, so we wait some time
here.
+ await()
+ .atMost(3 * PipeTsFileResource.TSFILE_MIN_TIME_TO_LIVE_IN_MS,
TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assert.assertFalse(Files.exists(pipeTsfile.toPath()));
+ Assert.assertFalse(Files.exists(pipeModFile.toPath()));
+ });
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index f8cd6f46277..bd6c8097f1e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -133,7 +133,6 @@ public enum ThreadName {
PIPE_RUNTIME_PROCEDURE_SUBMITTER("Pipe-Runtime-Procedure-Submitter"),
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR("Pipe-Runtime-Periodical-Job-Executor"),
PIPE_ASYNC_CONNECTOR_CLIENT_POOL("Pipe-Async-Connector-Client-Pool"),
- PIPE_WAL_RESOURCE_TTL_CHECKER("Pipe-WAL-Resource-TTL-Checker"),
PIPE_RECEIVER_AIR_GAP_AGENT("Pipe-Receiver-Air-Gap-Agent"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
STATEFUL_TRIGGER_INFORMATION_UPDATER("Stateful-Trigger-Information-Updater"),
@@ -269,7 +268,6 @@ public enum ThreadName {
PIPE_RUNTIME_PROCEDURE_SUBMITTER,
PIPE_RUNTIME_PERIODICAL_JOB_EXECUTOR,
PIPE_ASYNC_CONNECTOR_CLIENT_POOL,
- PIPE_WAL_RESOURCE_TTL_CHECKER,
PIPE_RECEIVER_AIR_GAP_AGENT,
WINDOW_EVALUATION_SERVICE,
STATEFUL_TRIGGER_INFORMATION_UPDATER));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 5ca93c3bd07..579d59e3720 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -156,7 +156,7 @@ public class CommonConfig {
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
10_000;
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
- private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 30;
+ private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50;
// 50B
diff --git
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 0e7fd60f91f..64c747ecae3 100644
---
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@ -350,6 +350,10 @@ public class TsFileSequenceReader implements AutoCloseable
{
}
}
+ public void clearCachedDeviceMetadata() {
+ cachedDeviceMetadata.clear();
+ }
+
private Map<String, TimeseriesMetadata> readDeviceMetadataFromDisk(String
device)
throws IOException {
readFileMetadata();