This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch lazy-reader
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 08d757a4d3895ef084edc302f0b1079828e5928c
Author: Xuanwo <[email protected]>
AuthorDate: Wed Oct 25 23:14:37 2023 +0800

    Implement tokio_read
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/oio/read/mod.rs        |  3 ++
 core/src/raw/oio/read/tokio_read.rs | 88 +++++++++++++++++++++++++++++++++++++
 2 files changed, 91 insertions(+)

diff --git a/core/src/raw/oio/read/mod.rs b/core/src/raw/oio/read/mod.rs
index 9ccbebdc3..e5f66a538 100644
--- a/core/src/raw/oio/read/mod.rs
+++ b/core/src/raw/oio/read/mod.rs
@@ -39,3 +39,6 @@ pub use into_read_from_stream::FromStreamReader;
 
 mod futures_read;
 pub use futures_read::FuturesReader;
+
+mod tokio_read;
+pub use tokio_read::TokioReader;
diff --git a/core/src/raw/oio/read/tokio_read.rs 
b/core/src/raw/oio/read/tokio_read.rs
new file mode 100644
index 000000000..966973bdc
--- /dev/null
+++ b/core/src/raw/oio/read/tokio_read.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::*;
+use crate::*;
+use bytes::Bytes;
+use std::io::SeekFrom;
+use std::pin::Pin;
+use std::task::{ready, Context, Poll};
+use tokio::io::AsyncSeek;
+use tokio::io::{AsyncRead, ReadBuf};
+
+/// FuturesReader implements [`oio::Read`] via [`AsyncRead`] + [`AsyncSeek`].
+pub struct TokioReader<R: AsyncRead + AsyncSeek> {
+    inner: R,
+
+    seek_pos: Option<SeekFrom>,
+}
+
+impl<R: AsyncRead + AsyncSeek> TokioReader<R> {
+    /// Create a new tokio reader.
+    pub fn new(inner: R) -> Self {
+        Self {
+            inner,
+            seek_pos: None,
+        }
+    }
+}
+
+impl<R> oio::Read for TokioReader<R>
+where
+    R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
+{
+    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        let mut buf = ReadBuf::new(buf);
+
+        ready!(Pin::new(&mut self.inner).poll_read(cx, &mut 
buf)).map_err(|err| {
+            new_std_io_error(err)
+                .with_operation(oio::ReadOperation::Read)
+                .with_context("source", "TokioReader")
+        })?;
+
+        Poll::Ready(Ok(buf.filled().len()))
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        if self.seek_pos != Some(pos) {
+            Pin::new(&mut self.inner).start_seek(pos).map_err(|err| {
+                new_std_io_error(err)
+                    .with_operation(oio::ReadOperation::Seek)
+                    .with_context("source", "TokioReader")
+            })?;
+            self.seek_pos = Some(pos)
+        }
+
+        // NOTE: don't return error by `?` here, we need to reset seek_pos.
+        let pos = ready!(Pin::new(&mut 
self.inner).poll_complete(cx).map_err(|err| {
+            new_std_io_error(err)
+                .with_operation(oio::ReadOperation::Seek)
+                .with_context("source", "TokioReader")
+        }));
+        self.seek_pos = None;
+        Poll::Ready(pos)
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        let _ = cx;
+
+        Poll::Ready(Some(Err(Error::new(
+            ErrorKind::Unsupported,
+            "TokioReader doesn't support poll_next",
+        ))))
+    }
+}

Reply via email to