This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1a9a8b1969 [ISSUE #6624]Support mark() & reset() for 
TieredFileSegmentInputStream (#6625)
1a9a8b1969 is described below

commit 1a9a8b1969d8341654a136aab4c703c95024e53c
Author: TheR1sing3un <[email protected]>
AuthorDate: Wed May 31 14:58:46 2023 +0800

    [ISSUE #6624]Support mark() & reset() for TieredFileSegmentInputStream 
(#6625)
    
    * test(tieredstorage): add UT to verify TieredFileSegmentInputStream
    
    1. add UT to verify TieredFileSegmentInputStream
    
    * refactor(tieredstorage): refactor TieredFileSegmentInputStream
    
    1. refactor TieredFileSegmentInputStream
    
    * feat(tieredstorage): support mark&reset TieredFileSegmentInputStream
    
    1. support mark&reset TieredFileSegmentInputStream
    
    * style(tieredstorage): remove commended code in 
TieredFileSegmentInputStreamTest
    
    1. remove commended code in TieredFileSegmentInputStreamTest
    
    * refactor(tieredstorage): better code placement
    
    1. better code placement
    
    * refactor(tieredstorage): refactor TieredFileSegmentInputStream for better 
understandability
    
    1. refactor TieredFileSegmentInputStream for better understandability
    
    * refactor(tieredstorage): refactor some code in 
TieredFileSegmentInputStream
    
    1. refactor some code in TieredFileSegmentInputStream
    
    * refactor(tieredstorage): refactor TieredFileSegmentInputStream
    
    1. refactor TieredFileSegmentInputStream
    2. add a
    TieredFileSegmentInputStream.Factory to build instance
    
    * refactor(tieredstorage): refactor TieredFileSegmentInputStream related 
directory structure
    
    1. refactor TieredFileSegmentInputStream related directory structure
    
    * refactor(tieredstorage): delete `commitLogOffsetBuffer` in 
TieredCommitLogInputStream
    
    1. delete `commitLogOffsetBuffer` in TieredCommitLogInputStream
    
    * perf(tieredstorage): benchmark TieredFileSegmentInputStream pef
    
    1. benchmark TieredFileSegmentInputStream pef
    
    Closes https://github.com/apache/rocketmq/issues/6624
    
    * feat(tieredstorage): optimized `read(byte[], int, int)` for 
TieredFIleSegmentInputStream
    
    1. optimized `read(byte[], int, int)` for TieredFIleSegmentInputStream
    
    Closes https://github.com/apache/rocketmq/issues/6624
    
    * fix(tieredstorage): fix a dead cycle in TieredFileSegmentInputStream
    
    1. fix a dead cycle in TieredFileSegmentInputStream.java
    2. remove unused JMH
    related dependency
    
    Closes https://github.com/apache/rocketmq/issues/6624
---
 .../tieredstore/provider/TieredFileSegment.java    | 109 +-------
 .../tieredstore/provider/TieredStoreProvider.java  |   6 +-
 .../inputstream/TieredCommitLogInputStream.java    | 183 +++++++++++++
 .../inputstream/TieredFileSegmentInputStream.java  | 172 +++++++++++++
 .../TieredFileSegmentInputStreamFactory.java       |  45 ++++
 .../provider/posix/PosixFileSegment.java           |   3 +-
 .../rocketmq/tieredstore/TieredDispatcherTest.java |  12 +-
 .../tieredstore/TieredMessageFetcherTest.java      |  16 +-
 .../container/TieredMessageQueueContainerTest.java |  12 +-
 .../tieredstore/mock/MemoryFileSegment.java        |   3 +-
 .../mock/MemoryFileSegmentWithoutCheck.java        |   3 +-
 .../provider/TieredFileSegmentInputStreamTest.java | 283 +++++++++++++++++++++
 .../provider/TieredFileSegmentTest.java            |  12 +-
 .../tieredstore/util/MessageBufferUtilTest.java    |  31 ++-
 14 files changed, 745 insertions(+), 145 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 2712e84c02..274f03e799 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.tieredstore.provider;
 
 import com.google.common.base.Stopwatch;
-import java.io.InputStream;
+
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -35,6 +35,8 @@ import 
org.apache.rocketmq.tieredstore.container.TieredConsumeQueue;
 import org.apache.rocketmq.tieredstore.container.TieredIndexFile;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
+import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
@@ -337,7 +339,7 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
         if (bufferSize == 0) {
             return CompletableFuture.completedFuture(true);
         }
-        TieredFileSegmentInputStream inputStream = new 
TieredFileSegmentInputStream(fileType, baseOffset + commitPosition, bufferList, 
codaBuffer, bufferSize);
+        TieredFileSegmentInputStream inputStream = 
TieredFileSegmentInputStreamFactory.build(fileType, baseOffset + 
commitPosition, bufferList, codaBuffer, bufferSize);
         int finalBufferSize = bufferSize;
         try {
             inflightCommitRequest = commit0(inputStream, commitPosition, 
bufferSize, fileType != FileSegmentType.INDEX)
@@ -425,107 +427,4 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
         }
     }
 
-    public static class TieredFileSegmentInputStream extends InputStream {
-
-        private final FileSegmentType fileType;
-        private final List<ByteBuffer> uploadBufferList;
-        private int bufferReadIndex = 0;
-        private int readOffset = 0;
-        // only used in commitLog
-        private long commitLogOffset;
-        private final ByteBuffer commitLogOffsetBuffer = 
ByteBuffer.allocate(8);
-        private final ByteBuffer codaBuffer;
-        private ByteBuffer curBuffer;
-        private final int contentLength;
-        private int readBytes = 0;
-
-        public TieredFileSegmentInputStream(FileSegmentType fileType, long 
startOffset,
-            List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int 
contentLength) {
-            this.fileType = fileType;
-            this.commitLogOffset = startOffset;
-            this.commitLogOffsetBuffer.putLong(0, startOffset);
-            this.uploadBufferList = uploadBufferList;
-            this.codaBuffer = codaBuffer;
-            this.contentLength = contentLength;
-            if (uploadBufferList.size() > 0) {
-                this.curBuffer = uploadBufferList.get(0);
-            }
-            if (fileType == FileSegmentType.INDEX && uploadBufferList.size() 
!= 1) {
-                logger.error("[Bug]TieredFileSegmentInputStream: index file 
must have only one buffer");
-            }
-        }
-
-        public List<ByteBuffer> getUploadBufferList() {
-            return uploadBufferList;
-        }
-
-        public ByteBuffer getCodaBuffer() {
-            return codaBuffer;
-        }
-
-        @Override
-        public int available() {
-            return contentLength - readBytes;
-        }
-
-        @Override
-        public int read() {
-            if (bufferReadIndex >= uploadBufferList.size()) {
-                return readCoda();
-            }
-
-            int res;
-            switch (fileType) {
-                case COMMIT_LOG:
-                    if (readOffset >= curBuffer.remaining()) {
-                        bufferReadIndex++;
-                        if (bufferReadIndex >= uploadBufferList.size()) {
-                            return readCoda();
-                        }
-                        curBuffer = uploadBufferList.get(bufferReadIndex);
-                        commitLogOffset += readOffset;
-                        commitLogOffsetBuffer.putLong(0, commitLogOffset);
-                        readOffset = 0;
-                    }
-                    if (readOffset >= 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION && readOffset < 
MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
-                        res = commitLogOffsetBuffer.get(readOffset - 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION) & 0xff;
-                        readOffset++;
-                    } else {
-                        res = curBuffer.get(readOffset++) & 0xff;
-                    }
-                    break;
-                case CONSUME_QUEUE:
-                    if (!curBuffer.hasRemaining()) {
-                        bufferReadIndex++;
-                        if (bufferReadIndex >= uploadBufferList.size()) {
-                            return -1;
-                        }
-                        curBuffer = uploadBufferList.get(bufferReadIndex);
-                    }
-                    res = curBuffer.get() & 0xff;
-                    break;
-                case INDEX:
-                    if (!curBuffer.hasRemaining()) {
-                        return -1;
-                    }
-                    res = curBuffer.get() & 0xff;
-                    break;
-                default:
-                    throw new IllegalStateException("unknown file type");
-            }
-            readBytes++;
-            return res;
-        }
-
-        private int readCoda() {
-            if (fileType != FileSegmentType.COMMIT_LOG || codaBuffer == null) {
-                return -1;
-            }
-            if (!codaBuffer.hasRemaining()) {
-                return -1;
-            }
-            readBytes++;
-            return codaBuffer.get() & 0xff;
-        }
-    }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index 081143ce8f..f043e07f39 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.tieredstore.provider;
 
+import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
@@ -69,6 +71,6 @@ public interface TieredStoreProvider {
      * @param append try to append or create a new file
      * @return put result, <code>true</code> if data successfully write; 
<code>false</code> otherwise
      */
-    CompletableFuture<Boolean> 
commit0(TieredFileSegment.TieredFileSegmentInputStream inputStream,
-        long position, int length, boolean append);
+    CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream 
inputStream,
+                                       long position, int length, boolean 
append);
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
new file mode 100644
index 0000000000..f5be3812b6
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
+
+    /**
+     * commitLogOffset is the real physical offset of the commitLog buffer 
which is being read
+     */
+    private long commitLogOffset;
+
+    private final ByteBuffer codaBuffer;
+    
+    private long markCommitLogOffset = -1;
+
+    public TieredCommitLogInputStream(TieredFileSegment.FileSegmentType 
fileType, long startOffset,
+                                      List<ByteBuffer> uploadBufferList, 
ByteBuffer codaBuffer, int contentLength) {
+        super(fileType, uploadBufferList, contentLength);
+        this.commitLogOffset = startOffset;
+        this.codaBuffer = codaBuffer;
+    }
+
+    @Override
+    public synchronized void mark(int ignore) {
+        super.mark(ignore);
+        this.markCommitLogOffset = commitLogOffset;
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        super.reset();
+        this.commitLogOffset = markCommitLogOffset;
+    }
+
+    @Override
+    public ByteBuffer getCodaBuffer() {
+        return this.codaBuffer;
+    }
+
+    @Override
+    public int read() {
+        if (available() <= 0) {
+            return -1;
+        }
+        readPosition++;
+        if (curReadBufferIndex >= uploadBufferList.size()) {
+            return readCoda();
+        }
+        int res;
+        if (readPosInCurBuffer >= curBuffer.remaining()) {
+            curReadBufferIndex++;
+            if (curReadBufferIndex >= uploadBufferList.size()) {
+                readPosInCurBuffer = 0;
+                return readCoda();
+            }
+            curBuffer = uploadBufferList.get(curReadBufferIndex);
+            commitLogOffset += readPosInCurBuffer;
+            readPosInCurBuffer = 0;
+        }
+        if (readPosInCurBuffer >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION 
&& readPosInCurBuffer < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+            res = (int) ((commitLogOffset >> (8 * 
(MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - readPosInCurBuffer - 1))) & 0xff);
+            readPosInCurBuffer++;
+        } else {
+            res = curBuffer.get(readPosInCurBuffer++) & 0xff;
+        }
+        return res;
+    }
+
+    private int readCoda() {
+        if (codaBuffer == null || readPosInCurBuffer >= 
codaBuffer.remaining()) {
+            return -1;
+        }
+        return codaBuffer.get(readPosInCurBuffer++) & 0xff;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off < 0 || len < 0 || len > 
b.length - off");
+        }
+        if (readPosition >= contentLength) {
+            return -1;
+        }
+
+        int available = available();
+        if (len > available) {
+            len = available;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        int needRead = len;
+        int pos = readPosition;
+        int bufIndex = curReadBufferIndex;
+        int posInCurBuffer = readPosInCurBuffer;
+        long curCommitLogOffset = commitLogOffset;
+        ByteBuffer curBuf = curBuffer;
+        while (needRead > 0 && bufIndex <= uploadBufferList.size()) {
+            int readLen, remaining, realReadLen = 0;
+            if (bufIndex == uploadBufferList.size()) {
+                // read from coda buffer
+                remaining = codaBuffer.remaining() - posInCurBuffer;
+                readLen = remaining < needRead ? remaining : needRead;
+                codaBuffer.position(posInCurBuffer);
+                codaBuffer.get(b, off, readLen);
+                codaBuffer.position(0);
+                // update flags
+                off += readLen;
+                needRead -= readLen;
+                pos += readLen;
+                posInCurBuffer += readLen;
+                continue;
+            }
+            remaining = curBuf.remaining() - posInCurBuffer;
+            readLen = remaining < needRead ? remaining : needRead;
+            curBuf = uploadBufferList.get(bufIndex);
+            if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) {
+                realReadLen = MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 
posInCurBuffer < readLen ?
+                        MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 
posInCurBuffer : readLen;
+                // read from commitLog buffer
+                curBuf.position(posInCurBuffer);
+                curBuf.get(b, off, realReadLen);
+                curBuf.position(0);
+            } else if (posInCurBuffer < 
MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+                realReadLen = MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - 
posInCurBuffer < readLen ?
+                        MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - 
posInCurBuffer : readLen;
+                // read from converted PHYSICAL_OFFSET_POSITION
+                byte[] physicalOffsetBytes = new byte[realReadLen];
+                for (int i = 0; i < realReadLen; i++) {
+                    physicalOffsetBytes[i] = (byte) ((curCommitLogOffset >> (8 
* (MessageBufferUtil.SYS_FLAG_OFFSET_POSITION - posInCurBuffer - i - 1))) & 
0xff);
+                }
+                System.arraycopy(physicalOffsetBytes, 0, b, off, realReadLen);
+            } else if (posInCurBuffer >= 
MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+                realReadLen = readLen;
+                // read from commitLog buffer
+                curBuf.position(posInCurBuffer);
+                curBuf.get(b, off, readLen);
+                curBuf.position(0);
+            }
+            // update flags
+            off += realReadLen;
+            needRead -= realReadLen;
+            pos += realReadLen;
+            posInCurBuffer += realReadLen;
+            if (posInCurBuffer == curBuffer.remaining()) {
+                // read from next buf
+                bufIndex++;
+                curCommitLogOffset += posInCurBuffer;
+                posInCurBuffer = 0;
+            }
+        }
+        readPosition = pos;
+        curReadBufferIndex = bufIndex;
+        readPosInCurBuffer = posInCurBuffer;
+        commitLogOffset = curCommitLogOffset;
+        curBuffer = curBuf;
+        return len;
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
new file mode 100644
index 0000000000..d5118c1464
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredFileSegmentInputStream extends InputStream {
+
+    private final TieredFileSegment.FileSegmentType fileType;
+    protected final List<ByteBuffer> uploadBufferList;
+    protected final int contentLength;
+
+    /**
+     * readPosition is the now position in the stream
+     */
+    protected int readPosition = 0;
+
+    /**
+     * curReadBufferIndex is the index of the buffer in uploadBufferList which 
is being read
+     */
+    protected int curReadBufferIndex = 0;
+    /**
+     * readPosInCurBuffer is the position in the buffer which is being read
+     */
+    protected int readPosInCurBuffer = 0;
+
+    /**
+     * curBuffer is the buffer which is being read, it is the same as 
uploadBufferList.get(curReadBufferIndex)
+     */
+    protected ByteBuffer curBuffer;
+
+    private int markReadPosition = -1;
+
+    private int markCurReadBufferIndex = -1;
+
+    private int markReadPosInCurBuffer = -1;
+
+    public TieredFileSegmentInputStream(TieredFileSegment.FileSegmentType 
fileType, List<ByteBuffer> uploadBufferList,
+        int contentLength) {
+        this.fileType = fileType;
+        this.contentLength = contentLength;
+        this.uploadBufferList = uploadBufferList;
+        if (uploadBufferList.size() > 0) {
+            this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+        }
+    }
+
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+
+    @Override
+    public synchronized void mark(int ignore) {
+        this.markReadPosition = readPosition;
+        this.markCurReadBufferIndex = curReadBufferIndex;
+        this.markReadPosInCurBuffer = readPosInCurBuffer;
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        if (this.markReadPosition == -1) {
+            throw new IOException("mark not set");
+        }
+        this.readPosition = markReadPosition;
+        this.curReadBufferIndex = markCurReadBufferIndex;
+        this.readPosInCurBuffer = markReadPosInCurBuffer;
+        if (this.curReadBufferIndex < uploadBufferList.size()) {
+            this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+        }
+    }
+
+    @Override
+    public int available() {
+        return contentLength - readPosition;
+    }
+
+    public List<ByteBuffer> getUploadBufferList() {
+        return uploadBufferList;
+    }
+
+    public ByteBuffer getCodaBuffer() {
+        return null;
+    }
+
+    @Override
+    public int read() {
+        if (available() <= 0) {
+            return -1;
+        }
+        readPosition++;
+        if (readPosInCurBuffer >= curBuffer.remaining()) {
+            curReadBufferIndex++;
+            if (curReadBufferIndex >= uploadBufferList.size()) {
+                return -1;
+            }
+            curBuffer = uploadBufferList.get(curReadBufferIndex);
+            readPosInCurBuffer = 0;
+        }
+        return curBuffer.get(readPosInCurBuffer++) & 0xff;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) {
+        if (b == null) {
+            throw new NullPointerException();
+        } else if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException("off < 0 || len < 0 || len > 
b.length - off");
+        }
+        if (readPosition >= contentLength) {
+            return -1;
+        }
+
+        int available = available();
+        if (len > available) {
+            len = available;
+        }
+        if (len <= 0) {
+            return 0;
+        }
+        int needRead = len;
+        int pos = readPosition;
+        int bufIndex = curReadBufferIndex;
+        int posInCurBuffer = readPosInCurBuffer;
+        ByteBuffer curBuf = curBuffer;
+        while (needRead > 0 && bufIndex < uploadBufferList.size()) {
+            curBuf = uploadBufferList.get(bufIndex);
+            int remaining = curBuf.remaining() - posInCurBuffer;
+            int readLen = remaining < needRead ? remaining : needRead;
+            // read from curBuf
+            curBuf.position(posInCurBuffer);
+            curBuf.get(b, off, readLen);
+            curBuf.position(0);
+            // update flags
+            off += readLen;
+            needRead -= readLen;
+            pos += readLen;
+            posInCurBuffer += readLen;
+            if (posInCurBuffer == curBuffer.remaining()) {
+                // read from next buf
+                bufIndex++;
+                posInCurBuffer = 0;
+            }
+        }
+        readPosition = pos;
+        curReadBufferIndex = bufIndex;
+        readPosInCurBuffer = posInCurBuffer;
+        curBuffer = curBuf;
+        return len;
+    }
+}
+
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
new file mode 100644
index 0000000000..e6f7749eea
--- /dev/null
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider.inputstream;
+
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TieredFileSegmentInputStreamFactory {
+
+    public static TieredFileSegmentInputStream 
build(TieredFileSegment.FileSegmentType fileType,
+                                                     long startOffset,
+                                                     List<ByteBuffer> 
uploadBufferList,
+                                                     ByteBuffer codaBuffer,
+                                                     int contentLength) {
+        if (fileType == TieredFileSegment.FileSegmentType.COMMIT_LOG) {
+            return new TieredCommitLogInputStream(fileType, startOffset, 
uploadBufferList, codaBuffer, contentLength);
+        } else if (fileType == 
TieredFileSegment.FileSegmentType.CONSUME_QUEUE) {
+            return new TieredFileSegmentInputStream(fileType, 
uploadBufferList, contentLength);
+        } else if (fileType == TieredFileSegment.FileSegmentType.INDEX) {
+            if (uploadBufferList.size() != 1) {
+                throw new IllegalArgumentException("uploadBufferList size in 
INDEX type input stream must be 1");
+            }
+            return new TieredFileSegmentInputStream(fileType, 
uploadBufferList, contentLength);
+        } else {
+            throw new IllegalArgumentException("fileType is not supported");
+        }
+    }
+}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 9d9620faff..7032799eb2 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -35,6 +35,7 @@ import 
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
 import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
 import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
@@ -182,7 +183,7 @@ public class PosixFileSegment extends TieredFileSegment {
 
     @Override
     public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream 
inputStream, long position, int length,
-        boolean append) {
+                                              boolean append) {
         Stopwatch stopwatch = Stopwatch.createStarted();
         AttributesBuilder attributesBuilder = newAttributesBuilder()
             .put(LABEL_OPERATION, OPERATION_POSIX_WRITE);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
index 860b1723eb..b5c4e9d06c 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredDispatcherTest.java
@@ -84,7 +84,7 @@ public class TieredDispatcherTest {
         DefaultMessageStore defaultMessageStore = 
Mockito.mock(DefaultMessageStore.class);
         TieredDispatcher dispatcher = new 
TieredDispatcher(defaultMessageStore, storeConfig);
 
-        SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, 
MessageBufferUtilTest.buildMessageBuffer(), MessageBufferUtilTest.MSG_LEN, 
null);
+        SelectMappedBufferResult mockResult = new SelectMappedBufferResult(0, 
MessageBufferUtilTest.buildMockedMessageBuffer(), 
MessageBufferUtilTest.MSG_LEN, null);
         Mockito.when(defaultMessageStore.selectOneMessageByOffset(7, 
MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
         DispatchRequest request = new DispatchRequest(mq.getTopic(), 
mq.getQueueId(), 6, 7, MessageBufferUtilTest.MSG_LEN, 1);
         dispatcher.dispatch(request);
@@ -98,13 +98,13 @@ public class TieredDispatcherTest {
         dispatcher.buildCQAndIndexFile();
         Assert.assertEquals(7, container.getConsumeQueueMaxOffset());
 
-        ByteBuffer buffer1 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 7);
         container.appendCommitLog(buffer1);
-        ByteBuffer buffer2 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 8);
         container.appendCommitLog(buffer2);
-        ByteBuffer buffer3 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer3 = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 9);
         container.appendCommitLog(buffer3);
         container.commitCommitLog();
@@ -152,10 +152,10 @@ public class TieredDispatcherTest {
 
         Mockito.when(((ConsumeQueue) 
defaultStore.getConsumeQueue(mq.getTopic(), 
mq.getQueueId())).getIndexBuffer(7)).thenReturn(mockResult);
 
-        mockResult = new SelectMappedBufferResult(0, 
MessageBufferUtilTest.buildMessageBuffer(), MessageBufferUtilTest.MSG_LEN, 
null);
+        mockResult = new SelectMappedBufferResult(0, 
MessageBufferUtilTest.buildMockedMessageBuffer(), 
MessageBufferUtilTest.MSG_LEN, null);
         Mockito.when(defaultStore.selectOneMessageByOffset(7, 
MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
 
-        ByteBuffer msg = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 7);
         mockResult = new SelectMappedBufferResult(0, msg, 
MessageBufferUtilTest.MSG_LEN, null);
         Mockito.when(defaultStore.selectOneMessageByOffset(8, 
MessageBufferUtilTest.MSG_LEN)).thenReturn(mockResult);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index d0a3e3f850..ddcc9fa6c1 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -89,13 +89,13 @@ public class TieredMessageFetcherTest {
         getMessageResult = fetcher.getMessageAsync("group", mq.getTopic(), 
mq.getQueueId(), 0, 32, null).join();
         Assert.assertEquals(GetMessageStatus.NO_MESSAGE_IN_QUEUE, 
getMessageResult.getStatus());
 
-        ByteBuffer msg1 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
         msg1.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 0);
         AppendResult result = container.appendCommitLog(msg1);
         Assert.assertEquals(AppendResult.SUCCESS, result);
 
-        ByteBuffer msg2 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg2 = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
         msg2.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 
MessageBufferUtilTest.MSG_LEN);
         container.appendCommitLog(msg2);
@@ -199,7 +199,7 @@ public class TieredMessageFetcherTest {
         TieredMessageQueueContainer container = 
TieredContainerManager.getInstance(storeConfig).getOrCreateMQContainer(mq);
         container.initOffset(0);
 
-        ByteBuffer msg1 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg1 = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
         msg1.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 0);
         long currentTimeMillis1 = System.currentTimeMillis();
@@ -207,7 +207,7 @@ public class TieredMessageFetcherTest {
         AppendResult result = container.appendCommitLog(msg1);
         Assert.assertEquals(AppendResult.SUCCESS, result);
 
-        ByteBuffer msg2 = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer msg2 = MessageBufferUtilTest.buildMockedMessageBuffer();
         msg2.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
         msg2.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 
MessageBufferUtilTest.MSG_LEN);
         long currentTimeMillis2 = System.currentTimeMillis();
@@ -244,7 +244,7 @@ public class TieredMessageFetcherTest {
 
 
         long timestamp = System.currentTimeMillis();
-        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 50);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp);
         container.initOffset(50);
@@ -266,13 +266,13 @@ public class TieredMessageFetcherTest {
         Assert.assertEquals(0, fetcher.queryMessageAsync(mq.getTopic(), "key", 
32, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
 
         container.initOffset(0);
-        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 0);
         container.appendCommitLog(buffer);
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 1);
         container.appendCommitLog(buffer);
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 2);
         container.appendCommitLog(buffer);
 
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
index 11afa362b2..ccfe18bd3f 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredMessageQueueContainerTest.java
@@ -68,7 +68,7 @@ public class TieredMessageQueueContainerTest {
     @Test
     public void testAppendCommitLog() throws ClassNotFoundException, 
NoSuchMethodException {
         TieredMessageQueueContainer container = new 
TieredMessageQueueContainer(mq, storeConfig);
-        ByteBuffer message = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer message = MessageBufferUtilTest.buildMockedMessageBuffer();
         AppendResult result = container.appendCommitLog(message);
         Assert.assertEquals(AppendResult.OFFSET_INCORRECT, result);
 
@@ -143,27 +143,27 @@ public class TieredMessageQueueContainerTest {
         TieredMessageQueueContainer container = new 
TieredMessageQueueContainer(mq, storeConfig);
         container.initOffset(50);
         long timestamp1 = System.currentTimeMillis();
-        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 50);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp1);
         container.appendCommitLog(buffer, true);
 
         long timestamp2 = timestamp1 + 100;
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 51);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
         container.appendCommitLog(buffer, true);
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 52);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
         container.appendCommitLog(buffer, true);
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 53);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp2);
         container.appendCommitLog(buffer, true);
 
         long timestamp3 = timestamp2 + 100;
-        buffer = MessageBufferUtilTest.buildMessageBuffer();
+        buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 54);
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, timestamp3);
         container.appendCommitLog(buffer, true);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
index 254b151e64..3c47d1cb8d 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegment.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
 import org.junit.Assert;
 
 public class MemoryFileSegment extends TieredFileSegment {
@@ -81,7 +82,7 @@ public class MemoryFileSegment extends TieredFileSegment {
 
     @Override
     public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream 
inputStream, long position, int length,
-        boolean append) {
+                                              boolean append) {
         try {
             if (blocker != null && !blocker.get()) {
                 throw new IllegalStateException();
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
index f7e5488da8..741a38c81c 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/mock/MemoryFileSegmentWithoutCheck.java
@@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
 import org.junit.Assert;
 
 public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment {
@@ -37,7 +38,7 @@ public class MemoryFileSegmentWithoutCheck extends 
MemoryFileSegment {
 
     @Override
     public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream 
inputStream, long position, int length,
-        boolean append) {
+                                              boolean append) {
         try {
             if (blocker != null && !blocker.get()) {
                 throw new IllegalStateException();
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
new file mode 100644
index 0000000000..3d9fdba9bd
--- /dev/null
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tieredstore.provider;
+
+import com.google.common.base.Supplier;
+import org.apache.rocketmq.tieredstore.container.TieredCommitLog;
+import org.apache.rocketmq.tieredstore.container.TieredConsumeQueue;
+import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
+import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+public class TieredFileSegmentInputStreamTest {
+
+    private final static long COMMIT_LOG_START_OFFSET = 13131313;
+
+    private final static int MSG_LEN = MessageBufferUtilTest.MSG_LEN;
+
+    private final static int MSG_NUM = 10;
+
+    private final static int RESET_TIMES = 10;
+
+    private final static Random RANDOM = new Random();
+
+    @Test
+    public void testCommitLogTypeInputStream() {
+        List<ByteBuffer> uploadBufferList = new ArrayList<>();
+        int bufferSize = 0;
+        for (int i = 0; i < MSG_NUM; i++) {
+            ByteBuffer byteBuffer = 
MessageBufferUtilTest.buildMockedMessageBuffer();
+            uploadBufferList.add(byteBuffer);
+            bufferSize += byteBuffer.remaining();
+        }
+
+        // build expected byte buffer for verifying the 
TieredFileSegmentInputStream
+        ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+        for (ByteBuffer byteBuffer : uploadBufferList) {
+            expectedByteBuffer.put(byteBuffer);
+            byteBuffer.rewind();
+        }
+        // set real physical offset
+        for (int i = 0; i < MSG_NUM; i++) {
+            long physicalOffset = COMMIT_LOG_START_OFFSET + i * MSG_LEN;
+            int position = i * MSG_LEN + 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION;
+            expectedByteBuffer.putLong(position, physicalOffset);
+        }
+
+        int finalBufferSize = bufferSize;
+        int[] batchReadSizeTestSet = {
+            MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 
1
+        };
+        verifyReadAndReset(expectedByteBuffer, () -> 
TieredFileSegmentInputStreamFactory.build(
+            TieredFileSegment.FileSegmentType.COMMIT_LOG, 
COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), 
finalBufferSize, batchReadSizeTestSet);
+
+    }
+
+    @Test
+    public void testCommitLogTypeInputStreamWithCoda() {
+        List<ByteBuffer> uploadBufferList = new ArrayList<>();
+        int bufferSize = 0;
+        for (int i = 0; i < MSG_NUM; i++) {
+            ByteBuffer byteBuffer = 
MessageBufferUtilTest.buildMockedMessageBuffer();
+            uploadBufferList.add(byteBuffer);
+            bufferSize += byteBuffer.remaining();
+        }
+
+        ByteBuffer codaBuffer = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
+        codaBuffer.putInt(TieredCommitLog.CODA_SIZE);
+        codaBuffer.putInt(TieredCommitLog.BLANK_MAGIC_CODE);
+        long timeMillis = System.currentTimeMillis();
+        codaBuffer.putLong(timeMillis);
+        codaBuffer.flip();
+        int codaBufferSize = codaBuffer.remaining();
+        bufferSize += codaBufferSize;
+
+        // build expected byte buffer for verifying the 
TieredFileSegmentInputStream
+        ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+        for (ByteBuffer byteBuffer : uploadBufferList) {
+            expectedByteBuffer.put(byteBuffer);
+            byteBuffer.rewind();
+        }
+        expectedByteBuffer.put(codaBuffer);
+        codaBuffer.rewind();
+        // set real physical offset
+        for (int i = 0; i < MSG_NUM; i++) {
+            long physicalOffset = COMMIT_LOG_START_OFFSET + i * MSG_LEN;
+            int position = i * MSG_LEN + 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION;
+            expectedByteBuffer.putLong(position, physicalOffset);
+        }
+
+        int finalBufferSize = bufferSize;
+        int[] batchReadSizeTestSet = {
+            MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1,
+            MSG_LEN - 1, MSG_LEN, MSG_LEN + 1,
+            bufferSize - 1, bufferSize, bufferSize + 1
+        };
+        verifyReadAndReset(expectedByteBuffer, () -> 
TieredFileSegmentInputStreamFactory.build(
+            TieredFileSegment.FileSegmentType.COMMIT_LOG, 
COMMIT_LOG_START_OFFSET, uploadBufferList, codaBuffer, finalBufferSize), 
finalBufferSize, batchReadSizeTestSet);
+
+    }
+
+    @Test
+    public void testConsumeQueueTypeInputStream() {
+        List<ByteBuffer> uploadBufferList = new ArrayList<>();
+        int bufferSize = 0;
+        for (int i = 0; i < MSG_NUM; i++) {
+            ByteBuffer byteBuffer = 
MessageBufferUtilTest.buildMockedConsumeQueueBuffer();
+            uploadBufferList.add(byteBuffer);
+            bufferSize += byteBuffer.remaining();
+        }
+
+        // build expected byte buffer for verifying the 
TieredFileSegmentInputStream
+        ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
+        for (ByteBuffer byteBuffer : uploadBufferList) {
+            expectedByteBuffer.put(byteBuffer);
+            byteBuffer.rewind();
+        }
+
+        int finalBufferSize = bufferSize;
+        int[] batchReadSizeTestSet = 
{TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1};
+        verifyReadAndReset(expectedByteBuffer, () -> 
TieredFileSegmentInputStreamFactory.build(
+            TieredFileSegment.FileSegmentType.CONSUME_QUEUE, 
COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), bufferSize, 
batchReadSizeTestSet);
+
+    }
+
+    @Test
+    public void testIndexTypeInputStream() {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(24);
+        byteBuffer.putLong(1);
+        byteBuffer.putLong(2);
+        byteBuffer.putLong(3);
+        byteBuffer.flip();
+        List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer);
+
+        // build expected byte buffer for verifying the 
TieredFileSegmentInputStream
+        ByteBuffer expectedByteBuffer = byteBuffer.slice();
+
+        verifyReadAndReset(expectedByteBuffer, () -> 
TieredFileSegmentInputStreamFactory.build(
+            TieredFileSegment.FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, 
uploadBufferList, null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 
24, 25});
+    }
+
+    private void verifyReadAndReset(ByteBuffer expectedByteBuffer, 
Supplier<TieredFileSegmentInputStream> constructor,
+        int bufferSize, int[] readBatchSizeTestSet) {
+        TieredFileSegmentInputStream inputStream = constructor.get();
+
+        // verify
+        verifyInputStream(inputStream, expectedByteBuffer);
+
+        // verify reset with method InputStream#mark() hasn't been called
+        try {
+            inputStream.reset();
+            Assert.fail("Should throw IOException");
+        } catch (IOException e) {
+            Assert.assertTrue(e instanceof IOException);
+        }
+
+        // verify reset with method InputStream#mark() has been called
+        int resetPosition = RANDOM.nextInt(bufferSize);
+        int expectedResetPosition = 0;
+        inputStream = constructor.get();
+        // verify and mark with resetPosition, use read() to read a byte each 
time
+        for (int i = 0; i < RESET_TIMES; i++) {
+            verifyInputStream(inputStream, expectedByteBuffer, 
expectedResetPosition, resetPosition);
+
+            try {
+                inputStream.reset();
+            } catch (IOException e) {
+                Assert.fail("Should not throw IOException");
+            }
+
+            expectedResetPosition = resetPosition;
+            resetPosition += RANDOM.nextInt(bufferSize - resetPosition);
+        }
+        for (int i = 0; i < readBatchSizeTestSet.length; i++) {
+            inputStream = constructor.get();
+            int readBatchSize = readBatchSizeTestSet[i];
+            expectedResetPosition = 0;
+            resetPosition = readBatchSize * RANDOM.nextInt(1 + bufferSize / 
readBatchSize);
+            // verify and mark with resetPosition, use read(byte[]) to read a 
byte array each time
+            for (int j = 0; j < RESET_TIMES; j++) {
+                verifyInputStreamViaBatchRead(inputStream, expectedByteBuffer, 
expectedResetPosition, resetPosition, readBatchSize);
+                try {
+                    inputStream.reset();
+                } catch (IOException e) {
+                    Assert.fail("Should not throw IOException");
+                }
+
+                expectedResetPosition = resetPosition;
+                resetPosition += readBatchSize * RANDOM.nextInt(1 + 
(bufferSize - resetPosition) / readBatchSize);
+            }
+        }
+    }
+
+    private void verifyInputStream(InputStream inputStream, ByteBuffer 
expectedBuffer) {
+        verifyInputStream(inputStream, expectedBuffer, 0, -1);
+    }
+
+    /**
+     * verify the input stream
+     *
+     * @param inputStream           the input stream to be verified
+     * @param expectedBuffer        the expected byte buffer
+     * @param expectedBufferReadPos the expected start position of the 
expected byte buffer
+     * @param expectedMarkCalledPos the expected position when the method 
InputStream#mark() is called. <i>(-1 means ignored)</i>
+     */
+    private void verifyInputStream(InputStream inputStream, ByteBuffer 
expectedBuffer, int expectedBufferReadPos,
+        int expectedMarkCalledPos) {
+        try {
+            expectedBuffer.position(expectedBufferReadPos);
+            while (true) {
+                if (expectedMarkCalledPos == expectedBuffer.position()) {
+                    inputStream.mark(0);
+                }
+                int b = inputStream.read();
+                if (b == -1)
+                    break;
+                Assert.assertEquals(expectedBuffer.get(), (byte) b);
+            }
+            Assert.assertFalse(expectedBuffer.hasRemaining());
+        } catch (IOException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    /**
+     * verify the input stream
+     *
+     * @param inputStream           the input stream to be verified
+     * @param expectedBuffer        the expected byte buffer
+     * @param expectedBufferReadPos the expected start position of the 
expected byte buffer
+     * @param expectedMarkCalledPos the expected position when the method 
InputStream#mark() is called. <i>(-1 means ignored)</i>
+     * @param readBatchSize         the batch size of each read(byte[]) 
operation
+     */
+    private void verifyInputStreamViaBatchRead(InputStream inputStream, 
ByteBuffer expectedBuffer,
+        int expectedBufferReadPos, int expectedMarkCalledPos, int 
readBatchSize) {
+        try {
+            expectedBuffer.position(expectedBufferReadPos);
+            byte[] buf = new byte[readBatchSize];
+            while (true) {
+                if (expectedMarkCalledPos == expectedBuffer.position()) {
+                    inputStream.mark(0);
+                }
+                int len = inputStream.read(buf, 0, readBatchSize);
+                if (len == -1)
+                    break;
+                byte[] expected = new byte[len];
+                expectedBuffer.get(expected, 0, len);
+                for (int i = 0; i < len; i++) {
+                    Assert.assertEquals(expected[i], buf[i]);
+                }
+            }
+            Assert.assertFalse(expectedBuffer.hasRemaining());
+        } catch (IOException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+}
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
index f55f7481cd..79b1883ad8 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
@@ -41,11 +41,11 @@ public class TieredFileSegmentTest {
         TieredFileSegment segment = 
createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
         segment.initPosition(segment.getSize());
         long lastSize = segment.getSize();
-        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
-        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
         Assert.assertTrue(segment.needCommit());
 
-        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer();
         long msg3StoreTime = System.currentTimeMillis();
         buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, 
msg3StoreTime);
         long queueOffset = baseOffset * 1000L;
@@ -117,8 +117,8 @@ public class TieredFileSegmentTest {
         long startTime = System.currentTimeMillis();
         MemoryFileSegment segment = (MemoryFileSegment) 
createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
         long lastSize = segment.getSize();
-        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
-        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
 
         segment.blocker = new CompletableFuture<>();
         new Thread(() -> {
@@ -127,7 +127,7 @@ public class TieredFileSegmentTest {
             } catch (InterruptedException e) {
                 Assert.fail(e.getMessage());
             }
-            ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+            ByteBuffer buffer = 
MessageBufferUtilTest.buildMockedMessageBuffer();
             buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, 
startTime);
             segment.append(buffer, 0);
             segment.blocker.complete(false);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
index 268ea2d464..befd401ffe 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/util/MessageBufferUtilTest.java
@@ -50,7 +50,7 @@ public class MessageBufferUtilTest {
         + 2 + 30 //properties
         + 0;
 
-    public static ByteBuffer buildMessageBuffer() {
+    public static ByteBuffer buildMockedMessageBuffer() {
         // Initialization of storage space
         ByteBuffer buffer = ByteBuffer.allocate(MSG_LEN);
         // 1 TOTALSIZE
@@ -99,23 +99,36 @@ public class MessageBufferUtilTest {
         return buffer;
     }
 
+    public static ByteBuffer buildMockedConsumeQueueBuffer() {
+        ByteBuffer byteBuffer = 
ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        // 1 COMMIT_LOG_OFFSET
+        byteBuffer.putLong(1);
+        // 2 MESSAGE_SIZE
+        byteBuffer.putInt(2);
+        // 3 TAG_HASH_CODE
+        byteBuffer.putLong(3);
+        byteBuffer.flip();
+        return byteBuffer;
+    }
+
+
     @Test
     public void testGetTotalSize() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         int totalSize = MessageBufferUtil.getTotalSize(buffer);
         Assert.assertEquals(MSG_LEN, totalSize);
     }
 
     @Test
     public void testGetMagicCode() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         int magicCode = MessageBufferUtil.getMagicCode(buffer);
         Assert.assertEquals(MessageDecoder.MESSAGE_MAGIC_CODE_V2, magicCode);
     }
 
     @Test
     public void testSplitMessages() {
-        ByteBuffer msgBuffer1 = buildMessageBuffer();
+        ByteBuffer msgBuffer1 = buildMockedMessageBuffer();
         msgBuffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 10);
         ByteBuffer msgBuffer2 = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
 
@@ -124,7 +137,7 @@ public class MessageBufferUtilTest {
         msgBuffer2.putLong(System.currentTimeMillis());
         msgBuffer2.flip();
 
-        ByteBuffer msgBuffer3 = buildMessageBuffer();
+        ByteBuffer msgBuffer3 = buildMockedMessageBuffer();
         msgBuffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 11);
 
         ByteBuffer msgBuffer = ByteBuffer.allocate(msgBuffer1.remaining() + 
msgBuffer2.remaining() + msgBuffer3.remaining());
@@ -202,21 +215,21 @@ public class MessageBufferUtilTest {
 
     @Test
     public void testGetQueueOffset() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         long queueOffset = MessageBufferUtil.getQueueOffset(buffer);
         Assert.assertEquals(6, queueOffset);
     }
 
     @Test
     public void testGetStoreTimeStamp() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         long storeTimeStamp = MessageBufferUtil.getStoreTimeStamp(buffer);
         Assert.assertEquals(11, storeTimeStamp);
     }
 
     @Test
     public void testGetOffsetId() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         InetSocketAddress inetSocketAddress = new 
InetSocketAddress("255.255.255.255", 65535);
         ByteBuffer addr = ByteBuffer.allocate(Long.BYTES);
         addr.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
@@ -232,7 +245,7 @@ public class MessageBufferUtilTest {
 
     @Test
     public void testGetProperties() {
-        ByteBuffer buffer = buildMessageBuffer();
+        ByteBuffer buffer = buildMockedMessageBuffer();
         Map<String, String> properties = 
MessageBufferUtil.getProperties(buffer);
         Assert.assertEquals(2, properties.size());
         
Assert.assertTrue(properties.containsKey(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));

Reply via email to