This is an automated email from the ASF dual-hosted git repository.

xikai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new b02bac2b feat: dist sql analyze (#1260)
b02bac2b is described below

commit b02bac2ba7475a76157487b102e36c85b0c2b5f9
Author: 鲍金日 <[email protected]>
AuthorDate: Thu Dec 28 09:59:41 2023 +0800

    feat: dist sql analyze (#1260)
    
    ## Rationale
    Currently, the analyze sql can not obtain detailed metrics of partitioned
    table.
    
    ## Detailed Changes
    Return metrics of partitioned table to remote client and then collect
    metrics in client.
    
    ## Test Plan
    - Existing tests
    - add new integration tests for explain analyze
---
 Cargo.lock                                         |  36 ++--
 Cargo.toml                                         |   2 +-
 df_engine_extensions/src/dist_sql_query/mod.rs     |  21 ++-
 .../src/dist_sql_query/physical_plan.rs            |  47 ++++-
 .../src/dist_sql_query/resolver.rs                 |  11 +-
 .../src/dist_sql_query/test_util.rs                |   6 +-
 .../cases/env/cluster/ddl/partition_table.result   |  17 ++
 .../cases/env/cluster/ddl/partition_table.sql      |   9 +
 query_engine/src/datafusion_impl/executor.rs       |   4 +-
 .../src/datafusion_impl/physical_planner.rs        |   6 +-
 query_engine/src/datafusion_impl/task_context.rs   |  10 +-
 query_engine/src/executor.rs                       |   4 +-
 query_engine/src/physical_planner.rs               |   4 +-
 remote_engine_client/src/client.rs                 | 117 +++++++------
 server/src/grpc/remote_engine_service/mod.rs       | 189 ++++++++++++++-------
 table_engine/src/partition/rule/key.rs             |   8 +-
 table_engine/src/provider.rs                       |   2 +-
 table_engine/src/remote/model.rs                   |   6 +
 18 files changed, 329 insertions(+), 170 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 5b41170a..e32493fe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -96,7 +96,7 @@ dependencies = [
  "atomic_enum",
  "base64 0.13.1",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "codec",
  "common_types",
  "datafusion",
@@ -1345,7 +1345,7 @@ dependencies = [
 [[package]]
 name = "ceresdbproto"
 version = "1.0.23"
-source = 
"git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05#2c60e0591b6066957c80e7d6ae97cf53ccd591e1";
+source = 
"git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4#d849fa44e29ea04c7d99c082a38efb8ce5200d5e";
 dependencies = [
  "prost",
  "protoc-bin-vendored",
@@ -1528,7 +1528,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "common_types",
  "etcd-client",
  "future_ext",
@@ -1606,7 +1606,7 @@ dependencies = [
  "arrow 43.0.0",
  "arrow_ext",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "chrono",
  "datafusion",
  "hash_ext",
@@ -2362,7 +2362,7 @@ dependencies = [
  "async-recursion",
  "async-trait",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "common_types",
  "datafusion",
  "datafusion-proto",
@@ -3916,7 +3916,7 @@ name = "meta_client"
 version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "common_types",
  "futures 0.3.28",
  "generic_error",
@@ -4441,7 +4441,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
  "bytes",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "chrono",
  "clru",
  "crc",
@@ -5318,7 +5318,7 @@ dependencies = [
  "async-trait",
  "bytes",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "clru",
  "cluster",
  "common_types",
@@ -5445,7 +5445,7 @@ dependencies = [
  "arrow 43.0.0",
  "async-trait",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "cluster",
  "codec",
  "common_types",
@@ -5756,7 +5756,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "arrow_ext",
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "common_types",
  "futures 0.3.28",
  "generic_error",
@@ -5885,7 +5885,7 @@ name = "router"
 version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "cluster",
  "common_types",
  "generic_error",
@@ -6260,7 +6260,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "clru",
  "cluster",
  "common_types",
@@ -6786,7 +6786,7 @@ dependencies = [
  "async-trait",
  "bytes_ext",
  "catalog",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "codec",
  "common_types",
  "futures 0.3.28",
@@ -6808,7 +6808,7 @@ dependencies = [
  "arrow_ext",
  "async-trait",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "common_types",
  "datafusion",
  "datafusion-proto",
@@ -7011,7 +7011,7 @@ dependencies = [
 name = "time_ext"
 version = "1.2.6-alpha"
 dependencies = [
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "chrono",
  "common_types",
  "macros",
@@ -7523,8 +7523,8 @@ version = "1.6.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
 dependencies = [
- "cfg-if 1.0.0",
- "rand 0.8.5",
+ "cfg-if 0.1.10",
+ "rand 0.3.23",
  "static_assertions",
 ]
 
@@ -7663,7 +7663,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "async-trait",
  "bytes_ext",
- "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
  "chrono",
  "codec",
  "common_types",
diff --git a/Cargo.toml b/Cargo.toml
index 50875c3c..a01a7222 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,7 +94,7 @@ bytes = "1"
 bytes_ext = { path = "components/bytes_ext" }
 catalog = { path = "catalog" }
 catalog_impls = { path = "catalog_impls" }
-ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git";, rev = 
"2c60e05" }
+ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git";, rev = 
"d849fa4" }
 codec = { path = "components/codec" }
 chrono = "0.4"
 clap = "3.0"
diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs 
b/df_engine_extensions/src/dist_sql_query/mod.rs
index 80db7be0..4bbf6b36 100644
--- a/df_engine_extensions/src/dist_sql_query/mod.rs
+++ b/df_engine_extensions/src/dist_sql_query/mod.rs
@@ -12,7 +12,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::{fmt, sync::Arc};
+use std::{
+    fmt,
+    sync::{Arc, Mutex},
+};
 
 use async_trait::async_trait;
 use common_types::projected_schema::ProjectedSchema;
@@ -35,8 +38,8 @@ pub mod test_util;
 pub trait RemotePhysicalPlanExecutor: fmt::Debug + Send + Sync + 'static {
     fn execute(
         &self,
+        task_context: RemoteTaskContext,
         table: TableIdentifier,
-        task_context: &TaskContext,
         plan: Arc<dyn ExecutionPlan>,
     ) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>>;
 }
@@ -58,6 +61,20 @@ pub trait ExecutableScanBuilder: fmt::Debug + Send + Sync + 
'static {
 
 type ExecutableScanBuilderRef = Box<dyn ExecutableScanBuilder>;
 
+pub struct RemoteTaskContext {
+    pub task_ctx: Arc<TaskContext>,
+    pub remote_metrics: Arc<Mutex<Option<String>>>,
+}
+
+impl RemoteTaskContext {
+    pub fn new(task_ctx: Arc<TaskContext>, remote_metrics: 
Arc<Mutex<Option<String>>>) -> Self {
+        Self {
+            task_ctx,
+            remote_metrics,
+        }
+    }
+}
+
 #[derive(Clone)]
 pub struct TableScanContext {
     pub batch_size: usize,
diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs 
b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index 9825227c..0dbaf415 100644
--- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -18,7 +18,7 @@ use std::{
     any::Any,
     fmt,
     pin::Pin,
-    sync::Arc,
+    sync::{Arc, Mutex},
     task::{Context, Poll},
     time::{Duration, Instant},
 };
@@ -46,7 +46,7 @@ use futures::{future::BoxFuture, FutureExt, Stream, 
StreamExt};
 use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
 use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, 
TraceMetricWhenDrop};
 
-use crate::dist_sql_query::{RemotePhysicalPlanExecutor, TableScanContext};
+use crate::dist_sql_query::{RemotePhysicalPlanExecutor, RemoteTaskContext, 
TableScanContext};
 
 /// Placeholder of partitioned table's scan plan
 /// It is inexecutable actually and just for carrying the necessary information
@@ -148,6 +148,7 @@ pub(crate) struct ResolvedPartitionedScan {
     pub remote_exec_ctx: Arc<RemoteExecContext>,
     pub pushdown_continue: bool,
     pub metrics_collector: MetricsCollector,
+    pub is_analyze: bool,
 }
 
 impl ResolvedPartitionedScan {
@@ -155,24 +156,27 @@ impl ResolvedPartitionedScan {
         remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
         sub_table_plan_ctxs: Vec<SubTablePlanContext>,
         metrics_collector: MetricsCollector,
+        is_analyze: bool,
     ) -> Self {
         let remote_exec_ctx = Arc::new(RemoteExecContext {
             executor: remote_executor,
             plan_ctxs: sub_table_plan_ctxs,
         });
 
-        Self::new_with_details(remote_exec_ctx, true, metrics_collector)
+        Self::new_with_details(remote_exec_ctx, true, metrics_collector, 
is_analyze)
     }
 
     pub fn new_with_details(
         remote_exec_ctx: Arc<RemoteExecContext>,
         pushdown_continue: bool,
         metrics_collector: MetricsCollector,
+        is_analyze: bool,
     ) -> Self {
         Self {
             remote_exec_ctx,
             pushdown_continue,
             metrics_collector,
+            is_analyze,
         }
     }
 
@@ -181,6 +185,7 @@ impl ResolvedPartitionedScan {
             remote_exec_ctx: self.remote_exec_ctx.clone(),
             pushdown_continue: false,
             metrics_collector: self.metrics_collector.clone(),
+            is_analyze: self.is_analyze,
         })
     }
 
@@ -216,6 +221,7 @@ impl ResolvedPartitionedScan {
                         table: plan_ctx.table.clone(),
                         plan: extended_plan,
                         metrics_collector: plan_ctx.metrics_collector.clone(),
+                        remote_metrics: plan_ctx.remote_metrics.clone(),
                     })
             })
             .collect::<DfResult<Vec<_>>>()?;
@@ -228,6 +234,7 @@ impl ResolvedPartitionedScan {
             remote_exec_ctx,
             can_push_down_more,
             self.metrics_collector.clone(),
+            self.is_analyze,
         );
 
         Ok(Arc::new(plan))
@@ -257,6 +264,7 @@ pub(crate) struct SubTablePlanContext {
     table: TableIdentifier,
     plan: Arc<dyn ExecutionPlan>,
     metrics_collector: MetricsCollector,
+    remote_metrics: Arc<Mutex<Option<String>>>,
 }
 
 impl SubTablePlanContext {
@@ -269,6 +277,7 @@ impl SubTablePlanContext {
             table,
             plan,
             metrics_collector,
+            remote_metrics: Arc::new(Mutex::new(None)),
         }
     }
 }
@@ -296,6 +305,12 @@ impl ExecutionPlan for ResolvedPartitionedScan {
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        // If this is a analyze plan, we should not collect metrics of children
+        // which have been send to remote, So we just return empty children.
+        if self.is_analyze {
+            return vec![];
+        }
+
         self.remote_exec_ctx
             .plan_ctxs
             .iter()
@@ -328,13 +343,17 @@ impl ExecutionPlan for ResolvedPartitionedScan {
             table: sub_table,
             plan,
             metrics_collector,
+            remote_metrics,
         } = &self.remote_exec_ctx.plan_ctxs[partition];
 
+        let remote_task_ctx = RemoteTaskContext::new(context, 
remote_metrics.clone());
+
         // Send plan for remote execution.
-        let stream_future =
-            self.remote_exec_ctx
-                .executor
-                .execute(sub_table.clone(), &context, plan.clone())?;
+        let stream_future = self.remote_exec_ctx.executor.execute(
+            remote_task_ctx,
+            sub_table.clone(),
+            plan.clone(),
+        )?;
         let record_stream =
             PartitionedScanStream::new(stream_future, plan.schema(), 
metrics_collector.clone());
 
@@ -350,7 +369,18 @@ impl ExecutionPlan for ResolvedPartitionedScan {
 
         let mut format_visitor = FormatCollectorVisitor::default();
         self.metrics_collector.visit(&mut format_visitor);
-        let metrics_desc = format_visitor.into_string();
+        let mut metrics_desc = format_visitor.into_string();
+
+        // collect metrics from remote
+        for sub_table_ctx in &self.remote_exec_ctx.plan_ctxs {
+            if let Some(remote_metrics) = 
sub_table_ctx.remote_metrics.lock().unwrap().take() {
+                metrics_desc.push_str(&format!(
+                    "\n{}:\n{}",
+                    sub_table_ctx.table.table, remote_metrics
+                ));
+            }
+        }
+
         metric_set.push(Arc::new(Metric::new(
             MetricValue::Count {
                 name: format!("\n{metrics_desc}").into(),
@@ -358,7 +388,6 @@ impl ExecutionPlan for ResolvedPartitionedScan {
             },
             None,
         )));
-
         Some(metric_set)
     }
 }
diff --git a/df_engine_extensions/src/dist_sql_query/resolver.rs 
b/df_engine_extensions/src/dist_sql_query/resolver.rs
index 951ba88d..c48bfe53 100644
--- a/df_engine_extensions/src/dist_sql_query/resolver.rs
+++ b/df_engine_extensions/src/dist_sql_query/resolver.rs
@@ -18,7 +18,7 @@ use async_recursion::async_recursion;
 use catalog::manager::ManagerRef as CatalogManagerRef;
 use datafusion::{
     error::{DataFusionError, Result as DfResult},
-    physical_plan::ExecutionPlan,
+    physical_plan::{analyze::AnalyzeExec, ExecutionPlan},
 };
 use table_engine::{remote::model::TableIdentifier, table::TableRef};
 
@@ -99,7 +99,10 @@ impl Resolver {
         &self,
         plan: Arc<dyn ExecutionPlan>,
     ) -> DfResult<Arc<dyn ExecutionPlan>> {
-        let resolved_plan = self.resolve_partitioned_scan_internal(plan)?;
+        // Check if this plan is `AnalyzeExec`, if it is, we should collect 
metrics.
+        let is_analyze = plan.as_any().is::<AnalyzeExec>();
+
+        let resolved_plan = self.resolve_partitioned_scan_internal(plan, 
is_analyze)?;
         PUSH_DOWN_PLAN_COUNTER
             .with_label_values(&["remote_scan"])
             .inc();
@@ -117,6 +120,7 @@ impl Resolver {
     pub fn resolve_partitioned_scan_internal(
         &self,
         plan: Arc<dyn ExecutionPlan>,
+        is_analyze: bool,
     ) -> DfResult<Arc<dyn ExecutionPlan>> {
         // Leave node, let's resolve it and return.
         if let Some(unresolved) = 
plan.as_any().downcast_ref::<UnresolvedPartitionedScan>() {
@@ -139,6 +143,7 @@ impl Resolver {
                 self.remote_executor.clone(),
                 remote_plans,
                 metrics_collector,
+                is_analyze,
             )));
         }
 
@@ -151,7 +156,7 @@ impl Resolver {
         // Resolve children if exist.
         let mut new_children = Vec::with_capacity(children.len());
         for child in children {
-            let child = self.resolve_partitioned_scan_internal(child)?;
+            let child = self.resolve_partitioned_scan_internal(child, 
is_analyze)?;
 
             new_children.push(child);
         }
diff --git a/df_engine_extensions/src/dist_sql_query/test_util.rs 
b/df_engine_extensions/src/dist_sql_query/test_util.rs
index 77584a9f..d70f9ec2 100644
--- a/df_engine_extensions/src/dist_sql_query/test_util.rs
+++ b/df_engine_extensions/src/dist_sql_query/test_util.rs
@@ -29,7 +29,7 @@ use common_types::{
 };
 use datafusion::{
     error::{DataFusionError, Result as DfResult},
-    execution::{FunctionRegistry, TaskContext},
+    execution::FunctionRegistry,
     logical_expr::{expr_fn, Literal, Operator},
     physical_plan::{
         aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -56,7 +56,7 @@ use trace_metric::MetricsCollector;
 use crate::dist_sql_query::{
     physical_plan::{PartitionedScanStream, UnresolvedPartitionedScan, 
UnresolvedSubTableScan},
     resolver::Resolver,
-    ExecutableScanBuilder, RemotePhysicalPlanExecutor, TableScanContext,
+    ExecutableScanBuilder, RemotePhysicalPlanExecutor, RemoteTaskContext, 
TableScanContext,
 };
 
 // Test context
@@ -504,8 +504,8 @@ struct MockRemotePhysicalPlanExecutor;
 impl RemotePhysicalPlanExecutor for MockRemotePhysicalPlanExecutor {
     fn execute(
         &self,
+        _task_context: RemoteTaskContext,
         _table: TableIdentifier,
-        _task_context: &TaskContext,
         _plan: Arc<dyn ExecutionPlan>,
     ) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>> {
         unimplemented!()
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result 
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 81502d1c..18e023c0 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -78,6 +78,23 @@ 
UInt64(9923681778193615344),Timestamp(1651737067000),String("ceresdb8"),Int32(0)
 
UInt64(4860320137932382618),Timestamp(1651737067000),String("ceresdb9"),Int32(0),Double(109.0),
 
 
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
+
+plan_type,plan,
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n    
__partition_table_t_1:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, 
metrics=[output_rows=1, elapsed_compute=xxs]\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, metrics=[\nPredicate { exprs:[name 
= Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_st [...]
+
+
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
+EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
+
+plan_type,plan,
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n    __partition_table_t_x:\n        
poll_duration=xxs\n        total_duration=xxs\n        wait_duration=xxs\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_x:\n [...]
+
+
 ALTER TABLE partition_table_t ADD COLUMN (b string);
 
 affected_rows: 0
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql 
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index 59a1dd2a..46be8e1b 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -35,6 +35,15 @@ SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2
 
 SELECT * from partition_table_t where name in ("ceresdb5", "ceresdb6", 
"ceresdb7","ceresdb8", "ceresdb9", "ceresdb10") order by name;
 
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
+
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
+EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
+
 ALTER TABLE partition_table_t ADD COLUMN (b string);
 
 INSERT INTO partition_table_t (t, id, name, value) VALUES (1651737067000, 10, 
"ceresdb0", 100);
diff --git a/query_engine/src/datafusion_impl/executor.rs 
b/query_engine/src/datafusion_impl/executor.rs
index 0412c1d1..eb75c0da 100644
--- a/query_engine/src/datafusion_impl/executor.rs
+++ b/query_engine/src/datafusion_impl/executor.rs
@@ -29,7 +29,7 @@ use crate::{
     },
     error::*,
     executor::Executor,
-    physical_planner::{PhysicalPlanPtr, TaskExecContext},
+    physical_planner::{PhysicalPlanRef, TaskExecContext},
 };
 
 #[derive(Debug, Clone)]
@@ -68,7 +68,7 @@ impl Executor for DatafusionExecutorImpl {
     async fn execute(
         &self,
         ctx: &Context,
-        physical_plan: PhysicalPlanPtr,
+        physical_plan: PhysicalPlanRef,
     ) -> Result<SendableRecordBatchStream> {
         debug!(
             "DatafusionExecutorImpl begin to execute plan, request_id:{}, 
physical_plan:{:?}",
diff --git a/query_engine/src/datafusion_impl/physical_planner.rs 
b/query_engine/src/datafusion_impl/physical_planner.rs
index 50b302e5..58287339 100644
--- a/query_engine/src/datafusion_impl/physical_planner.rs
+++ b/query_engine/src/datafusion_impl/physical_planner.rs
@@ -26,7 +26,7 @@ use crate::{
         DfContextBuilder,
     },
     error::*,
-    physical_planner::{PhysicalPlanPtr, PhysicalPlanner},
+    physical_planner::{PhysicalPlanRef, PhysicalPlanner},
 };
 
 /// Physical planner based on datafusion
@@ -58,7 +58,7 @@ impl DatafusionPhysicalPlannerImpl {
 #[async_trait]
 impl PhysicalPlanner for DatafusionPhysicalPlannerImpl {
     // TODO: we should modify `QueryPlan` to support create remote plan here.
-    async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) -> 
Result<PhysicalPlanPtr> {
+    async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) -> 
Result<PhysicalPlanRef> {
         // Register catalogs to datafusion execution context.
         let catalogs = 
CatalogProviderAdapter::new_adapters(logical_plan.tables.clone());
         // TODO: maybe we should not build `SessionContext` in each physical 
plan's
@@ -88,6 +88,6 @@ impl PhysicalPlanner for DatafusionPhysicalPlannerImpl {
         };
         let physical_plan = DataFusionPhysicalPlanAdapter::new(typed_plan);
 
-        Ok(Box::new(physical_plan))
+        Ok(Arc::new(physical_plan))
     }
 }
diff --git a/query_engine/src/datafusion_impl/task_context.rs 
b/query_engine/src/datafusion_impl/task_context.rs
index 5946f3d2..f19c5dde 100644
--- a/query_engine/src/datafusion_impl/task_context.rs
+++ b/query_engine/src/datafusion_impl/task_context.rs
@@ -33,14 +33,14 @@ use datafusion_proto::{
 };
 use df_engine_extensions::dist_sql_query::{
     resolver::Resolver, ExecutableScanBuilder, RemotePhysicalPlanExecutor,
-    RemotePhysicalPlanExecutorRef, TableScanContext,
+    RemotePhysicalPlanExecutorRef, RemoteTaskContext, TableScanContext,
 };
 use futures::future::BoxFuture;
 use generic_error::BoxError;
 use prost::Message;
 use snafu::ResultExt;
 use table_engine::{
-    provider::{CeresdbOptions, ScanTable},
+    provider::{CeresdbOptions, ScanTable, SCAN_TABLE_METRICS_COLLECTOR_NAME},
     remote::{
         model::{
             ExecContext, ExecutePlanRequest, PhysicalPlan, 
RemoteExecuteRequest, TableIdentifier,
@@ -172,12 +172,13 @@ struct RemotePhysicalPlanExecutorImpl {
 impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl {
     fn execute(
         &self,
+        task_context: RemoteTaskContext,
         table: TableIdentifier,
-        task_context: &TaskContext,
         plan: Arc<dyn ExecutionPlan>,
     ) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>> {
         // Get the custom context to rebuild execution context.
         let ceresdb_options = task_context
+            .task_ctx
             .session_config()
             .options()
             .extensions
@@ -223,6 +224,7 @@ impl RemotePhysicalPlanExecutor for 
RemotePhysicalPlanExecutorImpl {
             let request = ExecutePlanRequest {
                 plan_schema,
                 remote_request,
+                remote_metrics: task_context.remote_metrics,
             };
 
             // Remote execute.
@@ -288,7 +290,7 @@ impl ExecutableScanBuilder for ExecutableScanBuilderImpl {
             opts: read_opts,
             projected_schema: ctx.projected_schema,
             predicate: ctx.predicate,
-            metrics_collector: MetricsCollector::default(),
+            metrics_collector: 
MetricsCollector::new(SCAN_TABLE_METRICS_COLLECTOR_NAME.to_string()),
         };
 
         let mut scan = ScanTable::new(table, read_request);
diff --git a/query_engine/src/executor.rs b/query_engine/src/executor.rs
index bf32a24b..92b0a714 100644
--- a/query_engine/src/executor.rs
+++ b/query_engine/src/executor.rs
@@ -19,7 +19,7 @@ use std::{fmt, sync::Arc};
 use async_trait::async_trait;
 use table_engine::stream::SendableRecordBatchStream;
 
-use crate::{context::Context, error::*, physical_planner::PhysicalPlanPtr};
+use crate::{context::Context, error::*, physical_planner::PhysicalPlanRef};
 
 /// Query executor
 ///
@@ -33,7 +33,7 @@ pub trait Executor: fmt::Debug + Send + Sync + 'static {
     async fn execute(
         &self,
         ctx: &Context,
-        physical_plan: PhysicalPlanPtr,
+        physical_plan: PhysicalPlanRef,
     ) -> Result<SendableRecordBatchStream>;
 }
 
diff --git a/query_engine/src/physical_planner.rs 
b/query_engine/src/physical_planner.rs
index c32975e1..9233362f 100644
--- a/query_engine/src/physical_planner.rs
+++ b/query_engine/src/physical_planner.rs
@@ -29,7 +29,7 @@ use crate::{context::Context, 
datafusion_impl::task_context::DatafusionTaskExecC
 #[async_trait]
 pub trait PhysicalPlanner: fmt::Debug + Send + Sync + 'static {
     /// Create a physical plan from a logical plan
-    async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) -> 
Result<PhysicalPlanPtr>;
+    async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) -> 
Result<PhysicalPlanRef>;
 }
 
 pub type PhysicalPlannerRef = Arc<dyn PhysicalPlanner>;
@@ -45,7 +45,7 @@ pub trait PhysicalPlan: std::fmt::Debug + Sync + Send + 
'static {
     fn metrics_to_string(&self) -> String;
 }
 
-pub type PhysicalPlanPtr = Box<dyn PhysicalPlan>;
+pub type PhysicalPlanRef = Arc<dyn PhysicalPlan>;
 
 /// Task context, just a wrapper of datafusion task context now
 #[derive(Default)]
diff --git a/remote_engine_client/src/client.rs 
b/remote_engine_client/src/client.rs
index 80e47ad6..456de914 100644
--- a/remote_engine_client/src/client.rs
+++ b/remote_engine_client/src/client.rs
@@ -17,7 +17,7 @@
 use std::{
     collections::HashMap,
     pin::Pin,
-    sync::Arc,
+    sync::{Arc, Mutex},
     task::{Context, Poll},
 };
 
@@ -26,8 +26,12 @@ use arrow_ext::{
     ipc::{CompressOptions, CompressionMethod},
 };
 use ceresdbproto::{
-    remote_engine::{self, read_response::Output::Arrow, 
remote_engine_service_client::*},
-    storage::arrow_payload,
+    remote_engine::{
+        self,
+        read_response::Output::{Arrow, Metric},
+        remote_engine_service_client::*,
+    },
+    storage::{arrow_payload, ArrowPayload},
 };
 use common_types::{record_batch::RecordBatch, schema::RecordSchema};
 use futures::{Stream, StreamExt};
@@ -115,8 +119,12 @@ impl Client {
         // When success to get the stream, table has been found in remote, not 
need to
         // evict cache entry.
         let response = response.into_inner();
-        let remote_read_record_batch_stream =
-            ClientReadRecordBatchStream::new(table_ident, response, 
record_schema);
+        let remote_read_record_batch_stream = ClientReadRecordBatchStream::new(
+            table_ident,
+            response,
+            record_schema,
+            Default::default(),
+        );
 
         Ok(remote_read_record_batch_stream)
     }
@@ -481,8 +489,12 @@ impl Client {
         // When success to get the stream, table has been found in remote, not 
need to
         // evict cache entry.
         let response = response.into_inner();
-        let remote_execute_plan_stream =
-            ClientReadRecordBatchStream::new(table_ident, response, 
plan_schema);
+        let remote_execute_plan_stream = ClientReadRecordBatchStream::new(
+            table_ident,
+            response,
+            plan_schema,
+            request.remote_metrics,
+        );
 
         Ok(remote_execute_plan_stream)
     }
@@ -500,6 +512,7 @@ pub struct ClientReadRecordBatchStream {
     pub table_ident: TableIdentifier,
     pub response_stream: Streaming<remote_engine::ReadResponse>,
     pub record_schema: RecordSchema,
+    pub remote_metrics: Arc<Mutex<Option<String>>>,
 }
 
 impl ClientReadRecordBatchStream {
@@ -507,11 +520,13 @@ impl ClientReadRecordBatchStream {
         table_ident: TableIdentifier,
         response_stream: Streaming<remote_engine::ReadResponse>,
         record_schema: RecordSchema,
+        remote_metrics: Arc<Mutex<Option<String>>>,
     ) -> Self {
         Self {
             table_ident,
             response_stream,
             record_schema,
+            remote_metrics,
         }
     }
 }
@@ -534,52 +549,14 @@ impl Stream for ClientReadRecordBatchStream {
 
                 match response.output {
                     None => Poll::Ready(None),
-                    Some(v) => {
-                        let record_batch = match v {
-                            Arrow(mut v) => {
-                                if v.record_batches.len() != 1 {
-                                    return Poll::Ready(Some(
-                                        InvalidRecordBatchNumber {
-                                            batch_num: v.record_batches.len(),
-                                        }
-                                        .fail(),
-                                    ));
-                                }
-
-                                let compression = match v.compression() {
-                                    arrow_payload::Compression::None => 
CompressionMethod::None,
-                                    arrow_payload::Compression::Zstd => 
CompressionMethod::Zstd,
-                                };
-
-                                ipc::decode_record_batches(
-                                    v.record_batches.swap_remove(0),
-                                    compression,
-                                )
-                                .map_err(|e| Box::new(e) as _)
-                                .context(Convert {
-                                    msg: "decode read record batch",
-                                })
-                                .and_then(
-                                    |mut record_batch_vec| {
-                                        ensure!(
-                                            record_batch_vec.len() == 1,
-                                            InvalidRecordBatchNumber {
-                                                batch_num: 
record_batch_vec.len()
-                                            }
-                                        );
-                                        record_batch_vec
-                                            .swap_remove(0)
-                                            .try_into()
-                                            .map_err(|e| Box::new(e) as _)
-                                            .context(Convert {
-                                                msg: "convert read record 
batch",
-                                            })
-                                    },
-                                )
-                            }
-                        };
-                        Poll::Ready(Some(record_batch))
-                    }
+                    Some(v) => match v {
+                        Arrow(v) => 
Poll::Ready(Some(convert_arrow_payload(v))),
+                        Metric(v) => {
+                            let mut remote_metrics = 
this.remote_metrics.lock().unwrap();
+                            *remote_metrics = Some(v.metric);
+                            Poll::Ready(None)
+                        }
+                    },
                 }
             }
 
@@ -594,3 +571,37 @@ impl Stream for ClientReadRecordBatchStream {
         }
     }
 }
+
+fn convert_arrow_payload(mut v: ArrowPayload) -> Result<RecordBatch> {
+    if v.record_batches.len() != 1 {
+        return InvalidRecordBatchNumber {
+            batch_num: v.record_batches.len(),
+        }
+        .fail();
+    }
+    let compression = match v.compression() {
+        arrow_payload::Compression::None => CompressionMethod::None,
+        arrow_payload::Compression::Zstd => CompressionMethod::Zstd,
+    };
+
+    ipc::decode_record_batches(v.record_batches.swap_remove(0), compression)
+        .map_err(|e| Box::new(e) as _)
+        .context(Convert {
+            msg: "decode read record batch",
+        })
+        .and_then(|mut record_batch_vec| {
+            ensure!(
+                record_batch_vec.len() == 1,
+                InvalidRecordBatchNumber {
+                    batch_num: record_batch_vec.len()
+                }
+            );
+            record_batch_vec
+                .swap_remove(0)
+                .try_into()
+                .map_err(|e| Box::new(e) as _)
+                .context(Convert {
+                    msg: "convert read record batch",
+                })
+        })
+}
diff --git a/server/src/grpc/remote_engine_service/mod.rs 
b/server/src/grpc/remote_engine_service/mod.rs
index 3d2963a4..11359300 100644
--- a/server/src/grpc/remote_engine_service/mod.rs
+++ b/server/src/grpc/remote_engine_service/mod.rs
@@ -27,11 +27,13 @@ use async_trait::async_trait;
 use catalog::{manager::ManagerRef, schema::SchemaRef};
 use ceresdbproto::{
     remote_engine::{
-        execute_plan_request, read_response::Output::Arrow,
-        remote_engine_service_server::RemoteEngineService, row_group, 
AlterTableOptionsRequest,
-        AlterTableOptionsResponse, AlterTableSchemaRequest, 
AlterTableSchemaResponse, ExecContext,
-        ExecutePlanRequest, GetTableInfoRequest, GetTableInfoResponse, 
ReadRequest, ReadResponse,
-        WriteBatchRequest, WriteRequest, WriteResponse,
+        execute_plan_request,
+        read_response::Output::{Arrow, Metric},
+        remote_engine_service_server::RemoteEngineService,
+        row_group, AlterTableOptionsRequest, AlterTableOptionsResponse, 
AlterTableSchemaRequest,
+        AlterTableSchemaResponse, ExecContext, ExecutePlanRequest, 
GetTableInfoRequest,
+        GetTableInfoResponse, MetricPayload, ReadRequest, ReadResponse, 
WriteBatchRequest,
+        WriteRequest, WriteResponse,
     },
     storage::{arrow_payload, ArrowPayload},
 };
@@ -50,6 +52,7 @@ use proxy::{
 use query_engine::{
     context::Context as QueryContext,
     datafusion_impl::physical_plan::{DataFusionPhysicalPlanAdapter, TypedPlan},
+    physical_planner::PhysicalPlanRef,
     QueryEngineRef, QueryEngineType,
 };
 use snafu::{OptionExt, ResultExt};
@@ -82,6 +85,12 @@ pub mod error;
 const STREAM_QUERY_CHANNEL_LEN: usize = 200;
 const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024;
 
+#[derive(Debug, Clone)]
+pub enum RecordBatchWithMetric {
+    RecordBatch(RecordBatch),
+    Metric(String),
+}
+
 #[derive(Clone, Debug, PartialEq, Eq, Hash)]
 pub struct StreamReadReqKey {
     table: String,
@@ -192,46 +201,94 @@ impl<M: MetricCollector> Drop for StreamWithMetric<M> {
     }
 }
 
+struct RemoteExecStream {
+    inner: BoxStream<'static, Result<RecordBatch>>,
+    physical_plan: Option<PhysicalPlanRef>,
+}
+
+impl RemoteExecStream {
+    fn new(
+        inner: BoxStream<'static, Result<RecordBatch>>,
+        physical_plan: Option<PhysicalPlanRef>,
+    ) -> Self {
+        Self {
+            inner,
+            physical_plan,
+        }
+    }
+}
+
+impl Stream for RemoteExecStream {
+    type Item = Result<RecordBatchWithMetric>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        let this = self.get_mut();
+        match this.inner.poll_next_unpin(cx) {
+            Poll::Ready(Some(res)) => {
+                Poll::Ready(Some(res.map(RecordBatchWithMetric::RecordBatch)))
+            }
+            Poll::Ready(None) => match &this.physical_plan {
+                Some(physical_plan) => {
+                    let metrics = physical_plan.metrics_to_string();
+                    this.physical_plan = None;
+                    
Poll::Ready(Some(Ok(RecordBatchWithMetric::Metric(metrics))))
+                }
+                None => Poll::Ready(None),
+            },
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
 macro_rules! record_stream_to_response_stream {
     ($record_stream_result:ident, $StreamType:ident) => {
         match $record_stream_result {
             Ok(stream) => {
                 let new_stream: Self::$StreamType = Box::pin(stream.map(|res| 
match res {
-                    Ok(record_batch) => {
-                        let resp = match ipc::encode_record_batch(
-                            &record_batch.into_arrow_record_batch(),
-                            CompressOptions {
-                                compress_min_length: 
DEFAULT_COMPRESS_MIN_LENGTH,
-                                method: CompressionMethod::Zstd,
-                            },
-                        )
-                        .box_err()
-                        .context(ErrWithCause {
-                            code: StatusCode::Internal,
-                            msg: "encode record batch failed",
-                        }) {
-                            Err(e) => ReadResponse {
-                                header: Some(error::build_err_header(e)),
-                                ..Default::default()
-                            },
-                            Ok(CompressOutput { payload, method }) => {
-                                let compression = match method {
-                                    CompressionMethod::None => 
arrow_payload::Compression::None,
-                                    CompressionMethod::Zstd => 
arrow_payload::Compression::Zstd,
-                                };
-
-                                ReadResponse {
-                                    header: Some(error::build_ok_header()),
-                                    output: Some(Arrow(ArrowPayload {
-                                        record_batches: vec![payload],
-                                        compression: compression as i32,
-                                    })),
+                    Ok(res) => match res {
+                        RecordBatchWithMetric::Metric(metric) => {
+                            let resp = ReadResponse {
+                                header: Some(error::build_ok_header()),
+                                output: Some(Metric(MetricPayload { metric })),
+                            };
+                            Ok(resp)
+                        }
+                        RecordBatchWithMetric::RecordBatch(record_batch) => {
+                            let resp = match ipc::encode_record_batch(
+                                &record_batch.into_arrow_record_batch(),
+                                CompressOptions {
+                                    compress_min_length: 
DEFAULT_COMPRESS_MIN_LENGTH,
+                                    method: CompressionMethod::Zstd,
+                                },
+                            )
+                            .box_err()
+                            .context(ErrWithCause {
+                                code: StatusCode::Internal,
+                                msg: "encode record batch failed",
+                            }) {
+                                Err(e) => ReadResponse {
+                                    header: Some(error::build_err_header(e)),
+                                    ..Default::default()
+                                },
+                                Ok(CompressOutput { payload, method }) => {
+                                    let compression = match method {
+                                        CompressionMethod::None => 
arrow_payload::Compression::None,
+                                        CompressionMethod::Zstd => 
arrow_payload::Compression::Zstd,
+                                    };
+
+                                    ReadResponse {
+                                        header: Some(error::build_ok_header()),
+                                        output: Some(Arrow(ArrowPayload {
+                                            record_batches: vec![payload],
+                                            compression: compression as i32,
+                                        })),
+                                    }
                                 }
-                            }
-                        };
+                            };
 
-                        Ok(resp)
-                    }
+                            Ok(resp)
+                        }
+                    },
                     Err(e) => {
                         let resp = ReadResponse {
                             header: Some(error::build_err_header(e)),
@@ -240,7 +297,6 @@ macro_rules! record_stream_to_response_stream {
                         Ok(resp)
                     }
                 }));
-
                 Ok(Response::new(new_stream))
             }
             Err(e) => {
@@ -274,7 +330,7 @@ impl RemoteEngineServiceImpl {
     async fn stream_read_internal(
         &self,
         request: Request<ReadRequest>,
-    ) -> Result<StreamWithMetric<StreamReadMetricCollector>> {
+    ) -> Result<RemoteExecStream> {
         let metric = StreamReadMetricCollector(Instant::now());
 
         let ctx = self.handler_ctx();
@@ -316,17 +372,15 @@ impl RemoteEngineServiceImpl {
             });
         }
 
-        Ok(StreamWithMetric::new(
-            Box::pin(ReceiverStream::new(rx)),
-            metric,
-        ))
+        let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), 
metric);
+        Ok(RemoteExecStream::new(Box::pin(stream), None))
     }
 
     async fn dedup_stream_read_internal(
         &self,
         query_dedup: QueryDedup,
         request: Request<ReadRequest>,
-    ) -> Result<StreamWithMetric<StreamReadMetricCollector>> {
+    ) -> Result<RemoteExecStream> {
         let metric = StreamReadMetricCollector(Instant::now());
 
         let request = request.into_inner();
@@ -371,10 +425,8 @@ impl RemoteEngineServiceImpl {
             }
         }
 
-        Ok(StreamWithMetric::new(
-            Box::pin(ReceiverStream::new(rx)),
-            metric,
-        ))
+        let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), 
metric);
+        Ok(RemoteExecStream::new(Box::pin(stream), None))
     }
 
     async fn read_and_send_dedupped_resps<K, F>(
@@ -614,7 +666,7 @@ impl RemoteEngineServiceImpl {
     async fn execute_physical_plan_internal(
         &self,
         request: Request<ExecutePlanRequest>,
-    ) -> Result<StreamWithMetric<ExecutePlanMetricCollector>> {
+    ) -> Result<RemoteExecStream> {
         let request = request.into_inner();
         let query_engine = self.instance.query_engine.clone();
         let (ctx, encoded_plan) = extract_plan_from_req(request)?;
@@ -636,10 +688,15 @@ impl RemoteEngineServiceImpl {
             ctx.timeout_ms,
         );
 
+        let physical_plan = 
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
+            encoded_plan,
+        )));
+        let physical_plan_clone = physical_plan.clone();
+
         let stream = self
             .runtimes
             .read_runtime
-            .spawn(async move { handle_execute_plan(query_ctx, encoded_plan, 
query_engine).await })
+            .spawn(async move { handle_execute_plan(query_ctx, physical_plan, 
query_engine).await })
             .await
             .box_err()
             .with_context(|| ErrWithCause {
@@ -653,14 +710,18 @@ impl RemoteEngineServiceImpl {
                 })
             });
 
-        Ok(StreamWithMetric::new(Box::pin(stream), metric))
+        let stream = StreamWithMetric::new(Box::pin(stream), metric);
+        Ok(RemoteExecStream::new(
+            Box::pin(stream),
+            Some(physical_plan_clone),
+        ))
     }
 
     async fn dedup_execute_physical_plan_internal(
         &self,
         query_dedup: QueryDedup,
         request: Request<ExecutePlanRequest>,
-    ) -> Result<StreamWithMetric<ExecutePlanMetricCollector>> {
+    ) -> Result<RemoteExecStream> {
         let request = request.into_inner();
         let query_engine = self.instance.query_engine.clone();
         let (ctx, encoded_plan) = extract_plan_from_req(request)?;
@@ -685,6 +746,12 @@ impl RemoteEngineServiceImpl {
             encoded_plan: encoded_plan.clone(),
         };
 
+        let physical_plan = 
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
+            encoded_plan,
+        )));
+
+        let physical_plan_clone = physical_plan.clone();
+
         let QueryDedup {
             config,
             physical_plan_notifiers,
@@ -696,7 +763,7 @@ impl RemoteEngineServiceImpl {
             // The first request, need to handle it, and then notify the other 
requests.
             RequestResult::First => {
                 let query = async move {
-                    handle_execute_plan(query_ctx, encoded_plan, query_engine)
+                    handle_execute_plan(query_ctx, physical_plan, query_engine)
                         .await
                         .map(PartitionedStreams::one_stream)
                 };
@@ -715,9 +782,10 @@ impl RemoteEngineServiceImpl {
             }
         }
 
-        Ok(StreamWithMetric::new(
-            Box::pin(ReceiverStream::new(rx)),
-            metric,
+        let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)), 
metric);
+        Ok(RemoteExecStream::new(
+            Box::pin(stream),
+            Some(physical_plan_clone),
         ))
     }
 
@@ -1140,14 +1208,9 @@ fn create_query_ctx(
 
 async fn handle_execute_plan(
     ctx: QueryContext,
-    encoded_plan: Vec<u8>,
+    physical_plan: PhysicalPlanRef,
     query_engine: QueryEngineRef,
 ) -> Result<SendableRecordBatchStream> {
-    // TODO: Build remote plan in physical planner.
-    let physical_plan = 
Box::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
-        encoded_plan,
-    )));
-
     // Execute plan.
     let executor = query_engine.executor();
     executor
diff --git a/table_engine/src/partition/rule/key.rs 
b/table_engine/src/partition/rule/key.rs
index d33fd88b..b8d9e87c 100644
--- a/table_engine/src/partition/rule/key.rs
+++ b/table_engine/src/partition/rule/key.rs
@@ -14,7 +14,7 @@
 
 //! Key partition rule
 
-use std::collections::{HashMap, HashSet};
+use std::collections::{BTreeSet, HashMap};
 
 use common_types::{
     datum::{Datum, DatumView},
@@ -136,8 +136,8 @@ impl KeyRule {
         &self,
         group: &[usize],
         filters: &[PartitionFilter],
-    ) -> Result<HashSet<usize>> {
-        let mut partitions = HashSet::new();
+    ) -> Result<BTreeSet<usize>> {
+        let mut partitions = BTreeSet::new();
         let expanded_group = expand_partition_keys_group(group, filters)?;
         for partition_keys in expanded_group {
             let partition = compute_partition(partition_keys.into_iter(), 
self.partition_num);
@@ -219,7 +219,7 @@ impl PartitionRule for KeyRule {
             target_partitions = target_partitions
                 .intersection(&partitions)
                 .copied()
-                .collect::<HashSet<_>>();
+                .collect::<BTreeSet<_>>();
         }
 
         Ok(target_partitions.into_iter().collect())
diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs
index 6b0c38a7..fb3cb41d 100644
--- a/table_engine/src/provider.rs
+++ b/table_engine/src/provider.rs
@@ -47,7 +47,7 @@ use crate::{
     table::{ReadOptions, ReadRequest, TableRef},
 };
 
-const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_table";
+pub const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_table";
 
 #[derive(Clone, Debug)]
 pub struct CeresdbOptions {
diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs
index 2fc6a297..bd996703 100644
--- a/table_engine/src/remote/model.rs
+++ b/table_engine/src/remote/model.rs
@@ -16,6 +16,7 @@
 
 use std::{
     collections::HashMap,
+    sync::{Arc, Mutex},
     time::{Duration, Instant},
 };
 
@@ -409,6 +410,9 @@ pub struct ExecutePlanRequest {
 
     /// Remote plan execution request
     pub remote_request: RemoteExecuteRequest,
+
+    /// Collect metrics of remote plan
+    pub remote_metrics: Arc<Mutex<Option<String>>>,
 }
 
 impl ExecutePlanRequest {
@@ -417,6 +421,7 @@ impl ExecutePlanRequest {
         plan_schema: RecordSchema,
         context: ExecContext,
         physical_plan: PhysicalPlan,
+        remote_metrics: Arc<Mutex<Option<String>>>,
     ) -> Self {
         let remote_request = RemoteExecuteRequest {
             table,
@@ -427,6 +432,7 @@ impl ExecutePlanRequest {
         Self {
             plan_schema,
             remote_request,
+            remote_metrics,
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to