alamb commented on code in PR #19127:
URL: https://github.com/apache/datafusion/pull/19127#discussion_r2617033035
##########
datafusion-cli/src/object_storage/instrumented.rs:
##########
@@ -896,13 +961,15 @@ mod tests {
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
- let _ = instrumented.list(Some(&path));
+ let mut stream = instrumented.list(Some(&path));
+ // Consume at least one item from the stream to trigger duration
measurement
Review Comment:
I recommend also testing the duration value -- perhaps
```rust
let start = Instant::now();
let mut stream = instrumented.list(Some(&path));
// Consume at least one item from the stream to trigger duration
measurement
let time_before_start = start.elapsed();
let _ = stream.next().await;
...
assert!(request.duration.unwrap() > time_before_start);
```
##########
datafusion-cli/src/object_storage/instrumented.rs:
##########
@@ -35,14 +35,66 @@ use datafusion::{
error::DataFusionError,
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
};
-use futures::stream::BoxStream;
+use futures::stream::{BoxStream, Stream};
use object_store::{
path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload,
ObjectMeta,
ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
Result,
};
use parking_lot::{Mutex, RwLock};
use url::Url;
+/// A stream wrapper that measures the time until the first response(item or
end of stream) is yielded
+struct TimeToFirstItemStream<S> {
+ inner: S,
+ start: Instant,
+ request_index: usize,
+ requests: Arc<Mutex<Vec<RequestDetails>>>,
+ first_item_yielded: bool,
+}
+
+impl<S> TimeToFirstItemStream<S> {
+ fn new(
+ inner: S,
+ start: Instant,
+ request_index: usize,
+ requests: Arc<Mutex<Vec<RequestDetails>>>,
+ ) -> Self {
+ Self {
+ inner,
+ start,
+ request_index,
+ requests,
+ first_item_yielded: false,
+ }
+ }
+}
+
+impl<S> Stream for TimeToFirstItemStream<S>
+where
+ S: Stream<Item = Result<ObjectMeta>> + Unpin,
+{
+ type Item = Result<ObjectMeta>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
+
+ if !self.first_item_yielded && poll_result.is_ready() {
+ self.first_item_yielded = true;
+ let elapsed = self.start.elapsed();
+
+ let mut requests = self.requests.lock();
+ if let Some(request) = requests.get_mut(self.request_index) {
+ request.duration = Some(elapsed);
+ }
Review Comment:
How about instead of relying on some potentially tricky race condition, we
instead use an `Arc<AtomicU64>` for the duration? Then instead of
```rust
request_index: usize,
requests: Arc<Mutex<Vec<RequestDetails>>>,
```
We could just pass in
```rust
request_duration: Arc<AtomicU64>,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]