This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch IOTDB-5723
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8ff4acdf24727acacecda67b237f6d0ee9e48d0b
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun May 28 23:41:40 2023 +0800

    assign simple progress index
---
 .../apache/iotdb/db/engine/storagegroup/TsFileProcessor.java |  5 +++--
 .../apache/iotdb/db/engine/storagegroup/TsFileResource.java  |  4 ++++
 .../apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java |  6 +++---
 .../agent/runtime/SimpleConsensusProgressIndexAssigner.java  | 12 ++++++++----
 .../realtime/listener/PipeInsertionDataNodeListener.java     |  7 +++++++
 5 files changed, 25 insertions(+), 9 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index a124a8aa2c8..f8d752df5f8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -833,13 +833,14 @@ public class TsFileProcessor {
         if (workMemTable != null) {
           logger.info(
               "{}: flush a working memtable in async close tsfile {}, memtable 
size: {}, tsfile "
-                  + "size: {}, plan index: [{}, {}]",
+                  + "size: {}, plan index: [{}, {}], progress index: {}",
               storageGroupName,
               tsFileResource.getTsFile().getAbsolutePath(),
               workMemTable.memSize(),
               tsFileResource.getTsFileSize(),
               workMemTable.getMinPlanIndex(),
-              workMemTable.getMaxPlanIndex());
+              workMemTable.getMaxPlanIndex(),
+              tsFileResource.getMaxProgressIndex());
         } else {
           logger.info(
               "{}: flush a NotifyFlushMemTable in async close tsfile {}, 
tsfile size: {}",
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 774106dd593..858f36373f3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -1178,6 +1178,10 @@ public class TsFileResource {
       throw new IllegalStateException(
           "Should not get progress index from a unclosing TsFileResource.");
     }
+    return getMaxProgressIndex();
+  }
+
+  public ProgressIndex getMaxProgressIndex() {
     return maxProgressIndex == null ? new MinimumProgressIndex() : 
maxProgressIndex;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 486afe825e1..d03b732007a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,10 +19,10 @@
 
 package org.apache.iotdb.db.pipe.agent.runtime;
 
-import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
 import org.apache.iotdb.db.service.ResourcesInformationHolder;
@@ -78,8 +78,8 @@ public class PipeRuntimeAgent implements IService {
 
   ////////////////////// SimpleConsensus ProgressIndex Assigner 
//////////////////////
 
-  public SimpleProgressIndex assignSimpleProgressIndex() {
-    return simpleConsensusProgressIndexAssigner.assign();
+  public void assignSimpleProgressIndexIfNeeded(TsFileResource tsFileResource) 
{
+    simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
   }
 
   //////////////////////////// Runtime Exception Handlers 
////////////////////////////
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
index 195637a63de..648c4994f07 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -100,9 +101,12 @@ public class SimpleConsensusProgressIndexAssigner {
     FileUtils.writeStringToFile(file, String.valueOf(rebootTimes + 1), 
"UTF-8");
   }
 
-  public SimpleProgressIndex assign() {
-    return isEnable
-        ? new SimpleProgressIndex(rebootTimes, 
memtableFlushOrderId.getAndIncrement())
-        : null;
+  public void assignIfNeeded(TsFileResource tsFileResource) {
+    if (!isEnable) {
+      return;
+    }
+
+    tsFileResource.updateProgressIndex(
+        new SimpleProgressIndex(rebootTimes, 
memtableFlushOrderId.getAndIncrement()));
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
index d39a2fb9dc4..68b2c5ca010 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/listener/PipeInsertionDataNodeListener.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.db.pipe.core.collector.realtime.listener;
 
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.assigner.PipeDataRegionAssigner;
 import 
org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEventFactory;
@@ -45,6 +46,8 @@ public class PipeInsertionDataNodeListener {
   private final ConcurrentMap<String, PipeDataRegionAssigner> 
dataRegionId2Assigner =
       new ConcurrentHashMap<>();
 
+  //////////////////////////// start & stop ////////////////////////////
+
   public synchronized void startListenAndAssign(
       String dataRegionId, PipeRealtimeDataRegionCollector collector) {
     dataRegionId2Assigner
@@ -69,12 +72,16 @@ public class PipeInsertionDataNodeListener {
     }
   }
 
+  //////////////////////////// listen to events ////////////////////////////
+
   // TODO: listen to the tsfile synced from the other cluster
   // TODO: check whether the method is called on the right place. what is the 
meaning of the
   // variable shouldClose before calling this method?
   // TODO: maximum the efficiency of the method when there is no pipe in the 
system, avoid
   // dataRegionId2Assigner.get(dataRegionId);
   public void listenToTsFile(String dataRegionId, TsFileResource 
tsFileResource) {
+    PipeAgent.runtime().assignSimpleProgressIndexIfNeeded(tsFileResource);
+
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(dataRegionId);
 
     // only events from registered data region will be collected

Reply via email to