This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch databend-debug in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 26f4d19df04a24dcbba3dc485ce6ea2ec493dc84 Author: Xuanwo <[email protected]> AuthorDate: Fri Jan 19 16:03:55 2024 +0800 chore: Add log for every api call Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/complete.rs | 1 + core/src/raw/http_util/body.rs | 30 +++++++++++++++++++++++++++--- core/src/raw/http_util/client.rs | 10 ++++++++++ core/src/raw/oio/read/range_read.rs | 6 ++++++ core/src/services/s3/backend.rs | 2 ++ core/src/types/operator/operator.rs | 3 +++ core/src/types/reader.rs | 20 ++++++++++++++++++-- 7 files changed, 67 insertions(+), 5 deletions(-) diff --git a/core/src/layers/complete.rs b/core/src/layers/complete.rs index 60dd2d2673..aea29fb503 100644 --- a/core/src/layers/complete.rs +++ b/core/src/layers/complete.rs @@ -305,6 +305,7 @@ impl<A: Accessor> CompleteAccessor<A> { InnerCompleteReader::Two(r) } _ => { + log::debug!("opendal::RangeReader start read for {path}"); let r = RangeReader::new(self.inner.clone(), path, args); if streamable { diff --git a/core/src/raw/http_util/body.rs b/core/src/raw/http_util/body.rs index cee80ec156..6be1a3cb7a 100644 --- a/core/src/raw/http_util/body.rs +++ b/core/src/raw/http_util/body.rs @@ -25,6 +25,7 @@ use std::task::Poll; use bytes::Buf; use bytes::BufMut; use bytes::Bytes; +use log::debug; use crate::raw::*; use crate::*; @@ -160,6 +161,8 @@ impl IncomingAsyncBody { impl oio::Read for IncomingAsyncBody { fn poll_read(&mut self, cx: &mut Context<'_>, mut buf: &mut [u8]) -> Poll<Result<usize>> { + debug!("opendal::raw::IncomingAsyncBody polling read"); + if buf.is_empty() || self.size == Some(0) { return Poll::Ready(Ok(0)); } @@ -172,16 +175,27 @@ impl oio::Read for IncomingAsyncBody { match ready!(self.inner.poll_next(cx)) { // It's possible for underlying stream to return empty bytes, we should continue // to fetch next one. - Some(Ok(bs)) if bs.is_empty() => continue, + Some(Ok(bs)) if bs.is_empty() => { + debug!("opendal::raw::IncomingAsyncBody::poll_read got zero length of data, continue"); + continue; + } Some(Ok(bs)) => { + debug!( + "opendal::raw::IncomingAsyncBody::poll_read got {} bytes of data", + bs.len() + ); self.consumed += bs.len() as u64; break bs; } - Some(Err(err)) => return Poll::Ready(Err(err)), + Some(Err(err)) => { + debug!("opendal::raw::IncomingAsyncBody::poll_read got error: {err:?}"); + return Poll::Ready(Err(err)); + } None => { if let Some(size) = self.size { Self::check(size, self.consumed)?; } + debug!("opendal::raw::IncomingAsyncBody::poll_read finished read"); return Poll::Ready(Ok(0)); } } @@ -208,6 +222,8 @@ impl oio::Read for IncomingAsyncBody { } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { + debug!("opendal::raw::IncomingAsyncBody polling next"); + if self.size == Some(0) { return Poll::Ready(None); } @@ -218,15 +234,23 @@ impl oio::Read for IncomingAsyncBody { let res = match ready!(self.inner.poll_next(cx)) { Some(Ok(bs)) => { + debug!( + "opendal::raw::IncomingAsyncBody::poll_next got {} bytes of data", + bs.len() + ); self.consumed += bs.len() as u64; Some(Ok(bs)) } - Some(Err(err)) => Some(Err(err)), + Some(Err(err)) => { + debug!("opendal::raw::IncomingAsyncBody::poll_next got error: {err:?}"); + Some(Err(err)) + } None => { if let Some(size) = self.size { Self::check(size, self.consumed)?; } + debug!("opendal::raw::IncomingAsyncBody::poll_next finished next"); None } }; diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 2c5e4bc5c7..a1ac52a7ce 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -24,6 +24,7 @@ use std::time::Duration; use futures::TryStreamExt; use http::Request; use http::Response; +use log::debug; use super::body::IncomingAsyncBody; use super::parse_content_encoding; @@ -97,6 +98,11 @@ impl HttpClient { // Uri stores all string alike data in `Bytes` which means // the clone here is cheap. let uri = req.uri().clone(); + debug!( + "opendal::raw::HttpClient start sending request for {}", + uri.to_string() + ); + let is_head = req.method() == http::Method::HEAD; let (parts, body) = req.into_parts(); @@ -166,6 +172,10 @@ impl HttpClient { oerr })?; + debug!( + "opendal::raw::HttpClient response got for {}", + uri.to_string() + ); // Get content length from header so that we can check it. // diff --git a/core/src/raw/oio/read/range_read.rs b/core/src/raw/oio/read/range_read.rs index 5ffacf2ec8..a7cfce8a5c 100644 --- a/core/src/raw/oio/read/range_read.rs +++ b/core/src/raw/oio/read/range_read.rs @@ -208,6 +208,7 @@ where // Alter OpRead with correct calculated range. op = op.with_range(self.calculate_range()); + log::debug!("opendal::RangeReader send read request for {path}"); Box::pin(async move { acc.read(&path, op).await }) } @@ -317,6 +318,8 @@ where err })?; + log::debug!("opendal::RangeReader got response for {}", self.path); + self.ensure_size(rp.range().unwrap_or_default().size(), rp.size()); self.state = State::Read(r); @@ -324,15 +327,18 @@ where } State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) { Ok(0) => { + log::debug!("opendal::RangeReader finished read data for {}", self.path); // Reset state to Idle after all data has been consumed. self.state = State::Idle; Poll::Ready(Ok(0)) } Ok(n) => { + log::debug!("opendal::RangeReader got {n} bytes data for {}", self.path); self.cur += n as u64; Poll::Ready(Ok(n)) } Err(e) => { + log::debug!("opendal::RangeReader got error for {}", self.path); self.state = State::Idle; Poll::Ready(Err(e)) } diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index bb7b3949c3..a41be4ac06 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -1065,7 +1065,9 @@ impl Accessor for S3Backend { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + debug!("opendal::services::S3 start sending request for {path}"); let resp = self.core.s3_get_object(path, args).await?; + debug!("opendal::services::S3 got response from s3 for {path}"); let status = resp.status(); diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index 8ba9899b84..a7e173b17d 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -441,9 +441,12 @@ impl Operator { (range.size().unwrap(), range) }; + log::debug!("opendal::Operator start read {path} with range {range:?}"); let (_, mut s) = inner.read(&path, args.with_range(range)).await?; let mut buf = Vec::with_capacity(size_hint as usize); + log::debug!("opendal::Operator got reader for {path} with range {range:?}"); s.read_to_end(&mut buf).await?; + log::debug!("opendal::Operator finished read for {path} with range {range:?}"); Ok(buf) }; diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs index 5c6d97b30d..a0834a1271 100644 --- a/core/src/types/reader.rs +++ b/core/src/types/reader.rs @@ -78,7 +78,10 @@ impl Reader { impl oio::Read for Reader { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { - self.inner.poll_read(cx, buf) + log::debug!("opendal::Reader start polling read"); + let v = self.inner.poll_read(cx, buf); + log::debug!("opendal::Reader read polled: {v:?}"); + v } fn poll_seek(&mut self, cx: &mut Context<'_>, pos: io::SeekFrom) -> Poll<Result<u64>> { @@ -86,7 +89,20 @@ impl oio::Read for Reader { } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> { - self.inner.poll_next(cx) + log::debug!("opendal::Reader start polling next"); + let v = self.inner.poll_next(cx); + match &v { + Poll::Ready(Some(Ok(v))) => { + log::debug!( + "opendal::Reader next polled: Poll::Ready(Some(Ok({} bytes))", + v.len() + ); + } + v => { + log::debug!("opendal::Reader next polled: {v:?}"); + } + } + v } }
