ShadowySpirits commented on code in PR #6625:
URL: https://github.com/apache/rocketmq/pull/6625#discussion_r1173253571
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java:
##########
@@ -429,15 +431,29 @@ public static class TieredFileSegmentInputStream extends
InputStream {
private final FileSegmentType fileType;
private final List<ByteBuffer> uploadBufferList;
Review Comment:
This stream is more complicated now, should we separate it into a new file?
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java:
##########
@@ -429,15 +431,29 @@ public static class TieredFileSegmentInputStream extends
InputStream {
private final FileSegmentType fileType;
private final List<ByteBuffer> uploadBufferList;
- private int bufferReadIndex = 0;
- private int readOffset = 0;
- // only used in commitLog
+ /**
+ * curReadBufferIndex is the index of the buffer in uploadBufferList
which is being read
+ */
+ private int curReadBufferIndex = 0;
+ /**
+ * readPosInCurBuffer is the position in the buffer which is being read
+ */
+ private int readPosInCurBuffer = 0;
+ /**
+ * commitLogOffset is the real physical offset of the commitLog buffer
which is being read<br>
+ * <i>(only used for commitLog)</i>
+ */
private long commitLogOffset;
+ /**
+ * readPos is the now position in the stream
+ */
+ private int readPos = 0;
Review Comment:
readPos -> readPosition
##########
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java:
##########
@@ -463,69 +479,114 @@ public ByteBuffer getCodaBuffer() {
return codaBuffer;
}
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public synchronized void mark(int ignore) {
+ markPosition = new MarkPosition(curReadBufferIndex,
readPosInCurBuffer, commitLogOffset, readPos);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ if (markPosition == null) {
+ throw new IOException("mark is not set");
+ }
+ curReadBufferIndex = markPosition.curReadBufferIndex;
+ readPosInCurBuffer = markPosition.readPosInCurBuffer;
+ commitLogOffset = markPosition.commitLogOffset;
+ readPos = markPosition.readPos;
+ if (curReadBufferIndex < uploadBufferList.size()) {
+ curBuffer = uploadBufferList.get(curReadBufferIndex);
+ }
+ commitLogOffsetBuffer.putLong(0, commitLogOffset);
+ }
+
@Override
public int available() {
- return contentLength - readBytes;
+ return contentLength - readPos;
}
@Override
public int read() {
Review Comment:
Could you add some comments to explain this logic?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]