BlakeOrth commented on code in PR #19127:
URL: https://github.com/apache/datafusion/pull/19127#discussion_r2620294904


##########
datafusion-cli/src/object_storage/instrumented.rs:
##########
@@ -35,14 +35,66 @@ use datafusion::{
     error::DataFusionError,
     execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
 };
-use futures::stream::BoxStream;
+use futures::stream::{BoxStream, Stream};
 use object_store::{
     path::Path, GetOptions, GetRange, GetResult, ListResult, MultipartUpload, 
ObjectMeta,
     ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, 
Result,
 };
 use parking_lot::{Mutex, RwLock};
 use url::Url;
 
+/// A stream wrapper that measures the time until the first response(item or 
end of stream) is yielded
+struct TimeToFirstItemStream<S> {
+    inner: S,
+    start: Instant,
+    request_index: usize,
+    requests: Arc<Mutex<Vec<RequestDetails>>>,
+    first_item_yielded: bool,
+}
+
+impl<S> TimeToFirstItemStream<S> {
+    fn new(
+        inner: S,
+        start: Instant,
+        request_index: usize,
+        requests: Arc<Mutex<Vec<RequestDetails>>>,
+    ) -> Self {
+        Self {
+            inner,
+            start,
+            request_index,
+            requests,
+            first_item_yielded: false,
+        }
+    }
+}
+
+impl<S> Stream for TimeToFirstItemStream<S>
+where
+    S: Stream<Item = Result<ObjectMeta>> + Unpin,
+{
+    type Item = Result<ObjectMeta>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let poll_result = std::pin::Pin::new(&mut self.inner).poll_next(cx);
+
+        if !self.first_item_yielded && poll_result.is_ready() {
+            self.first_item_yielded = true;
+            let elapsed = self.start.elapsed();
+
+            let mut requests = self.requests.lock();
+            if let Some(request) = requests.get_mut(self.request_index) {
+                request.duration = Some(elapsed);
+            }

Review Comment:
   Yes, I like this solution quite a bit better than indexing into an array!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to