This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new accd9e1430c [IOTDB-5974] Pipe: Fix Recover ProgressIndex (#10090)
accd9e1430c is described below
commit accd9e1430c4b696e76f2e38105feed904b8e25a
Author: yschengzi <[email protected]>
AuthorDate: Fri Jun 9 00:30:25 2023 +0800
[IOTDB-5974] Pipe: Fix Recover ProgressIndex (#10090)
---
.../commons/consensus/index/impl/RecoverProgressIndex.java | 6 ++++++
.../apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java | 12 ++++++++++--
.../agent/runtime/SimpleConsensusProgressIndexAssigner.java | 5 ++---
.../db/wal/recover/file/UnsealedTsFileRecoverPerformer.java | 2 +-
4 files changed, 19 insertions(+), 6 deletions(-)
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
index d2742acdb71..97be1e4be58 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/RecoverProgressIndex.java
@@ -41,6 +41,12 @@ public class RecoverProgressIndex implements ProgressIndex {
this.dataNodeId2LocalIndex = new HashMap<>();
}
+ public RecoverProgressIndex(int dataNodeId, SimpleProgressIndex
simpleProgressIndex) {
+ this.dataNodeId2LocalIndex = new HashMap<>();
+
+ dataNodeId2LocalIndex.put(dataNodeId, simpleProgressIndex);
+ }
+
@Override
public void serialize(ByteBuffer byteBuffer) {
lock.readLock().lock();
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index b4e4077ea5c..7d1a6336a16 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -19,12 +19,14 @@
package org.apache.iotdb.db.pipe.agent.runtime;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.resource.file.PipeHardlinkFileDirStartupCleaner;
@@ -38,6 +40,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class PipeRuntimeAgent implements IService {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRuntimeAgent.class);
+ private static final int DATA_NODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
@@ -86,8 +89,13 @@ public class PipeRuntimeAgent implements IService {
simpleConsensusProgressIndexAssigner.assignIfNeeded(tsFileResource);
}
- public void assignSimpleProgressIndexForTsFileRecovery(TsFileResource
tsFileResource) {
-
simpleConsensusProgressIndexAssigner.assignSimpleProgressIndexForTsFileRecovery(tsFileResource);
+ ////////////////////// Recover ProgressIndex Assigner //////////////////////
+
+ public void assignRecoverProgressIndexForTsFileRecovery(TsFileResource
tsFileResource) {
+ tsFileResource.updateProgressIndex(
+ new RecoverProgressIndex(
+ DATA_NODE_ID,
+
simpleConsensusProgressIndexAssigner.getSimpleProgressIndexForTsFileRecovery()));
}
//////////////////////////// Runtime Exception Handlers
////////////////////////////
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
index e3c54c6eb94..7c81cd6cabd 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/SimpleConsensusProgressIndexAssigner.java
@@ -106,8 +106,7 @@ public class SimpleConsensusProgressIndexAssigner {
new SimpleProgressIndex(rebootTimes,
memtableFlushOrderId.getAndIncrement()));
}
- public void assignSimpleProgressIndexForTsFileRecovery(TsFileResource
tsFileResource) {
- tsFileResource.updateProgressIndex(
- new SimpleProgressIndex(rebootTimes,
memtableFlushOrderId.getAndIncrement()));
+ public SimpleProgressIndex getSimpleProgressIndexForTsFileRecovery() {
+ return new SimpleProgressIndex(rebootTimes,
memtableFlushOrderId.getAndIncrement());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index e3e675d2457..76a6eee4e4a 100644
---
a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++
b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -244,7 +244,7 @@ public class UnsealedTsFileRecoverPerformer extends
AbstractTsFileRecoverPerform
}
// set recover progress index for pipe
-
PipeAgent.runtime().assignSimpleProgressIndexForTsFileRecovery(tsFileResource);
+
PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(tsFileResource);
// if we put following codes in if clause above, this file can be
continued writing into it
// currently, we close this file anyway