This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/dev by this push:
new ce4044b8 feat: impl priority runtime for read (#1303)
ce4044b8 is described below
commit ce4044b8c2f02551ccd0c46a6d03cdf1e3032dd8
Author: Jiacai Liu <[email protected]>
AuthorDate: Thu Dec 28 15:35:46 2023 +0800
feat: impl priority runtime for read (#1303)
## Rationale
Close #1299
## Detailed Changes
- Add PriorityRuntime component, and use in read API
- In normal query, its plan will be executed in higher runtime by
default, when executor decide query is expensive, then it will spawn
`stream.poll` in another lower runtime.
- In distributed query, a priority field is added in remote query
request, so it can decide which runtime to run on.
## Test Plan
Newly added UT
---
Cargo.lock | 29 ++-
Cargo.toml | 8 +-
analytic_engine/src/instance/mod.rs | 4 +-
analytic_engine/src/instance/read.rs | 6 +-
analytic_engine/src/table/mod.rs | 2 +
analytic_engine/src/tests/table.rs | 1 +
analytic_engine/src/tests/util.rs | 5 +-
components/logger/Cargo.toml | 1 +
components/logger/src/lib.rs | 10 +-
components/runtime/src/lib.rs | 4 +-
components/runtime/src/priority_runtime.rs | 98 ++++++++++
df_engine_extensions/Cargo.toml | 1 +
df_engine_extensions/src/dist_sql_query/mod.rs | 18 +-
.../src/dist_sql_query/physical_plan.rs | 3 +
.../src/dist_sql_query/resolver.rs | 9 +-
.../src/dist_sql_query/test_util.rs | 5 +
docs/example-cluster-1.toml | 2 +-
.../cases/common/dml/issue-1087.result | 4 +-
.../cases/common/dml/issue-341.result | 8 +-
integration_tests/cases/common/dml/issue-59.result | 2 +-
.../cases/common/explain/explain.result | 2 +-
.../cases/common/optimizer/optimizer.result | 2 +-
.../cases/env/cluster/ddl/partition_table.result | 4 +-
.../cases/env/local/ddl/query-plan.result | 21 ++-
.../cases/env/local/ddl/query-plan.sql | 5 +
interpreters/Cargo.toml | 3 +
interpreters/src/context.rs | 20 +-
interpreters/src/factory.rs | 14 +-
interpreters/src/lib.rs | 4 +-
.../mod.rs => interpreters/src/metrics.rs | 14 +-
interpreters/src/select.rs | 108 +++++++----
interpreters/src/tests.rs | 25 ++-
proxy/src/instance.rs | 2 +
proxy/src/lib.rs | 81 +-------
proxy/src/read.rs | 37 +---
query_engine/Cargo.toml | 1 +
query_engine/src/config.rs | 3 +
query_engine/src/context.rs | 2 +
query_engine/src/datafusion_impl/mod.rs | 75 +-------
.../src/datafusion_impl/physical_planner.rs | 39 ++--
query_engine/src/datafusion_impl/task_context.rs | 6 +
query_frontend/Cargo.toml | 3 +
query_frontend/src/frontend.rs | 75 ++++----
query_frontend/src/influxql/planner.rs | 13 +-
query_frontend/src/lib.rs | 1 +
query_frontend/src/logical_optimizer/mod.rs | 49 +++++
.../src}/logical_optimizer/type_conversion.rs | 5 +-
query_frontend/src/plan.rs | 209 ++++++++++++++++++++-
query_frontend/src/planner.rs | 15 +-
query_frontend/src/promql/convert.rs | 3 +-
query_frontend/src/promql/remote.rs | 16 +-
.../src/grpc/remote_engine_service/metrics.rs | 14 +-
server/src/grpc/remote_engine_service/mod.rs | 87 ++++++---
server/src/http.rs | 6 +-
server/src/server.rs | 3 +
src/ceresdb/src/config.rs | 5 +-
src/ceresdb/src/setup.rs | 22 ++-
system_catalog/src/sys_catalog_table.rs | 1 +
table_engine/src/engine.rs | 4 +-
table_engine/src/provider.rs | 48 ++++-
table_engine/src/remote/model.rs | 11 +-
table_engine/src/table.rs | 5 +
62 files changed, 894 insertions(+), 389 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index e32493fe..27104165 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -648,7 +648,7 @@ dependencies = [
[[package]]
name = "arrow_util"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"ahash 0.8.3",
"arrow 43.0.0",
@@ -2271,7 +2271,7 @@ dependencies = [
[[package]]
name = "datafusion_util"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"async-trait",
"datafusion",
@@ -2372,6 +2372,7 @@ dependencies = [
"lazy_static",
"prometheus 0.12.0",
"prost",
+ "runtime",
"snafu 0.6.10",
"table_engine",
"tokio",
@@ -2860,7 +2861,7 @@ checksum =
"8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
[[package]]
name = "generated_types"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"pbjson",
"pbjson-build",
@@ -3298,7 +3299,7 @@ dependencies = [
[[package]]
name = "influxdb_influxql_parser"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"chrono",
"chrono-tz",
@@ -3352,12 +3353,15 @@ dependencies = [
"futures 0.3.28",
"generic_error",
"hash_ext",
+ "lazy_static",
"logger",
"macros",
"meta_client",
+ "prometheus 0.12.0",
"query_engine",
"query_frontend",
"regex",
+ "runtime",
"snafu 0.6.10",
"table_engine",
"test_util",
@@ -3388,7 +3392,7 @@ dependencies = [
[[package]]
name = "iox_query"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"arrow 43.0.0",
"arrow_util",
@@ -3412,7 +3416,7 @@ dependencies = [
[[package]]
name = "iox_query_influxql"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"arrow 43.0.0",
"chrono",
@@ -3738,6 +3742,7 @@ version = "1.2.6-alpha"
dependencies = [
"chrono",
"log",
+ "runtime",
"serde",
"slog",
"slog-async",
@@ -4511,7 +4516,7 @@ dependencies = [
[[package]]
name = "observability_deps"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"tracing",
]
@@ -5430,6 +5435,7 @@ dependencies = [
"macros",
"prost",
"query_frontend",
+ "runtime",
"serde",
"snafu 0.6.10",
"table_engine",
@@ -5446,6 +5452,7 @@ dependencies = [
"async-trait",
"catalog",
"ceresdbproto 1.0.23
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "chrono",
"cluster",
"codec",
"common_types",
@@ -5455,6 +5462,7 @@ dependencies = [
"generic_error",
"hash_ext",
"influxdb_influxql_parser",
+ "iox_query",
"iox_query_influxql",
"itertools 0.10.5",
"lazy_static",
@@ -5465,6 +5473,7 @@ dependencies = [
"prom-remote-api",
"regex",
"regex-syntax 0.6.29",
+ "runtime",
"schema",
"snafu 0.6.10",
"sqlparser",
@@ -5475,7 +5484,7 @@ dependencies = [
[[package]]
name = "query_functions"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"arrow 43.0.0",
"chrono",
@@ -6130,7 +6139,7 @@ dependencies = [
[[package]]
name = "schema"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"arrow 43.0.0",
"hashbrown 0.13.2",
@@ -6901,7 +6910,7 @@ dependencies = [
[[package]]
name = "test_helpers"
version = "0.1.0"
-source =
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57"
+source =
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c"
dependencies = [
"dotenvy",
"observability_deps",
diff --git a/Cargo.toml b/Cargo.toml
index a01a7222..c99bbaf5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -115,10 +115,10 @@ hash_ext = { path = "components/hash_ext" }
hex = "0.4.3"
hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git", rev
= "425487ce910f26636fbde8c4d640b538431aad50" }
id_allocator = { path = "components/id_allocator" }
-influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git",
rev = "acbd3ad7651f2deb74857155bea892f88926da57", package =
"iox_query_influxql" }
-influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev =
"acbd3ad7651f2deb74857155bea892f88926da57", package =
"influxdb_influxql_parser" }
-influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev =
"acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query" }
-influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev =
"acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" }
+influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git",
rev = "a905863", package = "iox_query_influxql" }
+influxql-parser = { git = "https://github.com/CeresDB/influxql.git", rev =
"a905863", package = "influxdb_influxql_parser" }
+influxql-query = { git = "https://github.com/CeresDB/influxql.git", rev =
"a905863", package = "iox_query" }
+influxql-schema = { git = "https://github.com/CeresDB/influxql.git", rev =
"a905863", package = "schema" }
interpreters = { path = "interpreters" }
itertools = "0.10.5"
lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
diff --git a/analytic_engine/src/instance/mod.rs
b/analytic_engine/src/instance/mod.rs
index 6eb257b8..7525f641 100644
--- a/analytic_engine/src/instance/mod.rs
+++ b/analytic_engine/src/instance/mod.rs
@@ -38,7 +38,7 @@ use generic_error::{BoxError, GenericError};
use logger::{error, info};
use macros::define_result;
use mem_collector::MemUsageCollector;
-use runtime::Runtime;
+use runtime::{PriorityRuntime, Runtime};
use snafu::{ResultExt, Snafu};
use table_engine::{engine::EngineRuntimes, predicate::PredicateRef,
table::FlushRequest};
use time_ext::ReadableDuration;
@@ -291,7 +291,7 @@ impl Instance {
}
#[inline]
- fn read_runtime(&self) -> &Arc<Runtime> {
+ fn read_runtime(&self) -> &PriorityRuntime {
&self.runtimes.read_runtime
}
diff --git a/analytic_engine/src/instance/read.rs
b/analytic_engine/src/instance/read.rs
index 9624f4cf..341af2fc 100644
--- a/analytic_engine/src/instance/read.rs
+++ b/analytic_engine/src/instance/read.rs
@@ -122,6 +122,10 @@ impl Instance {
None,
));
+ let runtime = self
+ .read_runtime()
+ .choose_runtime(&request.priority)
+ .clone();
let sst_read_options_builder = SstReadOptionsBuilder::new(
ScanType::Query,
self.scan_options.clone(),
@@ -129,7 +133,7 @@ impl Instance {
table_options.num_rows_per_row_group,
request.predicate.clone(),
self.meta_cache.clone(),
- self.read_runtime().clone(),
+ runtime,
);
if need_merge_sort {
diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs
index 90097101..f6bc9cf4 100644
--- a/analytic_engine/src/table/mod.rs
+++ b/analytic_engine/src/table/mod.rs
@@ -541,6 +541,8 @@ impl Table for TableImpl {
projected_schema: request.projected_schema,
predicate,
metrics_collector:
MetricsCollector::new(GET_METRICS_COLLECTOR_NAME.to_string()),
+ // TODO: pass priority from request
+ priority: Default::default(),
};
let mut batch_stream = self
.read(read_request)
diff --git a/analytic_engine/src/tests/table.rs
b/analytic_engine/src/tests/table.rs
index 8524f9de..1704a051 100644
--- a/analytic_engine/src/tests/table.rs
+++ b/analytic_engine/src/tests/table.rs
@@ -192,6 +192,7 @@ pub fn new_read_all_request_with_order(schema: Schema,
opts: ReadOptions) -> Rea
projected_schema: ProjectedSchema::no_projection(schema),
predicate: Arc::new(Predicate::empty()),
metrics_collector: MetricsCollector::default(),
+ priority: Default::default(),
}
}
diff --git a/analytic_engine/src/tests/util.rs
b/analytic_engine/src/tests/util.rs
index 0d9613b7..c7acad40 100644
--- a/analytic_engine/src/tests/util.rs
+++ b/analytic_engine/src/tests/util.rs
@@ -26,6 +26,7 @@ use common_types::{
use futures::stream::StreamExt;
use logger::info;
use object_store::config::{LocalOptions, ObjectStoreOptions, StorageOptions};
+use runtime::PriorityRuntime;
use size_ext::ReadableSize;
use table_engine::{
engine::{
@@ -124,7 +125,7 @@ impl<T: WalsOpener> TestContext<T> {
.open_wals(
&self.config.wal,
WalRuntimes {
- read_runtime: self.runtimes.read_runtime.clone(),
+ read_runtime:
self.runtimes.read_runtime.high().clone(),
write_runtime: self.runtimes.write_runtime.clone(),
default_runtime: self.runtimes.default_runtime.clone(),
},
@@ -528,7 +529,7 @@ impl Builder {
_dir: dir,
config,
runtimes: Arc::new(EngineRuntimes {
- read_runtime: runtime.clone(),
+ read_runtime: PriorityRuntime::new(runtime.clone(),
runtime.clone()),
write_runtime: runtime.clone(),
meta_runtime: runtime.clone(),
compact_runtime: runtime.clone(),
diff --git a/components/logger/Cargo.toml b/components/logger/Cargo.toml
index 77db8f59..7ae97c1e 100644
--- a/components/logger/Cargo.toml
+++ b/components/logger/Cargo.toml
@@ -35,6 +35,7 @@ workspace = true
[dependencies]
chrono = { workspace = true }
log = "0.4"
+runtime = { workspace = true }
serde = { workspace = true }
slog = { workspace = true }
slog-async = "2.6"
diff --git a/components/logger/src/lib.rs b/components/logger/src/lib.rs
index 1dc78b0c..5d25e230 100644
--- a/components/logger/src/lib.rs
+++ b/components/logger/src/lib.rs
@@ -28,6 +28,7 @@ pub use log::{
debug as log_debug, error as log_error, info as log_info, max_level, trace
as log_trace,
warn as log_warn, SetLoggerError,
};
+use runtime::Priority;
use serde::{Deserialize, Serialize};
pub use slog::Level;
use slog::{slog_o, Drain, Key, OwnedKVList, Record, KV};
@@ -471,6 +472,7 @@ pub struct SlowTimer<'a> {
sql: &'a str,
slow_threshold: Duration,
start_time: Instant,
+ priority: Option<Priority>,
}
impl<'a> Drop for SlowTimer<'a> {
@@ -478,9 +480,10 @@ impl<'a> Drop for SlowTimer<'a> {
let cost = self.elapsed();
if cost > self.slow_threshold {
slow_query!(
- "Normal query elapsed:{:?}, id:{}, query:{}",
+ "Normal query elapsed:{:?}, id:{}, priority:{:?}, query:{}",
cost,
self.request_id,
+ self.priority,
self.sql,
);
}
@@ -494,6 +497,7 @@ impl<'a> SlowTimer<'a> {
sql,
slow_threshold: threshold,
start_time: Instant::now(),
+ priority: None,
}
}
@@ -504,6 +508,10 @@ impl<'a> SlowTimer<'a> {
pub fn start_time(&self) -> Instant {
self.start_time
}
+
+ pub fn priority(&mut self, priority: Priority) {
+ self.priority = Some(priority);
+ }
}
#[macro_export(local_inner_macros)]
diff --git a/components/runtime/src/lib.rs b/components/runtime/src/lib.rs
index 54f6e7be..b6453819 100644
--- a/components/runtime/src/lib.rs
+++ b/components/runtime/src/lib.rs
@@ -30,8 +30,10 @@ use tokio::{
};
mod metrics;
+mod priority_runtime;
+
+pub use priority_runtime::{Priority, PriorityRuntime};
-// TODO(yingwen): Use opaque error type
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
diff --git a/components/runtime/src/priority_runtime.rs
b/components/runtime/src/priority_runtime.rs
new file mode 100644
index 00000000..1f69bd8a
--- /dev/null
+++ b/components/runtime/src/priority_runtime.rs
@@ -0,0 +1,98 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::future::Future;
+
+use crate::{JoinHandle, RuntimeRef};
+
+// TODO: maybe we could move this to common_types crate.
+#[derive(Copy, Clone, Debug, Default)]
+#[repr(u8)]
+pub enum Priority {
+ #[default]
+ High = 0,
+ Low = 1,
+}
+
+impl Priority {
+ pub fn as_u8(&self) -> u8 {
+ *self as u8
+ }
+
+ pub fn as_str(&self) -> &str {
+ match self {
+ Self::High => "high",
+ Self::Low => "low",
+ }
+ }
+}
+
+impl TryFrom<u8> for Priority {
+ type Error = String;
+
+ fn try_from(value: u8) -> Result<Self, Self::Error> {
+ match value {
+ 0 => Ok(Priority::High),
+ 1 => Ok(Priority::Low),
+ _ => Err(format!("Unknown priority, value:{value}")),
+ }
+ }
+}
+
+#[derive(Clone, Debug)]
+pub struct PriorityRuntime {
+ low: RuntimeRef,
+ high: RuntimeRef,
+}
+
+impl PriorityRuntime {
+ pub fn new(low: RuntimeRef, high: RuntimeRef) -> Self {
+ Self { low, high }
+ }
+
+ pub fn low(&self) -> &RuntimeRef {
+ &self.low
+ }
+
+ pub fn high(&self) -> &RuntimeRef {
+ &self.high
+ }
+
+ pub fn choose_runtime(&self, priority: &Priority) -> &RuntimeRef {
+ match priority {
+ Priority::Low => &self.low,
+ Priority::High => &self.high,
+ }
+ }
+
+ // By default we spawn the future to the higher priority runtime.
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ self.high.spawn(future)
+ }
+
+ pub fn spawn_with_priority<F>(&self, future: F, priority: Priority) ->
JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ match priority {
+ Priority::Low => self.low.spawn(future),
+ Priority::High => self.high.spawn(future),
+ }
+ }
+}
diff --git a/df_engine_extensions/Cargo.toml b/df_engine_extensions/Cargo.toml
index 54926369..4de30d40 100644
--- a/df_engine_extensions/Cargo.toml
+++ b/df_engine_extensions/Cargo.toml
@@ -41,6 +41,7 @@ generic_error = { workspace = true }
lazy_static = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
+runtime = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
trace_metric = { workspace = true }
diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs
b/df_engine_extensions/src/dist_sql_query/mod.rs
index 4bbf6b36..abfc0cca 100644
--- a/df_engine_extensions/src/dist_sql_query/mod.rs
+++ b/df_engine_extensions/src/dist_sql_query/mod.rs
@@ -26,6 +26,7 @@ use datafusion::{
};
use futures::future::BoxFuture;
use generic_error::BoxError;
+use runtime::Priority;
use table_engine::{predicate::PredicateRef, remote::model::TableIdentifier,
table::TableRef};
pub mod codec;
@@ -56,6 +57,7 @@ pub trait ExecutableScanBuilder: fmt::Debug + Send + Sync +
'static {
&self,
table: TableRef,
ctx: TableScanContext,
+ priority: Priority,
) -> DfResult<Arc<dyn ExecutionPlan>>;
}
@@ -91,22 +93,6 @@ pub struct TableScanContext {
pub predicate: PredicateRef,
}
-impl TableScanContext {
- pub fn new(
- batch_size: usize,
- read_parallelism: usize,
- projected_schema: ProjectedSchema,
- predicate: PredicateRef,
- ) -> Self {
- Self {
- batch_size,
- read_parallelism,
- projected_schema,
- predicate,
- }
- }
-}
-
impl TryFrom<TableScanContext> for
ceresdbproto::remote_engine::TableScanContext {
type Error = datafusion::error::DataFusionError;
diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index 0dbaf415..87cd18bd 100644
--- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -43,6 +43,7 @@ use datafusion::{
},
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
+use runtime::Priority;
use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector,
TraceMetricWhenDrop};
@@ -57,6 +58,7 @@ pub struct UnresolvedPartitionedScan {
pub sub_tables: Vec<TableIdentifier>,
pub table_scan_ctx: TableScanContext,
pub metrics_collector: MetricsCollector,
+ pub priority: Priority,
}
impl UnresolvedPartitionedScan {
@@ -77,6 +79,7 @@ impl UnresolvedPartitionedScan {
sub_tables,
table_scan_ctx,
metrics_collector,
+ priority: read_request.priority,
}
}
}
diff --git a/df_engine_extensions/src/dist_sql_query/resolver.rs
b/df_engine_extensions/src/dist_sql_query/resolver.rs
index c48bfe53..8f1d4e1c 100644
--- a/df_engine_extensions/src/dist_sql_query/resolver.rs
+++ b/df_engine_extensions/src/dist_sql_query/resolver.rs
@@ -20,6 +20,7 @@ use datafusion::{
error::{DataFusionError, Result as DfResult},
physical_plan::{analyze::AnalyzeExec, ExecutionPlan},
};
+use runtime::Priority;
use table_engine::{remote::model::TableIdentifier, table::TableRef};
use crate::{
@@ -45,6 +46,7 @@ pub struct Resolver {
remote_executor: RemotePhysicalPlanExecutorRef,
catalog_manager: CatalogManagerRef,
scan_builder: ExecutableScanBuilderRef,
+ priority: Priority,
}
impl Resolver {
@@ -52,11 +54,13 @@ impl Resolver {
remote_executor: RemotePhysicalPlanExecutorRef,
catalog_manager: CatalogManagerRef,
scan_builder: ExecutableScanBuilderRef,
+ priority: Priority,
) -> Self {
Self {
remote_executor,
catalog_manager,
scan_builder,
+ priority,
}
}
@@ -214,7 +218,10 @@ impl Resolver {
};
if let Some((table, table_scan_ctx)) = build_scan_opt {
- return self.scan_builder.build(table, table_scan_ctx).await;
+ return self
+ .scan_builder
+ .build(table, table_scan_ctx, self.priority)
+ .await;
}
let children = plan.children().clone();
diff --git a/df_engine_extensions/src/dist_sql_query/test_util.rs
b/df_engine_extensions/src/dist_sql_query/test_util.rs
index d70f9ec2..9006a526 100644
--- a/df_engine_extensions/src/dist_sql_query/test_util.rs
+++ b/df_engine_extensions/src/dist_sql_query/test_util.rs
@@ -44,6 +44,7 @@ use datafusion::{
scalar::ScalarValue,
};
use futures::{future::BoxFuture, Stream};
+use runtime::Priority;
use table_engine::{
memory::MemoryTable,
predicate::PredicateBuilder,
@@ -204,6 +205,7 @@ impl TestContext {
projected_schema,
predicate,
metrics_collector: MetricsCollector::default(),
+ priority: Default::default(),
};
// Build the test catalog
@@ -238,6 +240,7 @@ impl TestContext {
Arc::new(MockRemotePhysicalPlanExecutor),
self.catalog_manager.clone(),
Box::new(MockScanBuilder),
+ Priority::High,
)
}
@@ -422,6 +425,7 @@ impl ExecutableScanBuilder for MockScanBuilder {
&self,
_table: TableRef,
ctx: TableScanContext,
+ priority: Priority,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let request = ReadRequest {
request_id: RequestId::from("test"),
@@ -433,6 +437,7 @@ impl ExecutableScanBuilder for MockScanBuilder {
projected_schema: ctx.projected_schema.clone(),
predicate: ctx.predicate.clone(),
metrics_collector: MetricsCollector::default(),
+ priority,
};
Ok(Arc::new(MockScan { request }))
diff --git a/docs/example-cluster-1.toml b/docs/example-cluster-1.toml
index b08d3dc7..8c221dc8 100644
--- a/docs/example-cluster-1.toml
+++ b/docs/example-cluster-1.toml
@@ -16,7 +16,7 @@
addr = "127.0.0.1"
[logger]
-level = "info"
+level = "debug"
[server]
bind_addr = "0.0.0.0"
diff --git a/integration_tests/cases/common/dml/issue-1087.result
b/integration_tests/cases/common/dml/issue-1087.result
index 54265dae..ad8a0cc7 100644
--- a/integration_tests/cases/common/dml/issue-1087.result
+++ b/integration_tests/cases/common/dml/issue-1087.result
@@ -76,7 +76,7 @@ String("logical_plan after push_down_limit"),String("SAME
TEXT AS ABOVE"),
String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME
TEXT AS ABOVE"),
String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t,
name, value]"),
-String("initial_physical_plan"),String("ScanTable: table=issue_1087,
parallelism=8\n"),
+String("initial_physical_plan"),String("ScanTable: table=issue_1087,
parallelism=8, priority=Low\n"),
String("physical_plan after aggregate_statistics"),String("SAME TEXT AS
ABOVE"),
String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"),
String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"),
@@ -86,7 +86,7 @@ String("physical_plan after
CombinePartialFinalAggregate"),String("SAME TEXT AS
String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"),
String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"),
String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"),
-String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8\n"),
+String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8,
priority=Low\n"),
DROP TABLE `issue_1087`;
diff --git a/integration_tests/cases/common/dml/issue-341.result
b/integration_tests/cases/common/dml/issue-341.result
index f9405db3..90222259 100644
--- a/integration_tests/cases/common/dml/issue-341.result
+++ b/integration_tests/cases/common/dml/issue-341.result
@@ -58,7 +58,7 @@ WHERE
plan_type,plan,
String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp,
value], full_filters=[issue341_t1.value = Int32(3)]"),
-String("physical_plan"),String("ScanTable: table=issue341_t1,
parallelism=8\n"),
+String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8,
priority=Low\n"),
-- FilterExec node should not be in plan.
@@ -72,7 +72,7 @@ WHERE
plan_type,plan,
String("logical_plan"),String("Projection: issue341_t1.timestamp,
issue341_t1.value\n TableScan: issue341_t1 projection=[timestamp, value,
tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
-String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as
timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8\n"),
+String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as
timestamp, value@1 as value]\n ScanTable: table=issue341_t1, parallelism=8,
priority=Low\n"),
-- Repeat operations above, but with overwrite table
@@ -116,7 +116,7 @@ WHERE
plan_type,plan,
String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n
TableScan: issue341_t2 projection=[timestamp, value],
partial_filters=[issue341_t2.value = Float64(3)]"),
-String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n
FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8\n"),
+String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n
FilterExec: value@1 = 3\n ScanTable: table=issue341_t2, parallelism=8,
priority=Low\n"),
-- When using tag as filter, FilterExec node should not be in plan.
@@ -130,7 +130,7 @@ WHERE
plan_type,plan,
String("logical_plan"),String("Projection: issue341_t2.timestamp,
issue341_t2.value\n TableScan: issue341_t2 projection=[timestamp, value,
tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
-String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as
timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8\n"),
+String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as
timestamp, value@1 as value]\n ScanTable: table=issue341_t2, parallelism=8,
priority=Low\n"),
DROP TABLE IF EXISTS `issue341_t1`;
diff --git a/integration_tests/cases/common/dml/issue-59.result
b/integration_tests/cases/common/dml/issue-59.result
index d2bdb35f..549c7019 100644
--- a/integration_tests/cases/common/dml/issue-59.result
+++ b/integration_tests/cases/common/dml/issue-59.result
@@ -25,7 +25,7 @@ GROUP BY id+1;
plan_type,plan,
String("logical_plan"),String("Projection: group_alias_0 AS issue59.id +
Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n Aggregate:
groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n Projection:
group_alias_0, alias1\n Aggregate: groupBy=[[CAST(issue59.id AS Int64) +
Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n
TableScan: issue59 projection=[id, account]"),
-String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as
issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n
AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0],
aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n
RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n
AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0],
aggr=[COUNT(alias1)]\n [...]
+String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as
issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n
AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0],
aggr=[COUNT(alias1)]\n CoalesceBatchesExec: target_batch_size=8192\n
RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n
AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0],
aggr=[COUNT(alias1)]\n [...]
DROP TABLE IF EXISTS issue59;
diff --git a/integration_tests/cases/common/explain/explain.result
b/integration_tests/cases/common/explain/explain.result
index ba651eca..0cd06380 100644
--- a/integration_tests/cases/common/explain/explain.result
+++ b/integration_tests/cases/common/explain/explain.result
@@ -10,7 +10,7 @@ EXPLAIN SELECT t FROM `04_explain_t`;
plan_type,plan,
String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"),
-String("physical_plan"),String("ScanTable: table=04_explain_t,
parallelism=8\n"),
+String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8,
priority=Low\n"),
DROP TABLE `04_explain_t`;
diff --git a/integration_tests/cases/common/optimizer/optimizer.result
b/integration_tests/cases/common/optimizer/optimizer.result
index 8551f96c..f9cfac2d 100644
--- a/integration_tests/cases/common/optimizer/optimizer.result
+++ b/integration_tests/cases/common/optimizer/optimizer.result
@@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM
`07_optimizer_t` GROUP BY
plan_type,plan,
String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1,
AVG(07_optimizer_t.value) AS c2\n Aggregate: groupBy=[[07_optimizer_t.name]],
aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n TableScan:
07_optimizer_t projection=[name, value]"),
-String("physical_plan"),String("ProjectionExec:
expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name],
aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n
CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec:
partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec:
mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value),
AVG(07_optimizer_t.value)]\n [...]
+String("physical_plan"),String("ProjectionExec:
expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name],
aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n
CoalesceBatchesExec: target_batch_size=8192\n RepartitionExec:
partitioning=Hash([name@0], 8), input_partitions=8\n AggregateExec:
mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value),
AVG(07_optimizer_t.value)]\n [...]
DROP TABLE `07_optimizer_t`;
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 18e023c0..e8feacbc 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -83,7 +83,7 @@
UInt64(4860320137932382618),Timestamp(1651737067000),String("ceresdb9"),Int32(0)
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n
__partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec,
metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable:
table=__partition_table_t_1, parallelism=8, metrics=[\nPredicate { exprs:[name
= Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_st [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n
__partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec,
metrics=[output_rows=1, elapsed_compute=xxs]\n ScanTable:
table=__partition_table_t_1, parallelism=8, priority=Low, metrics=[\nPredicate
{ exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange [...]
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
@@ -92,7 +92,7 @@ String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:f
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0",
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n
__partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n __partition_table_t_x:\n
poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n
__partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n\n__partition_table_t_x:\n [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan:
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n
__partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n __partition_table_t_x:\n
poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n
__partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n
wait_duration=xxs\n\n__partition_table_t_x:\n [...]
ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/local/ddl/query-plan.result
b/integration_tests/cases/env/local/ddl/query-plan.result
index 26dcf909..be858c15 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.result
+++ b/integration_tests/cases/env/local/ddl/query-plan.result
@@ -31,7 +31,16 @@ explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348001000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate {
exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n
num_memtables=1\n num_ssts=0\n scan_count=2\n
scan_duration=xxs\n [...]
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348001001),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=true\n iter_num=1\n merge_iter_0:\n
init_duration=xxs\n num_memtables=1\n num_ssts=0\n
scan_count=2\n scan_durat [...]
+
+
+-- This query should have higher priority
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+explain analyze select t from `03_dml_select_real_time_range`
+where t >= 1695348001000 and t < 1695348002000;
+
+plan_type,plan,
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=High,
metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t
< TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001000), exclusive_end:
Timestamp(1695348002000) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n
num_memtables=1\n num_ssts= [...]
-- This query should not include memtable
@@ -40,7 +49,7 @@ explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348002000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate {
exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348002001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=0\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348002001),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=true\n iter_num=0\n=0]\n"),
-- SQLNESS ARG pre_cmd=flush
@@ -51,7 +60,7 @@ explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348001000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate {
exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348001001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=1\n merge_iter_0:\n init_duration=xxs\n
num_memtables=0\n num_ssts=1\n scan_count=2\n
scan_duration=xxs\n [...]
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348001001),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=true\n iter_num=1\n merge_iter_0:\n
init_duration=xxs\n num_memtables=0\n num_ssts=1\n
scan_count=2\n scan_durat [...]
-- This query should not include SST
@@ -59,7 +68,7 @@ explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348002000;
plan_type,plan,
-String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate {
exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange {
inclusive_start: Timestamp(1695348002001), exclusive_end:
Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n
iter_num=0\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable:
table=03_dml_select_real_time_range, parallelism=8, priority=Low,
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)],
time_range:TimeRange { inclusive_start: Timestamp(1695348002001),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=true\n iter_num=0\n=0]\n"),
-- Table with an 'append' update mode
@@ -92,7 +101,7 @@ explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';
plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable:
table=03_append_mode_table, parallelism=8, metrics=[\nPredicate { exprs:[t >=
TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")],
time_range:TimeRange { inclusive_start: Timestamp(1695348001000),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=false\n chain_iter_0:\n num_memtables=1\n
num_ssts= [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable:
table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), name =
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start:
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) }
}\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n
num_memtables=1\n [...]
-- Should just fetch projected columns from SST
@@ -106,7 +115,7 @@ explain analyze select t from `03_append_mode_table`
where t >= 1695348001000 and name = 'ceresdb';
plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable:
table=03_append_mode_table, parallelism=8, metrics=[\nPredicate { exprs:[t >=
TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")],
time_range:TimeRange { inclusive_start: Timestamp(1695348001000),
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n
do_merge_sort=false\n chain_iter_0:\n num_memtables=0\n
num_ssts= [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t],
metrics=[output_rows=2, elapsed_compute=xxs]\n ScanTable:
table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate {
exprs:[t >= TimestampMillisecond(1695348001000, None), name =
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start:
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) }
}\nscan_table:\n do_merge_sort=false\n chain_iter_0:\n
num_memtables=0\n [...]
DROP TABLE `03_dml_select_real_time_range`;
diff --git a/integration_tests/cases/env/local/ddl/query-plan.sql
b/integration_tests/cases/env/local/ddl/query-plan.sql
index a0baff5b..d1dbdaf9 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.sql
+++ b/integration_tests/cases/env/local/ddl/query-plan.sql
@@ -21,6 +21,11 @@ INSERT INTO `03_dml_select_real_time_range` (t, name, value)
explain analyze select t from `03_dml_select_real_time_range`
where t > 1695348001000;
+-- This query should have higher priority
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+explain analyze select t from `03_dml_select_real_time_range`
+where t >= 1695348001000 and t < 1695348002000;
+
-- This query should not include memtable
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
explain analyze select t from `03_dml_select_real_time_range`
diff --git a/interpreters/Cargo.toml b/interpreters/Cargo.toml
index 94b9f4ea..7687b97f 100644
--- a/interpreters/Cargo.toml
+++ b/interpreters/Cargo.toml
@@ -40,12 +40,15 @@ df_operator = { workspace = true }
futures = { workspace = true }
generic_error = { workspace = true }
hash_ext = { workspace = true }
+lazy_static = { workspace = true }
logger = { workspace = true }
macros = { workspace = true }
meta_client = { workspace = true }
+prometheus = { workspace = true }
query_engine = { workspace = true }
query_frontend = { workspace = true }
regex = { workspace = true }
+runtime = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
diff --git a/interpreters/src/context.rs b/interpreters/src/context.rs
index 34e76d44..ab72ca38 100644
--- a/interpreters/src/context.rs
+++ b/interpreters/src/context.rs
@@ -19,6 +19,7 @@ use std::{sync::Arc, time::Instant};
use common_types::request_id::RequestId;
use macros::define_result;
use query_engine::context::{Context as QueryContext, ContextRef as
QueryContextRef};
+use runtime::Priority;
use snafu::Snafu;
#[derive(Debug, Snafu)]
@@ -36,6 +37,9 @@ pub struct Context {
default_catalog: String,
default_schema: String,
enable_partition_table_access: bool,
+ /// If time range exceeds this threshold, the query will be marked as
+ /// expensive
+ expensive_query_threshold: u64,
}
impl Context {
@@ -46,16 +50,18 @@ impl Context {
default_catalog: String::new(),
default_schema: String::new(),
enable_partition_table_access: false,
+ expensive_query_threshold: 24 * 3600 * 1000, // default 24 hours
}
}
/// Create a new context of query executor
- pub fn new_query_context(&self) -> Result<QueryContextRef> {
+ pub fn new_query_context(&self, priority: Priority) ->
Result<QueryContextRef> {
let ctx = QueryContext {
request_id: self.request_id.clone(),
deadline: self.deadline,
default_catalog: self.default_catalog.clone(),
default_schema: self.default_schema.clone(),
+ priority,
};
Ok(Arc::new(ctx))
}
@@ -79,6 +85,11 @@ impl Context {
pub fn enable_partition_table_access(&self) -> bool {
self.enable_partition_table_access
}
+
+ #[inline]
+ pub fn expensive_query_threshold(&self) -> u64 {
+ self.expensive_query_threshold
+ }
}
#[must_use]
@@ -88,6 +99,7 @@ pub struct Builder {
default_catalog: String,
default_schema: String,
enable_partition_table_access: bool,
+ expensive_query_threshold: u64,
}
impl Builder {
@@ -102,6 +114,11 @@ impl Builder {
self
}
+ pub fn expensive_query_threshold(mut self, threshold: u64) -> Self {
+ self.expensive_query_threshold = threshold;
+ self
+ }
+
pub fn build(self) -> Context {
Context {
request_id: self.request_id,
@@ -109,6 +126,7 @@ impl Builder {
default_catalog: self.default_catalog,
default_schema: self.default_schema,
enable_partition_table_access: self.enable_partition_table_access,
+ expensive_query_threshold: self.expensive_query_threshold,
}
}
}
diff --git a/interpreters/src/factory.rs b/interpreters/src/factory.rs
index 6216b6c9..7217b5d1 100644
--- a/interpreters/src/factory.rs
+++ b/interpreters/src/factory.rs
@@ -17,6 +17,7 @@
use catalog::manager::ManagerRef;
use query_engine::{executor::ExecutorRef,
physical_planner::PhysicalPlannerRef};
use query_frontend::plan::Plan;
+use runtime::PriorityRuntime;
use table_engine::engine::TableEngineRef;
use crate::{
@@ -37,6 +38,7 @@ use crate::{
/// A factory to create interpreters
pub struct Factory {
query_executor: ExecutorRef,
+ query_runtime: PriorityRuntime,
physical_planner: PhysicalPlannerRef,
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
@@ -50,9 +52,11 @@ impl Factory {
catalog_manager: ManagerRef,
table_engine: TableEngineRef,
table_manipulator: TableManipulatorRef,
+ query_runtime: PriorityRuntime,
) -> Self {
Self {
query_executor,
+ query_runtime,
physical_planner,
catalog_manager,
table_engine,
@@ -68,9 +72,13 @@ impl Factory {
validator.validate(&plan)?;
let interpreter = match plan {
- Plan::Query(p) => {
- SelectInterpreter::create(ctx, p, self.query_executor,
self.physical_planner)
- }
+ Plan::Query(p) => SelectInterpreter::create(
+ ctx,
+ p,
+ self.query_executor,
+ self.physical_planner,
+ self.query_runtime,
+ ),
Plan::Insert(p) => InsertInterpreter::create(ctx, p),
Plan::Create(p) => {
CreateInterpreter::create(ctx, p, self.table_engine,
self.table_manipulator)
diff --git a/interpreters/src/lib.rs b/interpreters/src/lib.rs
index b7b8ce35..5304ab7d 100644
--- a/interpreters/src/lib.rs
+++ b/interpreters/src/lib.rs
@@ -29,13 +29,13 @@ pub mod exists;
pub mod factory;
pub mod insert;
pub mod interpreter;
+mod metrics;
pub mod select;
pub mod show;
+mod show_create;
pub mod table_manipulator;
pub mod validator;
-mod show_create;
-
#[cfg(test)]
mod tests;
diff --git a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs
b/interpreters/src/metrics.rs
similarity index 66%
copy from query_engine/src/datafusion_impl/logical_optimizer/mod.rs
copy to interpreters/src/metrics.rs
index a3b5130f..5f9b3028 100644
--- a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs
+++ b/interpreters/src/metrics.rs
@@ -12,8 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-//! Logical optimizer
+use lazy_static::lazy_static;
+use prometheus::{register_int_counter_vec, IntCounterVec};
-#[cfg(test)]
-pub mod tests;
-pub mod type_conversion;
+lazy_static! {
+ pub static ref ENGINE_QUERY_COUNTER: IntCounterVec =
register_int_counter_vec!(
+ "engine_query_counter",
+ "engine_query_counter",
+ &["priority"]
+ )
+ .unwrap();
+}
diff --git a/interpreters/src/select.rs b/interpreters/src/select.rs
index eb701663..c33c7fdf 100644
--- a/interpreters/src/select.rs
+++ b/interpreters/src/select.rs
@@ -19,15 +19,19 @@ use futures::TryStreamExt;
use generic_error::{BoxError, GenericError};
use logger::debug;
use macros::define_result;
-use query_engine::{executor::ExecutorRef,
physical_planner::PhysicalPlannerRef};
-use query_frontend::plan::QueryPlan;
+use query_engine::{
+ context::ContextRef as QueryContextRef,
+ executor::ExecutorRef,
+ physical_planner::{PhysicalPlanRef, PhysicalPlannerRef},
+};
+use query_frontend::plan::{PriorityContext, QueryPlan};
+use runtime::{Priority, PriorityRuntime};
use snafu::{ResultExt, Snafu};
-use table_engine::stream::SendableRecordBatchStream;
use crate::{
context::Context,
interpreter::{Interpreter, InterpreterPtr, Output, Result as
InterpreterResult, Select},
- RecordBatchVec,
+ metrics::ENGINE_QUERY_COUNTER,
};
#[derive(Debug, Snafu)]
@@ -37,6 +41,9 @@ pub enum Error {
#[snafu(display("Failed to execute physical plan, msg:{}, err:{}", msg,
source))]
ExecutePlan { msg: String, source: GenericError },
+
+ #[snafu(display("Failed to spawn task, err:{}", source))]
+ Spawn { source: runtime::Error },
}
define_result!(Error);
@@ -47,6 +54,7 @@ pub struct SelectInterpreter {
plan: QueryPlan,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
+ query_runtime: PriorityRuntime,
}
impl SelectInterpreter {
@@ -55,12 +63,14 @@ impl SelectInterpreter {
plan: QueryPlan,
executor: ExecutorRef,
physical_planner: PhysicalPlannerRef,
+ query_runtime: PriorityRuntime,
) -> InterpreterPtr {
Box::new(Self {
ctx,
plan,
executor,
physical_planner,
+ query_runtime,
})
}
}
@@ -69,21 +79,37 @@ impl SelectInterpreter {
impl Interpreter for SelectInterpreter {
async fn execute(self: Box<Self>) -> InterpreterResult<Output> {
let request_id = self.ctx.request_id();
- debug!(
- "Interpreter execute select begin, request_id:{}, plan:{:?}",
- request_id, self.plan
- );
+ let plan = self.plan;
+ let priority = match plan.decide_query_priority(PriorityContext {
+ time_range_threshold: self.ctx.expensive_query_threshold(),
+ }) {
+ Some(v) => v,
+ None => {
+ debug!(
+ "Query has invalid query range, return empty result
directly, id:{request_id}, plan:{plan:?}"
+ );
+ return Ok(Output::Records(Vec::new()));
+ }
+ };
+
+ ENGINE_QUERY_COUNTER
+ .with_label_values(&[priority.as_str()])
+ .inc();
let query_ctx = self
.ctx
- .new_query_context()
+ .new_query_context(priority)
.context(CreateQueryContext)
.context(Select)?;
+ debug!(
+ "Interpreter execute select begin, request_id:{request_id},
plan:{plan:?}, priority:{priority:?}"
+ );
+
// Create physical plan.
let physical_plan = self
.physical_planner
- .plan(&query_ctx, self.plan)
+ .plan(&query_ctx, plan)
.await
.box_err()
.context(ExecutePlan {
@@ -91,34 +117,50 @@ impl Interpreter for SelectInterpreter {
})
.context(Select)?;
- let record_batch_stream = self
- .executor
- .execute(&query_ctx, physical_plan)
+ if matches!(priority, Priority::Low) {
+ let executor = self.executor;
+ return self
+ .query_runtime
+ .spawn_with_priority(
+ async move {
+ execute_and_collect(query_ctx, executor, physical_plan)
+ .await
+ .context(Select)
+ },
+ Priority::Low,
+ )
+ .await
+ .context(Spawn)
+ .context(Select)?;
+ }
+
+ execute_and_collect(query_ctx, self.executor, physical_plan)
.await
- .box_err()
- .context(ExecutePlan {
- msg: "failed to execute physical plan",
- })
- .context(Select)?;
-
- debug!(
- "Interpreter execute select finish, request_id:{}",
- request_id
- );
-
- let record_batches = collect(record_batch_stream).await?;
-
- Ok(Output::Records(record_batches))
+ .context(Select)
}
}
-async fn collect(stream: SendableRecordBatchStream) ->
InterpreterResult<RecordBatchVec> {
- stream
- .try_collect()
+async fn execute_and_collect(
+ query_ctx: QueryContextRef,
+ executor: ExecutorRef,
+ physical_plan: PhysicalPlanRef,
+) -> Result<Output> {
+ let record_batch_stream = executor
+ .execute(&query_ctx, physical_plan)
.await
.box_err()
.context(ExecutePlan {
- msg: "failed to collect execution results",
- })
- .context(Select)
+ msg: "failed to execute physical plan",
+ })?;
+
+ let record_batches =
+ record_batch_stream
+ .try_collect()
+ .await
+ .box_err()
+ .context(ExecutePlan {
+ msg: "failed to collect execution results",
+ })?;
+
+ Ok(Output::Records(record_batches))
}
diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs
index ed5081d4..1b11f5c9 100644
--- a/interpreters/src/tests.rs
+++ b/interpreters/src/tests.rs
@@ -29,6 +29,7 @@ use query_frontend::{
config::DynamicConfig, parser::Parser, plan::Plan, planner::Planner,
provider::MetaProvider,
tests::MockMetaProvider,
};
+use runtime::{Builder, PriorityRuntime};
use table_engine::{engine::TableEngineRef, memory::MockRemoteEngine};
use crate::{
@@ -62,6 +63,7 @@ where
pub catalog_manager: ManagerRef,
pub table_manipulator: TableManipulatorRef,
pub query_engine: QueryEngineRef,
+ pub read_runtime: PriorityRuntime,
}
impl<M> Env<M>
@@ -84,6 +86,7 @@ where
self.catalog_manager.clone(),
self.engine(),
self.table_manipulator.clone(),
+ self.read_runtime.clone(),
)
}
@@ -236,6 +239,7 @@ where
catalog_manager.clone(),
self.engine(),
table_manipulator.clone(),
+ self.read_runtime.clone(),
);
let insert_sql = "INSERT INTO test_missing_columns_table(key1, key2,
field4) VALUES('tagk', 1638428434000, 1), ('tagk2', 1638428434000, 10);";
@@ -256,6 +260,7 @@ where
catalog_manager,
self.engine(),
table_manipulator,
+ self.read_runtime.clone(),
);
let ctx = Context::builder(RequestId::next_id(), None)
.default_catalog_and_schema(DEFAULT_CATALOG.to_string(),
DEFAULT_SCHEMA.to_string())
@@ -356,14 +361,21 @@ where
}
}
-#[tokio::test]
-async fn test_interpreters_rocks() {
- test_util::init_log_for_test();
- let rocksdb_ctx = RocksDBEngineBuildContext::default();
- test_interpreters(rocksdb_ctx).await;
+#[test]
+fn test_interpreters_rocks() {
+ let rt = Arc::new(Builder::default().build().unwrap());
+ let read_runtime = PriorityRuntime::new(rt.clone(), rt.clone());
+ rt.block_on(async {
+ test_util::init_log_for_test();
+ let rocksdb_ctx = RocksDBEngineBuildContext::default();
+ test_interpreters(rocksdb_ctx, read_runtime).await;
+ })
}
-async fn test_interpreters<T: EngineBuildContext>(engine_context: T) {
+async fn test_interpreters<T: EngineBuildContext>(
+ engine_context: T,
+ read_runtime: PriorityRuntime,
+) {
let env = TestEnv::builder().build();
let mut test_ctx = env.new_context(engine_context);
test_ctx.open().await;
@@ -391,6 +403,7 @@ async fn test_interpreters<T:
EngineBuildContext>(engine_context: T) {
catalog_manager,
table_manipulator,
query_engine,
+ read_runtime,
};
env.test_create_table().await;
diff --git a/proxy/src/instance.rs b/proxy/src/instance.rs
index ec0d2e48..00087d40 100644
--- a/proxy/src/instance.rs
+++ b/proxy/src/instance.rs
@@ -21,6 +21,7 @@ use df_operator::registry::FunctionRegistryRef;
use interpreters::table_manipulator::TableManipulatorRef;
use query_engine::QueryEngineRef;
use query_frontend::config::DynamicConfig as FrontendDynamicConfig;
+use runtime::PriorityRuntime;
use table_engine::{engine::TableEngineRef, remote::RemoteEngineRef};
use crate::limiter::Limiter;
@@ -29,6 +30,7 @@ use crate::limiter::Limiter;
pub struct Instance {
pub catalog_manager: ManagerRef,
pub query_engine: QueryEngineRef,
+ pub query_runtime: PriorityRuntime,
pub table_engine: TableEngineRef,
pub partition_table_engine: TableEngineRef,
// User defined functions registry.
diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs
index 74d26813..93b53590 100644
--- a/proxy/src/lib.rs
+++ b/proxy/src/lib.rs
@@ -39,7 +39,6 @@ mod write;
pub const FORWARDED_FROM: &str = "forwarded-from";
use std::{
- ops::Bound,
sync::Arc,
time::{Duration, Instant},
};
@@ -55,14 +54,9 @@ use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, PrometheusRemoteQueryRequest,
PrometheusRemoteQueryResponse, Route,
};
-use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID, ENABLE_TTL,
TTL};
-use datafusion::{
- prelude::{Column, Expr},
- scalar::ScalarValue,
-};
+use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID};
use futures::FutureExt;
use generic_error::BoxError;
-use influxql_query::logical_optimizer::range_predicate::find_time_range;
use interpreters::{
context::Context as InterpreterContext,
factory::Factory,
@@ -80,7 +74,6 @@ use table_engine::{
table::{TableId, TableRef},
PARTITION_TABLE_ENGINE_TYPE,
};
-use time_ext::{current_time_millis, parse_duration};
use tonic::{transport::Channel, IntoRequest};
use crate::{
@@ -92,9 +85,6 @@ use crate::{
schema_config_provider::SchemaConfigProviderRef,
};
-// Because the clock may have errors, choose 1 hour as the error buffer
-const QUERY_EXPIRED_BUFFER: Duration = Duration::from_secs(60 * 60);
-
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
pub struct SubTableAccessPerm {
@@ -123,6 +113,7 @@ pub struct Proxy {
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<ReadRequestNotifiers>,
+ expensive_query_threshold: u64,
}
impl Proxy {
@@ -140,6 +131,7 @@ impl Proxy {
cluster_with_meta: bool,
sub_table_access_perm: SubTableAccessPerm,
request_notifiers: Option<ReadRequestNotifiers>,
+ expensive_query_threshold: u64,
) -> Self {
let forwarder = Arc::new(Forwarder::new(
forward_config,
@@ -159,6 +151,7 @@ impl Proxy {
cluster_with_meta,
sub_table_access_perm,
request_notifiers,
+ expensive_query_threshold,
}
}
@@ -211,70 +204,6 @@ impl Proxy {
})
}
- /// Returns true when query range maybe exceeding ttl,
- /// Note: False positive is possible
- // TODO(tanruixiang): Add integration testing when supported by the testing
- // framework
- fn is_plan_expired(
- &self,
- plan: &Plan,
- catalog_name: &str,
- schema_name: &str,
- table_name: &str,
- ) -> Result<bool> {
- if let Plan::Query(query) = &plan {
- let catalog = self.get_catalog(catalog_name)?;
- let schema = self.get_schema(&catalog, schema_name)?;
- let table_ref = match self.get_table(&schema, table_name) {
- Ok(Some(v)) => v,
- _ => return Ok(false),
- };
- if let Some(value) = table_ref.options().get(ENABLE_TTL) {
- if value == "false" {
- return Ok(false);
- }
- }
- let ttl_duration = if let Some(ttl) = table_ref.options().get(TTL)
{
- if let Ok(ttl) = parse_duration(ttl) {
- ttl
- } else {
- return Ok(false);
- }
- } else {
- return Ok(false);
- };
-
- let timestamp_name = &table_ref
- .schema()
- .column(table_ref.schema().timestamp_index())
- .name
- .clone();
- let ts_col = Column::from_name(timestamp_name);
- let range = find_time_range(&query.df_plan, &ts_col)
- .box_err()
- .context(Internal {
- msg: "Failed to find time range",
- })?;
- match range.end {
- Bound::Included(x) | Bound::Excluded(x) => {
- if let Expr::Literal(ScalarValue::Int64(Some(x))) = x {
- let now = current_time_millis() as i64;
- let deadline = now
- - ttl_duration.as_millis() as i64
- - QUERY_EXPIRED_BUFFER.as_millis() as i64;
-
- if x * 1_000 <= deadline {
- return Ok(true);
- }
- }
- }
- Bound::Unbounded => (),
- }
- }
-
- Ok(false)
- }
-
fn get_catalog(&self, catalog_name: &str) -> Result<CatalogRef> {
let catalog = self
.instance
@@ -554,6 +483,7 @@ impl Proxy {
// Use current ctx's catalog and schema as default catalog and
schema
.default_catalog_and_schema(catalog.to_string(),
schema.to_string())
.enable_partition_table_access(enable_partition_table_access)
+ .expensive_query_threshold(self.expensive_query_threshold)
.build();
let interpreter_factory = Factory::new(
self.instance.query_engine.executor(),
@@ -561,6 +491,7 @@ impl Proxy {
self.instance.catalog_manager.clone(),
self.instance.table_engine.clone(),
self.instance.table_manipulator.clone(),
+ self.instance.query_runtime.clone(),
);
interpreter_factory
.create(interpreter_ctx, plan)
diff --git a/proxy/src/read.rs b/proxy/src/read.rs
index 9d93221c..effb88c0 100644
--- a/proxy/src/read.rs
+++ b/proxy/src/read.rs
@@ -28,6 +28,7 @@ use notifier::notifier::{ExecutionGuard, RequestNotifiers,
RequestResult};
use query_frontend::{
frontend,
frontend::{Context as SqlContext, Frontend},
+ plan::{Plan, PriorityContext},
provider::CatalogMetaProvider,
};
use router::endpoint::Endpoint;
@@ -176,7 +177,7 @@ impl Proxy {
.slow_threshold
.load(std::sync::atomic::Ordering::Relaxed);
let slow_threshold = Duration::from_secs(slow_threshold_secs);
- let slow_timer = SlowTimer::new(request_id.as_str(), sql,
slow_threshold);
+ let mut slow_timer = SlowTimer::new(request_id.as_str(), sql,
slow_threshold);
let deadline = ctx.timeout.map(|t| slow_timer.start_time() + t);
let catalog = self.instance.catalog_manager.default_catalog_name();
@@ -243,13 +244,11 @@ impl Proxy {
})?;
}
- let mut plan_maybe_expired = false;
- if let Some(table_name) = &table_name {
- match self.is_plan_expired(&plan, catalog, schema, table_name) {
- Ok(v) => plan_maybe_expired = v,
- Err(err) => {
- warn!("Plan expire check failed, err:{err}");
- }
+ if let Plan::Query(plan) = &plan {
+ if let Some(priority) = plan.decide_query_priority(PriorityContext
{
+ time_range_threshold: self.expensive_query_threshold,
+ }) {
+ slow_timer.priority(priority);
}
}
@@ -276,27 +275,7 @@ impl Proxy {
"Handle sql query finished, sql:{sql}, elapsed:{cost:?},
catalog:{catalog}, schema:{schema}, ctx:{ctx:?}",
);
- match &output {
- Output::AffectedRows(_) => Ok(output),
- Output::Records(v) => {
- if plan_maybe_expired {
- let num_rows = v
- .iter()
- .fold(0_usize, |acc, record_batch| acc +
record_batch.num_rows());
- if num_rows == 0 {
- warn!("Query time range maybe exceed TTL, sql:{sql}");
-
- // TODO: Cannot return this error directly, empty query
- // should return 200, not 4xx/5xx
- // All protocols should recognize this error.
- // return Err(Error::QueryMaybeExceedTTL {
- // msg: format!("Query time range maybe exceed TTL,
- // sql:{sql}"), });
- }
- }
- Ok(output)
- }
- }
+ Ok(output)
}
async fn maybe_forward_sql_query(
diff --git a/query_engine/Cargo.toml b/query_engine/Cargo.toml
index df2e3240..4d9388fa 100644
--- a/query_engine/Cargo.toml
+++ b/query_engine/Cargo.toml
@@ -46,6 +46,7 @@ logger = { workspace = true }
macros = { workspace = true }
prost = { workspace = true }
query_frontend = { workspace = true }
+runtime = { workspace = true }
serde = { workspace = true }
snafu = { workspace = true }
table_engine = { workspace = true }
diff --git a/query_engine/src/config.rs b/query_engine/src/config.rs
index 1e6350ba..61462f3c 100644
--- a/query_engine/src/config.rs
+++ b/query_engine/src/config.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use serde::{Deserialize, Serialize};
+use time_ext::ReadableDuration;
// FIXME: Use cpu number as the default parallelism
const DEFAULT_READ_PARALLELISM: usize = 8;
@@ -21,12 +22,14 @@ const DEFAULT_READ_PARALLELISM: usize = 8;
#[serde(default)]
pub struct Config {
pub read_parallelism: usize,
+ pub expensive_query_threshold: ReadableDuration,
}
impl Default for Config {
fn default() -> Self {
Self {
read_parallelism: DEFAULT_READ_PARALLELISM,
+ expensive_query_threshold: ReadableDuration::hours(24),
}
}
}
diff --git a/query_engine/src/context.rs b/query_engine/src/context.rs
index 2bf6aebf..55493aec 100644
--- a/query_engine/src/context.rs
+++ b/query_engine/src/context.rs
@@ -17,6 +17,7 @@
use std::{sync::Arc, time::Instant};
use common_types::request_id::RequestId;
+use runtime::Priority;
pub type ContextRef = Arc<Context>;
@@ -27,4 +28,5 @@ pub struct Context {
pub deadline: Option<Instant>,
pub default_catalog: String,
pub default_schema: String,
+ pub priority: Priority,
}
diff --git a/query_engine/src/datafusion_impl/mod.rs
b/query_engine/src/datafusion_impl/mod.rs
index ed8d963f..dfab1fd0 100644
--- a/query_engine/src/datafusion_impl/mod.rs
+++ b/query_engine/src/datafusion_impl/mod.rs
@@ -12,17 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::{fmt, sync::Arc, time::Instant};
+use std::{sync::Arc, time::Instant};
use catalog::manager::ManagerRef as CatalogManager;
use datafusion::{
execution::{
- context::{QueryPlanner, SessionState},
+ context::SessionState,
runtime_env::{RuntimeConfig, RuntimeEnv},
FunctionRegistry,
},
- optimizer::analyzer::Analyzer,
- physical_optimizer::PhysicalOptimizerRule,
prelude::{SessionConfig, SessionContext},
};
use df_engine_extensions::codec::PhysicalExtensionCodecImpl;
@@ -31,8 +29,7 @@ use table_engine::{provider::CeresdbOptions,
remote::RemoteEngineRef};
use crate::{
context::Context,
datafusion_impl::{
- executor::DatafusionExecutorImpl,
logical_optimizer::type_conversion::TypeConversion,
- physical_planner::DatafusionPhysicalPlannerImpl,
+ executor::DatafusionExecutorImpl,
physical_planner::DatafusionPhysicalPlannerImpl,
physical_planner_extension::QueryPlannerAdapter,
task_context::Preprocessor,
},
executor::ExecutorRef,
@@ -41,7 +38,6 @@ use crate::{
};
pub mod executor;
-pub mod logical_optimizer;
pub mod physical_optimizer;
pub mod physical_plan;
pub mod physical_plan_extension;
@@ -67,15 +63,12 @@ impl DatafusionQueryEngineImpl {
) -> Result<Self> {
let runtime_env = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
let df_physical_planner = Arc::new(QueryPlannerAdapter);
- let df_ctx_builder = Arc::new(DfContextBuilder::new(
- config,
- runtime_env.clone(),
+ let df_ctx_builder = Arc::new(DfContextBuilder::new(config,
runtime_env.clone()));
+ let physical_planner = Arc::new(DatafusionPhysicalPlannerImpl::new(
+ df_ctx_builder.clone(),
df_physical_planner,
));
- // Physical planner
- let physical_planner =
Arc::new(DatafusionPhysicalPlannerImpl::new(df_ctx_builder.clone()));
-
// Executor
let extension_codec = Arc::new(PhysicalExtensionCodecImpl::new());
let preprocessor = Arc::new(Preprocessor::new(
@@ -105,33 +98,17 @@ impl QueryEngine for DatafusionQueryEngineImpl {
}
/// Datafusion context builder
-#[derive(Clone)]
+#[derive(Debug, Clone)]
pub struct DfContextBuilder {
config: Config,
runtime_env: Arc<RuntimeEnv>,
- physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
-}
-
-impl fmt::Debug for DfContextBuilder {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("DfContextBuilder")
- .field("config", &self.config)
- .field("runtime_env", &self.runtime_env)
- .field("physical_planner", &"QueryPlannerAdapter")
- .finish()
- }
}
impl DfContextBuilder {
- pub fn new(
- config: Config,
- runtime_env: Arc<RuntimeEnv>,
- physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
- ) -> Self {
+ pub fn new(config: Config, runtime_env: Arc<RuntimeEnv>) -> Self {
Self {
config,
runtime_env,
- physical_planner,
}
}
@@ -144,6 +121,7 @@ impl DfContextBuilder {
request_timeout: timeout,
default_catalog: ctx.default_catalog.clone(),
default_schema: ctx.default_schema.clone(),
+ priority: ctx.priority,
};
let mut df_session_config = SessionConfig::new()
.with_default_catalog_and_schema(
@@ -159,40 +137,7 @@ impl DfContextBuilder {
// Using default logcial optimizer, if want to add more custom rule,
using
// `add_optimizer_rule` to add.
- let state = SessionState::with_config_rt(df_session_config,
self.runtime_env.clone())
- .with_query_planner(self.physical_planner.clone());
-
- // Register analyzer rules
- let state = Self::register_analyzer_rules(state);
-
- // Register iox optimizers, used by influxql.
- let state =
influxql_query::logical_optimizer::register_iox_logical_optimizers(state);
-
+ let state = SessionState::with_config_rt(df_session_config,
self.runtime_env.clone());
SessionContext::with_state(state)
}
-
- // TODO: this is not used now, bug of RepartitionAdapter is already fixed
in
- // datafusion itself. Remove this code in future.
- #[allow(dead_code)]
- fn apply_adapters_for_physical_optimize_rules(
- default_rules: &[Arc<dyn PhysicalOptimizerRule + Send + Sync>],
- ) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
- let mut new_rules = Vec::with_capacity(default_rules.len());
- for rule in default_rules {
-
new_rules.push(physical_optimizer::may_adapt_optimize_rule(rule.clone()))
- }
-
- new_rules
- }
-
- fn register_analyzer_rules(mut state: SessionState) -> SessionState {
- // Our analyzer has high priority, so first add we custom rules, then
add the
- // default ones.
- state = state.with_analyzer_rules(vec![Arc::new(TypeConversion)]);
- for rule in Analyzer::new().rules {
- state = state.add_analyzer_rule(rule);
- }
-
- state
- }
}
diff --git a/query_engine/src/datafusion_impl/physical_planner.rs
b/query_engine/src/datafusion_impl/physical_planner.rs
index 58287339..8ee4198f 100644
--- a/query_engine/src/datafusion_impl/physical_planner.rs
+++ b/query_engine/src/datafusion_impl/physical_planner.rs
@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use std::sync::Arc;
+use std::{fmt, sync::Arc};
use async_trait::async_trait;
+use datafusion::execution::context::QueryPlanner;
use generic_error::BoxError;
-use query_frontend::{plan::QueryPlan, provider::CatalogProviderAdapter};
+use query_frontend::plan::QueryPlan;
use snafu::ResultExt;
use crate::{
@@ -30,14 +31,29 @@ use crate::{
};
/// Physical planner based on datafusion
-#[derive(Debug, Clone)]
+#[derive(Clone)]
pub struct DatafusionPhysicalPlannerImpl {
df_ctx_builder: Arc<DfContextBuilder>,
+ physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
+}
+
+impl fmt::Debug for DatafusionPhysicalPlannerImpl {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("DfContextBuilder")
+ .field("df_ctx_builder", &self.df_ctx_builder)
+ .finish()
+ }
}
impl DatafusionPhysicalPlannerImpl {
- pub fn new(df_ctx_builder: Arc<DfContextBuilder>) -> Self {
- Self { df_ctx_builder }
+ pub fn new(
+ df_ctx_builder: Arc<DfContextBuilder>,
+ physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
+ ) -> Self {
+ Self {
+ df_ctx_builder,
+ physical_planner,
+ }
}
fn has_partitioned_table(logical_plan: &QueryPlan) -> bool {
@@ -59,21 +75,16 @@ impl DatafusionPhysicalPlannerImpl {
impl PhysicalPlanner for DatafusionPhysicalPlannerImpl {
// TODO: we should modify `QueryPlan` to support create remote plan here.
async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) ->
Result<PhysicalPlanRef> {
- // Register catalogs to datafusion execution context.
- let catalogs =
CatalogProviderAdapter::new_adapters(logical_plan.tables.clone());
// TODO: maybe we should not build `SessionContext` in each physical
plan's
// building. We need to do so because we place some dynamic
// information(such as `timeout`) in `SessionConfig`, maybe it is
better
// to remove it to `TaskContext`.
let df_ctx = self.df_ctx_builder.build(ctx);
- for (name, catalog) in catalogs {
- df_ctx.register_catalog(&name, Arc::new(catalog));
- }
+ let state = df_ctx.state();
- // Generate physical plan.
- let exec_plan = df_ctx
- .state()
- .create_physical_plan(&logical_plan.df_plan)
+ let exec_plan = self
+ .physical_planner
+ .create_physical_plan(&logical_plan.df_plan, &state)
.await
.box_err()
.context(PhysicalPlannerWithCause { msg: None })?;
diff --git a/query_engine/src/datafusion_impl/task_context.rs
b/query_engine/src/datafusion_impl/task_context.rs
index f19c5dde..8aefd563 100644
--- a/query_engine/src/datafusion_impl/task_context.rs
+++ b/query_engine/src/datafusion_impl/task_context.rs
@@ -38,6 +38,7 @@ use df_engine_extensions::dist_sql_query::{
use futures::future::BoxFuture;
use generic_error::BoxError;
use prost::Message;
+use runtime::Priority;
use snafu::ResultExt;
use table_engine::{
provider::{CeresdbOptions, ScanTable, SCAN_TABLE_METRICS_COLLECTOR_NAME},
@@ -191,6 +192,7 @@ impl RemotePhysicalPlanExecutor for
RemotePhysicalPlanExecutorImpl {
.map(|n| Instant::now() + Duration::from_millis(n));
let default_catalog = ceresdb_options.default_catalog.clone();
let default_schema = ceresdb_options.default_schema.clone();
+ let priority = ceresdb_options.priority;
let display_plan = DisplayableExecutionPlan::new(plan.as_ref());
let exec_ctx = ExecContext {
@@ -199,6 +201,7 @@ impl RemotePhysicalPlanExecutor for
RemotePhysicalPlanExecutorImpl {
default_catalog,
default_schema,
query: display_plan.indent(true).to_string(),
+ priority,
};
// Encode plan and schema
@@ -261,6 +264,7 @@ impl DistQueryResolverBuilder {
self.remote_executor.clone(),
self.catalog_manager.clone(),
scan_builder,
+ ctx.priority,
)
}
}
@@ -278,6 +282,7 @@ impl ExecutableScanBuilder for ExecutableScanBuilderImpl {
&self,
table: TableRef,
ctx: TableScanContext,
+ priority: Priority,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let read_opts = ReadOptions {
batch_size: ctx.batch_size,
@@ -291,6 +296,7 @@ impl ExecutableScanBuilder for ExecutableScanBuilderImpl {
projected_schema: ctx.projected_schema,
predicate: ctx.predicate,
metrics_collector:
MetricsCollector::new(SCAN_TABLE_METRICS_COLLECTOR_NAME.to_string()),
+ priority,
};
let mut scan = ScanTable::new(table, read_request);
diff --git a/query_frontend/Cargo.toml b/query_frontend/Cargo.toml
index 998beb2b..51705ab2 100644
--- a/query_frontend/Cargo.toml
+++ b/query_frontend/Cargo.toml
@@ -36,6 +36,7 @@ arrow = { workspace = true }
async-trait = { workspace = true }
catalog = { workspace = true }
ceresdbproto = { workspace = true }
+chrono = { workspace = true }
cluster = { workspace = true }
codec = { workspace = true }
common_types = { workspace = true }
@@ -46,6 +47,7 @@ generic_error = { workspace = true }
hash_ext = { workspace = true }
influxql-logical-planner = { workspace = true }
influxql-parser = { workspace = true }
+influxql-query = { workspace = true }
influxql-schema = { workspace = true }
itertools = { workspace = true }
lazy_static = { workspace = true }
@@ -56,6 +58,7 @@ paste = { workspace = true }
prom-remote-api = { workspace = true }
regex = { workspace = true }
regex-syntax = "0.6.28"
+runtime = { workspace = true }
snafu = { workspace = true }
sqlparser = { workspace = true }
table_engine = { workspace = true }
diff --git a/query_frontend/src/frontend.rs b/query_frontend/src/frontend.rs
index 7208cfbe..aff6eabb 100644
--- a/query_frontend/src/frontend.rs
+++ b/query_frontend/src/frontend.rs
@@ -212,50 +212,53 @@ impl<P: MetaProvider> Frontend<P> {
}
}
-pub fn parse_table_name(statements: &StatementVec) -> Option<String> {
- // maybe have empty sql
- if statements.is_empty() {
- return None;
- }
- match &statements[0] {
- Statement::Standard(s) => match *s.clone() {
- SqlStatement::Insert { table_name, .. } => {
- Some(TableName::from(table_name).to_string())
- }
- SqlStatement::Explain { statement, .. } => {
- if let SqlStatement::Query(q) = *statement {
- match *q.body {
- SetExpr::Select(select) => {
- if select.from.len() != 1 {
- None
- } else if let TableFactor::Table { name, .. } =
&select.from[0].relation
- {
- Some(TableName::from(name.clone()).to_string())
- } else {
- None
- }
+pub fn parse_table_name_with_standard(sql_statement: &SqlStatement) ->
Option<String> {
+ match sql_statement.clone() {
+ SqlStatement::Insert { table_name, .. } => {
+ Some(TableName::from(table_name.clone()).to_string())
+ }
+ SqlStatement::Explain { statement, .. } => {
+ if let SqlStatement::Query(q) = *statement {
+ match *q.body {
+ SetExpr::Select(select) => {
+ if select.from.len() != 1 {
+ None
+ } else if let TableFactor::Table { name, .. } =
&select.from[0].relation {
+ Some(TableName::from(name.clone()).to_string())
+ } else {
+ None
}
- // TODO: return unsupported error rather than none.
- _ => None,
}
+ // TODO: return unsupported error rather than none.
+ _ => None,
+ }
+ } else {
+ None
+ }
+ }
+ SqlStatement::Query(q) => match *q.body {
+ SetExpr::Select(select) => {
+ if select.from.len() != 1 {
+ None
+ } else if let TableFactor::Table { name, .. } =
&select.from[0].relation {
+ Some(TableName::from(name.clone()).to_string())
} else {
None
}
}
- SqlStatement::Query(q) => match *q.body {
- SetExpr::Select(select) => {
- if select.from.len() != 1 {
- None
- } else if let TableFactor::Table { name, .. } =
&select.from[0].relation {
- Some(TableName::from(name.clone()).to_string())
- } else {
- None
- }
- }
- _ => None,
- },
_ => None,
},
+ _ => None,
+ }
+}
+
+pub fn parse_table_name(statements: &StatementVec) -> Option<String> {
+ // maybe have empty sql
+ if statements.is_empty() {
+ return None;
+ }
+ match &statements[0] {
+ Statement::Standard(s) => parse_table_name_with_standard(s),
Statement::Create(s) => Some(s.table_name.to_string()),
Statement::Drop(s) => Some(s.table_name.to_string()),
Statement::Describe(s) => Some(s.table_name.to_string()),
diff --git a/query_frontend/src/influxql/planner.rs
b/query_frontend/src/influxql/planner.rs
index dc3b3693..960da307 100644
--- a/query_frontend/src/influxql/planner.rs
+++ b/query_frontend/src/influxql/planner.rs
@@ -37,6 +37,7 @@ use table_engine::table::TableRef;
use crate::{
influxql::error::*,
+ logical_optimizer::optimize_plan,
plan::{Plan, QueryPlan, QueryType, ShowPlan, ShowTablesPlan},
provider::{ContextProviderAdapter, MetaProvider},
};
@@ -171,6 +172,12 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
.context(BuildPlanWithCause {
msg: "planner stmt to plan",
})?;
+ let df_plan = optimize_plan(&df_plan)
+ .box_err()
+ .context(BuildPlanWithCause {
+ msg: "optimize plan",
+ })?;
+
let tables = Arc::new(
self.schema_provider
.context_provider
@@ -180,7 +187,11 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
msg: "get tables from context_provider",
})?,
);
- Ok(Plan::Query(QueryPlan { df_plan, tables }))
+ Ok(Plan::Query(QueryPlan {
+ df_plan,
+ tables,
+ table_name: None,
+ }))
}
}
}
diff --git a/query_frontend/src/lib.rs b/query_frontend/src/lib.rs
index ee96af79..c8a2617c 100644
--- a/query_frontend/src/lib.rs
+++ b/query_frontend/src/lib.rs
@@ -23,6 +23,7 @@ pub mod config;
pub mod container;
pub mod frontend;
pub mod influxql;
+mod logical_optimizer;
pub mod parser;
mod partition;
pub mod plan;
diff --git a/query_frontend/src/logical_optimizer/mod.rs
b/query_frontend/src/logical_optimizer/mod.rs
new file mode 100644
index 00000000..9fcd46b3
--- /dev/null
+++ b/query_frontend/src/logical_optimizer/mod.rs
@@ -0,0 +1,49 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Logical optimizer
+
+mod type_conversion;
+use std::sync::Arc;
+
+use datafusion::{
+ error::Result,
+ execution::{context::SessionState, runtime_env::RuntimeEnv},
+ logical_expr::LogicalPlan,
+ optimizer::analyzer::Analyzer,
+ prelude::SessionConfig,
+};
+use type_conversion::TypeConversion;
+
+pub fn optimize_plan(plan: &LogicalPlan) -> Result<LogicalPlan> {
+ let state = SessionState::with_config_rt(SessionConfig::new(),
Arc::new(RuntimeEnv::default()));
+ let state = register_analyzer_rules(state);
+ // Register iox optimizers, used by influxql.
+ let state =
influxql_query::logical_optimizer::register_iox_logical_optimizers(state);
+
+ let plan = state.optimize(plan)?;
+
+ Ok(plan)
+}
+
+fn register_analyzer_rules(mut state: SessionState) -> SessionState {
+ // Our analyzer has high priority, so first add we custom rules, then add
the
+ // default ones.
+ state =
state.with_analyzer_rules(vec![Arc::new(crate::logical_optimizer::TypeConversion)]);
+ for rule in Analyzer::new().rules {
+ state = state.add_analyzer_rule(rule);
+ }
+
+ state
+}
diff --git
a/query_engine/src/datafusion_impl/logical_optimizer/type_conversion.rs
b/query_frontend/src/logical_optimizer/type_conversion.rs
similarity index 98%
rename from
query_engine/src/datafusion_impl/logical_optimizer/type_conversion.rs
rename to query_frontend/src/logical_optimizer/type_conversion.rs
index 29c1e956..f2b0992a 100644
--- a/query_engine/src/datafusion_impl/logical_optimizer/type_conversion.rs
+++ b/query_frontend/src/logical_optimizer/type_conversion.rs
@@ -372,7 +372,6 @@ mod tests {
};
use super::*;
- use crate::datafusion_impl::logical_optimizer::type_conversion;
fn expr_test_schema() -> DFSchemaRef {
Arc::new(
@@ -591,7 +590,7 @@ mod tests {
"2021-09-07 16:00:00Z",
];
for string in date_string {
- let result =
type_conversion::string_to_timestamp_ms_workaround(string);
+ let result = string_to_timestamp_ms_workaround(string);
assert!(result.is_err());
}
@@ -600,7 +599,7 @@ mod tests {
let t = NaiveTime::from_hms_milli_opt(16, 0, 0, 0).unwrap();
let dt = NaiveDateTime::new(d, t);
let expect = naive_datetime_to_timestamp(&date_string, dt).unwrap();
- let result =
type_conversion::string_to_timestamp_ms_workaround(&date_string);
+ let result = string_to_timestamp_ms_workaround(&date_string);
if let Ok(ScalarValue::TimestampMillisecond(Some(mills), _)) = result {
assert_eq!(mills, expect)
}
diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs
index f2dd864e..94c8bca6 100644
--- a/query_frontend/src/plan.rs
+++ b/query_frontend/src/plan.rs
@@ -18,14 +18,21 @@ use std::{
collections::{BTreeMap, HashMap},
fmt,
fmt::{Debug, Formatter},
+ ops::Bound,
sync::Arc,
};
-use common_types::{column_schema::ColumnSchema, row::RowGroup, schema::Schema};
-use datafusion::logical_expr::{
- expr::Expr as DfLogicalExpr, logical_plan::LogicalPlan as
DataFusionLogicalPlan,
+use common_types::{column_schema::ColumnSchema, row::RowGroup, schema::Schema,
time::TimeRange};
+use datafusion::{
+ logical_expr::{
+ expr::Expr as DfLogicalExpr, logical_plan::LogicalPlan as
DataFusionLogicalPlan,
+ },
+ prelude::Column,
+ scalar::ScalarValue,
};
+use logger::{debug, warn};
use macros::define_result;
+use runtime::Priority;
use snafu::Snafu;
use table_engine::{partition::PartitionInfo, table::TableRef};
@@ -70,14 +77,132 @@ pub enum Plan {
Exists(ExistsTablePlan),
}
+pub struct PriorityContext {
+ pub time_range_threshold: u64,
+}
+
pub struct QueryPlan {
pub df_plan: DataFusionLogicalPlan,
+ pub table_name: Option<String>,
// Contains the TableProviders so we can register the them to
ExecutionContext later.
// Use TableProviderAdapter here so we can get the underlying TableRef and
also be
// able to cast to Arc<dyn TableProvider + Send + Sync>
pub tables: Arc<TableContainer>,
}
+impl QueryPlan {
+ fn find_timestamp_column(&self) -> Option<Column> {
+ let table_name = self.table_name.as_ref()?;
+ let table_ref = self.tables.get(table_name.into())?;
+ let schema = table_ref.table.schema();
+ let timestamp_name = schema.timestamp_name();
+ Some(Column::from_name(timestamp_name))
+ }
+
+ /// This function is used to extract time range from the query plan.
+ /// It will return max possible time range. For example, if the query
+ /// contains no timestmap filter, it will return
+ /// `TimeRange::min_to_max()`
+ ///
+ /// Note: When it timestamp filter evals to false(such as ts < 10 and ts >
+ /// 100), it will return None, which means no valid time range for this
+ /// query.
+ fn extract_time_range(&self) -> Option<TimeRange> {
+ let ts_column = if let Some(v) = self.find_timestamp_column() {
+ v
+ } else {
+ warn!(
+ "Couldn't find time column, plan:{:?}, table_name:{:?}",
+ self.df_plan, self.table_name
+ );
+ return Some(TimeRange::min_to_max());
+ };
+ let time_range = match
influxql_query::logical_optimizer::range_predicate::find_time_range(
+ &self.df_plan,
+ &ts_column,
+ ) {
+ Ok(v) => v,
+ Err(e) => {
+ warn!(
+ "Couldn't find time range, plan:{:?}, err:{}",
+ self.df_plan, e
+ );
+ return Some(TimeRange::min_to_max());
+ }
+ };
+
+ debug!(
+ "Extract time range, value:{time_range:?}, plan:{:?}",
+ self.df_plan
+ );
+ let mut start = i64::MIN;
+ match time_range.start {
+ Bound::Included(inclusive_start) => {
+ if let
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+ inclusive_start
+ {
+ start = start.max(x);
+ }
+ }
+ Bound::Excluded(exclusive_start) => {
+ if let
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+ exclusive_start
+ {
+ start = start.max(x + 1);
+ }
+ }
+ Bound::Unbounded => {}
+ }
+ let mut end = i64::MAX;
+ match time_range.end {
+ Bound::Included(inclusive_end) => {
+ if let
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+ inclusive_end
+ {
+ end = end.min(x + 1);
+ }
+ }
+ Bound::Excluded(exclusive_start) => {
+ if let
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+ exclusive_start
+ {
+ end = end.min(x);
+ }
+ }
+ Bound::Unbounded => {}
+ }
+
+ TimeRange::new(start.into(), end.into())
+ }
+
+ /// Decide the query priority based on the query plan.
+ /// When query contains invalid time range, it will return None.
+ // TODO: Currently we only consider the time range, consider other
factors, such
+ // as the number of series, or slow log metrics.
+ pub fn decide_query_priority(&self, ctx: PriorityContext) ->
Option<Priority> {
+ let threshold = ctx.time_range_threshold;
+ let time_range = self.extract_time_range()?;
+ let is_expensive = if let Some(v) = time_range
+ .exclusive_end()
+ .as_i64()
+ .checked_sub(time_range.inclusive_start().as_i64())
+ {
+ v as u64 >= threshold
+ } else {
+ // When overflow, we treat it as expensive query.
+ true
+ };
+
+ let priority = if is_expensive {
+ Priority::Low
+ } else {
+ Priority::High
+ };
+
+ Some(priority)
+ }
+}
+
impl Debug for QueryPlan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("QueryPlan")
@@ -200,3 +325,81 @@ pub enum ShowPlan {
pub struct ExistsTablePlan {
pub exists: bool,
}
+
+#[cfg(test)]
+mod tests {
+
+ use super::*;
+ use crate::planner::tests::sql_to_logical_plan;
+
+ #[test]
+ fn test_extract_time_range() {
+ // key2 is timestamp column
+ let testcases = [
+ (
+ "select * from test_table where key2 > 1 and key2 < 10",
+ Some((2, 10)),
+ ),
+ (
+ "select field1 from test_table where key2 > 1 and key2 < 10",
+ Some((2, 10)),
+ ),
+ (
+ "select * from test_table where key2 >= 1 and key2 <= 10",
+ Some((1, 11)),
+ ),
+ (
+ "select * from test_table where key2 < 1 and key2 > 10",
+ None,
+ ),
+ (
+ "select * from test_table where key2 < 1 ",
+ Some((i64::MIN, 1))
+ ),
+ (
+ "select * from test_table where key2 > 1 ",
+ Some((2, i64::MAX))
+ ),
+ // date literals
+ (
+ r#"select * from test_table where key2 >= "2023-11-21
14:12:00+08:00" and key2 < "2023-11-21 14:22:00+08:00" "#,
+ Some((1700547120000, 1700547720000))
+ ),
+ // no timestamp filter
+ ("select * from test_table", Some((i64::MIN, i64::MAX))),
+ // aggr
+ (
+ "select key2, sum(field1) from test_table where key2 > 1 and
key2 < 10 group by key2",
+ Some((2, 10)),
+ ),
+ // aggr & sort
+ (
+ "select key2, sum(field1) from test_table where key2 > 1 and
key2 < 10 group by key2 order by key2", Some((2, 10)),
+ ),
+ // explain
+ (
+ "explain select * from test_table where key2 > 1 and key2 <
10",
+ Some((2, 10)),
+ ),
+ // analyze
+ (
+ "explain analyze select * from test_table where key2 > 1 and
key2 < 10",
+ Some((2, 10)),
+ ),
+ ];
+
+ for case in testcases {
+ let sql = case.0;
+ let plan = sql_to_logical_plan(sql).unwrap();
+ let plan = match plan {
+ Plan::Query(v) => v,
+ _ => unreachable!(),
+ };
+ let expected = case
+ .1
+ .map(|v| TimeRange::new_unchecked(v.0.into(), v.1.into()));
+
+ assert_eq!(plan.extract_time_range(), expected, "sql:{}", sql);
+ }
+ }
+}
diff --git a/query_frontend/src/planner.rs b/query_frontend/src/planner.rs
index 7b1fa82e..559fbcd8 100644
--- a/query_frontend/src/planner.rs
+++ b/query_frontend/src/planner.rs
@@ -67,6 +67,8 @@ use crate::{
},
config::DynamicConfig,
container::TableReference,
+ frontend::parse_table_name_with_standard,
+ logical_optimizer::optimize_plan,
parser,
partition::PartitionParser,
plan::{
@@ -613,18 +615,20 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
fn sql_statement_to_datafusion_plan(self, sql_stmt: SqlStatement) ->
Result<Plan> {
let df_planner = SqlToRel::new_with_options(&self.meta_provider,
DEFAULT_PARSER_OPTS);
+ let table_name = parse_table_name_with_standard(&sql_stmt);
let df_plan = df_planner
.sql_statement_to_plan(sql_stmt)
.context(DatafusionPlan)?;
+ let df_plan = optimize_plan(&df_plan).context(DatafusionPlan)?;
debug!("Sql statement to datafusion plan, df_plan:\n{:#?}", df_plan);
// Get all tables needed in the plan
let tables =
self.meta_provider.try_into_container().context(FindMeta)?;
-
Ok(Plan::Query(QueryPlan {
df_plan,
+ table_name,
tables: Arc::new(tables),
}))
}
@@ -1389,7 +1393,7 @@ pub fn get_table_ref(table_name: &str) -> TableReference {
}
#[cfg(test)]
-mod tests {
+pub mod tests {
use ceresdbproto::storage::{
value, Field, FieldGroup, Tag, Value as PbValue, WriteSeriesEntry,
@@ -1416,7 +1420,7 @@ mod tests {
Ok(())
}
- fn sql_to_logical_plan(sql: &str) -> Result<Plan> {
+ pub fn sql_to_logical_plan(sql: &str) -> Result<Plan> {
let dyn_config = DynamicConfig::default();
sql_to_logical_plan_with_config(sql, &dyn_config)
}
@@ -1644,10 +1648,9 @@ mod tests {
let sql = "select * from test_table;";
quick_test(
sql,
- "Query(
+ r"Query(
QueryPlan {
- df_plan: Projection: test_table.key1, test_table.key2,
test_table.field1, test_table.field2, test_table.field3, test_table.field4
- TableScan: test_table,
+ df_plan: TableScan: test_table projection=[key1, key2, field1, field2,
field3, field4],
},
)",
)
diff --git a/query_frontend/src/promql/convert.rs
b/query_frontend/src/promql/convert.rs
index e8ba5851..0109f9ac 100644
--- a/query_frontend/src/promql/convert.rs
+++ b/query_frontend/src/promql/convert.rs
@@ -154,7 +154,7 @@ impl Expr {
meta_provider: ContextProviderAdapter<'_, P>,
read_parallelism: usize,
) -> Result<(Plan, Arc<ColumnNames>)> {
- let (logic_plan, column_name, _) =
+ let (logic_plan, column_name, table_name) =
self.build_plan_iter(&meta_provider, INIT_LEVEL,
read_parallelism)?;
let tables = Arc::new(
meta_provider
@@ -167,6 +167,7 @@ impl Expr {
Plan::Query(QueryPlan {
df_plan: logic_plan,
tables,
+ table_name: Some(table_name),
}),
column_name,
))
diff --git a/query_frontend/src/promql/remote.rs
b/query_frontend/src/promql/remote.rs
index 6b8c8f8f..4343048c 100644
--- a/query_frontend/src/promql/remote.rs
+++ b/query_frontend/src/promql/remote.rs
@@ -27,6 +27,7 @@ use prom_remote_api::types::{label_matcher, LabelMatcher,
Query};
use snafu::{OptionExt, ResultExt};
use crate::{
+ logical_optimizer::optimize_plan,
plan::{Plan, QueryPlan},
promql::{
convert::Selector,
@@ -81,6 +82,7 @@ pub fn remote_query_to_plan<P: MetaProvider>(
.sort(sort_exprs)?
.build()
.context(BuildPlanError)?;
+ let df_plan = optimize_plan(&df_plan).context(BuildPlanError)?;
let tables = Arc::new(
meta_provider
@@ -90,7 +92,11 @@ pub fn remote_query_to_plan<P: MetaProvider>(
})?,
);
Ok(RemoteQueryPlan {
- plan: Plan::Query(QueryPlan { df_plan, tables }),
+ plan: Plan::Query(QueryPlan {
+ df_plan,
+ tables,
+ table_name: Some(metric),
+ }),
field_col_name: field,
timestamp_col_name: timestamp_col_name.to_string(),
})
@@ -185,8 +191,8 @@ mod tests {
r#"
Query(QueryPlan { df_plan: Sort: cpu.tsid ASC NULLS FIRST, cpu.time ASC NULLS
FIRST
Projection: cpu.tag1, cpu.tag2, cpu.time, cpu.tsid, cpu.value
- Filter: cpu.tag1 = Utf8("some-value") AND cpu.time BETWEEN Int64(1000) AND
Int64(2000)
- TableScan: cpu })"#
+ Filter: cpu.tag1 = Utf8("some-value") AND cpu.time >=
TimestampMillisecond(1000, None) AND cpu.time <= TimestampMillisecond(2000,
None)
+ TableScan: cpu projection=[tsid, time, tag1, tag2, value],
partial_filters=[cpu.tag1 = Utf8("some-value"), cpu.time >=
TimestampMillisecond(1000, None), cpu.time <= TimestampMillisecond(2000, None)]
})"#
.to_string()
);
assert_eq!(&field_col_name, "value");
@@ -217,8 +223,8 @@ Query(QueryPlan { df_plan: Sort: cpu.tsid ASC NULLS FIRST,
cpu.time ASC NULLS FI
r#"
Query(QueryPlan { df_plan: Sort: cpu.tsid ASC NULLS FIRST, cpu.time ASC NULLS
FIRST
Projection: cpu.tag1, cpu.tag2, cpu.time, cpu.tsid, cpu.field2
- Filter: cpu.tag1 = Utf8("some-value") AND cpu.time BETWEEN Int64(1000) AND
Int64(2000)
- TableScan: cpu })"#
+ Filter: cpu.tag1 = Utf8("some-value") AND cpu.time >=
TimestampMillisecond(1000, None) AND cpu.time <= TimestampMillisecond(2000,
None)
+ TableScan: cpu projection=[tsid, time, tag1, tag2, field2],
partial_filters=[cpu.tag1 = Utf8("some-value"), cpu.time >=
TimestampMillisecond(1000, None), cpu.time <= TimestampMillisecond(2000, None)]
})"#
.to_string()
);
assert_eq!(&field_col_name, "field2");
diff --git a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs
b/server/src/grpc/remote_engine_service/metrics.rs
similarity index 65%
rename from query_engine/src/datafusion_impl/logical_optimizer/mod.rs
rename to server/src/grpc/remote_engine_service/metrics.rs
index a3b5130f..c6c8124a 100644
--- a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs
+++ b/server/src/grpc/remote_engine_service/metrics.rs
@@ -12,8 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-//! Logical optimizer
+use lazy_static::lazy_static;
+use prometheus::{register_int_counter_vec, IntCounterVec};
-#[cfg(test)]
-pub mod tests;
-pub mod type_conversion;
+lazy_static! {
+ pub static ref REMOTE_ENGINE_QUERY_COUNTER: IntCounterVec =
register_int_counter_vec!(
+ "remote_engine_query_counter",
+ "remote_engine_query_counter",
+ &["priority"]
+ )
+ .unwrap();
+}
diff --git a/server/src/grpc/remote_engine_service/mod.rs
b/server/src/grpc/remote_engine_service/mod.rs
index 11359300..eda9a61a 100644
--- a/server/src/grpc/remote_engine_service/mod.rs
+++ b/server/src/grpc/remote_engine_service/mod.rs
@@ -32,8 +32,8 @@ use ceresdbproto::{
remote_engine_service_server::RemoteEngineService,
row_group, AlterTableOptionsRequest, AlterTableOptionsResponse,
AlterTableSchemaRequest,
AlterTableSchemaResponse, ExecContext, ExecutePlanRequest,
GetTableInfoRequest,
- GetTableInfoResponse, MetricPayload, ReadRequest, ReadResponse,
WriteBatchRequest,
- WriteRequest, WriteResponse,
+ GetTableInfoResponse, MetricPayload, QueryPriority, ReadRequest,
ReadResponse,
+ WriteBatchRequest, WriteRequest, WriteResponse,
},
storage::{arrow_payload, ArrowPayload},
};
@@ -43,7 +43,7 @@ use futures::{
Future,
};
use generic_error::BoxError;
-use logger::{error, info, slow_query};
+use logger::{debug, error, info, slow_query};
use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
use proxy::{
hotspot::{HotspotRecorder, Message},
@@ -55,6 +55,7 @@ use query_engine::{
physical_planner::PhysicalPlanRef,
QueryEngineRef, QueryEngineType,
};
+use runtime::{Priority, RuntimeRef};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::EngineRuntimes,
@@ -76,11 +77,15 @@ use crate::{
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM,
},
- remote_engine_service::error::{ErrNoCause, ErrWithCause, Result,
StatusCode},
+ remote_engine_service::{
+ error::{ErrNoCause, ErrWithCause, Result, StatusCode},
+ metrics::REMOTE_ENGINE_QUERY_COUNTER,
+ },
},
};
pub mod error;
+mod metrics;
const STREAM_QUERY_CHANNEL_LEN: usize = 200;
const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024;
@@ -139,15 +144,22 @@ struct ExecutePlanMetricCollector {
query: String,
request_id: RequestId,
slow_threshold: Duration,
+ priority: Priority,
}
impl ExecutePlanMetricCollector {
- fn new(request_id: String, query: String, slow_threshold_secs: u64) ->
Self {
+ fn new(
+ request_id: String,
+ query: String,
+ slow_threshold_secs: u64,
+ priority: Priority,
+ ) -> Self {
Self {
start: Instant::now(),
query,
request_id: request_id.into(),
slow_threshold: Duration::from_secs(slow_threshold_secs),
+ priority,
}
}
}
@@ -157,12 +169,17 @@ impl MetricCollector for ExecutePlanMetricCollector {
let cost = self.start.elapsed();
if cost > self.slow_threshold {
slow_query!(
- "Remote query elapsed:{:?}, id:{}, query:{}",
+ "Remote query elapsed:{:?}, id:{}, priority:{}, query:{}",
cost,
self.request_id,
+ self.priority.as_str(),
self.query
);
}
+
+ REMOTE_ENGINE_QUERY_COUNTER
+ .with_label_values(&[self.priority.as_str()])
+ .inc();
REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
.execute_physical_plan
.observe(cost.as_secs_f64());
@@ -415,6 +432,8 @@ impl RemoteEngineServiceImpl {
query,
request_notifiers.clone(),
config.notify_timeout.0,
+ // TODO: decide runtime from request priority.
+ self.runtimes.read_runtime.high(),
)
.await?;
}
@@ -435,6 +454,7 @@ impl RemoteEngineServiceImpl {
query: F,
notifiers: Arc<RequestNotifiers<K, mpsc::Sender<Result<RecordBatch>>>>,
notify_timeout: Duration,
+ rt: &RuntimeRef,
) -> Result<()>
where
K: Hash + PartialEq + Eq,
@@ -444,7 +464,7 @@ impl RemoteEngineServiceImpl {
let mut guard = ExecutionGuard::new(|| {
notifiers.take_notifiers(&request_key);
});
- let handle = self.runtimes.read_runtime.spawn(query);
+ let handle = rt.spawn(query);
let streams = handle.await.box_err().context(ErrWithCause {
code: StatusCode::Internal,
msg: "fail to join task",
@@ -459,7 +479,7 @@ impl RemoteEngineServiceImpl {
})
});
- let handle = self.runtimes.read_runtime.spawn(async move {
+ let handle = rt.spawn(async move {
let mut batches = Vec::new();
while let Some(batch) = stream.next().await {
batches.push(batch)
@@ -486,7 +506,7 @@ impl RemoteEngineServiceImpl {
let notifiers = notifiers.take_notifiers(&request_key).unwrap();
// Do send in background to avoid blocking the rpc procedure.
- self.runtimes.read_runtime.spawn(async move {
+ rt.spawn(async move {
Self::send_dedupped_resps(resps, notifiers, notify_timeout).await;
});
@@ -676,26 +696,36 @@ impl RemoteEngineServiceImpl {
.slow_threshold
.load(std::sync::atomic::Ordering::Relaxed);
- let metric = ExecutePlanMetricCollector::new(
- ctx.request_id_str.clone(),
- ctx.displayable_query,
- slow_threshold_secs,
- );
+ let priority = ctx.priority();
let query_ctx = create_query_ctx(
ctx.request_id_str,
ctx.default_catalog,
ctx.default_schema,
ctx.timeout_ms,
+ priority,
);
+ debug!(
+ "Execute remote query, ctx:{query_ctx:?}, query:{}",
+ &ctx.displayable_query
+ );
+ let metric = ExecutePlanMetricCollector::new(
+ ctx.request_id.to_string(),
+ ctx.displayable_query,
+ slow_threshold_secs,
+ query_ctx.priority,
+ );
let physical_plan =
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
encoded_plan,
)));
- let physical_plan_clone = physical_plan.clone();
- let stream = self
+ let rt = self
.runtimes
.read_runtime
+ .choose_runtime(&query_ctx.priority);
+ let physical_plan_clone = physical_plan.clone();
+
+ let stream = rt
.spawn(async move { handle_execute_plan(query_ctx, physical_plan,
query_engine).await })
.await
.box_err()
@@ -730,18 +760,20 @@ impl RemoteEngineServiceImpl {
.dyn_config
.slow_threshold
.load(std::sync::atomic::Ordering::Relaxed);
- let metric = ExecutePlanMetricCollector::new(
- ctx.request_id_str.clone(),
- ctx.displayable_query,
- slow_threshold_secs,
- );
+ let priority = ctx.priority();
let query_ctx = create_query_ctx(
ctx.request_id_str,
ctx.default_catalog,
ctx.default_schema,
ctx.timeout_ms,
+ priority,
+ );
+ let metric = ExecutePlanMetricCollector::new(
+ ctx.request_id.to_string(),
+ ctx.displayable_query,
+ slow_threshold_secs,
+ query_ctx.priority,
);
-
let key = PhysicalPlanKey {
encoded_plan: encoded_plan.clone(),
};
@@ -758,6 +790,10 @@ impl RemoteEngineServiceImpl {
..
} = query_dedup;
+ let rt = self
+ .runtimes
+ .read_runtime
+ .choose_runtime(&query_ctx.priority);
let (tx, rx) = mpsc::channel(config.notify_queue_cap);
match physical_plan_notifiers.insert_notifier(key.clone(), tx) {
// The first request, need to handle it, and then notify the other
requests.
@@ -772,6 +808,7 @@ impl RemoteEngineServiceImpl {
query,
physical_plan_notifiers,
config.notify_timeout.0,
+ rt,
)
.await?;
}
@@ -1190,6 +1227,7 @@ fn create_query_ctx(
default_catalog: String,
default_schema: String,
timeout_ms: i64,
+ priority: QueryPriority,
) -> QueryContext {
let request_id = RequestId::from(request_id);
let deadline = if timeout_ms >= 0 {
@@ -1197,12 +1235,17 @@ fn create_query_ctx(
} else {
None
};
+ let priority = match priority {
+ QueryPriority::Low => Priority::Low,
+ QueryPriority::High => Priority::High,
+ };
QueryContext {
request_id,
deadline,
default_catalog,
default_schema,
+ priority,
}
}
diff --git a/server/src/http.rs b/server/src/http.rs
index 068f21fa..389e7712 100644
--- a/server/src/http.rs
+++ b/server/src/http.rs
@@ -43,7 +43,7 @@ use proxy::{
Proxy,
};
use router::endpoint::Endpoint;
-use runtime::{Runtime, RuntimeRef};
+use runtime::{PriorityRuntime, Runtime};
use serde::Serialize;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::{engine::EngineRuntimes, table::FlushRequest};
@@ -310,7 +310,7 @@ impl Service {
.and(self.with_proxy())
.and(self.with_read_runtime())
.and_then(
- |req, mut ctx: RequestContext, proxy: Arc<Proxy>, runtime:
RuntimeRef| async move {
+ |req, mut ctx: RequestContext, proxy: Arc<Proxy>, runtime:
PriorityRuntime| async move {
// We don't timeout http api since it's mainly used for
debugging.
ctx.timeout = None;
@@ -774,7 +774,7 @@ impl Service {
fn with_read_runtime(
&self,
- ) -> impl Filter<Extract = (Arc<Runtime>,), Error = Infallible> + Clone {
+ ) -> impl Filter<Extract = (PriorityRuntime,), Error = Infallible> + Clone
{
let runtime = self.engine_runtimes.read_runtime.clone();
warp::any().map(move || runtime.clone())
}
diff --git a/server/src/server.rs b/server/src/server.rs
index 27f59c18..9d418a32 100644
--- a/server/src/server.rs
+++ b/server/src/server.rs
@@ -378,6 +378,7 @@ impl Builder {
let config_content =
self.config_content.context(MissingConfigContent)?;
let query_engine_config =
self.query_engine_config.context(MissingQueryEngineConfig)?;
let datafusion_context =
self.datatfusion_context.context(MissingDatafusionContext)?;
+ let expensive_query_threshold =
query_engine_config.expensive_query_threshold.as_millis();
let hotspot_recorder = Arc::new(HotspotRecorder::new(
self.server_config.hotspot,
@@ -412,6 +413,7 @@ impl Builder {
let instance = Instance {
catalog_manager,
query_engine,
+ query_runtime: engine_runtimes.read_runtime.clone(),
table_engine,
partition_table_engine,
function_registry,
@@ -459,6 +461,7 @@ impl Builder {
self.cluster.is_some(),
self.server_config.sub_table_access_perm,
request_notifiers,
+ expensive_query_threshold,
));
let http_service = http::Builder::new(http_config)
diff --git a/src/ceresdb/src/config.rs b/src/ceresdb/src/config.rs
index b23476d4..676df8ef 100644
--- a/src/ceresdb/src/config.rs
+++ b/src/ceresdb/src/config.rs
@@ -91,7 +91,7 @@ pub enum ClusterDeployment {
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(default)]
pub struct RuntimeConfig {
- /// Runtime for reading data
+ /// High priority runtime for reading data
pub read_thread_num: usize,
/// The size of the stack used by the read thread
///
@@ -99,6 +99,8 @@ pub struct RuntimeConfig {
/// TODO: this config may be removed in the future when the complex query
/// won't overflow the stack.
pub read_thread_stack_size: ReadableSize,
+ /// Low priority runtime for reading data
+ pub low_read_thread_num: usize,
/// Runtime for writing data
pub write_thread_num: usize,
/// Runtime for communicating with meta cluster
@@ -116,6 +118,7 @@ impl Default for RuntimeConfig {
Self {
read_thread_num: 8,
read_thread_stack_size: ReadableSize::mb(16),
+ low_read_thread_num: 1,
write_thread_num: 8,
meta_thread_num: 2,
compact_thread_num: 4,
diff --git a/src/ceresdb/src/setup.rs b/src/ceresdb/src/setup.rs
index de78aa3d..a7ae6a31 100644
--- a/src/ceresdb/src/setup.rs
+++ b/src/ceresdb/src/setup.rs
@@ -32,6 +32,7 @@ use proxy::{
},
};
use router::{rule_based::ClusterView, ClusterBasedRouter, RuleBasedRouter};
+use runtime::PriorityRuntime;
use server::{
config::{StaticRouteConfig, StaticTopologyConfig},
local_tables::LocalTablesRecoverer,
@@ -86,12 +87,20 @@ fn build_runtime(name: &str, threads_num: usize) ->
runtime::Runtime {
}
fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
+ let read_stack_size = config.read_thread_stack_size.as_byte() as usize;
EngineRuntimes {
- read_runtime: Arc::new(build_runtime_with_stack_size(
- "ceres-read",
- config.read_thread_num,
- Some(config.read_thread_stack_size.as_byte() as usize),
- )),
+ read_runtime: PriorityRuntime::new(
+ Arc::new(build_runtime_with_stack_size(
+ "read-low",
+ config.low_read_thread_num,
+ Some(read_stack_size),
+ )),
+ Arc::new(build_runtime_with_stack_size(
+ "read-high",
+ config.read_thread_num,
+ Some(read_stack_size),
+ )),
+ ),
write_runtime: Arc::new(build_runtime("ceres-write",
config.write_thread_num)),
compact_runtime: Arc::new(build_runtime("ceres-compact",
config.compact_thread_num)),
meta_runtime: Arc::new(build_runtime("ceres-meta",
config.meta_thread_num)),
@@ -266,7 +275,8 @@ async fn build_table_engine_proxy(engine_builder:
EngineBuilder<'_>) -> Arc<Tabl
fn make_wal_runtime(runtimes: Arc<EngineRuntimes>) -> WalRuntimes {
WalRuntimes {
write_runtime: runtimes.write_runtime.clone(),
- read_runtime: runtimes.read_runtime.clone(),
+ // TODO: remove read_runtime from WalRuntimes
+ read_runtime: runtimes.read_runtime.high().clone(),
default_runtime: runtimes.default_runtime.clone(),
}
}
diff --git a/system_catalog/src/sys_catalog_table.rs
b/system_catalog/src/sys_catalog_table.rs
index 0daacb03..00f7b061 100644
--- a/system_catalog/src/sys_catalog_table.rs
+++ b/system_catalog/src/sys_catalog_table.rs
@@ -545,6 +545,7 @@ impl SysCatalogTable {
projected_schema:
ProjectedSchema::no_projection(self.table.schema()),
predicate: PredicateBuilder::default().build(),
metrics_collector: MetricsCollector::default(),
+ priority: Default::default(),
};
let mut batch_stream =
self.table.read(read_request).await.context(ReadTable)?;
diff --git a/table_engine/src/engine.rs b/table_engine/src/engine.rs
index 9946d0cb..3411c767 100644
--- a/table_engine/src/engine.rs
+++ b/table_engine/src/engine.rs
@@ -24,7 +24,7 @@ use common_types::{
};
use generic_error::{GenericError, GenericResult};
use macros::define_result;
-use runtime::RuntimeRef;
+use runtime::{PriorityRuntime, RuntimeRef};
use snafu::{ensure, Backtrace, Snafu};
use crate::{
@@ -346,7 +346,7 @@ pub type TableEngineRef = Arc<dyn TableEngine>;
#[derive(Clone, Debug)]
pub struct EngineRuntimes {
/// Runtime for reading data
- pub read_runtime: RuntimeRef,
+ pub read_runtime: PriorityRuntime,
/// Runtime for writing data
pub write_runtime: RuntimeRef,
/// Runtime for compacting data
diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs
index fb3cb41d..cbc41372 100644
--- a/table_engine/src/provider.rs
+++ b/table_engine/src/provider.rs
@@ -39,6 +39,7 @@ use datafusion::{
};
use df_operator::visitor;
use logger::debug;
+use runtime::Priority;
use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector};
use crate::{
@@ -55,12 +56,19 @@ pub struct CeresdbOptions {
pub request_timeout: Option<u64>,
pub default_schema: String,
pub default_catalog: String,
+ pub priority: Priority,
}
impl ConfigExtension for CeresdbOptions {
const PREFIX: &'static str = "ceresdb";
}
+impl CeresdbOptions {
+ const REQUEST_ID_KEY: &'static str = "request_id";
+ const REQUEST_PRIORITY_KEY: &'static str = "request_priority";
+ const REQUEST_TIMEOUT_KEY: &'static str = "request_timeout";
+}
+
impl ExtensionOptions for CeresdbOptions {
fn as_any(&self) -> &dyn Any {
self
@@ -76,33 +84,57 @@ impl ExtensionOptions for CeresdbOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
match key {
- "request_id" => self.request_id = value.to_string(),
- "request_timeout" => {
+ Self::REQUEST_ID_KEY => self.request_id = value.to_string(),
+ Self::REQUEST_TIMEOUT_KEY => {
self.request_timeout = Some(value.parse::<u64>().map_err(|e| {
DataFusionError::External(
format!("could not parse request_timeout,
input:{value}, err:{e:?}").into(),
)
})?)
}
+ Self::REQUEST_PRIORITY_KEY => {
+ self.priority = value
+ .parse::<u8>()
+ .map_err(|e| {
+ DataFusionError::External(
+ format!("request_priority should be u8,
input:{value}, err:{e:?}")
+ .into(),
+ )
+ })
+ .and_then(|value| {
+ Priority::try_from(value).map_err(|e| {
+ DataFusionError::External(
+ format!("parse request_priority failed,
input:{value}, err:{e:?}")
+ .into(),
+ )
+ })
+ })?
+ }
_ => Err(DataFusionError::External(
format!("could not find key, key:{key}").into(),
))?,
}
+
Ok(())
}
fn entries(&self) -> Vec<ConfigEntry> {
vec![
ConfigEntry {
- key: "request_id".to_string(),
+ key: Self::REQUEST_ID_KEY.to_string(),
value: Some(self.request_id.to_string()),
description: "",
},
ConfigEntry {
- key: "request_timeout".to_string(),
+ key: Self::REQUEST_TIMEOUT_KEY.to_string(),
value: self.request_timeout.map(|v| v.to_string()),
description: "",
},
+ ConfigEntry {
+ key: Self::REQUEST_PRIORITY_KEY.to_string(),
+ value: Some(self.priority.as_u8().to_string()),
+ description: "",
+ },
]
}
}
@@ -182,8 +214,9 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
.request_timeout
.map(|n| Instant::now() + Duration::from_millis(n));
let read_parallelism = state.config().target_partitions();
+ let priority = ceresdb_options.priority;
debug!(
- "TableProvider scan table, table:{}, request_id:{},
projection:{:?}, filters:{:?}, limit:{:?}, deadline:{:?}, parallelism:{}",
+ "TableProvider scan table, table:{}, request_id:{},
projection:{:?}, filters:{:?}, limit:{:?}, deadline:{:?}, parallelism:{},
priority:{:?}",
self.table.name(),
request_id,
projection,
@@ -191,6 +224,7 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
limit,
deadline,
read_parallelism,
+ priority,
);
let predicate = self.check_and_build_predicate_from_filters(filters);
@@ -216,6 +250,7 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
projected_schema,
predicate,
metrics_collector:
MetricsCollector::new(SCAN_TABLE_METRICS_COLLECTOR_NAME.to_string()),
+ priority,
};
self.builder.build(request).await
@@ -439,9 +474,10 @@ impl DisplayAs for ScanTable {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) ->
fmt::Result {
write!(
f,
- "ScanTable: table={}, parallelism={}",
+ "ScanTable: table={}, parallelism={}, priority={:?}",
self.table.name(),
self.request.opts.read_parallelism,
+ self.request.priority
)
}
}
diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs
index bd996703..9073e75e 100644
--- a/table_engine/src/remote/model.rs
+++ b/table_engine/src/remote/model.rs
@@ -22,7 +22,7 @@ use std::{
use bytes_ext::{ByteVec, Bytes};
use ceresdbproto::remote_engine::{
- self, execute_plan_request, row_group::Rows::Contiguous, ColumnDesc,
+ self, execute_plan_request, row_group::Rows::Contiguous, ColumnDesc,
QueryPriority,
};
use common_types::{
request_id::RequestId,
@@ -35,6 +35,7 @@ use common_types::{
use generic_error::{BoxError, GenericError, GenericResult};
use itertools::Itertools;
use macros::define_result;
+use runtime::Priority;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
use crate::{
@@ -454,6 +455,7 @@ pub struct ExecContext {
pub default_catalog: String,
pub default_schema: String,
pub query: String,
+ pub priority: Priority,
}
pub enum PhysicalPlan {
@@ -474,7 +476,7 @@ impl From<RemoteExecuteRequest> for
ceresdbproto::remote_engine::ExecutePlanRequ
default_catalog: value.context.default_catalog,
default_schema: value.context.default_schema,
timeout_ms: rest_duration_ms,
- priority: 0, // not used now
+ priority: value.context.priority.as_u8() as i32,
displayable_query: value.context.query,
};
@@ -510,6 +512,10 @@ impl
TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
let pb_exec_ctx = value.context.context(ConvertRemoteExecuteRequest {
msg: "missing exec ctx",
})?;
+ let priority = match pb_exec_ctx.priority() {
+ QueryPriority::Low => Priority::Low,
+ QueryPriority::High => Priority::High,
+ };
let ceresdbproto::remote_engine::ExecContext {
request_id_str,
default_catalog,
@@ -532,6 +538,7 @@ impl
TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
default_catalog,
default_schema,
query: displayable_query,
+ priority,
};
// Plan
diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs
index 4f3e854f..6c380441 100644
--- a/table_engine/src/table.rs
+++ b/table_engine/src/table.rs
@@ -36,6 +36,7 @@ use common_types::{
};
use generic_error::{BoxError, GenericError};
use macros::define_result;
+use runtime::Priority;
use serde::Deserialize;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use trace_metric::MetricsCollector;
@@ -387,6 +388,7 @@ pub struct ReadRequest {
pub predicate: PredicateRef,
/// Collector for metrics of this read request.
pub metrics_collector: MetricsCollector,
+ pub priority: Priority,
}
impl fmt::Debug for ReadRequest {
@@ -415,6 +417,7 @@ impl fmt::Debug for ReadRequest {
.field("opts", &self.opts)
.field("projected", &projected)
.field("predicate", &predicate)
+ .field("priority", &self.priority)
.finish()
}
}
@@ -470,6 +473,8 @@ impl TryFrom<ceresdbproto::remote_engine::TableReadRequest>
for ReadRequest {
projected_schema,
predicate,
metrics_collector: MetricsCollector::default(),
+ // TODO: pass priority from request.
+ priority: Default::default(),
})
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]