This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new ac694375da feat: implement OperatorInputStream and 
OperatorOutputStream (#4626)
ac694375da is described below

commit ac694375da955ea6d3461aee0bfafd5a5d774541
Author: tison <wander4...@gmail.com>
AuthorDate: Sun May 19 22:52:26 2024 +0800

    feat: implement OperatorInputStream and OperatorOutputStream (#4626)
    
    Signed-off-by: tison <wander4...@gmail.com>
---
 bindings/java/pom.xml                              | 12 +++
 bindings/java/src/blocking_operator.rs             |  1 +
 bindings/java/src/lib.rs                           |  2 +
 .../java/org/apache/opendal/BlockingOperator.java  | 28 ++++---
 .../org/apache/opendal/OperatorInputStream.java    | 71 ++++++++++++++++
 .../org/apache/opendal/OperatorOutputStream.java   | 82 +++++++++++++++++++
 bindings/java/src/operator_input_stream.rs         | 92 +++++++++++++++++++++
 bindings/java/src/operator_output_stream.rs        | 95 ++++++++++++++++++++++
 .../test/OperatorInputOutputStreamTest.java        | 66 +++++++++++++++
 core/src/raw/ops.rs                                |  2 +-
 core/src/types/blocking_read/blocking_reader.rs    |  2 +-
 core/src/types/blocking_read/std_bytes_iterator.rs | 26 ++++--
 12 files changed, 461 insertions(+), 18 deletions(-)

diff --git a/bindings/java/pom.xml b/bindings/java/pom.xml
index aec26bab55..52d77ee94f 100644
--- a/bindings/java/pom.xml
+++ b/bindings/java/pom.xml
@@ -67,6 +67,7 @@
 
         <!-- library dependencies -->
         <assertj.version>3.23.1</assertj.version>
+        <commons-io.version>2.16.1</commons-io.version>
         <dotenv.version>2.3.2</dotenv.version>
         <lombok.version>1.18.30</lombok.version>
         <slf4j.version>2.0.7</slf4j.version>
@@ -110,6 +111,12 @@
                 <artifactId>dotenv-java</artifactId>
                 <version>${dotenv.version}</version>
             </dependency>
+            <dependency>
+                <groupId>commons-io</groupId>
+                <artifactId>commons-io</artifactId>
+                <version>${commons-io.version}</version>
+                <scope>test</scope>
+            </dependency>
             <dependency>
                 <groupId>org.apache.httpcomponents.client5</groupId>
                 <artifactId>httpclient5</artifactId>
@@ -151,6 +158,11 @@
             <artifactId>dotenv-java</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents.client5</groupId>
             <artifactId>httpclient5</artifactId>
diff --git a/bindings/java/src/blocking_operator.rs 
b/bindings/java/src/blocking_operator.rs
index 3234cf84fc..88d92d1a40 100644
--- a/bindings/java/src/blocking_operator.rs
+++ b/bindings/java/src/blocking_operator.rs
@@ -25,6 +25,7 @@ use jni::sys::jobject;
 use jni::sys::jobjectArray;
 use jni::sys::jsize;
 use jni::JNIEnv;
+
 use opendal::BlockingOperator;
 
 use crate::convert::jstring_to_string;
diff --git a/bindings/java/src/lib.rs b/bindings/java/src/lib.rs
index c9950a4796..3dde906222 100644
--- a/bindings/java/src/lib.rs
+++ b/bindings/java/src/lib.rs
@@ -38,6 +38,8 @@ mod error;
 mod executor;
 mod layer;
 mod operator;
+mod operator_input_stream;
+mod operator_output_stream;
 mod utility;
 
 pub(crate) type Result<T> = std::result::Result<T, error::Error>;
diff --git 
a/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java 
b/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java
index ffb0b60420..833e6ac04b 100644
--- a/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java
+++ b/bindings/java/src/main/java/org/apache/opendal/BlockingOperator.java
@@ -69,10 +69,18 @@ public class BlockingOperator extends NativeObject {
         write(nativeHandle, path, content);
     }
 
+    public OperatorOutputStream createOutputStream(String path) {
+        return new OperatorOutputStream(this, path);
+    }
+
     public byte[] read(String path) {
         return read(nativeHandle, path);
     }
 
+    public OperatorInputStream createInputStream(String path) {
+        return new OperatorInputStream(this, path);
+    }
+
     public void delete(String path) {
         delete(nativeHandle, path);
     }
@@ -104,23 +112,23 @@ public class BlockingOperator extends NativeObject {
     @Override
     protected native void disposeInternal(long handle);
 
-    private static native long duplicate(long nativeHandle);
+    private static native long duplicate(long op);
 
-    private static native void write(long nativeHandle, String path, byte[] 
content);
+    private static native void write(long op, String path, byte[] content);
 
-    private static native byte[] read(long nativeHandle, String path);
+    private static native byte[] read(long op, String path);
 
-    private static native void delete(long nativeHandle, String path);
+    private static native void delete(long op, String path);
 
-    private static native Metadata stat(long nativeHandle, String path);
+    private static native Metadata stat(long op, String path);
 
-    private static native long createDir(long nativeHandle, String path);
+    private static native long createDir(long op, String path);
 
-    private static native long copy(long nativeHandle, String sourcePath, 
String targetPath);
+    private static native long copy(long op, String sourcePath, String 
targetPath);
 
-    private static native long rename(long nativeHandle, String sourcePath, 
String targetPath);
+    private static native long rename(long op, String sourcePath, String 
targetPath);
 
-    private static native void removeAll(long nativeHandle, String path);
+    private static native void removeAll(long op, String path);
 
-    private static native Entry[] list(long nativeHandle, String path);
+    private static native Entry[] list(long op, String path);
 }
diff --git 
a/bindings/java/src/main/java/org/apache/opendal/OperatorInputStream.java 
b/bindings/java/src/main/java/org/apache/opendal/OperatorInputStream.java
new file mode 100644
index 0000000000..a9eb6731d8
--- /dev/null
+++ b/bindings/java/src/main/java/org/apache/opendal/OperatorInputStream.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.opendal;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class OperatorInputStream extends InputStream {
+    private static class Reader extends NativeObject {
+        private Reader(long nativeHandle) {
+            super(nativeHandle);
+        }
+
+        @Override
+        protected void disposeInternal(long handle) {
+            disposeReader(handle);
+        }
+    }
+
+    private final Reader reader;
+
+    private int offset = 0;
+    private byte[] bytes = new byte[0];
+
+    public OperatorInputStream(BlockingOperator operator, String path) {
+        final long op = operator.nativeHandle;
+        this.reader = new Reader(constructReader(op, path));
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (bytes != null && offset >= bytes.length) {
+            bytes = readNextBytes(reader.nativeHandle);
+            offset = 0;
+        }
+
+        if (bytes != null) {
+            return bytes[offset++] & 0xFF;
+        }
+
+        return -1;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    private static native long constructReader(long op, String path);
+
+    private static native long disposeReader(long reader);
+
+    private static native byte[] readNextBytes(long reader);
+}
diff --git 
a/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java 
b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
new file mode 100644
index 0000000000..119346595e
--- /dev/null
+++ b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.opendal;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+public class OperatorOutputStream extends OutputStream {
+    private static class Writer extends NativeObject {
+        private Writer(long nativeHandle) {
+            super(nativeHandle);
+        }
+
+        @Override
+        protected void disposeInternal(long handle) {
+            disposeWriter(handle);
+        }
+    }
+
+    private static final int MAX_BYTES = 16384;
+
+    private final Writer writer;
+    private final byte[] bytes = new byte[MAX_BYTES];
+
+    private int offset = 0;
+
+    public OperatorOutputStream(BlockingOperator operator, String path) {
+        final long op = operator.nativeHandle;
+        this.writer = new Writer(constructWriter(op, path));
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        bytes[offset++] = (byte) b;
+        if (offset >= MAX_BYTES) {
+            flush();
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (offset > MAX_BYTES) {
+            throw new IOException("INTERNAL ERROR: " + offset + " > " + 
MAX_BYTES);
+        } else if (offset < MAX_BYTES) {
+            final byte[] bytes = Arrays.copyOf(this.bytes, offset);
+            writeBytes(writer.nativeHandle, bytes);
+        } else {
+            writeBytes(writer.nativeHandle, bytes);
+        }
+        offset = 0;
+    }
+
+    @Override
+    public void close() throws IOException {
+        flush();
+        writer.close();
+    }
+
+    private static native long constructWriter(long op, String path);
+
+    private static native long disposeWriter(long writer);
+
+    private static native byte[] writeBytes(long writer, byte[] bytes);
+}
diff --git a/bindings/java/src/operator_input_stream.rs 
b/bindings/java/src/operator_input_stream.rs
new file mode 100644
index 0000000000..fd53237bf3
--- /dev/null
+++ b/bindings/java/src/operator_input_stream.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::convert::jstring_to_string;
+use jni::objects::{JByteArray, JClass, JObject, JString};
+use jni::sys::{jbyteArray, jlong};
+use jni::JNIEnv;
+use opendal::{BlockingOperator, StdBytesIterator};
+
+/// # Safety
+///
+/// This function should not be called before the Operator is ready.
+#[no_mangle]
+pub unsafe extern "system" fn 
Java_org_apache_opendal_OperatorInputStream_constructReader(
+    mut env: JNIEnv,
+    _: JClass,
+    op: *mut BlockingOperator,
+    path: JString,
+) -> jlong {
+    intern_construct_reader(&mut env, &mut *op, path).unwrap_or_else(|e| {
+        e.throw(&mut env);
+        0
+    })
+}
+
+fn intern_construct_reader(
+    env: &mut JNIEnv,
+    op: &mut BlockingOperator,
+    path: JString,
+) -> crate::Result<jlong> {
+    let path = jstring_to_string(env, &path)?;
+    let reader = op.reader(&path)?.into_bytes_iterator(..);
+    Ok(Box::into_raw(Box::new(reader)) as jlong)
+}
+
+/// # Safety
+///
+/// This function should not be called before the Operator is ready.
+#[no_mangle]
+pub unsafe extern "system" fn 
Java_org_apache_opendal_OperatorInputStream_disposeReader(
+    _: JNIEnv,
+    _: JClass,
+    reader: *mut StdBytesIterator,
+) {
+    drop(Box::from_raw(reader));
+}
+
+/// # Safety
+///
+/// This function should not be called before the Operator is ready.
+#[no_mangle]
+pub unsafe extern "system" fn 
Java_org_apache_opendal_OperatorInputStream_readNextBytes(
+    mut env: JNIEnv,
+    _: JClass,
+    reader: *mut StdBytesIterator,
+) -> jbyteArray {
+    intern_read_next_bytes(&mut env, &mut *reader).unwrap_or_else(|e| {
+        e.throw(&mut env);
+        JByteArray::default().into_raw()
+    })
+}
+
+fn intern_read_next_bytes(
+    env: &mut JNIEnv,
+    reader: &mut StdBytesIterator,
+) -> crate::Result<jbyteArray> {
+    match reader
+        .next()
+        .transpose()
+        .map_err(|err| opendal::Error::new(opendal::ErrorKind::Unexpected, 
&err.to_string()))?
+    {
+        None => Ok(JObject::null().into_raw()),
+        Some(content) => {
+            let result = env.byte_array_from_slice(&content)?;
+            Ok(result.into_raw())
+        }
+    }
+}
diff --git a/bindings/java/src/operator_output_stream.rs 
b/bindings/java/src/operator_output_stream.rs
new file mode 100644
index 0000000000..7478fe67af
--- /dev/null
+++ b/bindings/java/src/operator_output_stream.rs
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use jni::objects::{JByteArray, JClass, JString};
+use jni::sys::jlong;
+use jni::JNIEnv;
+
+use opendal::{BlockingOperator, BlockingWriter};
+
+use crate::convert::jstring_to_string;
+
+/// # Safety
+///
+/// This function should not be called before the Operator is ready.
+#[no_mangle]
+pub unsafe extern "system" fn 
Java_org_apache_opendal_OperatorOutputStream_constructWriter(
+    mut env: JNIEnv,
+    _: JClass,
+    op: *mut BlockingOperator,
+    path: JString,
+) -> jlong {
+    intern_construct_write(&mut env, &mut *op, path).unwrap_or_else(|e| {
+        e.throw(&mut env);
+        0
+    })
+}
+
+fn intern_construct_write(
+    env: &mut JNIEnv,
+    op: &mut BlockingOperator,
+    path: JString,
+) -> crate::Result<jlong> {
+    let path = jstring_to_string(env, &path)?;
+    let writer = op.writer(&path)?;
+    Ok(Box::into_raw(Box::new(writer)) as jlong)
+}
+
+/// # Safety
+///
+/// This function should not be called before the Operator is ready.
+#[no_mangle]
+pub unsafe extern "system" fn 
Java_org_apache_opendal_OperatorOutputStream_disposeWriter(
+    mut env: JNIEnv,
+    _: JClass,
+    writer: *mut BlockingWriter,
+) {
+    let mut writer = Box::from_raw(writer);
+    intern_dispose_write(&mut writer).unwrap_or_else(|e| {
+        e.throw(&mut env);
+    })
+}
+
+fn intern_dispose_write(writer: &mut BlockingWriter) -> crate::Result<()> {
+    writer.close()?;
+    Ok(())
+}
+
+/// # Safety
+///
+/// This function should not be called before the Operator is ready.
+#[no_mangle]
+pub unsafe extern "system" fn 
Java_org_apache_opendal_OperatorOutputStream_writeBytes(
+    mut env: JNIEnv,
+    _: JClass,
+    writer: *mut BlockingWriter,
+    content: JByteArray,
+) {
+    intern_write_bytes(&mut env, &mut *writer, content).unwrap_or_else(|e| {
+        e.throw(&mut env);
+    })
+}
+
+fn intern_write_bytes(
+    env: &mut JNIEnv,
+    writer: &mut BlockingWriter,
+    content: JByteArray,
+) -> crate::Result<()> {
+    let content = env.convert_byte_array(content)?;
+    writer.write(content)?;
+    Ok(())
+}
diff --git 
a/bindings/java/src/test/java/org/apache/opendal/test/OperatorInputOutputStreamTest.java
 
b/bindings/java/src/test/java/org/apache/opendal/test/OperatorInputOutputStreamTest.java
new file mode 100644
index 0000000000..8cacbe57ff
--- /dev/null
+++ 
b/bindings/java/src/test/java/org/apache/opendal/test/OperatorInputOutputStreamTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.opendal.test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.opendal.BlockingOperator;
+import org.apache.opendal.OperatorInputStream;
+import org.apache.opendal.OperatorOutputStream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class OperatorInputOutputStreamTest {
+    @TempDir
+    private static Path tempDir;
+
+    @Test
+    void testReadWriteWithStream() throws Exception {
+        final Map<String, String> conf = new HashMap<>();
+        conf.put("root", tempDir.toString());
+
+        try (final BlockingOperator op = BlockingOperator.of("fs", conf)) {
+            final String path = "OperatorInputOutputStreamTest.txt";
+            final long multi = 1024 * 1024;
+
+            try (final OperatorOutputStream os = op.createOutputStream(path)) {
+                for (long i = 0; i < multi; i++) {
+                    os.write("[content] OperatorInputStreamTest\n".getBytes());
+                }
+            }
+
+            try (final OperatorInputStream is = op.createInputStream(path)) {
+                final Stream<String> lines = new BufferedReader(new 
InputStreamReader(is)).lines();
+                final AtomicLong count = new AtomicLong();
+                lines.forEach((line) -> {
+                    assertThat(line).isEqualTo("[content] 
OperatorInputStreamTest");
+                    count.incrementAndGet();
+                });
+                assertThat(count.get()).isEqualTo(multi);
+            }
+        }
+    }
+}
diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs
index f21c2769fb..2c4b494931 100644
--- a/core/src/raw/ops.rs
+++ b/core/src/raw/ops.rs
@@ -387,7 +387,7 @@ impl OpRead {
 pub struct OpReader {
     /// The range of the read request.
     ///
-    /// Not available for `reader``.
+    /// Not available for `reader`.
     range: BytesRange,
     /// The concurrent requests that reader can send.
     concurrent: usize,
diff --git a/core/src/types/blocking_read/blocking_reader.rs 
b/core/src/types/blocking_read/blocking_reader.rs
index e87528fcea..6dfedf7b1d 100644
--- a/core/src/types/blocking_read/blocking_reader.rs
+++ b/core/src/types/blocking_read/blocking_reader.rs
@@ -151,7 +151,7 @@ impl BlockingReader {
 
     /// Convert reader into [`StdBytesIterator`] which implements [`Iterator`].
     #[inline]
-    pub fn into_bytes_iterator(self, range: Range<u64>) -> StdBytesIterator {
+    pub fn into_bytes_iterator(self, range: impl RangeBounds<u64>) -> 
StdBytesIterator {
         StdBytesIterator::new(self.inner, range)
     }
 }
diff --git a/core/src/types/blocking_read/std_bytes_iterator.rs 
b/core/src/types/blocking_read/std_bytes_iterator.rs
index dba05ae734..7f8752a1c6 100644
--- a/core/src/types/blocking_read/std_bytes_iterator.rs
+++ b/core/src/types/blocking_read/std_bytes_iterator.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::collections::Bound;
 use std::io;
+use std::ops::RangeBounds;
 
 use bytes::Buf;
 use bytes::Bytes;
@@ -30,7 +32,7 @@ use crate::raw::*;
 pub struct StdBytesIterator {
     inner: oio::BlockingReader,
     offset: u64,
-    size: u64,
+    end: u64,
     cap: usize,
 
     cur: u64,
@@ -39,11 +41,23 @@ pub struct StdBytesIterator {
 impl StdBytesIterator {
     /// NOTE: don't allow users to create StdIterator directly.
     #[inline]
-    pub(crate) fn new(r: oio::BlockingReader, range: std::ops::Range<u64>) -> 
Self {
+    pub(crate) fn new(r: oio::BlockingReader, range: impl RangeBounds<u64>) -> 
Self {
+        let start = match range.start_bound().cloned() {
+            Bound::Included(start) => start,
+            Bound::Excluded(start) => start + 1,
+            Bound::Unbounded => 0,
+        };
+
+        let end = match range.end_bound().cloned() {
+            Bound::Included(end) => Some(end + 1),
+            Bound::Excluded(end) => Some(end),
+            Bound::Unbounded => None,
+        };
+
         StdBytesIterator {
             inner: r,
-            offset: range.start,
-            size: range.end - range.start,
+            offset: start,
+            end: end.unwrap_or(u64::MAX),
             // TODO: should use services preferred io size.
             cap: 4 * 1024 * 1024,
             cur: 0,
@@ -61,12 +75,12 @@ impl Iterator for StdBytesIterator {
     type Item = io::Result<Bytes>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        if self.cur >= self.size {
+        if self.offset + self.cur >= self.end {
             return None;
         }
 
         let next_offset = self.offset + self.cur;
-        let next_size = (self.size - self.cur).min(self.cap as u64) as usize;
+        let next_size = (self.end - self.offset).min(self.cap as u64) as usize;
         match self.inner.read_at(next_offset, next_size) {
             Ok(buf) if !buf.has_remaining() => None,
             Ok(mut buf) => {

Reply via email to