lollipopjin commented on code in PR #7281:
URL: https://github.com/apache/rocketmq/pull/7281#discussion_r1309538119
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java:
##########
@@ -236,23 +211,34 @@ public AppendResult append(ByteBuffer byteBuf, long
timeStamp) {
setFull();
return AppendResult.FILE_FULL;
}
- if (uploadBufferList.size() >
storeConfig.getTieredStoreGroupCommitCount()
+
+ if (bufferList.size() >
storeConfig.getTieredStoreGroupCommitCount()
|| appendPosition - commitPosition >
storeConfig.getTieredStoreGroupCommitSize()) {
commitAsync();
}
- if (uploadBufferList.size() >
storeConfig.getTieredStoreMaxGroupCommitCount()) {
- logger.debug("TieredFileSegment#append: buffer full: file: {},
upload buffer size: {}",
- getPath(), uploadBufferList.size());
+
+ if (bufferList.size() >
storeConfig.getTieredStoreMaxGroupCommitCount()) {
+ logger.debug("File segment append buffer full, file: {},
buffer size: {}, pending bytes: {}",
+ getPath(), bufferList.size(), appendPosition -
commitPosition);
return AppendResult.BUFFER_FULL;
}
- if (timeStamp != Long.MAX_VALUE) {
- maxTimestamp = timeStamp;
+
+ if (timestamp != Long.MAX_VALUE) {
+ maxTimestamp = timestamp;
if (minTimestamp == Long.MAX_VALUE) {
- minTimestamp = timeStamp;
+ minTimestamp = timestamp;
}
}
+
appendPosition += byteBuf.remaining();
- uploadBufferList.add(byteBuf);
+
+ // deep copy buffer
+ ByteBuffer byteBuffer =
ByteBuffer.allocateDirect(byteBuf.remaining());
Review Comment:
How about rename byteBuffer to byteBufferCopied
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java:
##########
@@ -95,18 +96,34 @@ public synchronized void reset() throws IOException {
this.readPosition = markReadPosition;
this.curReadBufferIndex = markCurReadBufferIndex;
this.readPosInCurBuffer = markReadPosInCurBuffer;
- if (this.curReadBufferIndex < uploadBufferList.size()) {
- this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+ if (this.curReadBufferIndex < bufferList.size()) {
+ this.curBuffer = bufferList.get(curReadBufferIndex);
}
}
+ public synchronized void rewind() {
+ this.readPosition = 0;
+ this.curReadBufferIndex = 0;
Review Comment:
Reuse the file segment stream after rewind?so here need to correct the
position?
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java:
##########
@@ -95,18 +96,34 @@ public synchronized void reset() throws IOException {
this.readPosition = markReadPosition;
this.curReadBufferIndex = markCurReadBufferIndex;
this.readPosInCurBuffer = markReadPosInCurBuffer;
- if (this.curReadBufferIndex < uploadBufferList.size()) {
- this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+ if (this.curReadBufferIndex < bufferList.size()) {
+ this.curBuffer = bufferList.get(curReadBufferIndex);
}
}
+ public synchronized void rewind() {
+ this.readPosition = 0;
+ this.curReadBufferIndex = 0;
Review Comment:
Reuse the file segment stream after rewind?so here need to correct the
position?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]