alamb commented on code in PR #17266:
URL: https://github.com/apache/datafusion/pull/17266#discussion_r2327043729
##########
datafusion-cli/src/object_storage.rs:
##########
@@ -563,6 +563,592 @@ pub(crate) async fn get_object_store(
Ok(store)
}
+pub mod instrumented {
+ use core::fmt;
+ use std::{
+ cmp, default,
+ ops::AddAssign,
+ str::FromStr,
+ sync::{
+ atomic::{AtomicU8, Ordering},
+ Arc,
+ },
+ time::Duration,
+ };
+
+ use async_trait::async_trait;
+ use chrono::Utc;
+ use datafusion::{
+ common::{instant::Instant, HashMap},
+ error::DataFusionError,
+ execution::object_store::ObjectStoreRegistry,
+ };
+ use futures::stream::BoxStream;
+ use object_store::{
+ path::Path, GetOptions, GetRange, GetResult, ListResult,
MultipartUpload,
+ ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload,
PutResult,
+ Result,
+ };
+ use parking_lot::{Mutex, RwLock};
+ use url::Url;
+
+ #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
+ enum Operation {
+ Copy,
+ Delete,
+ Get,
+ Head,
+ List,
+ Put,
+ }
+
+ #[derive(Debug)]
+ struct RequestDetails {
+ op: Operation,
+ path: Path,
+ timestamp: chrono::DateTime<Utc>,
+ duration: Option<Duration>,
+ size: Option<usize>,
+ range: Option<GetRange>,
+ extra_display: Option<String>,
+ }
+
+ impl fmt::Display for RequestDetails {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let mut output_parts = vec![format!(
+ "{} operation={:?}",
+ self.timestamp.to_rfc3339(),
+ self.op
+ )];
+
+ if let Some(d) = self.duration {
+ output_parts.push(format!("duration={:.6}s", d.as_secs_f32()));
+ }
+ if let Some(s) = self.size {
+ output_parts.push(format!("size={}", s));
+ }
+ if let Some(r) = &self.range {
+ output_parts.push(format!("range: {}", r));
+ }
+ output_parts.push(format!("path={}", self.path));
+
+ if let Some(ed) = &self.extra_display {
+ output_parts.push(ed.clone());
+ }
+
+ write!(f, "{}", output_parts.join(" "))
+ }
+ }
+
+ #[derive(Default)]
+ struct RequestSummary {
+ count: usize,
+ duration_stats: Option<Stats<Duration>>,
+ size_stats: Option<Stats<usize>>,
+ }
+
+ impl RequestSummary {
+ fn push(&mut self, request: &RequestDetails) {
+ self.count += 1;
+ if let Some(dur) = request.duration {
+ self.duration_stats.get_or_insert_default().push(dur)
+ }
+ if let Some(size) = request.size {
+ self.size_stats.get_or_insert_default().push(size)
+ }
+ }
+ }
+
+ impl fmt::Display for RequestSummary {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ writeln!(f, " count: {}", self.count)?;
+
+ if let Some(dur_stats) = &self.duration_stats {
+ writeln!(f, " duration min: {:.6}s",
dur_stats.min.as_secs_f32())?;
+ writeln!(f, " duration max: {:.6}s",
dur_stats.max.as_secs_f32())?;
+ let avg = dur_stats.sum.as_secs_f32() / (self.count as f32);
+ writeln!(f, " duration avg: {:.6}s", avg)?;
+ }
+
+ if let Some(size_stats) = &self.size_stats {
+ writeln!(f, " size min: {} B", size_stats.min)?;
+ writeln!(f, " size max: {} B", size_stats.max)?;
+ let avg = size_stats.sum / self.count;
+ writeln!(f, " size avg: {} B", avg)?;
+ writeln!(f, " size sum: {} B", size_stats.sum)?;
+ }
+
+ Ok(())
+ }
+ }
+
+ struct Stats<T: Copy + Ord + AddAssign<T>> {
+ min: T,
+ max: T,
+ sum: T,
+ }
+
+ impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
+ fn push(&mut self, val: T) {
+ self.min = cmp::min(val, self.min);
+ self.max = cmp::max(val, self.max);
+ self.sum += val;
+ }
+ }
+
+ impl default::Default for Stats<Duration> {
+ fn default() -> Self {
+ Self {
+ min: Duration::MAX,
+ max: Duration::ZERO,
+ sum: Duration::ZERO,
+ }
+ }
+ }
+
+ impl default::Default for Stats<usize> {
+ fn default() -> Self {
+ Self {
+ min: usize::MAX,
+ max: usize::MIN,
+ sum: 0,
+ }
+ }
+ }
+
+ #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
+ pub enum InstrumentedObjectStoreMode {
+ #[default]
+ Disabled,
+ Summary,
+ Trace,
+ }
+
+ impl fmt::Display for InstrumentedObjectStoreMode {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{:?}", self)
+ }
+ }
+
+ impl FromStr for InstrumentedObjectStoreMode {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ match s.to_lowercase().as_str() {
+ "disabled" => Ok(Self::Disabled),
+ "summary" => Ok(Self::Summary),
+ "trace" => Ok(Self::Trace),
+ _ => Err(DataFusionError::Execution(format!("Unrecognized mode
{s}"))),
+ }
+ }
+ }
+
+ #[derive(Debug)]
+ pub struct InstrumentedObjectStore {
+ inner: Arc<dyn ObjectStore>,
+ instrument_mode: AtomicU8,
+ requests: Mutex<Vec<RequestDetails>>,
+ }
+
+ impl fmt::Display for InstrumentedObjectStore {
Review Comment:
I found it a bit confusing that `Display` also modified the object store --
maybe we could have an explicit `take/display` function 🤔
##########
datafusion-cli/src/print_options.rs:
##########
@@ -73,6 +77,8 @@ pub struct PrintOptions {
pub quiet: bool,
pub maxrows: MaxRows,
pub color: bool,
+ pub object_store_profile_mode: InstrumentedObjectStoreMode,
+ pub instrumented_registry: Arc<InstrumentedObjectStoreRegistry>,
Review Comment:
Yes, I agree it is time to encapsulate it
##########
datafusion-cli/src/object_storage.rs:
##########
@@ -563,6 +563,592 @@ pub(crate) async fn get_object_store(
Ok(store)
}
+pub mod instrumented {
Review Comment:
Yes, I agree that putting the instrumented object store into
`object_storage/instrumented.rs` seems like a good idea
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]