This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new accd9e1430c [IOTDB-5974] Pipe: Fix Recover ProgressIndex (#10090)
accd9e1430c is described below

commit accd9e1430c4b696e76f2e38105feed904b8e25a
Author: yschengzi <[email protected]>
AuthorDate: Fri Jun 9 00:30:25 2023 +0800

    [IOTDB-5974] Pipe: Fix Recover ProgressIndex (#10090)
---
 .../commons/consensus/index/impl/RecoverProgressIndex.java   |  6 ++++++
 .../apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java | 12 ++++++++++--
 .../agent/runtime/SimpleConsensusProgressIndexAssigner.java  |  5 ++---
 .../db/wal/recover/file/UnsealedTsFileRecoverPerformer.java  |  2 +-
 4 files changed, 19 insertions(+), 6 deletions(-)

diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index d2742acdb71..97be1e4be58 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -41,6 +41,12 @@ public class RecoverProgressIndex implements ProgressIndex {
     this.dataNodeId2LocalIndex = new HashMap<>();
   }
 
+  public RecoverProgressIndex(int dataNodeId, SimpleProgressIndex 
simpleProgressIndex) {
+    this.dataNodeId2LocalIndex = new HashMap<>();
+
+    dataNodeId2LocalIndex.put(dataNodeId, simpleProgressIndex);
+  }
+
   @Override
   public void serialize(ByteBuffer byteBuffer) {
     lock.readLock().lock();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index b4e4077ea5c..7d1a6336a16 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,12 +19,14 @@
 
 package org.apache.iotdb.db.pipe.agent.runtime;
 
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.resource.file.PipeHardlinkFileDirStartupCleaner;
@@ -38,6 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class PipeRuntimeAgent implements IService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeRuntimeAgent.class);
+  private static final int DATA_NODE_ID = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
 
   private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
 
@@ -86,8 +89,13 @@ public class PipeRuntimeAgent implements IService {
     simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
   }
 
-  public void assignSimpleProgressIndexForTsFileRecovery(TsFileResource 
tsFileResource) {
-    
simpleConsensusProgressIndexAssigner.assignSimpleProgressIndexForTsFileRecovery(tsFileResource);
+  ////////////////////// Recover ProgressIndex Assigner //////////////////////
+
+  public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource 
tsFileResource) {
+    tsFileResource.updateProgressIndex(
+        new RecoverProgressIndex(
+            DATA_NODE_ID,
+            
simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
   }
 
   //////////////////////////// Runtime Exception Handlers 
////////////////////////////
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
index e3c54c6eb94..7c81cd6cabd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -106,8 +106,7 @@ public class SimpleConsensusProgressIndexAssigner {
         new SimpleProgressIndex(rebootTimes, 
memtableFlushOrderId.getAndIncrement()));
   }
 
-  public void assignSimpleProgressIndexForTsFileRecovery(TsFileResource 
tsFileResource) {
-    tsFileResource.updateProgressIndex(
-        new SimpleProgressIndex(rebootTimes, 
memtableFlushOrderId.getAndIncrement()));
+  public SimpleProgressIndex getSimpleProgressIndexForTsFileRecovery() {
+    return new SimpleProgressIndex(rebootTimes, 
memtableFlushOrderId.getAndIncrement());
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
 
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index e3e675d2457..76a6eee4e4a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -244,7 +244,7 @@ public class UnsealedTsFileRecoverPerformer extends 
AbstractTsFileRecoverPerform
         }
 
         // set recover progress index for pipe
-        
PipeAgent.runtime().assignSimpleProgressIndexForTsFileRecovery(tsFileResource);
+        
PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(tsFileResource);
 
         // if we put following codes in if clause above, this file can be 
continued writing into it
         // currently, we close this file anyway

Reply via email to