This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 6f3586a26aa [region migration] Fix tsfile error caused by target 
DataNode's fast rebooting #14031 (#14054)
6f3586a26aa is described below

commit 6f3586a26aa8f6e15219725afbb22272f64ce3f3
Author: Li Yu Heng <[email protected]>
AuthorDate: Tue Nov 12 14:18:55 2024 +0800

    [region migration] Fix tsfile error caused by target DataNode's fast 
rebooting #14031 (#14054)
---
 .../org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java    | 4 ++--
 .../consensus/iot/service/IoTConsensusRPCServiceProcessor.java    | 2 +-
 .../org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java | 1 +
 .../iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java      | 8 ++++----
 .../thrift-consensus/src/main/thrift/iotconsensus.thrift          | 5 +++--
 5 files changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index de3643bca6c..efa1a0306ef 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -373,7 +373,7 @@ public class IoTConsensusServerImpl {
   }
 
   public void receiveSnapshotFragment(
-      String snapshotId, String originalFilePath, ByteBuffer fileChunk)
+      String snapshotId, String originalFilePath, ByteBuffer fileChunk, long 
fileOffset)
       throws ConsensusGroupModifyPeerException {
     try {
       String targetFilePath = calculateSnapshotPath(snapshotId, 
originalFilePath);
@@ -384,7 +384,7 @@ public class IoTConsensusServerImpl {
       }
       try (FileOutputStream fos = new 
FileOutputStream(targetFile.getAbsolutePath(), true);
           FileChannel channel = fos.getChannel()) {
-        channel.write(fileChunk.slice());
+        channel.write(fileChunk.slice(), fileOffset);
       }
     } catch (IOException e) {
       throw new ConsensusGroupModifyPeerException(
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 07fa67ee539..a76b0e97b06 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -270,7 +270,7 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
     }
     TSStatus responseStatus;
     try {
-      impl.receiveSnapshotFragment(req.snapshotId, req.filePath, 
req.fileChunk);
+      impl.receiveSnapshotFragment(req.snapshotId, req.filePath, 
req.fileChunk, req.offset);
       responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     } catch (ConsensusGroupModifyPeerException e) {
       responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
index a249bad90ff..67fcb220c18 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
@@ -50,6 +50,7 @@ public class SnapshotFragment {
     TSendSnapshotFragmentReq req = new TSendSnapshotFragmentReq();
     req.setSnapshotId(snapshotId);
     req.setFilePath(filePath);
+    req.setOffset(startOffset);
     req.setChunkLength(fragmentSize);
     req.setFileChunk(fileChunk);
     return req;
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
index ca79b8f3c95..3331829d243 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -46,12 +46,12 @@ public class SnapshotFragmentReader {
 
   public boolean hasNext() throws IOException {
     buf.clear();
-    int actualReadSize = fileChannel.read(buf);
+    int readSize = fileChannel.read(buf);
     buf.flip();
-    if (actualReadSize > 0) {
+    if (readSize > 0) {
       cachedSnapshotFragment =
-          new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize, 
actualReadSize, buf);
-      totalReadSize += actualReadSize;
+          new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize, 
readSize, buf);
+      totalReadSize += readSize;
       return true;
     }
     return false;
diff --git 
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift 
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index 36dbd2dc35e..d0b4808977e 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -80,8 +80,9 @@ struct TSendSnapshotFragmentReq {
   1: required common.TConsensusGroupId consensusGroupId
   2: required string snapshotId
   3: required string filePath
-  4: required i64 chunkLength
-  5: required binary fileChunk
+  4: required i64 offset
+  5: required i64 chunkLength
+  6: required binary fileChunk
 }
 
 struct TWaitSyncLogCompleteReq {

Reply via email to