This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 6f3586a26aa [region migration] Fix tsfile error caused by target
DataNode's fast rebooting #14031 (#14054)
6f3586a26aa is described below
commit 6f3586a26aa8f6e15219725afbb22272f64ce3f3
Author: Li Yu Heng <[email protected]>
AuthorDate: Tue Nov 12 14:18:55 2024 +0800
[region migration] Fix tsfile error caused by target DataNode's fast
rebooting #14031 (#14054)
---
.../org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java | 4 ++--
.../consensus/iot/service/IoTConsensusRPCServiceProcessor.java | 2 +-
.../org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java | 1 +
.../iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java | 8 ++++----
.../thrift-consensus/src/main/thrift/iotconsensus.thrift | 5 +++--
5 files changed, 11 insertions(+), 9 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index de3643bca6c..efa1a0306ef 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -373,7 +373,7 @@ public class IoTConsensusServerImpl {
}
public void receiveSnapshotFragment(
- String snapshotId, String originalFilePath, ByteBuffer fileChunk)
+ String snapshotId, String originalFilePath, ByteBuffer fileChunk, long
fileOffset)
throws ConsensusGroupModifyPeerException {
try {
String targetFilePath = calculateSnapshotPath(snapshotId,
originalFilePath);
@@ -384,7 +384,7 @@ public class IoTConsensusServerImpl {
}
try (FileOutputStream fos = new
FileOutputStream(targetFile.getAbsolutePath(), true);
FileChannel channel = fos.getChannel()) {
- channel.write(fileChunk.slice());
+ channel.write(fileChunk.slice(), fileOffset);
}
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 07fa67ee539..a76b0e97b06 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -270,7 +270,7 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
}
TSStatus responseStatus;
try {
- impl.receiveSnapshotFragment(req.snapshotId, req.filePath,
req.fileChunk);
+ impl.receiveSnapshotFragment(req.snapshotId, req.filePath,
req.fileChunk, req.offset);
responseStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (ConsensusGroupModifyPeerException e) {
responseStatus = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
index a249bad90ff..67fcb220c18 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragment.java
@@ -50,6 +50,7 @@ public class SnapshotFragment {
TSendSnapshotFragmentReq req = new TSendSnapshotFragmentReq();
req.setSnapshotId(snapshotId);
req.setFilePath(filePath);
+ req.setOffset(startOffset);
req.setChunkLength(fragmentSize);
req.setFileChunk(fileChunk);
return req;
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
index ca79b8f3c95..3331829d243 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/snapshot/SnapshotFragmentReader.java
@@ -46,12 +46,12 @@ public class SnapshotFragmentReader {
public boolean hasNext() throws IOException {
buf.clear();
- int actualReadSize = fileChannel.read(buf);
+ int readSize = fileChannel.read(buf);
buf.flip();
- if (actualReadSize > 0) {
+ if (readSize > 0) {
cachedSnapshotFragment =
- new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize,
actualReadSize, buf);
- totalReadSize += actualReadSize;
+ new SnapshotFragment(snapshotId, filePath, fileSize, totalReadSize,
readSize, buf);
+ totalReadSize += readSize;
return true;
}
return false;
diff --git
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index 36dbd2dc35e..d0b4808977e 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -80,8 +80,9 @@ struct TSendSnapshotFragmentReq {
1: required common.TConsensusGroupId consensusGroupId
2: required string snapshotId
3: required string filePath
- 4: required i64 chunkLength
- 5: required binary fileChunk
+ 4: required i64 offset
+ 5: required i64 chunkLength
+ 6: required binary fileChunk
}
struct TWaitSyncLogCompleteReq {