This is an automated email from the ASF dual-hosted git repository.
xikai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new b02bac2b feat: dist sql analyze (#1260)
b02bac2b is described below
commit b02bac2ba7475a76157487b102e36c85b0c2b5f9
Author: 鲍金日 <[email protected]>
AuthorDate: Thu Dec 28 09:59:41 2023 +0800
feat: dist sql analyze (#1260)
## Rationale
Currently, the analyze sql can not obtain detailed metrics of partitioned
table.
## Detailed Changes
Return metrics of partitioned table to remote client and then collect
metrics in client.
## Test Plan
- Existing tests
- add new integration tests for explain analyze
---
Cargo.lock | 36 ++--
Cargo.toml | 2 +-
df_engine_extensions/src/dist_sql_query/mod.rs | 21 ++-
.../src/dist_sql_query/physical_plan.rs | 47 ++++-
.../src/dist_sql_query/resolver.rs | 11 +-
.../src/dist_sql_query/test_util.rs | 6 +-
.../cases/env/cluster/ddl/partition_table.result | 17 ++
.../cases/env/cluster/ddl/partition_table.sql | 9 +
query_engine/src/datafusion_impl/executor.rs | 4 +-
.../src/datafusion_impl/physical_planner.rs | 6 +-
query_engine/src/datafusion_impl/task_context.rs | 10 +-
query_engine/src/executor.rs | 4 +-
query_engine/src/physical_planner.rs | 4 +-
remote_engine_client/src/client.rs | 117 +++++++------
server/src/grpc/remote_engine_service/mod.rs | 189 ++++++++++++++-------
table_engine/src/partition/rule/key.rs | 8 +-
table_engine/src/provider.rs | 2 +-
table_engine/src/remote/model.rs | 6 +
18 files changed, 329 insertions(+), 170 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 5b41170a..e32493fe 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -96,7 +96,7 @@ dependencies = [
"atomic_enum",
"base64 0.13.1",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"codec",
"common_types",
"datafusion",
@@ -1345,7 +1345,7 @@ dependencies = [
[[package]]
name = "ceresdbproto"
version = "1.0.23"
-source =
"git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05#2c60e0591b6066957c80e7d6ae97cf53ccd591e1"
+source =
"git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4#d849fa44e29ea04c7d99c082a38efb8ce5200d5e"
dependencies = [
"prost",
"protoc-bin-vendored",
@@ -1528,7 +1528,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"common_types",
"etcd-client",
"future_ext",
@@ -1606,7 +1606,7 @@ dependencies = [
"arrow 43.0.0",
"arrow_ext",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"chrono",
"datafusion",
"hash_ext",
@@ -2362,7 +2362,7 @@ dependencies = [
"async-recursion",
"async-trait",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"common_types",
"datafusion",
"datafusion-proto",
@@ -3916,7 +3916,7 @@ name = "meta_client"
version = "1.2.6-alpha"
dependencies = [
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"common_types",
"futures 0.3.28",
"generic_error",
@@ -4441,7 +4441,7 @@ version = "1.2.6-alpha"
dependencies = [
"async-trait",
"bytes",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"chrono",
"clru",
"crc",
@@ -5318,7 +5318,7 @@ dependencies = [
"async-trait",
"bytes",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"clru",
"cluster",
"common_types",
@@ -5445,7 +5445,7 @@ dependencies = [
"arrow 43.0.0",
"async-trait",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"cluster",
"codec",
"common_types",
@@ -5756,7 +5756,7 @@ version = "1.2.6-alpha"
dependencies = [
"arrow_ext",
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"common_types",
"futures 0.3.28",
"generic_error",
@@ -5885,7 +5885,7 @@ name = "router"
version = "1.2.6-alpha"
dependencies = [
"async-trait",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"cluster",
"common_types",
"generic_error",
@@ -6260,7 +6260,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"clru",
"cluster",
"common_types",
@@ -6786,7 +6786,7 @@ dependencies = [
"async-trait",
"bytes_ext",
"catalog",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"codec",
"common_types",
"futures 0.3.28",
@@ -6808,7 +6808,7 @@ dependencies = [
"arrow_ext",
"async-trait",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"common_types",
"datafusion",
"datafusion-proto",
@@ -7011,7 +7011,7 @@ dependencies = [
name = "time_ext"
version = "1.2.6-alpha"
dependencies = [
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"chrono",
"common_types",
"macros",
@@ -7523,8 +7523,8 @@ version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
- "cfg-if 1.0.0",
- "rand 0.8.5",
+ "cfg-if 0.1.10",
+ "rand 0.3.23",
"static_assertions",
]
@@ -7663,7 +7663,7 @@ version = "1.2.6-alpha"
dependencies = [
"async-trait",
"bytes_ext",
- "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)",
+ "ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
"chrono",
"codec",
"common_types",
diff --git a/Cargo.toml b/Cargo.toml
index 50875c3c..a01a7222 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -94,7 +94,7 @@ bytes = "1"
bytes_ext = { path = "components/bytes_ext" }
catalog = { path = "catalog" }
catalog_impls = { path = "catalog_impls" }
-ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev =
"2c60e05" }
+ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev =
"d849fa4" }
codec = { path = "components/codec" }
chrono = "0.4"
clap = "3.0"
diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs
b/df_engine_extensions/src/dist_sql_query/mod.rs
index 80db7be0..4bbf6b36 100644
--- a/df_engine_extensions/src/dist_sql_query/mod.rs
+++ b/df_engine_extensions/src/dist_sql_query/mod.rs
@@ -12,7 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::{fmt, sync::Arc};
+use std::{
+ fmt,
+ sync::{Arc, Mutex},
+};
use async_trait::async_trait;
use common_types::projected_schema::ProjectedSchema;
@@ -35,8 +38,8 @@ pub mod test_util;
pub trait RemotePhysicalPlanExecutor: fmt::Debug + Send + Sync + 'static {
fn execute(
&self,
+ task_context: RemoteTaskContext,
table: TableIdentifier,
- task_context: &TaskContext,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>>;
}
@@ -58,6 +61,20 @@ pub trait ExecutableScanBuilder: fmt::Debug + Send + Sync +
'static {
type ExecutableScanBuilderRef = Box<dyn ExecutableScanBuilder>;
+pub struct RemoteTaskContext {
+ pub task_ctx: Arc<TaskContext>,
+ pub remote_metrics: Arc<Mutex<Option<String>>>,
+}
+
+impl RemoteTaskContext {
+ pub fn new(task_ctx: Arc<TaskContext>, remote_metrics:
Arc<Mutex<Option<String>>>) -> Self {
+ Self {
+ task_ctx,
+ remote_metrics,
+ }
+ }
+}
+
#[derive(Clone)]
pub struct TableScanContext {
pub batch_size: usize,
diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index 9825227c..0dbaf415 100644
--- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -18,7 +18,7 @@ use std::{
any::Any,
fmt,
pin::Pin,
- sync::Arc,
+ sync::{Arc, Mutex},
task::{Context, Poll},
time::{Duration, Instant},
};
@@ -46,7 +46,7 @@ use futures::{future::BoxFuture, FutureExt, Stream,
StreamExt};
use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector,
TraceMetricWhenDrop};
-use crate::dist_sql_query::{RemotePhysicalPlanExecutor, TableScanContext};
+use crate::dist_sql_query::{RemotePhysicalPlanExecutor, RemoteTaskContext,
TableScanContext};
/// Placeholder of partitioned table's scan plan
/// It is inexecutable actually and just for carrying the necessary information
@@ -148,6 +148,7 @@ pub(crate) struct ResolvedPartitionedScan {
pub remote_exec_ctx: Arc<RemoteExecContext>,
pub pushdown_continue: bool,
pub metrics_collector: MetricsCollector,
+ pub is_analyze: bool,
}
impl ResolvedPartitionedScan {
@@ -155,24 +156,27 @@ impl ResolvedPartitionedScan {
remote_executor: Arc<dyn RemotePhysicalPlanExecutor>,
sub_table_plan_ctxs: Vec<SubTablePlanContext>,
metrics_collector: MetricsCollector,
+ is_analyze: bool,
) -> Self {
let remote_exec_ctx = Arc::new(RemoteExecContext {
executor: remote_executor,
plan_ctxs: sub_table_plan_ctxs,
});
- Self::new_with_details(remote_exec_ctx, true, metrics_collector)
+ Self::new_with_details(remote_exec_ctx, true, metrics_collector,
is_analyze)
}
pub fn new_with_details(
remote_exec_ctx: Arc<RemoteExecContext>,
pushdown_continue: bool,
metrics_collector: MetricsCollector,
+ is_analyze: bool,
) -> Self {
Self {
remote_exec_ctx,
pushdown_continue,
metrics_collector,
+ is_analyze,
}
}
@@ -181,6 +185,7 @@ impl ResolvedPartitionedScan {
remote_exec_ctx: self.remote_exec_ctx.clone(),
pushdown_continue: false,
metrics_collector: self.metrics_collector.clone(),
+ is_analyze: self.is_analyze,
})
}
@@ -216,6 +221,7 @@ impl ResolvedPartitionedScan {
table: plan_ctx.table.clone(),
plan: extended_plan,
metrics_collector: plan_ctx.metrics_collector.clone(),
+ remote_metrics: plan_ctx.remote_metrics.clone(),
})
})
.collect::<DfResult<Vec<_>>>()?;
@@ -228,6 +234,7 @@ impl ResolvedPartitionedScan {
remote_exec_ctx,
can_push_down_more,
self.metrics_collector.clone(),
+ self.is_analyze,
);
Ok(Arc::new(plan))
@@ -257,6 +264,7 @@ pub(crate) struct SubTablePlanContext {
table: TableIdentifier,
plan: Arc<dyn ExecutionPlan>,
metrics_collector: MetricsCollector,
+ remote_metrics: Arc<Mutex<Option<String>>>,
}
impl SubTablePlanContext {
@@ -269,6 +277,7 @@ impl SubTablePlanContext {
table,
plan,
metrics_collector,
+ remote_metrics: Arc::new(Mutex::new(None)),
}
}
}
@@ -296,6 +305,12 @@ impl ExecutionPlan for ResolvedPartitionedScan {
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ // If this is a analyze plan, we should not collect metrics of children
+ // which have been send to remote, So we just return empty children.
+ if self.is_analyze {
+ return vec![];
+ }
+
self.remote_exec_ctx
.plan_ctxs
.iter()
@@ -328,13 +343,17 @@ impl ExecutionPlan for ResolvedPartitionedScan {
table: sub_table,
plan,
metrics_collector,
+ remote_metrics,
} = &self.remote_exec_ctx.plan_ctxs[partition];
+ let remote_task_ctx = RemoteTaskContext::new(context,
remote_metrics.clone());
+
// Send plan for remote execution.
- let stream_future =
- self.remote_exec_ctx
- .executor
- .execute(sub_table.clone(), &context, plan.clone())?;
+ let stream_future = self.remote_exec_ctx.executor.execute(
+ remote_task_ctx,
+ sub_table.clone(),
+ plan.clone(),
+ )?;
let record_stream =
PartitionedScanStream::new(stream_future, plan.schema(),
metrics_collector.clone());
@@ -350,7 +369,18 @@ impl ExecutionPlan for ResolvedPartitionedScan {
let mut format_visitor = FormatCollectorVisitor::default();
self.metrics_collector.visit(&mut format_visitor);
- let metrics_desc = format_visitor.into_string();
+ let mut metrics_desc = format_visitor.into_string();
+
+ // collect metrics from remote
+ for sub_table_ctx in &self.remote_exec_ctx.plan_ctxs {
+ if let Some(remote_metrics) =
sub_table_ctx.remote_metrics.lock().unwrap().take() {
+ metrics_desc.push_str(&format!(
+ "\n{}:\n{}",
+ sub_table_ctx.table.table, remote_metrics
+ ));
+ }
+ }
+
metric_set.push(Arc::new(Metric::new(
MetricValue::Count {
name: format!("\n{metrics_desc}").into(),
@@ -358,7 +388,6 @@ impl ExecutionPlan for ResolvedPartitionedScan {
},
None,
)));
-
Some(metric_set)
}
}
diff --git a/df_engine_extensions/src/dist_sql_query/resolver.rs
b/df_engine_extensions/src/dist_sql_query/resolver.rs
index 951ba88d..c48bfe53 100644
--- a/df_engine_extensions/src/dist_sql_query/resolver.rs
+++ b/df_engine_extensions/src/dist_sql_query/resolver.rs
@@ -18,7 +18,7 @@ use async_recursion::async_recursion;
use catalog::manager::ManagerRef as CatalogManagerRef;
use datafusion::{
error::{DataFusionError, Result as DfResult},
- physical_plan::ExecutionPlan,
+ physical_plan::{analyze::AnalyzeExec, ExecutionPlan},
};
use table_engine::{remote::model::TableIdentifier, table::TableRef};
@@ -99,7 +99,10 @@ impl Resolver {
&self,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
- let resolved_plan = self.resolve_partitioned_scan_internal(plan)?;
+ // Check if this plan is `AnalyzeExec`, if it is, we should collect
metrics.
+ let is_analyze = plan.as_any().is::<AnalyzeExec>();
+
+ let resolved_plan = self.resolve_partitioned_scan_internal(plan,
is_analyze)?;
PUSH_DOWN_PLAN_COUNTER
.with_label_values(&["remote_scan"])
.inc();
@@ -117,6 +120,7 @@ impl Resolver {
pub fn resolve_partitioned_scan_internal(
&self,
plan: Arc<dyn ExecutionPlan>,
+ is_analyze: bool,
) -> DfResult<Arc<dyn ExecutionPlan>> {
// Leave node, let's resolve it and return.
if let Some(unresolved) =
plan.as_any().downcast_ref::<UnresolvedPartitionedScan>() {
@@ -139,6 +143,7 @@ impl Resolver {
self.remote_executor.clone(),
remote_plans,
metrics_collector,
+ is_analyze,
)));
}
@@ -151,7 +156,7 @@ impl Resolver {
// Resolve children if exist.
let mut new_children = Vec::with_capacity(children.len());
for child in children {
- let child = self.resolve_partitioned_scan_internal(child)?;
+ let child = self.resolve_partitioned_scan_internal(child,
is_analyze)?;
new_children.push(child);
}
diff --git a/df_engine_extensions/src/dist_sql_query/test_util.rs
b/df_engine_extensions/src/dist_sql_query/test_util.rs
index 77584a9f..d70f9ec2 100644
--- a/df_engine_extensions/src/dist_sql_query/test_util.rs
+++ b/df_engine_extensions/src/dist_sql_query/test_util.rs
@@ -29,7 +29,7 @@ use common_types::{
};
use datafusion::{
error::{DataFusionError, Result as DfResult},
- execution::{FunctionRegistry, TaskContext},
+ execution::FunctionRegistry,
logical_expr::{expr_fn, Literal, Operator},
physical_plan::{
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -56,7 +56,7 @@ use trace_metric::MetricsCollector;
use crate::dist_sql_query::{
physical_plan::{PartitionedScanStream, UnresolvedPartitionedScan,
UnresolvedSubTableScan},
resolver::Resolver,
- ExecutableScanBuilder, RemotePhysicalPlanExecutor, TableScanContext,
+ ExecutableScanBuilder, RemotePhysicalPlanExecutor, RemoteTaskContext,
TableScanContext,
};
// Test context
@@ -504,8 +504,8 @@ struct MockRemotePhysicalPlanExecutor;
impl RemotePhysicalPlanExecutor for MockRemotePhysicalPlanExecutor {
fn execute(
&self,
+ _task_context: RemoteTaskContext,
_table: TableIdentifier,
- _task_context: &TaskContext,
_plan: Arc<dyn ExecutionPlan>,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>> {
unimplemented!()
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 81502d1c..18e023c0 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -78,6 +78,23 @@
UInt64(9923681778193615344),Timestamp(1651737067000),String("ceresdb8"),Int32(0)
UInt64(4860320137932382618),Timestamp(1651737067000),String("ceresdb9"),Int32(0),Double(109.0),
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
+
+plan_type,plan,
+String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n
__partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec,
metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable:
table=__partition_table_t_1, parallelism=8, metrics=[\nPredicate { exprs:[name
= Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_st [...]
+
+
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
+EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0",
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
+
+plan_type,plan,
+String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n
__partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n __partition_table_t_x:\n
poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n
__partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n\n__partition_table_t_x:\n [...]
+
+
ALTER TABLE partition_table_t ADD COLUMN (b string);
affected_rows: 0
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql
b/integration_tests/cases/env/cluster/ddl/partition_table.sql
index 59a1dd2a..46be8e1b 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.sql
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql
@@ -35,6 +35,15 @@ SELECT * from partition_table_t where name in ("ceresdb0",
"ceresdb1", "ceresdb2
SELECT * from partition_table_t where name in ("ceresdb5", "ceresdb6",
"ceresdb7","ceresdb8", "ceresdb9", "ceresdb10") order by name;
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
+
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
+-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
+EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0",
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
+
ALTER TABLE partition_table_t ADD COLUMN (b string);
INSERT INTO partition_table_t (t, id, name, value) VALUES (1651737067000, 10,
"ceresdb0", 100);
diff --git a/query_engine/src/datafusion_impl/executor.rs
b/query_engine/src/datafusion_impl/executor.rs
index 0412c1d1..eb75c0da 100644
--- a/query_engine/src/datafusion_impl/executor.rs
+++ b/query_engine/src/datafusion_impl/executor.rs
@@ -29,7 +29,7 @@ use crate::{
},
error::*,
executor::Executor,
- physical_planner::{PhysicalPlanPtr, TaskExecContext},
+ physical_planner::{PhysicalPlanRef, TaskExecContext},
};
#[derive(Debug, Clone)]
@@ -68,7 +68,7 @@ impl Executor for DatafusionExecutorImpl {
async fn execute(
&self,
ctx: &Context,
- physical_plan: PhysicalPlanPtr,
+ physical_plan: PhysicalPlanRef,
) -> Result<SendableRecordBatchStream> {
debug!(
"DatafusionExecutorImpl begin to execute plan, request_id:{},
physical_plan:{:?}",
diff --git a/query_engine/src/datafusion_impl/physical_planner.rs
b/query_engine/src/datafusion_impl/physical_planner.rs
index 50b302e5..58287339 100644
--- a/query_engine/src/datafusion_impl/physical_planner.rs
+++ b/query_engine/src/datafusion_impl/physical_planner.rs
@@ -26,7 +26,7 @@ use crate::{
DfContextBuilder,
},
error::*,
- physical_planner::{PhysicalPlanPtr, PhysicalPlanner},
+ physical_planner::{PhysicalPlanRef, PhysicalPlanner},
};
/// Physical planner based on datafusion
@@ -58,7 +58,7 @@ impl DatafusionPhysicalPlannerImpl {
#[async_trait]
impl PhysicalPlanner for DatafusionPhysicalPlannerImpl {
// TODO: we should modify `QueryPlan` to support create remote plan here.
- async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) ->
Result<PhysicalPlanPtr> {
+ async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) ->
Result<PhysicalPlanRef> {
// Register catalogs to datafusion execution context.
let catalogs =
CatalogProviderAdapter::new_adapters(logical_plan.tables.clone());
// TODO: maybe we should not build `SessionContext` in each physical
plan's
@@ -88,6 +88,6 @@ impl PhysicalPlanner for DatafusionPhysicalPlannerImpl {
};
let physical_plan = DataFusionPhysicalPlanAdapter::new(typed_plan);
- Ok(Box::new(physical_plan))
+ Ok(Arc::new(physical_plan))
}
}
diff --git a/query_engine/src/datafusion_impl/task_context.rs
b/query_engine/src/datafusion_impl/task_context.rs
index 5946f3d2..f19c5dde 100644
--- a/query_engine/src/datafusion_impl/task_context.rs
+++ b/query_engine/src/datafusion_impl/task_context.rs
@@ -33,14 +33,14 @@ use datafusion_proto::{
};
use df_engine_extensions::dist_sql_query::{
resolver::Resolver, ExecutableScanBuilder, RemotePhysicalPlanExecutor,
- RemotePhysicalPlanExecutorRef, TableScanContext,
+ RemotePhysicalPlanExecutorRef, RemoteTaskContext, TableScanContext,
};
use futures::future::BoxFuture;
use generic_error::BoxError;
use prost::Message;
use snafu::ResultExt;
use table_engine::{
- provider::{CeresdbOptions, ScanTable},
+ provider::{CeresdbOptions, ScanTable, SCAN_TABLE_METRICS_COLLECTOR_NAME},
remote::{
model::{
ExecContext, ExecutePlanRequest, PhysicalPlan,
RemoteExecuteRequest, TableIdentifier,
@@ -172,12 +172,13 @@ struct RemotePhysicalPlanExecutorImpl {
impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl {
fn execute(
&self,
+ task_context: RemoteTaskContext,
table: TableIdentifier,
- task_context: &TaskContext,
plan: Arc<dyn ExecutionPlan>,
) -> DfResult<BoxFuture<'static, DfResult<SendableRecordBatchStream>>> {
// Get the custom context to rebuild execution context.
let ceresdb_options = task_context
+ .task_ctx
.session_config()
.options()
.extensions
@@ -223,6 +224,7 @@ impl RemotePhysicalPlanExecutor for
RemotePhysicalPlanExecutorImpl {
let request = ExecutePlanRequest {
plan_schema,
remote_request,
+ remote_metrics: task_context.remote_metrics,
};
// Remote execute.
@@ -288,7 +290,7 @@ impl ExecutableScanBuilder for ExecutableScanBuilderImpl {
opts: read_opts,
projected_schema: ctx.projected_schema,
predicate: ctx.predicate,
- metrics_collector: MetricsCollector::default(),
+ metrics_collector:
MetricsCollector::new(SCAN_TABLE_METRICS_COLLECTOR_NAME.to_string()),
};
let mut scan = ScanTable::new(table, read_request);
diff --git a/query_engine/src/executor.rs b/query_engine/src/executor.rs
index bf32a24b..92b0a714 100644
--- a/query_engine/src/executor.rs
+++ b/query_engine/src/executor.rs
@@ -19,7 +19,7 @@ use std::{fmt, sync::Arc};
use async_trait::async_trait;
use table_engine::stream::SendableRecordBatchStream;
-use crate::{context::Context, error::*, physical_planner::PhysicalPlanPtr};
+use crate::{context::Context, error::*, physical_planner::PhysicalPlanRef};
/// Query executor
///
@@ -33,7 +33,7 @@ pub trait Executor: fmt::Debug + Send + Sync + 'static {
async fn execute(
&self,
ctx: &Context,
- physical_plan: PhysicalPlanPtr,
+ physical_plan: PhysicalPlanRef,
) -> Result<SendableRecordBatchStream>;
}
diff --git a/query_engine/src/physical_planner.rs
b/query_engine/src/physical_planner.rs
index c32975e1..9233362f 100644
--- a/query_engine/src/physical_planner.rs
+++ b/query_engine/src/physical_planner.rs
@@ -29,7 +29,7 @@ use crate::{context::Context,
datafusion_impl::task_context::DatafusionTaskExecC
#[async_trait]
pub trait PhysicalPlanner: fmt::Debug + Send + Sync + 'static {
/// Create a physical plan from a logical plan
- async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) ->
Result<PhysicalPlanPtr>;
+ async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) ->
Result<PhysicalPlanRef>;
}
pub type PhysicalPlannerRef = Arc<dyn PhysicalPlanner>;
@@ -45,7 +45,7 @@ pub trait PhysicalPlan: std::fmt::Debug + Sync + Send +
'static {
fn metrics_to_string(&self) -> String;
}
-pub type PhysicalPlanPtr = Box<dyn PhysicalPlan>;
+pub type PhysicalPlanRef = Arc<dyn PhysicalPlan>;
/// Task context, just a wrapper of datafusion task context now
#[derive(Default)]
diff --git a/remote_engine_client/src/client.rs
b/remote_engine_client/src/client.rs
index 80e47ad6..456de914 100644
--- a/remote_engine_client/src/client.rs
+++ b/remote_engine_client/src/client.rs
@@ -17,7 +17,7 @@
use std::{
collections::HashMap,
pin::Pin,
- sync::Arc,
+ sync::{Arc, Mutex},
task::{Context, Poll},
};
@@ -26,8 +26,12 @@ use arrow_ext::{
ipc::{CompressOptions, CompressionMethod},
};
use ceresdbproto::{
- remote_engine::{self, read_response::Output::Arrow,
remote_engine_service_client::*},
- storage::arrow_payload,
+ remote_engine::{
+ self,
+ read_response::Output::{Arrow, Metric},
+ remote_engine_service_client::*,
+ },
+ storage::{arrow_payload, ArrowPayload},
};
use common_types::{record_batch::RecordBatch, schema::RecordSchema};
use futures::{Stream, StreamExt};
@@ -115,8 +119,12 @@ impl Client {
// When success to get the stream, table has been found in remote, not
need to
// evict cache entry.
let response = response.into_inner();
- let remote_read_record_batch_stream =
- ClientReadRecordBatchStream::new(table_ident, response,
record_schema);
+ let remote_read_record_batch_stream = ClientReadRecordBatchStream::new(
+ table_ident,
+ response,
+ record_schema,
+ Default::default(),
+ );
Ok(remote_read_record_batch_stream)
}
@@ -481,8 +489,12 @@ impl Client {
// When success to get the stream, table has been found in remote, not
need to
// evict cache entry.
let response = response.into_inner();
- let remote_execute_plan_stream =
- ClientReadRecordBatchStream::new(table_ident, response,
plan_schema);
+ let remote_execute_plan_stream = ClientReadRecordBatchStream::new(
+ table_ident,
+ response,
+ plan_schema,
+ request.remote_metrics,
+ );
Ok(remote_execute_plan_stream)
}
@@ -500,6 +512,7 @@ pub struct ClientReadRecordBatchStream {
pub table_ident: TableIdentifier,
pub response_stream: Streaming<remote_engine::ReadResponse>,
pub record_schema: RecordSchema,
+ pub remote_metrics: Arc<Mutex<Option<String>>>,
}
impl ClientReadRecordBatchStream {
@@ -507,11 +520,13 @@ impl ClientReadRecordBatchStream {
table_ident: TableIdentifier,
response_stream: Streaming<remote_engine::ReadResponse>,
record_schema: RecordSchema,
+ remote_metrics: Arc<Mutex<Option<String>>>,
) -> Self {
Self {
table_ident,
response_stream,
record_schema,
+ remote_metrics,
}
}
}
@@ -534,52 +549,14 @@ impl Stream for ClientReadRecordBatchStream {
match response.output {
None => Poll::Ready(None),
- Some(v) => {
- let record_batch = match v {
- Arrow(mut v) => {
- if v.record_batches.len() != 1 {
- return Poll::Ready(Some(
- InvalidRecordBatchNumber {
- batch_num: v.record_batches.len(),
- }
- .fail(),
- ));
- }
-
- let compression = match v.compression() {
- arrow_payload::Compression::None =>
CompressionMethod::None,
- arrow_payload::Compression::Zstd =>
CompressionMethod::Zstd,
- };
-
- ipc::decode_record_batches(
- v.record_batches.swap_remove(0),
- compression,
- )
- .map_err(|e| Box::new(e) as _)
- .context(Convert {
- msg: "decode read record batch",
- })
- .and_then(
- |mut record_batch_vec| {
- ensure!(
- record_batch_vec.len() == 1,
- InvalidRecordBatchNumber {
- batch_num:
record_batch_vec.len()
- }
- );
- record_batch_vec
- .swap_remove(0)
- .try_into()
- .map_err(|e| Box::new(e) as _)
- .context(Convert {
- msg: "convert read record
batch",
- })
- },
- )
- }
- };
- Poll::Ready(Some(record_batch))
- }
+ Some(v) => match v {
+ Arrow(v) =>
Poll::Ready(Some(convert_arrow_payload(v))),
+ Metric(v) => {
+ let mut remote_metrics =
this.remote_metrics.lock().unwrap();
+ *remote_metrics = Some(v.metric);
+ Poll::Ready(None)
+ }
+ },
}
}
@@ -594,3 +571,37 @@ impl Stream for ClientReadRecordBatchStream {
}
}
}
+
+fn convert_arrow_payload(mut v: ArrowPayload) -> Result<RecordBatch> {
+ if v.record_batches.len() != 1 {
+ return InvalidRecordBatchNumber {
+ batch_num: v.record_batches.len(),
+ }
+ .fail();
+ }
+ let compression = match v.compression() {
+ arrow_payload::Compression::None => CompressionMethod::None,
+ arrow_payload::Compression::Zstd => CompressionMethod::Zstd,
+ };
+
+ ipc::decode_record_batches(v.record_batches.swap_remove(0), compression)
+ .map_err(|e| Box::new(e) as _)
+ .context(Convert {
+ msg: "decode read record batch",
+ })
+ .and_then(|mut record_batch_vec| {
+ ensure!(
+ record_batch_vec.len() == 1,
+ InvalidRecordBatchNumber {
+ batch_num: record_batch_vec.len()
+ }
+ );
+ record_batch_vec
+ .swap_remove(0)
+ .try_into()
+ .map_err(|e| Box::new(e) as _)
+ .context(Convert {
+ msg: "convert read record batch",
+ })
+ })
+}
diff --git a/server/src/grpc/remote_engine_service/mod.rs
b/server/src/grpc/remote_engine_service/mod.rs
index 3d2963a4..11359300 100644
--- a/server/src/grpc/remote_engine_service/mod.rs
+++ b/server/src/grpc/remote_engine_service/mod.rs
@@ -27,11 +27,13 @@ use async_trait::async_trait;
use catalog::{manager::ManagerRef, schema::SchemaRef};
use ceresdbproto::{
remote_engine::{
- execute_plan_request, read_response::Output::Arrow,
- remote_engine_service_server::RemoteEngineService, row_group,
AlterTableOptionsRequest,
- AlterTableOptionsResponse, AlterTableSchemaRequest,
AlterTableSchemaResponse, ExecContext,
- ExecutePlanRequest, GetTableInfoRequest, GetTableInfoResponse,
ReadRequest, ReadResponse,
- WriteBatchRequest, WriteRequest, WriteResponse,
+ execute_plan_request,
+ read_response::Output::{Arrow, Metric},
+ remote_engine_service_server::RemoteEngineService,
+ row_group, AlterTableOptionsRequest, AlterTableOptionsResponse,
AlterTableSchemaRequest,
+ AlterTableSchemaResponse, ExecContext, ExecutePlanRequest,
GetTableInfoRequest,
+ GetTableInfoResponse, MetricPayload, ReadRequest, ReadResponse,
WriteBatchRequest,
+ WriteRequest, WriteResponse,
},
storage::{arrow_payload, ArrowPayload},
};
@@ -50,6 +52,7 @@ use proxy::{
use query_engine::{
context::Context as QueryContext,
datafusion_impl::physical_plan::{DataFusionPhysicalPlanAdapter, TypedPlan},
+ physical_planner::PhysicalPlanRef,
QueryEngineRef, QueryEngineType,
};
use snafu::{OptionExt, ResultExt};
@@ -82,6 +85,12 @@ pub mod error;
const STREAM_QUERY_CHANNEL_LEN: usize = 200;
const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024;
+#[derive(Debug, Clone)]
+pub enum RecordBatchWithMetric {
+ RecordBatch(RecordBatch),
+ Metric(String),
+}
+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct StreamReadReqKey {
table: String,
@@ -192,46 +201,94 @@ impl<M: MetricCollector> Drop for StreamWithMetric<M> {
}
}
+struct RemoteExecStream {
+ inner: BoxStream<'static, Result<RecordBatch>>,
+ physical_plan: Option<PhysicalPlanRef>,
+}
+
+impl RemoteExecStream {
+ fn new(
+ inner: BoxStream<'static, Result<RecordBatch>>,
+ physical_plan: Option<PhysicalPlanRef>,
+ ) -> Self {
+ Self {
+ inner,
+ physical_plan,
+ }
+ }
+}
+
+impl Stream for RemoteExecStream {
+ type Item = Result<RecordBatchWithMetric>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ let this = self.get_mut();
+ match this.inner.poll_next_unpin(cx) {
+ Poll::Ready(Some(res)) => {
+ Poll::Ready(Some(res.map(RecordBatchWithMetric::RecordBatch)))
+ }
+ Poll::Ready(None) => match &this.physical_plan {
+ Some(physical_plan) => {
+ let metrics = physical_plan.metrics_to_string();
+ this.physical_plan = None;
+
Poll::Ready(Some(Ok(RecordBatchWithMetric::Metric(metrics))))
+ }
+ None => Poll::Ready(None),
+ },
+ Poll::Pending => Poll::Pending,
+ }
+ }
+}
+
macro_rules! record_stream_to_response_stream {
($record_stream_result:ident, $StreamType:ident) => {
match $record_stream_result {
Ok(stream) => {
let new_stream: Self::$StreamType = Box::pin(stream.map(|res|
match res {
- Ok(record_batch) => {
- let resp = match ipc::encode_record_batch(
- &record_batch.into_arrow_record_batch(),
- CompressOptions {
- compress_min_length:
DEFAULT_COMPRESS_MIN_LENGTH,
- method: CompressionMethod::Zstd,
- },
- )
- .box_err()
- .context(ErrWithCause {
- code: StatusCode::Internal,
- msg: "encode record batch failed",
- }) {
- Err(e) => ReadResponse {
- header: Some(error::build_err_header(e)),
- ..Default::default()
- },
- Ok(CompressOutput { payload, method }) => {
- let compression = match method {
- CompressionMethod::None =>
arrow_payload::Compression::None,
- CompressionMethod::Zstd =>
arrow_payload::Compression::Zstd,
- };
-
- ReadResponse {
- header: Some(error::build_ok_header()),
- output: Some(Arrow(ArrowPayload {
- record_batches: vec![payload],
- compression: compression as i32,
- })),
+ Ok(res) => match res {
+ RecordBatchWithMetric::Metric(metric) => {
+ let resp = ReadResponse {
+ header: Some(error::build_ok_header()),
+ output: Some(Metric(MetricPayload { metric })),
+ };
+ Ok(resp)
+ }
+ RecordBatchWithMetric::RecordBatch(record_batch) => {
+ let resp = match ipc::encode_record_batch(
+ &record_batch.into_arrow_record_batch(),
+ CompressOptions {
+ compress_min_length:
DEFAULT_COMPRESS_MIN_LENGTH,
+ method: CompressionMethod::Zstd,
+ },
+ )
+ .box_err()
+ .context(ErrWithCause {
+ code: StatusCode::Internal,
+ msg: "encode record batch failed",
+ }) {
+ Err(e) => ReadResponse {
+ header: Some(error::build_err_header(e)),
+ ..Default::default()
+ },
+ Ok(CompressOutput { payload, method }) => {
+ let compression = match method {
+ CompressionMethod::None =>
arrow_payload::Compression::None,
+ CompressionMethod::Zstd =>
arrow_payload::Compression::Zstd,
+ };
+
+ ReadResponse {
+ header: Some(error::build_ok_header()),
+ output: Some(Arrow(ArrowPayload {
+ record_batches: vec![payload],
+ compression: compression as i32,
+ })),
+ }
}
- }
- };
+ };
- Ok(resp)
- }
+ Ok(resp)
+ }
+ },
Err(e) => {
let resp = ReadResponse {
header: Some(error::build_err_header(e)),
@@ -240,7 +297,6 @@ macro_rules! record_stream_to_response_stream {
Ok(resp)
}
}));
-
Ok(Response::new(new_stream))
}
Err(e) => {
@@ -274,7 +330,7 @@ impl RemoteEngineServiceImpl {
async fn stream_read_internal(
&self,
request: Request<ReadRequest>,
- ) -> Result<StreamWithMetric<StreamReadMetricCollector>> {
+ ) -> Result<RemoteExecStream> {
let metric = StreamReadMetricCollector(Instant::now());
let ctx = self.handler_ctx();
@@ -316,17 +372,15 @@ impl RemoteEngineServiceImpl {
});
}
- Ok(StreamWithMetric::new(
- Box::pin(ReceiverStream::new(rx)),
- metric,
- ))
+ let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)),
metric);
+ Ok(RemoteExecStream::new(Box::pin(stream), None))
}
async fn dedup_stream_read_internal(
&self,
query_dedup: QueryDedup,
request: Request<ReadRequest>,
- ) -> Result<StreamWithMetric<StreamReadMetricCollector>> {
+ ) -> Result<RemoteExecStream> {
let metric = StreamReadMetricCollector(Instant::now());
let request = request.into_inner();
@@ -371,10 +425,8 @@ impl RemoteEngineServiceImpl {
}
}
- Ok(StreamWithMetric::new(
- Box::pin(ReceiverStream::new(rx)),
- metric,
- ))
+ let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)),
metric);
+ Ok(RemoteExecStream::new(Box::pin(stream), None))
}
async fn read_and_send_dedupped_resps<K, F>(
@@ -614,7 +666,7 @@ impl RemoteEngineServiceImpl {
async fn execute_physical_plan_internal(
&self,
request: Request<ExecutePlanRequest>,
- ) -> Result<StreamWithMetric<ExecutePlanMetricCollector>> {
+ ) -> Result<RemoteExecStream> {
let request = request.into_inner();
let query_engine = self.instance.query_engine.clone();
let (ctx, encoded_plan) = extract_plan_from_req(request)?;
@@ -636,10 +688,15 @@ impl RemoteEngineServiceImpl {
ctx.timeout_ms,
);
+ let physical_plan =
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
+ encoded_plan,
+ )));
+ let physical_plan_clone = physical_plan.clone();
+
let stream = self
.runtimes
.read_runtime
- .spawn(async move { handle_execute_plan(query_ctx, encoded_plan,
query_engine).await })
+ .spawn(async move { handle_execute_plan(query_ctx, physical_plan,
query_engine).await })
.await
.box_err()
.with_context(|| ErrWithCause {
@@ -653,14 +710,18 @@ impl RemoteEngineServiceImpl {
})
});
- Ok(StreamWithMetric::new(Box::pin(stream), metric))
+ let stream = StreamWithMetric::new(Box::pin(stream), metric);
+ Ok(RemoteExecStream::new(
+ Box::pin(stream),
+ Some(physical_plan_clone),
+ ))
}
async fn dedup_execute_physical_plan_internal(
&self,
query_dedup: QueryDedup,
request: Request<ExecutePlanRequest>,
- ) -> Result<StreamWithMetric<ExecutePlanMetricCollector>> {
+ ) -> Result<RemoteExecStream> {
let request = request.into_inner();
let query_engine = self.instance.query_engine.clone();
let (ctx, encoded_plan) = extract_plan_from_req(request)?;
@@ -685,6 +746,12 @@ impl RemoteEngineServiceImpl {
encoded_plan: encoded_plan.clone(),
};
+ let physical_plan =
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
+ encoded_plan,
+ )));
+
+ let physical_plan_clone = physical_plan.clone();
+
let QueryDedup {
config,
physical_plan_notifiers,
@@ -696,7 +763,7 @@ impl RemoteEngineServiceImpl {
// The first request, need to handle it, and then notify the other
requests.
RequestResult::First => {
let query = async move {
- handle_execute_plan(query_ctx, encoded_plan, query_engine)
+ handle_execute_plan(query_ctx, physical_plan, query_engine)
.await
.map(PartitionedStreams::one_stream)
};
@@ -715,9 +782,10 @@ impl RemoteEngineServiceImpl {
}
}
- Ok(StreamWithMetric::new(
- Box::pin(ReceiverStream::new(rx)),
- metric,
+ let stream = StreamWithMetric::new(Box::pin(ReceiverStream::new(rx)),
metric);
+ Ok(RemoteExecStream::new(
+ Box::pin(stream),
+ Some(physical_plan_clone),
))
}
@@ -1140,14 +1208,9 @@ fn create_query_ctx(
async fn handle_execute_plan(
ctx: QueryContext,
- encoded_plan: Vec<u8>,
+ physical_plan: PhysicalPlanRef,
query_engine: QueryEngineRef,
) -> Result<SendableRecordBatchStream> {
- // TODO: Build remote plan in physical planner.
- let physical_plan =
Box::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
- encoded_plan,
- )));
-
// Execute plan.
let executor = query_engine.executor();
executor
diff --git a/table_engine/src/partition/rule/key.rs
b/table_engine/src/partition/rule/key.rs
index d33fd88b..b8d9e87c 100644
--- a/table_engine/src/partition/rule/key.rs
+++ b/table_engine/src/partition/rule/key.rs
@@ -14,7 +14,7 @@
//! Key partition rule
-use std::collections::{HashMap, HashSet};
+use std::collections::{BTreeSet, HashMap};
use common_types::{
datum::{Datum, DatumView},
@@ -136,8 +136,8 @@ impl KeyRule {
&self,
group: &[usize],
filters: &[PartitionFilter],
- ) -> Result<HashSet<usize>> {
- let mut partitions = HashSet::new();
+ ) -> Result<BTreeSet<usize>> {
+ let mut partitions = BTreeSet::new();
let expanded_group = expand_partition_keys_group(group, filters)?;
for partition_keys in expanded_group {
let partition = compute_partition(partition_keys.into_iter(),
self.partition_num);
@@ -219,7 +219,7 @@ impl PartitionRule for KeyRule {
target_partitions = target_partitions
.intersection(&partitions)
.copied()
- .collect::<HashSet<_>>();
+ .collect::<BTreeSet<_>>();
}
Ok(target_partitions.into_iter().collect())
diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs
index 6b0c38a7..fb3cb41d 100644
--- a/table_engine/src/provider.rs
+++ b/table_engine/src/provider.rs
@@ -47,7 +47,7 @@ use crate::{
table::{ReadOptions, ReadRequest, TableRef},
};
-const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_table";
+pub const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_table";
#[derive(Clone, Debug)]
pub struct CeresdbOptions {
diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs
index 2fc6a297..bd996703 100644
--- a/table_engine/src/remote/model.rs
+++ b/table_engine/src/remote/model.rs
@@ -16,6 +16,7 @@
use std::{
collections::HashMap,
+ sync::{Arc, Mutex},
time::{Duration, Instant},
};
@@ -409,6 +410,9 @@ pub struct ExecutePlanRequest {
/// Remote plan execution request
pub remote_request: RemoteExecuteRequest,
+
+ /// Collect metrics of remote plan
+ pub remote_metrics: Arc<Mutex<Option<String>>>,
}
impl ExecutePlanRequest {
@@ -417,6 +421,7 @@ impl ExecutePlanRequest {
plan_schema: RecordSchema,
context: ExecContext,
physical_plan: PhysicalPlan,
+ remote_metrics: Arc<Mutex<Option<String>>>,
) -> Self {
let remote_request = RemoteExecuteRequest {
table,
@@ -427,6 +432,7 @@ impl ExecutePlanRequest {
Self {
plan_schema,
remote_request,
+ remote_metrics,
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]