This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch IOTDB-5723 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8ff4acdf24727acacecda67b237f6d0ee9e48d0b Author: Steve Yurong Su <[email protected]> AuthorDate: Sun May 28 23:41:40 2023 +0800 assign simple progress index --- .../apache/iotdb/db/engine/storagegroup/TsFileProcessor.java | 5 +++-- .../apache/iotdb/db/engine/storagegroup/TsFileResource.java | 4 ++++ .../apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java | 6 +++--- .../agent/runtime/SimpleConsensusProgressIndexAssigner.java | 12 ++++++++---- .../realtime/listener/PipeInsertionDataNodeListener.java | 7 +++++++ 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index a124a8aa2c8..f8d752df5f8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -833,13 +833,14 @@ public class TsFileProcessor { if (workMemTable != null) { logger.info( "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile " - + "size: {}, plan index: [{}, {}]", + + "size: {}, plan index: [{}, {}], progress index: {}", storageGroupName, tsFileResource.getTsFile().getAbsolutePath(), workMemTable.memSize(), tsFileResource.getTsFileSize(), workMemTable.getMinPlanIndex(), - workMemTable.getMaxPlanIndex()); + workMemTable.getMaxPlanIndex(), + tsFileResource.getMaxProgressIndex()); } else { logger.info( "{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}", diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java index 774106dd593..858f36373f3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java @@ -1178,6 +1178,10 @@ public class TsFileResource { throw new IllegalStateException( "Should not get progress index from a unclosing TsFileResource."); } + return getMaxProgressIndex(); + } + + public ProgressIndex getMaxProgressIndex() { return maxProgressIndex == null ? new MinimumProgressIndex() : maxProgressIndex; } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index 486afe825e1..d03b732007a 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -19,10 +19,10 @@ package org.apache.iotdb.db.pipe.agent.runtime; -import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask; import org.apache.iotdb.db.service.ResourcesInformationHolder; @@ -78,8 +78,8 @@ public class PipeRuntimeAgent implements IService { ////////////////////// SimpleConsensus ProgressIndex Assigner ////////////////////// - public SimpleProgressIndex assignSimpleProgressIndex() { - return simpleConsensusProgressIndexAssigner.assign(); + public void assignSimpleProgressIndexIfNeeded(TsFileResource tsFileResource) { + simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource); } //////////////////////////// Runtime Exception Handlers //////////////////////////// diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java index 195637a63de..648c4994f07 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.file.SystemFileFactory; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -100,9 +101,12 @@ public class SimpleConsensusProgressIndexAssigner { FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), "UTF-8"); } - public SimpleProgressIndex assign() { - return isEnable - ? new SimpleProgressIndex(rebootTimes, memtableFlushOrderId.getAndIncrement()) - : null; + public void assignIfNeeded(TsFileResource tsFileResource) { + if (!isEnable) { + return; + } + + tsFileResource.updateProgressIndex( + new SimpleProgressIndex(rebootTimes, memtableFlushOrderId.getAndIncrement())); } } diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java index d39a2fb9dc4..68b2c5ca010 100644 --- a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java +++ b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.core.collector.realtime.listener; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector; import org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner; import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory; @@ -45,6 +46,8 @@ public class PipeInsertionDataNodeListener { private final ConcurrentMap<String, PipeDataRegionAssigner> dataRegionId2Assigner = new ConcurrentHashMap<>(); + //////////////////////////// start & stop //////////////////////////// + public synchronized void startListenAndAssign( String dataRegionId, PipeRealtimeDataRegionCollector collector) { dataRegionId2Assigner @@ -69,12 +72,16 @@ public class PipeInsertionDataNodeListener { } } + //////////////////////////// listen to events //////////////////////////// + // TODO: listen to the tsfile synced from the other cluster // TODO: check whether the method is called on the right place. what is the meaning of the // variable shouldClose before calling this method? // TODO: maximum the efficiency of the method when there is no pipe in the system, avoid // dataRegionId2Assigner.get(dataRegionId); public void listenToTsFile(String dataRegionId, TsFileResource tsFileResource) { + PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource); + final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); // only events from registered data region will be collected
