kosiew commented on code in PR #17021:
URL: https://github.com/apache/datafusion/pull/17021#discussion_r2284283586


##########
datafusion-cli/src/cli_context.rs:
##########
@@ -93,6 +156,128 @@ impl CliSessionContext for SessionContext {
         &self,
         plan: LogicalPlan,
     ) -> Result<DataFrame, DataFusionError> {
-        self.execute_logical_plan(plan).await
+        SessionContext::execute_logical_plan(self, plan).await
+    }
+}
+
+/// Session context used by the CLI with memory profiling support.
+pub struct ReplSessionContext {
+    ctx: SessionContext,
+    memory_profiling: AtomicBool,
+    base_memory_pool: Arc<dyn MemoryPool>,
+    tracked_memory_pool: RwLock<Option<Arc<dyn TrackedPool>>>,
+    top_memory_consumers: usize,
+}
+
+impl ReplSessionContext {
+    pub fn new(
+        ctx: SessionContext,
+        base_memory_pool: Arc<dyn MemoryPool>,
+        top_memory_consumers: usize,
+    ) -> Self {
+        Self {
+            ctx,
+            memory_profiling: AtomicBool::new(false),
+            base_memory_pool,
+            tracked_memory_pool: RwLock::new(None),
+            top_memory_consumers,
+        }
+    }
+}
+
+#[async_trait::async_trait]
+impl CliSessionContext for ReplSessionContext {
+    fn task_ctx(&self) -> Arc<TaskContext> {
+        self.ctx.task_ctx()
+    }
+
+    fn session_state(&self) -> SessionState {
+        self.ctx.state()
+    }
+
+    fn register_object_store(
+        &self,
+        url: &url::Url,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore + 'static>> {
+        self.ctx.register_object_store(url, object_store)
+    }
+
+    fn register_table_options_extension_from_scheme(&self, scheme: &str) {
+        match scheme {
+            // For Amazon S3 or Alibaba Cloud OSS
+            "s3" | "oss" | "cos" => self
+                .ctx
+                .register_table_options_extension(AwsOptions::default()),
+            // For Google Cloud Storage
+            "gs" | "gcs" => self
+                .ctx
+                .register_table_options_extension(GcpOptions::default()),
+            // For unsupported schemes, do nothing:
+            _ => {}
+        }
+    }
+
+    async fn execute_logical_plan(
+        &self,
+        plan: LogicalPlan,
+    ) -> Result<DataFrame, DataFusionError> {
+        self.ctx.execute_logical_plan(plan).await
+    }
+
+    fn memory_profiling(&self) -> bool {
+        self.memory_profiling.load(Ordering::Relaxed)
+    }
+
+    fn set_memory_profiling(&self, enable: bool) {
+        if enable {
+            if self.top_memory_consumers == 0 {
+                return;
+            }
+            if self.memory_profiling.swap(true, Ordering::Relaxed) {
+                return;
+            }
+            let tracked = Arc::new(TrackConsumersPool::new(

Review Comment:
   Simplified as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to