This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch pipe-flush in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7bbdc95f8442ff48337f7efe3ebc45861f651535 Author: Caideyipi <[email protected]> AuthorDate: Fri Feb 13 14:41:50 2026 +0800 Flush manager --- .../tsfile/PipeCompactedTsFileInsertionEvent.java | 8 +- .../common/tsfile/PipeTsFileInsertionEvent.java | 7 +- .../PipeRealtimeDataRegionHybridSource.java | 13 ++- .../realtime/PipeRealtimeDataRegionSource.java | 10 +- .../PipeRealtimeDataRegionTsFileSource.java | 16 ++- ...ipeTsFileEpochProgressIndexAndFlushManager.java | 128 +++++++++++++++++++++ .../PipeTsFileEpochProgressIndexKeeper.java | 89 -------------- .../listener/PipeTimePartitionListener.java | 50 ++++---- .../rescon/memory/TimePartitionManager.java | 2 +- .../apache/iotdb/commons/conf/CommonConfig.java | 15 +++ .../iotdb/commons/pipe/config/PipeConfig.java | 4 + .../iotdb/commons/pipe/config/PipeDescriptor.java | 5 + 12 files changed, 211 insertions(+), 136 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java index bf3f2c97acb..078ce718488 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java @@ -26,7 +26,7 @@ import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; -import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -38,7 +38,7 @@ import java.util.stream.Collectors; public class PipeCompactedTsFileInsertionEvent extends PipeTsFileInsertionEvent { - private final String dataRegionId; + private final int dataRegionId; private final Set<String> originFilePaths; private final List<Long> commitIds; @@ -70,7 +70,7 @@ public class PipeCompactedTsFileInsertionEvent extends PipeTsFileInsertionEvent anyOfOriginalEvents.getStartTime(), anyOfOriginalEvents.getEndTime()); - this.dataRegionId = String.valueOf(committerKey.getRegionId()); + this.dataRegionId = committerKey.getRegionId(); this.originFilePaths = originalEvents.stream() .map(PipeTsFileInsertionEvent::getTsFile) @@ -186,7 +186,7 @@ public class PipeCompactedTsFileInsertionEvent extends PipeTsFileInsertionEvent public void eliminateProgressIndex() { if (Objects.isNull(overridingProgressIndex)) { for (final String originFilePath : originFilePaths) { - PipeTsFileEpochProgressIndexKeeper.getInstance() + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() .eliminateProgressIndex(dataRegionId, pipeName, originFilePath); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 6b3e505d2ea..7b07e084a7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -45,7 +45,7 @@ import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager; -import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -399,8 +399,9 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent public void eliminateProgressIndex() { if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) { - PipeTsFileEpochProgressIndexKeeper.getInstance() - .eliminateProgressIndex(resource.getDataRegionId(), pipeName, resource.getTsFilePath()); + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() + .eliminateProgressIndex( + Integer.parseInt(resource.getDataRegionId()), pipeName, resource.getTsFilePath()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java index 0721683f4d2..bb26b5e18da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java @@ -30,7 +30,7 @@ import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; -import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager; import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -65,6 +65,7 @@ public class PipeRealtimeDataRegionHybridSource extends PipeRealtimeDataRegionSo "Unsupported event type %s for hybrid realtime extractor %s", eventToExtract.getClass(), this)); } + PipeTsFileEpochProgressIndexAndFlushManager.getInstance().flushAllTimeoutTsFiles(); } @Override @@ -82,8 +83,8 @@ public class PipeRealtimeDataRegionHybridSource extends PipeRealtimeDataRegionSo if (canNotUseTabletAnymore(event)) { event.getTsFileEpoch().migrateState(this, curState -> TsFileEpoch.State.USING_TSFILE); - PipeTsFileEpochProgressIndexKeeper.getInstance() - .registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource()); + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() + .registerResource(dataRegionId, pipeName, event.getTsFileEpoch().getResource()); } else { event .getTsFileEpoch() @@ -169,13 +170,15 @@ public class PipeRealtimeDataRegionHybridSource extends PipeRealtimeDataRegionSo switch (state) { case USING_TABLET: // If the state is USING_TABLET, discard the event - PipeTsFileEpochProgressIndexKeeper.getInstance() + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() .eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(), false); return; case EMPTY: case USING_TSFILE: case USING_BOTH: + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() + .markAsExtracted(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); if (!pendingQueue.waitedOffer(event)) { // This would not happen, but just in case. // pendingQueue is unbounded, so it should never reach capacity. @@ -310,7 +313,7 @@ public class PipeRealtimeDataRegionHybridSource extends PipeRealtimeDataRegionSo LOGGER.error(errorMessage); PipeDataNodeAgent.runtime() .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); - PipeTsFileEpochProgressIndexKeeper.getInstance() + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() .eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); return null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index cd00e2975a0..3dec073f057 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -39,7 +39,7 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter; -import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager; import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener; import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeTimePartitionListener; import org.apache.iotdb.db.storageengine.StorageEngine; @@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { protected String pipeName; protected long creationTime; - protected String dataRegionId; + protected int dataRegionId; protected PipeTaskMeta pipeTaskMeta; protected boolean shouldExtractInsertion; @@ -214,7 +214,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight(); pipeName = environment.getPipeName(); - dataRegionId = String.valueOf(environment.getRegionId()); + dataRegionId = environment.getRegionId(); pipeTaskMeta = environment.getPipeTaskMeta(); // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. These metrics are @@ -555,7 +555,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { return skipIfNoPrivileges; } - public final String getDataRegionId() { + public final int getDataRegionId() { return dataRegionId; } @@ -595,7 +595,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { } private void maySkipProgressIndexForRealtimeEvent(final PipeRealtimeEvent event) { - if (PipeTsFileEpochProgressIndexKeeper.getInstance() + if (PipeTsFileEpochProgressIndexAndFlushManager.getInstance() .isProgressIndexAfterOrEquals( dataRegionId, pipeName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java index 8f74bc63fb3..be1f648d163 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java @@ -25,7 +25,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent; -import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; +import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexAndFlushManager; import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch; import org.apache.iotdb.pipe.api.event.Event; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -39,7 +39,9 @@ public class PipeRealtimeDataRegionTsFileSource extends PipeRealtimeDataRegionSo LoggerFactory.getLogger(PipeRealtimeDataRegionTsFileSource.class); @Override - protected void doExtract(PipeRealtimeEvent event) { + protected void doExtract(final PipeRealtimeEvent event) { + PipeTsFileEpochProgressIndexAndFlushManager.getInstance().flushAllTimeoutTsFiles(); + if (event.getEvent() instanceof PipeHeartbeatEvent) { extractHeartbeat(event); return; @@ -51,8 +53,8 @@ public class PipeRealtimeDataRegionTsFileSource extends PipeRealtimeDataRegionSo } event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE); - PipeTsFileEpochProgressIndexKeeper.getInstance() - .registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource()); + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() + .registerResource(dataRegionId, pipeName, event.getTsFileEpoch().getResource()); if (!(event.getEvent() instanceof TsFileInsertionEvent)) { event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(), false); @@ -64,7 +66,7 @@ public class PipeRealtimeDataRegionTsFileSource extends PipeRealtimeDataRegionSo // Pending is unbounded, so it should never reach capacity. final String errorMessage = String.format( - "extract: pending queue of PipeRealtimeDataRegionTsFileExtractor %s " + "extract: pending queue of PipeRealtimeDataRegionTsFileSource %s " + "has reached capacity, discard TsFile event %s, current state %s", this, event, event.getTsFileEpoch().getState(this)); LOGGER.error(errorMessage); @@ -75,6 +77,8 @@ public class PipeRealtimeDataRegionTsFileSource extends PipeRealtimeDataRegionSo event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(), false); } + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() + .markAsExtracted(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath()); event.getTsFileEpoch().clearState(this); } @@ -116,7 +120,7 @@ public class PipeRealtimeDataRegionTsFileSource extends PipeRealtimeDataRegionSo LOGGER.error(errorMessage); PipeDataNodeAgent.runtime() .report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage)); - PipeTsFileEpochProgressIndexKeeper.getInstance() + PipeTsFileEpochProgressIndexAndFlushManager.getInstance() .eliminateProgressIndex( dataRegionId, pipeName, realtimeEvent.getTsFileEpoch().getFilePath()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java new file mode 100644 index 00000000000..20b2dfe410f --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexAndFlushManager.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner; + +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.utils.Pair; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +public class PipeTsFileEpochProgressIndexAndFlushManager { + + // data region id -> pipeName -> tsFile path -> max progress index + private final Map<Integer, Map<String, Map<String, Pair<TsFileResource, Long>>>> + progressIndexKeeper = new ConcurrentHashMap<>(); + + public synchronized void registerResource( + final int dataRegionId, final String pipeName, final TsFileResource resource) { + progressIndexKeeper + .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) + .putIfAbsent(resource.getTsFilePath(), new Pair<>(resource, System.currentTimeMillis())); + } + + public synchronized void markAsExtracted( + final int dataRegionId, final String pipeName, final String filePath) { + final Pair<TsFileResource, Long> pair = + progressIndexKeeper + .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) + .get(filePath); + if (Objects.nonNull(pair)) { + pair.setRight(Long.MAX_VALUE); + } + } + + public void flushAllTimeoutTsFiles() { + progressIndexKeeper.forEach( + (regionId, map) -> + map.values() + .forEach( + fileMap -> + fileMap.forEach( + (path, pair) -> { + if (System.currentTimeMillis() + - PipeConfig.getInstance().getPipeTsFileFlushIntervalSeconds() + * 1000L + >= pair.getRight()) { + StorageEngine.getInstance() + .getDataRegion(new DataRegionId(regionId)) + .asyncCloseOneTsFileProcessor( + pair.getLeft().isSeq(), pair.getLeft().getProcessor()); + pair.setRight(Long.MAX_VALUE); + } + }))); + } + + public synchronized void eliminateProgressIndex( + final int dataRegionId, final @Nonnull String pipeName, final String filePath) { + progressIndexKeeper + .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) + .remove(filePath); + } + + public synchronized boolean isProgressIndexAfterOrEquals( + final int dataRegionId, + final String pipeName, + final String tsFilePath, + final ProgressIndex progressIndex) { + return progressIndexKeeper + .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) + .entrySet() + .stream() + .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath)) + .map(Entry::getValue) + .filter(Objects::nonNull) + .anyMatch(resource -> !resource.getLeft().getMaxProgressIndex().isAfter(progressIndex)); + } + + //////////////////////////// singleton //////////////////////////// + + private static class PipeTimePartitionProgressIndexKeeperHolder { + + private static final PipeTsFileEpochProgressIndexAndFlushManager INSTANCE = + new PipeTsFileEpochProgressIndexAndFlushManager(); + + private PipeTimePartitionProgressIndexKeeperHolder() { + // empty constructor + } + } + + public static PipeTsFileEpochProgressIndexAndFlushManager getInstance() { + return PipeTsFileEpochProgressIndexAndFlushManager.PipeTimePartitionProgressIndexKeeperHolder + .INSTANCE; + } + + private PipeTsFileEpochProgressIndexAndFlushManager() { + // empty constructor + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java deleted file mode 100644 index ff7d90c377d..00000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner; - -import org.apache.iotdb.commons.consensus.index.ProgressIndex; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; - -import javax.annotation.Nonnull; - -import java.util.Map; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -public class PipeTsFileEpochProgressIndexKeeper { - - // data region id -> pipeName -> tsFile path -> max progress index - private final Map<String, Map<String, Map<String, TsFileResource>>> progressIndexKeeper = - new ConcurrentHashMap<>(); - - public synchronized void registerProgressIndex( - final String dataRegionId, final String pipeName, final TsFileResource resource) { - progressIndexKeeper - .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) - .putIfAbsent(resource.getTsFilePath(), resource); - } - - public synchronized void eliminateProgressIndex( - final String dataRegionId, final @Nonnull String pipeName, final String filePath) { - progressIndexKeeper - .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) - .remove(filePath); - } - - public synchronized boolean isProgressIndexAfterOrEquals( - final String dataRegionId, - final String pipeName, - final String tsFilePath, - final ProgressIndex progressIndex) { - return progressIndexKeeper - .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) - .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) - .entrySet() - .stream() - .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath)) - .map(Entry::getValue) - .filter(Objects::nonNull) - .anyMatch(resource -> !resource.getMaxProgressIndex().isAfter(progressIndex)); - } - - //////////////////////////// singleton //////////////////////////// - - private static class PipeTimePartitionProgressIndexKeeperHolder { - - private static final PipeTsFileEpochProgressIndexKeeper INSTANCE = - new PipeTsFileEpochProgressIndexKeeper(); - - private PipeTimePartitionProgressIndexKeeperHolder() { - // empty constructor - } - } - - public static PipeTsFileEpochProgressIndexKeeper getInstance() { - return PipeTsFileEpochProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE; - } - - private PipeTsFileEpochProgressIndexKeeper() { - // empty constructor - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java index c5e98cd1b2c..d8fd295c9ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java @@ -30,47 +30,50 @@ import java.util.concurrent.ConcurrentHashMap; public class PipeTimePartitionListener { - private final Map<String, Map<String, PipeRealtimeDataRegionSource>> dataRegionId2Extractors = + private final Map<Integer, Map<String, PipeRealtimeDataRegionSource>> dataRegionId2Sources = new ConcurrentHashMap<>(); // This variable is used to record the upper and lower bounds that each data region's time // partition ID has ever reached. - private final Map<String, Pair<Long, Long>> dataRegionId2TimePartitionIdBound = + private final Map<Integer, Pair<Long, Long>> dataRegionId2TimePartitionIdBound = new ConcurrentHashMap<>(); //////////////////////////// start & stop //////////////////////////// public synchronized void startListen( - String dataRegionId, PipeRealtimeDataRegionSource extractor) { - dataRegionId2Extractors + final int dataRegionId, final PipeRealtimeDataRegionSource source) { + dataRegionId2Sources .computeIfAbsent(dataRegionId, o -> new HashMap<>()) - .put(extractor.getTaskID(), extractor); - // Assign the previously recorded upper and lower bounds of time partition to the extractor that + .put(source.getTaskID(), source); + // Assign the previously recorded upper and lower bounds of time partition to the source that // has just started listening to the growth of time partition. - Pair<Long, Long> timePartitionIdBound = dataRegionId2TimePartitionIdBound.get(dataRegionId); + final Pair<Long, Long> timePartitionIdBound = + dataRegionId2TimePartitionIdBound.get(dataRegionId); if (Objects.nonNull(timePartitionIdBound)) { - extractor.setDataRegionTimePartitionIdBound(timePartitionIdBound); + source.setDataRegionTimePartitionIdBound(timePartitionIdBound); } } - public synchronized void stopListen(String dataRegionId, PipeRealtimeDataRegionSource extractor) { - Map<String, PipeRealtimeDataRegionSource> extractors = - dataRegionId2Extractors.get(dataRegionId); - if (Objects.isNull(extractors)) { + public synchronized void stopListen( + final int dataRegionId, final PipeRealtimeDataRegionSource source) { + final Map<String, PipeRealtimeDataRegionSource> sources = + dataRegionId2Sources.get(dataRegionId); + if (Objects.isNull(sources)) { return; } - extractors.remove(extractor.getTaskID()); - if (extractors.isEmpty()) { - dataRegionId2Extractors.remove(dataRegionId); + sources.remove(source.getTaskID()); + if (sources.isEmpty()) { + dataRegionId2Sources.remove(dataRegionId); } } //////////////////////////// listen to changes //////////////////////////// public synchronized void listenToTimePartitionGrow( - String dataRegionId, Pair<Long, Long> newTimePartitionIdBound) { + final int dataRegionId, final Pair<Long, Long> newTimePartitionIdBound) { boolean shouldBroadcastTimePartitionChange = false; - Pair<Long, Long> oldTimePartitionIdBound = dataRegionId2TimePartitionIdBound.get(dataRegionId); + final Pair<Long, Long> oldTimePartitionIdBound = + dataRegionId2TimePartitionIdBound.get(dataRegionId); if (Objects.isNull(oldTimePartitionIdBound)) { dataRegionId2TimePartitionIdBound.put(dataRegionId, newTimePartitionIdBound); @@ -86,14 +89,15 @@ public class PipeTimePartitionListener { } if (shouldBroadcastTimePartitionChange) { - Map<String, PipeRealtimeDataRegionSource> extractors = - dataRegionId2Extractors.get(dataRegionId); - if (Objects.isNull(extractors)) { + final Map<String, PipeRealtimeDataRegionSource> sources = + dataRegionId2Sources.get(dataRegionId); + if (Objects.isNull(sources)) { return; } - Pair<Long, Long> timePartitionIdBound = dataRegionId2TimePartitionIdBound.get(dataRegionId); - extractors.forEach( - (id, extractor) -> extractor.setDataRegionTimePartitionIdBound(timePartitionIdBound)); + final Pair<Long, Long> timePartitionIdBound = + dataRegionId2TimePartitionIdBound.get(dataRegionId); + sources.forEach( + (id, source) -> source.setDataRegionTimePartitionIdBound(timePartitionIdBound)); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java index 2e1161977f8..5ded82a36ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java @@ -67,7 +67,7 @@ public class TimePartitionManager { // PipeInsertionDataNodeListener.listenToInsertNode. PipeTimePartitionListener.getInstance() .listenToTimePartitionGrow( - String.valueOf(timePartitionInfo.dataRegionId.getId()), + timePartitionInfo.dataRegionId.getId(), new Pair<>( timePartitionInfoMapForRegion.firstKey(), timePartitionInfoMapForRegion.lastKey())); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index cf68da89553..2e600029241 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -322,6 +322,9 @@ public class CommonConfig { private volatile int pipeTsFilePinMaxLogNumPerRound = 10; private volatile int pipeTsFilePinMaxLogIntervalRounds = 90; + // <= 0 means disabled + private volatile long pipeTsFileFlushIntervalSeconds = 5 * 60L; + private volatile boolean pipeMemoryManagementEnabled = true; private volatile long pipeMemoryAllocateRetryIntervalMs = 50; private volatile int pipeMemoryAllocateMaxRetries = 10; @@ -1783,6 +1786,18 @@ public class CommonConfig { "pipeTsFilePinMaxLogIntervalRounds is set to {}", pipeTsFilePinMaxLogIntervalRounds); } + public long getPipeTsFileFlushIntervalSeconds() { + return pipeTsFileFlushIntervalSeconds; + } + + public void setPipeTsFileFlushIntervalSeconds(long pipeTsFileFlushIntervalSeconds) { + if (this.pipeTsFileFlushIntervalSeconds == pipeTsFileFlushIntervalSeconds) { + return; + } + this.pipeTsFileFlushIntervalSeconds = pipeTsFileFlushIntervalSeconds; + logger.info("pipeTsFileFlushIntervalSeconds is set to {}", pipeTsFileFlushIntervalSeconds); + } + public boolean getPipeMemoryManagementEnabled() { return pipeMemoryManagementEnabled; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java index a49caa53368..16539d82d8a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java @@ -387,6 +387,10 @@ public class PipeConfig { return COMMON_CONFIG.getPipeTsFilePinMaxLogIntervalRounds(); } + public long getPipeTsFileFlushIntervalSeconds() { + return COMMON_CONFIG.getPipeTsFileFlushIntervalSeconds(); + } + public long getPipeLoggerCacheMaxSizeInBytes() { return COMMON_CONFIG.getPipeLoggerCacheMaxSizeInBytes(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java index 832517b9745..6e72b4919fb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java @@ -131,6 +131,11 @@ public class PipeDescriptor { properties.getProperty( "pipe_tsfile_pin_max_log_interval_rounds", String.valueOf(config.getPipeTsFilePinMaxLogIntervalRounds())))); + config.setPipeTsFileFlushIntervalSeconds( + Long.parseLong( + properties.getProperty( + "pipe_tsfile_flush_interval_seconds", + String.valueOf(config.getPipeTsFileFlushIntervalSeconds())))); config.setPipeMemoryManagementEnabled( Boolean.parseBoolean( properties.getProperty(
