BlakeOrth commented on code in PR #17266:
URL: https://github.com/apache/datafusion/pull/17266#discussion_r2330773743


##########
datafusion-cli/src/object_storage.rs:
##########
@@ -563,6 +563,592 @@ pub(crate) async fn get_object_store(
     Ok(store)
 }
 
+pub mod instrumented {
+    use core::fmt;
+    use std::{
+        cmp, default,
+        ops::AddAssign,
+        str::FromStr,
+        sync::{
+            atomic::{AtomicU8, Ordering},
+            Arc,
+        },
+        time::Duration,
+    };
+
+    use async_trait::async_trait;
+    use chrono::Utc;
+    use datafusion::{
+        common::{instant::Instant, HashMap},
+        error::DataFusionError,
+        execution::object_store::ObjectStoreRegistry,
+    };
+    use futures::stream::BoxStream;
+    use object_store::{
+        path::Path, GetOptions, GetRange, GetResult, ListResult, 
MultipartUpload,
+        ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, 
PutResult,
+        Result,
+    };
+    use parking_lot::{Mutex, RwLock};
+    use url::Url;
+
+    #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
+    enum Operation {
+        Copy,
+        Delete,
+        Get,
+        Head,
+        List,
+        Put,
+    }
+
+    #[derive(Debug)]
+    struct RequestDetails {
+        op: Operation,
+        path: Path,
+        timestamp: chrono::DateTime<Utc>,
+        duration: Option<Duration>,
+        size: Option<usize>,
+        range: Option<GetRange>,
+        extra_display: Option<String>,
+    }
+
+    impl fmt::Display for RequestDetails {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            let mut output_parts = vec![format!(
+                "{} operation={:?}",
+                self.timestamp.to_rfc3339(),
+                self.op
+            )];
+
+            if let Some(d) = self.duration {
+                output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
+            }
+            if let Some(s) = self.size {
+                output_parts.push(format!("size={}", s));
+            }
+            if let Some(r) = &self.range {
+                output_parts.push(format!("range: {}", r));
+            }
+            output_parts.push(format!("path={}", self.path));
+
+            if let Some(ed) = &self.extra_display {
+                output_parts.push(ed.clone());
+            }
+
+            write!(f, "{}", output_parts.join(" "))
+        }
+    }
+
+    #[derive(Default)]
+    struct RequestSummary {
+        count: usize,
+        duration_stats: Option<Stats<Duration>>,
+        size_stats: Option<Stats<usize>>,
+    }
+
+    impl RequestSummary {
+        fn push(&mut self, request: &RequestDetails) {
+            self.count += 1;
+            if let Some(dur) = request.duration {
+                self.duration_stats.get_or_insert_default().push(dur)
+            }
+            if let Some(size) = request.size {
+                self.size_stats.get_or_insert_default().push(size)
+            }
+        }
+    }
+
+    impl fmt::Display for RequestSummary {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            writeln!(f, "  count: {}", self.count)?;
+
+            if let Some(dur_stats) = &self.duration_stats {
+                writeln!(f, "  duration min: {:.6}s", 
dur_stats.min.as_secs_f32())?;
+                writeln!(f, "  duration max: {:.6}s", 
dur_stats.max.as_secs_f32())?;
+                let avg = dur_stats.sum.as_secs_f32() / (self.count as f32);
+                writeln!(f, "  duration avg: {:.6}s", avg)?;
+            }
+
+            if let Some(size_stats) = &self.size_stats {
+                writeln!(f, "  size min: {} B", size_stats.min)?;
+                writeln!(f, "  size max: {} B", size_stats.max)?;
+                let avg = size_stats.sum / self.count;
+                writeln!(f, "  size avg: {} B", avg)?;
+                writeln!(f, "  size sum: {} B", size_stats.sum)?;
+            }
+
+            Ok(())
+        }
+    }
+
+    struct Stats<T: Copy + Ord + AddAssign<T>> {
+        min: T,
+        max: T,
+        sum: T,
+    }
+
+    impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
+        fn push(&mut self, val: T) {
+            self.min = cmp::min(val, self.min);
+            self.max = cmp::max(val, self.max);
+            self.sum += val;
+        }
+    }
+
+    impl default::Default for Stats<Duration> {
+        fn default() -> Self {
+            Self {
+                min: Duration::MAX,
+                max: Duration::ZERO,
+                sum: Duration::ZERO,
+            }
+        }
+    }
+
+    impl default::Default for Stats<usize> {
+        fn default() -> Self {
+            Self {
+                min: usize::MAX,
+                max: usize::MIN,
+                sum: 0,
+            }
+        }
+    }
+
+    #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
+    pub enum InstrumentedObjectStoreMode {
+        #[default]
+        Disabled,
+        Summary,
+        Trace,
+    }
+
+    impl fmt::Display for InstrumentedObjectStoreMode {
+        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+            write!(f, "{:?}", self)
+        }
+    }
+
+    impl FromStr for InstrumentedObjectStoreMode {
+        type Err = DataFusionError;
+
+        fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+            match s.to_lowercase().as_str() {
+                "disabled" => Ok(Self::Disabled),
+                "summary" => Ok(Self::Summary),
+                "trace" => Ok(Self::Trace),
+                _ => Err(DataFusionError::Execution(format!("Unrecognized mode 
{s}"))),
+            }
+        }
+    }
+
+    #[derive(Debug)]
+    pub struct InstrumentedObjectStore {
+        inner: Arc<dyn ObjectStore>,
+        instrument_mode: AtomicU8,
+        requests: Mutex<Vec<RequestDetails>>,
+    }
+
+    impl fmt::Display for InstrumentedObjectStore {

Review Comment:
   This is a really good point I hadn't even considered when writing this. 
Having the behavior of `Display` be dependent on underlying object state and 
when you called it is weird implicit behavior.
   
   I'm open to implementation recommendations here. I could see keeping the 
method as-is but changing the method signature to something that better 
communicates the underlying state modification and dropping the `Display` impl. 
Another path I can see working is making the `Display` impl not modify the 
underlying state, but push the state modification into another method a user 
has to call in order to clear the existing entries to be displayed. I'm sure 
there are other paths as well. One thing I liked about the existing 
implementation is it's more difficult for a user of the API to accidentally 
accumulate object store operations from multiple queries because removing them 
does not require the user to call an additional method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to