This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git


The following commit(s) were added to refs/heads/dev by this push:
     new ce4044b8 feat: impl priority runtime for read (#1303)
ce4044b8 is described below

commit ce4044b8c2f02551ccd0c46a6d03cdf1e3032dd8
Author: Jiacai Liu <[email protected]>
AuthorDate: Thu Dec 28 15:35:46 2023 +0800

    feat: impl priority runtime for read (#1303)
    
    ## Rationale
    Close #1299
    
    ## Detailed Changes
    - Add PriorityRuntime component, and use in read API
    - In normal query, its plan will be executed in higher runtime by
    default, when executor decide query is expensive, then it will spawn
    `stream.poll` in another lower runtime.
    - In distributed query, a priority field is added in remote query
    request, so it can decide which runtime to run on.
    
    
    ## Test Plan
    Newly added UT
---
 Cargo.lock                                         |  29 ++-
 Cargo.toml                                         |   8 +-
 analytic_engine/src/instance/mod.rs                |   4 +-
 analytic_engine/src/instance/read.rs               |   6 +-
 analytic_engine/src/table/mod.rs                   |   2 +
 analytic_engine/src/tests/table.rs                 |   1 +
 analytic_engine/src/tests/util.rs                  |   5 +-
 components/logger/Cargo.toml                       |   1 +
 components/logger/src/lib.rs                       |  10 +-
 components/runtime/src/lib.rs                      |   4 +-
 components/runtime/src/priority_runtime.rs         |  98 ++++++++++
 df_engine_extensions/Cargo.toml                    |   1 +
 df_engine_extensions/src/dist_sql_query/mod.rs     |  18 +-
 .../src/dist_sql_query/physical_plan.rs            |   3 +
 .../src/dist_sql_query/resolver.rs                 |   9 +-
 .../src/dist_sql_query/test_util.rs                |   5 +
 docs/example-cluster-1.toml                        |   2 +-
 .../cases/common/dml/issue-1087.result             |   4 +-
 .../cases/common/dml/issue-341.result              |   8 +-
 integration_tests/cases/common/dml/issue-59.result |   2 +-
 .../cases/common/explain/explain.result            |   2 +-
 .../cases/common/optimizer/optimizer.result        |   2 +-
 .../cases/env/cluster/ddl/partition_table.result   |   4 +-
 .../cases/env/local/ddl/query-plan.result          |  21 ++-
 .../cases/env/local/ddl/query-plan.sql             |   5 +
 interpreters/Cargo.toml                            |   3 +
 interpreters/src/context.rs                        |  20 +-
 interpreters/src/factory.rs                        |  14 +-
 interpreters/src/lib.rs                            |   4 +-
 .../mod.rs => interpreters/src/metrics.rs          |  14 +-
 interpreters/src/select.rs                         | 108 +++++++----
 interpreters/src/tests.rs                          |  25 ++-
 proxy/src/instance.rs                              |   2 +
 proxy/src/lib.rs                                   |  81 +-------
 proxy/src/read.rs                                  |  37 +---
 query_engine/Cargo.toml                            |   1 +
 query_engine/src/config.rs                         |   3 +
 query_engine/src/context.rs                        |   2 +
 query_engine/src/datafusion_impl/mod.rs            |  75 +-------
 .../src/datafusion_impl/physical_planner.rs        |  39 ++--
 query_engine/src/datafusion_impl/task_context.rs   |   6 +
 query_frontend/Cargo.toml                          |   3 +
 query_frontend/src/frontend.rs                     |  75 ++++----
 query_frontend/src/influxql/planner.rs             |  13 +-
 query_frontend/src/lib.rs                          |   1 +
 query_frontend/src/logical_optimizer/mod.rs        |  49 +++++
 .../src}/logical_optimizer/type_conversion.rs      |   5 +-
 query_frontend/src/plan.rs                         | 209 ++++++++++++++++++++-
 query_frontend/src/planner.rs                      |  15 +-
 query_frontend/src/promql/convert.rs               |   3 +-
 query_frontend/src/promql/remote.rs                |  16 +-
 .../src/grpc/remote_engine_service/metrics.rs      |  14 +-
 server/src/grpc/remote_engine_service/mod.rs       |  87 ++++++---
 server/src/http.rs                                 |   6 +-
 server/src/server.rs                               |   3 +
 src/ceresdb/src/config.rs                          |   5 +-
 src/ceresdb/src/setup.rs                           |  22 ++-
 system_catalog/src/sys_catalog_table.rs            |   1 +
 table_engine/src/engine.rs                         |   4 +-
 table_engine/src/provider.rs                       |  48 ++++-
 table_engine/src/remote/model.rs                   |  11 +-
 table_engine/src/table.rs                          |   5 +
 62 files changed, 894 insertions(+), 389 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index e32493fe..27104165 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -648,7 +648,7 @@ dependencies = [
 [[package]]
 name = "arrow_util"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "ahash 0.8.3",
  "arrow 43.0.0",
@@ -2271,7 +2271,7 @@ dependencies = [
 [[package]]
 name = "datafusion_util"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "async-trait",
  "datafusion",
@@ -2372,6 +2372,7 @@ dependencies = [
  "lazy_static",
  "prometheus 0.12.0",
  "prost",
+ "runtime",
  "snafu 0.6.10",
  "table_engine",
  "tokio",
@@ -2860,7 +2861,7 @@ checksum = 
"8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2"
 [[package]]
 name = "generated_types"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "pbjson",
  "pbjson-build",
@@ -3298,7 +3299,7 @@ dependencies = [
 [[package]]
 name = "influxdb_influxql_parser"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "chrono",
  "chrono-tz",
@@ -3352,12 +3353,15 @@ dependencies = [
  "futures 0.3.28",
  "generic_error",
  "hash_ext",
+ "lazy_static",
  "logger",
  "macros",
  "meta_client",
+ "prometheus 0.12.0",
  "query_engine",
  "query_frontend",
  "regex",
+ "runtime",
  "snafu 0.6.10",
  "table_engine",
  "test_util",
@@ -3388,7 +3392,7 @@ dependencies = [
 [[package]]
 name = "iox_query"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "arrow 43.0.0",
  "arrow_util",
@@ -3412,7 +3416,7 @@ dependencies = [
 [[package]]
 name = "iox_query_influxql"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "arrow 43.0.0",
  "chrono",
@@ -3738,6 +3742,7 @@ version = "1.2.6-alpha"
 dependencies = [
  "chrono",
  "log",
+ "runtime",
  "serde",
  "slog",
  "slog-async",
@@ -4511,7 +4516,7 @@ dependencies = [
 [[package]]
 name = "observability_deps"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "tracing",
 ]
@@ -5430,6 +5435,7 @@ dependencies = [
  "macros",
  "prost",
  "query_frontend",
+ "runtime",
  "serde",
  "snafu 0.6.10",
  "table_engine",
@@ -5446,6 +5452,7 @@ dependencies = [
  "async-trait",
  "catalog",
  "ceresdbproto 1.0.23 
(git+https://github.com/CeresDB/horaedbproto.git?rev=d849fa4)",
+ "chrono",
  "cluster",
  "codec",
  "common_types",
@@ -5455,6 +5462,7 @@ dependencies = [
  "generic_error",
  "hash_ext",
  "influxdb_influxql_parser",
+ "iox_query",
  "iox_query_influxql",
  "itertools 0.10.5",
  "lazy_static",
@@ -5465,6 +5473,7 @@ dependencies = [
  "prom-remote-api",
  "regex",
  "regex-syntax 0.6.29",
+ "runtime",
  "schema",
  "snafu 0.6.10",
  "sqlparser",
@@ -5475,7 +5484,7 @@ dependencies = [
 [[package]]
 name = "query_functions"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "arrow 43.0.0",
  "chrono",
@@ -6130,7 +6139,7 @@ dependencies = [
 [[package]]
 name = "schema"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "arrow 43.0.0",
  "hashbrown 0.13.2",
@@ -6901,7 +6910,7 @@ dependencies = [
 [[package]]
 name = "test_helpers"
 version = "0.1.0"
-source = 
"git+https://github.com/CeresDB/influxql.git?rev=acbd3ad7651f2deb74857155bea892f88926da57#acbd3ad7651f2deb74857155bea892f88926da57";
+source = 
"git+https://github.com/CeresDB/influxql.git?rev=a905863#a9058633c03f018607dc1e4f6ca090b82d46a30c";
 dependencies = [
  "dotenvy",
  "observability_deps",
diff --git a/Cargo.toml b/Cargo.toml
index a01a7222..c99bbaf5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -115,10 +115,10 @@ hash_ext = { path = "components/hash_ext" }
 hex = "0.4.3"
 hyperloglog = { git = "https://github.com/jedisct1/rust-hyperloglog.git";, rev 
= "425487ce910f26636fbde8c4d640b538431aad50" }
 id_allocator = { path = "components/id_allocator" }
-influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git";, 
rev = "acbd3ad7651f2deb74857155bea892f88926da57", package = 
"iox_query_influxql" }
-influxql-parser = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"acbd3ad7651f2deb74857155bea892f88926da57", package = 
"influxdb_influxql_parser" }
-influxql-query = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"acbd3ad7651f2deb74857155bea892f88926da57", package = "iox_query" }
-influxql-schema = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"acbd3ad7651f2deb74857155bea892f88926da57", package = "schema" }
+influxql-logical-planner = { git = "https://github.com/CeresDB/influxql.git";, 
rev = "a905863", package = "iox_query_influxql" }
+influxql-parser = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"a905863", package = "influxdb_influxql_parser" }
+influxql-query = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"a905863", package = "iox_query" }
+influxql-schema = { git = "https://github.com/CeresDB/influxql.git";, rev = 
"a905863", package = "schema" }
 interpreters = { path = "interpreters" }
 itertools = "0.10.5"
 lz4_flex = { version = "0.11", default-features = false, features = ["frame"] }
diff --git a/analytic_engine/src/instance/mod.rs 
b/analytic_engine/src/instance/mod.rs
index 6eb257b8..7525f641 100644
--- a/analytic_engine/src/instance/mod.rs
+++ b/analytic_engine/src/instance/mod.rs
@@ -38,7 +38,7 @@ use generic_error::{BoxError, GenericError};
 use logger::{error, info};
 use macros::define_result;
 use mem_collector::MemUsageCollector;
-use runtime::Runtime;
+use runtime::{PriorityRuntime, Runtime};
 use snafu::{ResultExt, Snafu};
 use table_engine::{engine::EngineRuntimes, predicate::PredicateRef, 
table::FlushRequest};
 use time_ext::ReadableDuration;
@@ -291,7 +291,7 @@ impl Instance {
     }
 
     #[inline]
-    fn read_runtime(&self) -> &Arc<Runtime> {
+    fn read_runtime(&self) -> &PriorityRuntime {
         &self.runtimes.read_runtime
     }
 
diff --git a/analytic_engine/src/instance/read.rs 
b/analytic_engine/src/instance/read.rs
index 9624f4cf..341af2fc 100644
--- a/analytic_engine/src/instance/read.rs
+++ b/analytic_engine/src/instance/read.rs
@@ -122,6 +122,10 @@ impl Instance {
             None,
         ));
 
+        let runtime = self
+            .read_runtime()
+            .choose_runtime(&request.priority)
+            .clone();
         let sst_read_options_builder = SstReadOptionsBuilder::new(
             ScanType::Query,
             self.scan_options.clone(),
@@ -129,7 +133,7 @@ impl Instance {
             table_options.num_rows_per_row_group,
             request.predicate.clone(),
             self.meta_cache.clone(),
-            self.read_runtime().clone(),
+            runtime,
         );
 
         if need_merge_sort {
diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs
index 90097101..f6bc9cf4 100644
--- a/analytic_engine/src/table/mod.rs
+++ b/analytic_engine/src/table/mod.rs
@@ -541,6 +541,8 @@ impl Table for TableImpl {
             projected_schema: request.projected_schema,
             predicate,
             metrics_collector: 
MetricsCollector::new(GET_METRICS_COLLECTOR_NAME.to_string()),
+            // TODO: pass priority from request
+            priority: Default::default(),
         };
         let mut batch_stream = self
             .read(read_request)
diff --git a/analytic_engine/src/tests/table.rs 
b/analytic_engine/src/tests/table.rs
index 8524f9de..1704a051 100644
--- a/analytic_engine/src/tests/table.rs
+++ b/analytic_engine/src/tests/table.rs
@@ -192,6 +192,7 @@ pub fn new_read_all_request_with_order(schema: Schema, 
opts: ReadOptions) -> Rea
         projected_schema: ProjectedSchema::no_projection(schema),
         predicate: Arc::new(Predicate::empty()),
         metrics_collector: MetricsCollector::default(),
+        priority: Default::default(),
     }
 }
 
diff --git a/analytic_engine/src/tests/util.rs 
b/analytic_engine/src/tests/util.rs
index 0d9613b7..c7acad40 100644
--- a/analytic_engine/src/tests/util.rs
+++ b/analytic_engine/src/tests/util.rs
@@ -26,6 +26,7 @@ use common_types::{
 use futures::stream::StreamExt;
 use logger::info;
 use object_store::config::{LocalOptions, ObjectStoreOptions, StorageOptions};
+use runtime::PriorityRuntime;
 use size_ext::ReadableSize;
 use table_engine::{
     engine::{
@@ -124,7 +125,7 @@ impl<T: WalsOpener> TestContext<T> {
                 .open_wals(
                     &self.config.wal,
                     WalRuntimes {
-                        read_runtime: self.runtimes.read_runtime.clone(),
+                        read_runtime: 
self.runtimes.read_runtime.high().clone(),
                         write_runtime: self.runtimes.write_runtime.clone(),
                         default_runtime: self.runtimes.default_runtime.clone(),
                     },
@@ -528,7 +529,7 @@ impl Builder {
             _dir: dir,
             config,
             runtimes: Arc::new(EngineRuntimes {
-                read_runtime: runtime.clone(),
+                read_runtime: PriorityRuntime::new(runtime.clone(), 
runtime.clone()),
                 write_runtime: runtime.clone(),
                 meta_runtime: runtime.clone(),
                 compact_runtime: runtime.clone(),
diff --git a/components/logger/Cargo.toml b/components/logger/Cargo.toml
index 77db8f59..7ae97c1e 100644
--- a/components/logger/Cargo.toml
+++ b/components/logger/Cargo.toml
@@ -35,6 +35,7 @@ workspace = true
 [dependencies]
 chrono = { workspace = true }
 log = "0.4"
+runtime = { workspace = true }
 serde = { workspace = true }
 slog = { workspace = true }
 slog-async = "2.6"
diff --git a/components/logger/src/lib.rs b/components/logger/src/lib.rs
index 1dc78b0c..5d25e230 100644
--- a/components/logger/src/lib.rs
+++ b/components/logger/src/lib.rs
@@ -28,6 +28,7 @@ pub use log::{
     debug as log_debug, error as log_error, info as log_info, max_level, trace 
as log_trace,
     warn as log_warn, SetLoggerError,
 };
+use runtime::Priority;
 use serde::{Deserialize, Serialize};
 pub use slog::Level;
 use slog::{slog_o, Drain, Key, OwnedKVList, Record, KV};
@@ -471,6 +472,7 @@ pub struct SlowTimer<'a> {
     sql: &'a str,
     slow_threshold: Duration,
     start_time: Instant,
+    priority: Option<Priority>,
 }
 
 impl<'a> Drop for SlowTimer<'a> {
@@ -478,9 +480,10 @@ impl<'a> Drop for SlowTimer<'a> {
         let cost = self.elapsed();
         if cost > self.slow_threshold {
             slow_query!(
-                "Normal query elapsed:{:?}, id:{}, query:{}",
+                "Normal query elapsed:{:?}, id:{}, priority:{:?}, query:{}",
                 cost,
                 self.request_id,
+                self.priority,
                 self.sql,
             );
         }
@@ -494,6 +497,7 @@ impl<'a> SlowTimer<'a> {
             sql,
             slow_threshold: threshold,
             start_time: Instant::now(),
+            priority: None,
         }
     }
 
@@ -504,6 +508,10 @@ impl<'a> SlowTimer<'a> {
     pub fn start_time(&self) -> Instant {
         self.start_time
     }
+
+    pub fn priority(&mut self, priority: Priority) {
+        self.priority = Some(priority);
+    }
 }
 
 #[macro_export(local_inner_macros)]
diff --git a/components/runtime/src/lib.rs b/components/runtime/src/lib.rs
index 54f6e7be..b6453819 100644
--- a/components/runtime/src/lib.rs
+++ b/components/runtime/src/lib.rs
@@ -30,8 +30,10 @@ use tokio::{
 };
 
 mod metrics;
+mod priority_runtime;
+
+pub use priority_runtime::{Priority, PriorityRuntime};
 
-// TODO(yingwen): Use opaque error type
 #[derive(Debug, Snafu)]
 #[snafu(visibility(pub))]
 pub enum Error {
diff --git a/components/runtime/src/priority_runtime.rs 
b/components/runtime/src/priority_runtime.rs
new file mode 100644
index 00000000..1f69bd8a
--- /dev/null
+++ b/components/runtime/src/priority_runtime.rs
@@ -0,0 +1,98 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use std::future::Future;
+
+use crate::{JoinHandle, RuntimeRef};
+
+// TODO: maybe we could move this to common_types crate.
+#[derive(Copy, Clone, Debug, Default)]
+#[repr(u8)]
+pub enum Priority {
+    #[default]
+    High = 0,
+    Low = 1,
+}
+
+impl Priority {
+    pub fn as_u8(&self) -> u8 {
+        *self as u8
+    }
+
+    pub fn as_str(&self) -> &str {
+        match self {
+            Self::High => "high",
+            Self::Low => "low",
+        }
+    }
+}
+
+impl TryFrom<u8> for Priority {
+    type Error = String;
+
+    fn try_from(value: u8) -> Result<Self, Self::Error> {
+        match value {
+            0 => Ok(Priority::High),
+            1 => Ok(Priority::Low),
+            _ => Err(format!("Unknown priority, value:{value}")),
+        }
+    }
+}
+
+#[derive(Clone, Debug)]
+pub struct PriorityRuntime {
+    low: RuntimeRef,
+    high: RuntimeRef,
+}
+
+impl PriorityRuntime {
+    pub fn new(low: RuntimeRef, high: RuntimeRef) -> Self {
+        Self { low, high }
+    }
+
+    pub fn low(&self) -> &RuntimeRef {
+        &self.low
+    }
+
+    pub fn high(&self) -> &RuntimeRef {
+        &self.high
+    }
+
+    pub fn choose_runtime(&self, priority: &Priority) -> &RuntimeRef {
+        match priority {
+            Priority::Low => &self.low,
+            Priority::High => &self.high,
+        }
+    }
+
+    // By default we spawn the future to the higher priority runtime.
+    pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+    where
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
+    {
+        self.high.spawn(future)
+    }
+
+    pub fn spawn_with_priority<F>(&self, future: F, priority: Priority) -> 
JoinHandle<F::Output>
+    where
+        F: Future + Send + 'static,
+        F::Output: Send + 'static,
+    {
+        match priority {
+            Priority::Low => self.low.spawn(future),
+            Priority::High => self.high.spawn(future),
+        }
+    }
+}
diff --git a/df_engine_extensions/Cargo.toml b/df_engine_extensions/Cargo.toml
index 54926369..4de30d40 100644
--- a/df_engine_extensions/Cargo.toml
+++ b/df_engine_extensions/Cargo.toml
@@ -41,6 +41,7 @@ generic_error = { workspace = true }
 lazy_static = { workspace = true }
 prometheus = { workspace = true }
 prost = { workspace = true }
+runtime = { workspace = true }
 snafu = { workspace = true }
 table_engine = { workspace = true }
 trace_metric = { workspace = true }
diff --git a/df_engine_extensions/src/dist_sql_query/mod.rs 
b/df_engine_extensions/src/dist_sql_query/mod.rs
index 4bbf6b36..abfc0cca 100644
--- a/df_engine_extensions/src/dist_sql_query/mod.rs
+++ b/df_engine_extensions/src/dist_sql_query/mod.rs
@@ -26,6 +26,7 @@ use datafusion::{
 };
 use futures::future::BoxFuture;
 use generic_error::BoxError;
+use runtime::Priority;
 use table_engine::{predicate::PredicateRef, remote::model::TableIdentifier, 
table::TableRef};
 
 pub mod codec;
@@ -56,6 +57,7 @@ pub trait ExecutableScanBuilder: fmt::Debug + Send + Sync + 
'static {
         &self,
         table: TableRef,
         ctx: TableScanContext,
+        priority: Priority,
     ) -> DfResult<Arc<dyn ExecutionPlan>>;
 }
 
@@ -91,22 +93,6 @@ pub struct TableScanContext {
     pub predicate: PredicateRef,
 }
 
-impl TableScanContext {
-    pub fn new(
-        batch_size: usize,
-        read_parallelism: usize,
-        projected_schema: ProjectedSchema,
-        predicate: PredicateRef,
-    ) -> Self {
-        Self {
-            batch_size,
-            read_parallelism,
-            projected_schema,
-            predicate,
-        }
-    }
-}
-
 impl TryFrom<TableScanContext> for 
ceresdbproto::remote_engine::TableScanContext {
     type Error = datafusion::error::DataFusionError;
 
diff --git a/df_engine_extensions/src/dist_sql_query/physical_plan.rs 
b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
index 0dbaf415..87cd18bd 100644
--- a/df_engine_extensions/src/dist_sql_query/physical_plan.rs
+++ b/df_engine_extensions/src/dist_sql_query/physical_plan.rs
@@ -43,6 +43,7 @@ use datafusion::{
     },
 };
 use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
+use runtime::Priority;
 use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
 use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, 
TraceMetricWhenDrop};
 
@@ -57,6 +58,7 @@ pub struct UnresolvedPartitionedScan {
     pub sub_tables: Vec<TableIdentifier>,
     pub table_scan_ctx: TableScanContext,
     pub metrics_collector: MetricsCollector,
+    pub priority: Priority,
 }
 
 impl UnresolvedPartitionedScan {
@@ -77,6 +79,7 @@ impl UnresolvedPartitionedScan {
             sub_tables,
             table_scan_ctx,
             metrics_collector,
+            priority: read_request.priority,
         }
     }
 }
diff --git a/df_engine_extensions/src/dist_sql_query/resolver.rs 
b/df_engine_extensions/src/dist_sql_query/resolver.rs
index c48bfe53..8f1d4e1c 100644
--- a/df_engine_extensions/src/dist_sql_query/resolver.rs
+++ b/df_engine_extensions/src/dist_sql_query/resolver.rs
@@ -20,6 +20,7 @@ use datafusion::{
     error::{DataFusionError, Result as DfResult},
     physical_plan::{analyze::AnalyzeExec, ExecutionPlan},
 };
+use runtime::Priority;
 use table_engine::{remote::model::TableIdentifier, table::TableRef};
 
 use crate::{
@@ -45,6 +46,7 @@ pub struct Resolver {
     remote_executor: RemotePhysicalPlanExecutorRef,
     catalog_manager: CatalogManagerRef,
     scan_builder: ExecutableScanBuilderRef,
+    priority: Priority,
 }
 
 impl Resolver {
@@ -52,11 +54,13 @@ impl Resolver {
         remote_executor: RemotePhysicalPlanExecutorRef,
         catalog_manager: CatalogManagerRef,
         scan_builder: ExecutableScanBuilderRef,
+        priority: Priority,
     ) -> Self {
         Self {
             remote_executor,
             catalog_manager,
             scan_builder,
+            priority,
         }
     }
 
@@ -214,7 +218,10 @@ impl Resolver {
             };
 
         if let Some((table, table_scan_ctx)) = build_scan_opt {
-            return self.scan_builder.build(table, table_scan_ctx).await;
+            return self
+                .scan_builder
+                .build(table, table_scan_ctx, self.priority)
+                .await;
         }
 
         let children = plan.children().clone();
diff --git a/df_engine_extensions/src/dist_sql_query/test_util.rs 
b/df_engine_extensions/src/dist_sql_query/test_util.rs
index d70f9ec2..9006a526 100644
--- a/df_engine_extensions/src/dist_sql_query/test_util.rs
+++ b/df_engine_extensions/src/dist_sql_query/test_util.rs
@@ -44,6 +44,7 @@ use datafusion::{
     scalar::ScalarValue,
 };
 use futures::{future::BoxFuture, Stream};
+use runtime::Priority;
 use table_engine::{
     memory::MemoryTable,
     predicate::PredicateBuilder,
@@ -204,6 +205,7 @@ impl TestContext {
             projected_schema,
             predicate,
             metrics_collector: MetricsCollector::default(),
+            priority: Default::default(),
         };
 
         // Build the test catalog
@@ -238,6 +240,7 @@ impl TestContext {
             Arc::new(MockRemotePhysicalPlanExecutor),
             self.catalog_manager.clone(),
             Box::new(MockScanBuilder),
+            Priority::High,
         )
     }
 
@@ -422,6 +425,7 @@ impl ExecutableScanBuilder for MockScanBuilder {
         &self,
         _table: TableRef,
         ctx: TableScanContext,
+        priority: Priority,
     ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
         let request = ReadRequest {
             request_id: RequestId::from("test"),
@@ -433,6 +437,7 @@ impl ExecutableScanBuilder for MockScanBuilder {
             projected_schema: ctx.projected_schema.clone(),
             predicate: ctx.predicate.clone(),
             metrics_collector: MetricsCollector::default(),
+            priority,
         };
 
         Ok(Arc::new(MockScan { request }))
diff --git a/docs/example-cluster-1.toml b/docs/example-cluster-1.toml
index b08d3dc7..8c221dc8 100644
--- a/docs/example-cluster-1.toml
+++ b/docs/example-cluster-1.toml
@@ -16,7 +16,7 @@
 addr = "127.0.0.1"
 
 [logger]
-level = "info"
+level = "debug"
 
 [server]
 bind_addr = "0.0.0.0"
diff --git a/integration_tests/cases/common/dml/issue-1087.result 
b/integration_tests/cases/common/dml/issue-1087.result
index 54265dae..ad8a0cc7 100644
--- a/integration_tests/cases/common/dml/issue-1087.result
+++ b/integration_tests/cases/common/dml/issue-1087.result
@@ -76,7 +76,7 @@ String("logical_plan after push_down_limit"),String("SAME 
TEXT AS ABOVE"),
 String("logical_plan after influx_regex_to_datafusion_regex"),String("SAME 
TEXT AS ABOVE"),
 String("logical_plan after handle_gap_fill"),String("SAME TEXT AS ABOVE"),
 String("logical_plan"),String("TableScan: issue_1087 projection=[tsid, t, 
name, value]"),
-String("initial_physical_plan"),String("ScanTable: table=issue_1087, 
parallelism=8\n"),
+String("initial_physical_plan"),String("ScanTable: table=issue_1087, 
parallelism=8, priority=Low\n"),
 String("physical_plan after aggregate_statistics"),String("SAME TEXT AS 
ABOVE"),
 String("physical_plan after join_selection"),String("SAME TEXT AS ABOVE"),
 String("physical_plan after PipelineFixer"),String("SAME TEXT AS ABOVE"),
@@ -86,7 +86,7 @@ String("physical_plan after 
CombinePartialFinalAggregate"),String("SAME TEXT AS
 String("physical_plan after EnforceSorting"),String("SAME TEXT AS ABOVE"),
 String("physical_plan after coalesce_batches"),String("SAME TEXT AS ABOVE"),
 String("physical_plan after PipelineChecker"),String("SAME TEXT AS ABOVE"),
-String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8\n"),
+String("physical_plan"),String("ScanTable: table=issue_1087, parallelism=8, 
priority=Low\n"),
 
 
 DROP TABLE `issue_1087`;
diff --git a/integration_tests/cases/common/dml/issue-341.result 
b/integration_tests/cases/common/dml/issue-341.result
index f9405db3..90222259 100644
--- a/integration_tests/cases/common/dml/issue-341.result
+++ b/integration_tests/cases/common/dml/issue-341.result
@@ -58,7 +58,7 @@ WHERE
 
 plan_type,plan,
 String("logical_plan"),String("TableScan: issue341_t1 projection=[timestamp, 
value], full_filters=[issue341_t1.value = Int32(3)]"),
-String("physical_plan"),String("ScanTable: table=issue341_t1, 
parallelism=8\n"),
+String("physical_plan"),String("ScanTable: table=issue341_t1, parallelism=8, 
priority=Low\n"),
 
 
 -- FilterExec node should not be in plan.
@@ -72,7 +72,7 @@ WHERE
 
 plan_type,plan,
 String("logical_plan"),String("Projection: issue341_t1.timestamp, 
issue341_t1.value\n  TableScan: issue341_t1 projection=[timestamp, value, 
tag1], full_filters=[issue341_t1.tag1 = Utf8(\"t3\")]"),
-String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as 
timestamp, value@1 as value]\n  ScanTable: table=issue341_t1, parallelism=8\n"),
+String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as 
timestamp, value@1 as value]\n  ScanTable: table=issue341_t1, parallelism=8, 
priority=Low\n"),
 
 
 -- Repeat operations above, but with overwrite table
@@ -116,7 +116,7 @@ WHERE
 
 plan_type,plan,
 String("logical_plan"),String("Filter: issue341_t2.value = Float64(3)\n  
TableScan: issue341_t2 projection=[timestamp, value], 
partial_filters=[issue341_t2.value = Float64(3)]"),
-String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n  
FilterExec: value@1 = 3\n    ScanTable: table=issue341_t2, parallelism=8\n"),
+String("physical_plan"),String("CoalesceBatchesExec: target_batch_size=8192\n  
FilterExec: value@1 = 3\n    ScanTable: table=issue341_t2, parallelism=8, 
priority=Low\n"),
 
 
 -- When using tag as filter, FilterExec node should not be in plan.
@@ -130,7 +130,7 @@ WHERE
 
 plan_type,plan,
 String("logical_plan"),String("Projection: issue341_t2.timestamp, 
issue341_t2.value\n  TableScan: issue341_t2 projection=[timestamp, value, 
tag1], full_filters=[issue341_t2.tag1 = Utf8(\"t3\")]"),
-String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as 
timestamp, value@1 as value]\n  ScanTable: table=issue341_t2, parallelism=8\n"),
+String("physical_plan"),String("ProjectionExec: expr=[timestamp@0 as 
timestamp, value@1 as value]\n  ScanTable: table=issue341_t2, parallelism=8, 
priority=Low\n"),
 
 
 DROP TABLE IF EXISTS `issue341_t1`;
diff --git a/integration_tests/cases/common/dml/issue-59.result 
b/integration_tests/cases/common/dml/issue-59.result
index d2bdb35f..549c7019 100644
--- a/integration_tests/cases/common/dml/issue-59.result
+++ b/integration_tests/cases/common/dml/issue-59.result
@@ -25,7 +25,7 @@ GROUP BY id+1;
 
 plan_type,plan,
 String("logical_plan"),String("Projection: group_alias_0 AS issue59.id + 
Int64(1), COUNT(alias1) AS COUNT(DISTINCT issue59.account)\n  Aggregate: 
groupBy=[[group_alias_0]], aggr=[[COUNT(alias1)]]\n    Projection: 
group_alias_0, alias1\n      Aggregate: groupBy=[[CAST(issue59.id AS Int64) + 
Int64(1) AS group_alias_0, issue59.account AS alias1]], aggr=[[]]\n        
TableScan: issue59 projection=[id, account]"),
-String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as 
issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n  
AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], 
aggr=[COUNT(alias1)]\n    CoalesceBatchesExec: target_batch_size=8192\n      
RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n  
      AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], 
aggr=[COUNT(alias1)]\n          [...]
+String("physical_plan"),String("ProjectionExec: expr=[group_alias_0@0 as 
issue59.id + Int64(1), COUNT(alias1)@1 as COUNT(DISTINCT issue59.account)]\n  
AggregateExec: mode=FinalPartitioned, gby=[group_alias_0@0 as group_alias_0], 
aggr=[COUNT(alias1)]\n    CoalesceBatchesExec: target_batch_size=8192\n      
RepartitionExec: partitioning=Hash([group_alias_0@0], 8), input_partitions=8\n  
      AggregateExec: mode=Partial, gby=[group_alias_0@0 as group_alias_0], 
aggr=[COUNT(alias1)]\n          [...]
 
 
 DROP TABLE IF EXISTS issue59;
diff --git a/integration_tests/cases/common/explain/explain.result 
b/integration_tests/cases/common/explain/explain.result
index ba651eca..0cd06380 100644
--- a/integration_tests/cases/common/explain/explain.result
+++ b/integration_tests/cases/common/explain/explain.result
@@ -10,7 +10,7 @@ EXPLAIN SELECT t FROM `04_explain_t`;
 
 plan_type,plan,
 String("logical_plan"),String("TableScan: 04_explain_t projection=[t]"),
-String("physical_plan"),String("ScanTable: table=04_explain_t, 
parallelism=8\n"),
+String("physical_plan"),String("ScanTable: table=04_explain_t, parallelism=8, 
priority=Low\n"),
 
 
 DROP TABLE `04_explain_t`;
diff --git a/integration_tests/cases/common/optimizer/optimizer.result 
b/integration_tests/cases/common/optimizer/optimizer.result
index 8551f96c..f9cfac2d 100644
--- a/integration_tests/cases/common/optimizer/optimizer.result
+++ b/integration_tests/cases/common/optimizer/optimizer.result
@@ -10,7 +10,7 @@ EXPLAIN SELECT max(value) AS c1, avg(value) AS c2 FROM 
`07_optimizer_t` GROUP BY
 
 plan_type,plan,
 String("logical_plan"),String("Projection: MAX(07_optimizer_t.value) AS c1, 
AVG(07_optimizer_t.value) AS c2\n  Aggregate: groupBy=[[07_optimizer_t.name]], 
aggr=[[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]]\n    TableScan: 
07_optimizer_t projection=[name, value]"),
-String("physical_plan"),String("ProjectionExec: 
expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n  
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], 
aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n    
CoalesceBatchesExec: target_batch_size=8192\n      RepartitionExec: 
partitioning=Hash([name@0], 8), input_partitions=8\n        AggregateExec: 
mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), 
AVG(07_optimizer_t.value)]\n [...]
+String("physical_plan"),String("ProjectionExec: 
expr=[MAX(07_optimizer_t.value)@1 as c1, AVG(07_optimizer_t.value)@2 as c2]\n  
AggregateExec: mode=FinalPartitioned, gby=[name@0 as name], 
aggr=[MAX(07_optimizer_t.value), AVG(07_optimizer_t.value)]\n    
CoalesceBatchesExec: target_batch_size=8192\n      RepartitionExec: 
partitioning=Hash([name@0], 8), input_partitions=8\n        AggregateExec: 
mode=Partial, gby=[name@0 as name], aggr=[MAX(07_optimizer_t.value), 
AVG(07_optimizer_t.value)]\n [...]
 
 
 DROP TABLE `07_optimizer_t`;
diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result 
b/integration_tests/cases/env/cluster/ddl/partition_table.result
index 18e023c0..e8feacbc 100644
--- a/integration_tests/cases/env/cluster/ddl/partition_table.result
+++ b/integration_tests/cases/env/cluster/ddl/partition_table.result
@@ -83,7 +83,7 @@ 
UInt64(4860320137932382618),Timestamp(1651737067000),String("ceresdb9"),Int32(0)
 EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";
 
 plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n    
__partition_table_t_1:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, 
metrics=[output_rows=1, elapsed_compute=xxs]\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, metrics=[\nPredicate { exprs:[name 
= Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_st [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n    
__partition_table_t_1:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, 
metrics=[output_rows=1, elapsed_compute=xxs]\n  ScanTable: 
table=__partition_table_t_1, parallelism=8, priority=Low, metrics=[\nPredicate 
{ exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange  [...]
 
 
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
@@ -92,7 +92,7 @@ String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:f
 EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", 
"ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");
 
 plan_type,plan,
-String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n    __partition_table_t_x:\n        
poll_duration=xxs\n        total_duration=xxs\n        wait_duration=xxs\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_x:\n [...]
+String("Plan with Metrics"),String("ResolvedPartitionedScan: 
pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n    __partition_table_t_x:\n        
poll_duration=xxs\n        total_duration=xxs\n        wait_duration=xxs\n    
__partition_table_t_x:\n        poll_duration=xxs\n        total_duration=xxs\n 
       wait_duration=xxs\n\n__partition_table_t_x:\n [...]
 
 
 ALTER TABLE partition_table_t ADD COLUMN (b string);
diff --git a/integration_tests/cases/env/local/ddl/query-plan.result 
b/integration_tests/cases/env/local/ddl/query-plan.result
index 26dcf909..be858c15 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.result
+++ b/integration_tests/cases/env/local/ddl/query-plan.result
@@ -31,7 +31,16 @@ explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348001000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { 
exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n        
num_memtables=1\n        num_ssts=0\n        scan_count=2\n        
scan_duration=xxs\n      [...]
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348001001), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=true\n    iter_num=1\n    merge_iter_0:\n        
init_duration=xxs\n        num_memtables=1\n        num_ssts=0\n        
scan_count=2\n        scan_durat [...]
+
+
+-- This query should have higher priority
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+explain analyze select t from `03_dml_select_real_time_range`
+where t >= 1695348001000 and t < 1695348002000;
+
+plan_type,plan,
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=High, 
metrics=[\nPredicate { exprs:[t >= TimestampMillisecond(1695348001000, None), t 
< TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001000), exclusive_end: 
Timestamp(1695348002000) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n        
num_memtables=1\n        num_ssts= [...]
 
 
 -- This query should not include memtable
@@ -40,7 +49,7 @@ explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348002000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { 
exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348002001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=0\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348002001), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=true\n    iter_num=0\n=0]\n"),
 
 
 -- SQLNESS ARG pre_cmd=flush
@@ -51,7 +60,7 @@ explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348001000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { 
exprs:[t > TimestampMillisecond(1695348001000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348001001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=1\n    merge_iter_0:\n        init_duration=xxs\n        
num_memtables=0\n        num_ssts=1\n        scan_count=2\n        
scan_duration=xxs\n      [...]
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348001000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348001001), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=true\n    iter_num=1\n    merge_iter_0:\n        
init_duration=xxs\n        num_memtables=0\n        num_ssts=1\n        
scan_count=2\n        scan_durat [...]
 
 
 -- This query should not include SST
@@ -59,7 +68,7 @@ explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348002000;
 
 plan_type,plan,
-String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, metrics=[\nPredicate { 
exprs:[t > TimestampMillisecond(1695348002000, None)], time_range:TimeRange { 
inclusive_start: Timestamp(1695348002001), exclusive_end: 
Timestamp(9223372036854775807) } }\nscan_table:\n    do_merge_sort=true\n    
iter_num=0\n=0]\n"),
+String("Plan with Metrics"),String("ScanTable: 
table=03_dml_select_real_time_range, parallelism=8, priority=Low, 
metrics=[\nPredicate { exprs:[t > TimestampMillisecond(1695348002000, None)], 
time_range:TimeRange { inclusive_start: Timestamp(1695348002001), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=true\n    iter_num=0\n=0]\n"),
 
 
 -- Table with an 'append' update mode
@@ -92,7 +101,7 @@ explain analyze select t from `03_append_mode_table`
 where t >= 1695348001000 and name = 'ceresdb';
 
 plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=[output_rows=2, elapsed_compute=xxs]\n  ScanTable: 
table=03_append_mode_table, parallelism=8, metrics=[\nPredicate { exprs:[t >= 
TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], 
time_range:TimeRange { inclusive_start: Timestamp(1695348001000), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=false\n    chain_iter_0:\n        num_memtables=1\n        
num_ssts= [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=[output_rows=2, elapsed_compute=xxs]\n  ScanTable: 
table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), name = 
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: 
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } 
}\nscan_table:\n    do_merge_sort=false\n    chain_iter_0:\n        
num_memtables=1\n    [...]
 
 
 -- Should just fetch projected columns from SST
@@ -106,7 +115,7 @@ explain analyze select t from `03_append_mode_table`
 where t >= 1695348001000 and name = 'ceresdb';
 
 plan_type,plan,
-String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=[output_rows=2, elapsed_compute=xxs]\n  ScanTable: 
table=03_append_mode_table, parallelism=8, metrics=[\nPredicate { exprs:[t >= 
TimestampMillisecond(1695348001000, None), name = Utf8(\"ceresdb\")], 
time_range:TimeRange { inclusive_start: Timestamp(1695348001000), 
exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n    
do_merge_sort=false\n    chain_iter_0:\n        num_memtables=0\n        
num_ssts= [...]
+String("Plan with Metrics"),String("ProjectionExec: expr=[t@0 as t], 
metrics=[output_rows=2, elapsed_compute=xxs]\n  ScanTable: 
table=03_append_mode_table, parallelism=8, priority=Low, metrics=[\nPredicate { 
exprs:[t >= TimestampMillisecond(1695348001000, None), name = 
Utf8(\"ceresdb\")], time_range:TimeRange { inclusive_start: 
Timestamp(1695348001000), exclusive_end: Timestamp(9223372036854775807) } 
}\nscan_table:\n    do_merge_sort=false\n    chain_iter_0:\n        
num_memtables=0\n    [...]
 
 
 DROP TABLE `03_dml_select_real_time_range`;
diff --git a/integration_tests/cases/env/local/ddl/query-plan.sql 
b/integration_tests/cases/env/local/ddl/query-plan.sql
index a0baff5b..d1dbdaf9 100644
--- a/integration_tests/cases/env/local/ddl/query-plan.sql
+++ b/integration_tests/cases/env/local/ddl/query-plan.sql
@@ -21,6 +21,11 @@ INSERT INTO `03_dml_select_real_time_range` (t, name, value)
 explain analyze select t from `03_dml_select_real_time_range`
 where t > 1695348001000;
 
+-- This query should have higher priority
+-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
+explain analyze select t from `03_dml_select_real_time_range`
+where t >= 1695348001000 and t < 1695348002000;
+
 -- This query should not include memtable
 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
 explain analyze select t from `03_dml_select_real_time_range`
diff --git a/interpreters/Cargo.toml b/interpreters/Cargo.toml
index 94b9f4ea..7687b97f 100644
--- a/interpreters/Cargo.toml
+++ b/interpreters/Cargo.toml
@@ -40,12 +40,15 @@ df_operator = { workspace = true }
 futures = { workspace = true }
 generic_error = { workspace = true }
 hash_ext = { workspace = true }
+lazy_static = { workspace = true }
 logger = { workspace = true }
 macros = { workspace = true }
 meta_client = { workspace = true }
+prometheus = { workspace = true }
 query_engine = { workspace = true }
 query_frontend = { workspace = true }
 regex = { workspace = true }
+runtime = { workspace = true }
 snafu = { workspace = true }
 table_engine = { workspace = true }
 
diff --git a/interpreters/src/context.rs b/interpreters/src/context.rs
index 34e76d44..ab72ca38 100644
--- a/interpreters/src/context.rs
+++ b/interpreters/src/context.rs
@@ -19,6 +19,7 @@ use std::{sync::Arc, time::Instant};
 use common_types::request_id::RequestId;
 use macros::define_result;
 use query_engine::context::{Context as QueryContext, ContextRef as 
QueryContextRef};
+use runtime::Priority;
 use snafu::Snafu;
 
 #[derive(Debug, Snafu)]
@@ -36,6 +37,9 @@ pub struct Context {
     default_catalog: String,
     default_schema: String,
     enable_partition_table_access: bool,
+    /// If time range exceeds this threshold, the query will be marked as
+    /// expensive
+    expensive_query_threshold: u64,
 }
 
 impl Context {
@@ -46,16 +50,18 @@ impl Context {
             default_catalog: String::new(),
             default_schema: String::new(),
             enable_partition_table_access: false,
+            expensive_query_threshold: 24 * 3600 * 1000, // default 24 hours
         }
     }
 
     /// Create a new context of query executor
-    pub fn new_query_context(&self) -> Result<QueryContextRef> {
+    pub fn new_query_context(&self, priority: Priority) -> 
Result<QueryContextRef> {
         let ctx = QueryContext {
             request_id: self.request_id.clone(),
             deadline: self.deadline,
             default_catalog: self.default_catalog.clone(),
             default_schema: self.default_schema.clone(),
+            priority,
         };
         Ok(Arc::new(ctx))
     }
@@ -79,6 +85,11 @@ impl Context {
     pub fn enable_partition_table_access(&self) -> bool {
         self.enable_partition_table_access
     }
+
+    #[inline]
+    pub fn expensive_query_threshold(&self) -> u64 {
+        self.expensive_query_threshold
+    }
 }
 
 #[must_use]
@@ -88,6 +99,7 @@ pub struct Builder {
     default_catalog: String,
     default_schema: String,
     enable_partition_table_access: bool,
+    expensive_query_threshold: u64,
 }
 
 impl Builder {
@@ -102,6 +114,11 @@ impl Builder {
         self
     }
 
+    pub fn expensive_query_threshold(mut self, threshold: u64) -> Self {
+        self.expensive_query_threshold = threshold;
+        self
+    }
+
     pub fn build(self) -> Context {
         Context {
             request_id: self.request_id,
@@ -109,6 +126,7 @@ impl Builder {
             default_catalog: self.default_catalog,
             default_schema: self.default_schema,
             enable_partition_table_access: self.enable_partition_table_access,
+            expensive_query_threshold: self.expensive_query_threshold,
         }
     }
 }
diff --git a/interpreters/src/factory.rs b/interpreters/src/factory.rs
index 6216b6c9..7217b5d1 100644
--- a/interpreters/src/factory.rs
+++ b/interpreters/src/factory.rs
@@ -17,6 +17,7 @@
 use catalog::manager::ManagerRef;
 use query_engine::{executor::ExecutorRef, 
physical_planner::PhysicalPlannerRef};
 use query_frontend::plan::Plan;
+use runtime::PriorityRuntime;
 use table_engine::engine::TableEngineRef;
 
 use crate::{
@@ -37,6 +38,7 @@ use crate::{
 /// A factory to create interpreters
 pub struct Factory {
     query_executor: ExecutorRef,
+    query_runtime: PriorityRuntime,
     physical_planner: PhysicalPlannerRef,
     catalog_manager: ManagerRef,
     table_engine: TableEngineRef,
@@ -50,9 +52,11 @@ impl Factory {
         catalog_manager: ManagerRef,
         table_engine: TableEngineRef,
         table_manipulator: TableManipulatorRef,
+        query_runtime: PriorityRuntime,
     ) -> Self {
         Self {
             query_executor,
+            query_runtime,
             physical_planner,
             catalog_manager,
             table_engine,
@@ -68,9 +72,13 @@ impl Factory {
         validator.validate(&plan)?;
 
         let interpreter = match plan {
-            Plan::Query(p) => {
-                SelectInterpreter::create(ctx, p, self.query_executor, 
self.physical_planner)
-            }
+            Plan::Query(p) => SelectInterpreter::create(
+                ctx,
+                p,
+                self.query_executor,
+                self.physical_planner,
+                self.query_runtime,
+            ),
             Plan::Insert(p) => InsertInterpreter::create(ctx, p),
             Plan::Create(p) => {
                 CreateInterpreter::create(ctx, p, self.table_engine, 
self.table_manipulator)
diff --git a/interpreters/src/lib.rs b/interpreters/src/lib.rs
index b7b8ce35..5304ab7d 100644
--- a/interpreters/src/lib.rs
+++ b/interpreters/src/lib.rs
@@ -29,13 +29,13 @@ pub mod exists;
 pub mod factory;
 pub mod insert;
 pub mod interpreter;
+mod metrics;
 pub mod select;
 pub mod show;
+mod show_create;
 pub mod table_manipulator;
 pub mod validator;
 
-mod show_create;
-
 #[cfg(test)]
 mod tests;
 
diff --git a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs 
b/interpreters/src/metrics.rs
similarity index 66%
copy from query_engine/src/datafusion_impl/logical_optimizer/mod.rs
copy to interpreters/src/metrics.rs
index a3b5130f..5f9b3028 100644
--- a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs
+++ b/interpreters/src/metrics.rs
@@ -12,8 +12,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//! Logical optimizer
+use lazy_static::lazy_static;
+use prometheus::{register_int_counter_vec, IntCounterVec};
 
-#[cfg(test)]
-pub mod tests;
-pub mod type_conversion;
+lazy_static! {
+    pub static ref ENGINE_QUERY_COUNTER: IntCounterVec = 
register_int_counter_vec!(
+        "engine_query_counter",
+        "engine_query_counter",
+        &["priority"]
+    )
+    .unwrap();
+}
diff --git a/interpreters/src/select.rs b/interpreters/src/select.rs
index eb701663..c33c7fdf 100644
--- a/interpreters/src/select.rs
+++ b/interpreters/src/select.rs
@@ -19,15 +19,19 @@ use futures::TryStreamExt;
 use generic_error::{BoxError, GenericError};
 use logger::debug;
 use macros::define_result;
-use query_engine::{executor::ExecutorRef, 
physical_planner::PhysicalPlannerRef};
-use query_frontend::plan::QueryPlan;
+use query_engine::{
+    context::ContextRef as QueryContextRef,
+    executor::ExecutorRef,
+    physical_planner::{PhysicalPlanRef, PhysicalPlannerRef},
+};
+use query_frontend::plan::{PriorityContext, QueryPlan};
+use runtime::{Priority, PriorityRuntime};
 use snafu::{ResultExt, Snafu};
-use table_engine::stream::SendableRecordBatchStream;
 
 use crate::{
     context::Context,
     interpreter::{Interpreter, InterpreterPtr, Output, Result as 
InterpreterResult, Select},
-    RecordBatchVec,
+    metrics::ENGINE_QUERY_COUNTER,
 };
 
 #[derive(Debug, Snafu)]
@@ -37,6 +41,9 @@ pub enum Error {
 
     #[snafu(display("Failed to execute physical plan, msg:{}, err:{}", msg, 
source))]
     ExecutePlan { msg: String, source: GenericError },
+
+    #[snafu(display("Failed to spawn task, err:{}", source))]
+    Spawn { source: runtime::Error },
 }
 
 define_result!(Error);
@@ -47,6 +54,7 @@ pub struct SelectInterpreter {
     plan: QueryPlan,
     executor: ExecutorRef,
     physical_planner: PhysicalPlannerRef,
+    query_runtime: PriorityRuntime,
 }
 
 impl SelectInterpreter {
@@ -55,12 +63,14 @@ impl SelectInterpreter {
         plan: QueryPlan,
         executor: ExecutorRef,
         physical_planner: PhysicalPlannerRef,
+        query_runtime: PriorityRuntime,
     ) -> InterpreterPtr {
         Box::new(Self {
             ctx,
             plan,
             executor,
             physical_planner,
+            query_runtime,
         })
     }
 }
@@ -69,21 +79,37 @@ impl SelectInterpreter {
 impl Interpreter for SelectInterpreter {
     async fn execute(self: Box<Self>) -> InterpreterResult<Output> {
         let request_id = self.ctx.request_id();
-        debug!(
-            "Interpreter execute select begin, request_id:{}, plan:{:?}",
-            request_id, self.plan
-        );
+        let plan = self.plan;
+        let priority = match plan.decide_query_priority(PriorityContext {
+            time_range_threshold: self.ctx.expensive_query_threshold(),
+        }) {
+            Some(v) => v,
+            None => {
+                debug!(
+                    "Query has invalid query range, return empty result 
directly, id:{request_id}, plan:{plan:?}"
+                );
+                return Ok(Output::Records(Vec::new()));
+            }
+        };
+
+        ENGINE_QUERY_COUNTER
+            .with_label_values(&[priority.as_str()])
+            .inc();
 
         let query_ctx = self
             .ctx
-            .new_query_context()
+            .new_query_context(priority)
             .context(CreateQueryContext)
             .context(Select)?;
 
+        debug!(
+            "Interpreter execute select begin, request_id:{request_id}, 
plan:{plan:?}, priority:{priority:?}"
+        );
+
         // Create physical plan.
         let physical_plan = self
             .physical_planner
-            .plan(&query_ctx, self.plan)
+            .plan(&query_ctx, plan)
             .await
             .box_err()
             .context(ExecutePlan {
@@ -91,34 +117,50 @@ impl Interpreter for SelectInterpreter {
             })
             .context(Select)?;
 
-        let record_batch_stream = self
-            .executor
-            .execute(&query_ctx, physical_plan)
+        if matches!(priority, Priority::Low) {
+            let executor = self.executor;
+            return self
+                .query_runtime
+                .spawn_with_priority(
+                    async move {
+                        execute_and_collect(query_ctx, executor, physical_plan)
+                            .await
+                            .context(Select)
+                    },
+                    Priority::Low,
+                )
+                .await
+                .context(Spawn)
+                .context(Select)?;
+        }
+
+        execute_and_collect(query_ctx, self.executor, physical_plan)
             .await
-            .box_err()
-            .context(ExecutePlan {
-                msg: "failed to execute physical plan",
-            })
-            .context(Select)?;
-
-        debug!(
-            "Interpreter execute select finish, request_id:{}",
-            request_id
-        );
-
-        let record_batches = collect(record_batch_stream).await?;
-
-        Ok(Output::Records(record_batches))
+            .context(Select)
     }
 }
 
-async fn collect(stream: SendableRecordBatchStream) -> 
InterpreterResult<RecordBatchVec> {
-    stream
-        .try_collect()
+async fn execute_and_collect(
+    query_ctx: QueryContextRef,
+    executor: ExecutorRef,
+    physical_plan: PhysicalPlanRef,
+) -> Result<Output> {
+    let record_batch_stream = executor
+        .execute(&query_ctx, physical_plan)
         .await
         .box_err()
         .context(ExecutePlan {
-            msg: "failed to collect execution results",
-        })
-        .context(Select)
+            msg: "failed to execute physical plan",
+        })?;
+
+    let record_batches =
+        record_batch_stream
+            .try_collect()
+            .await
+            .box_err()
+            .context(ExecutePlan {
+                msg: "failed to collect execution results",
+            })?;
+
+    Ok(Output::Records(record_batches))
 }
diff --git a/interpreters/src/tests.rs b/interpreters/src/tests.rs
index ed5081d4..1b11f5c9 100644
--- a/interpreters/src/tests.rs
+++ b/interpreters/src/tests.rs
@@ -29,6 +29,7 @@ use query_frontend::{
     config::DynamicConfig, parser::Parser, plan::Plan, planner::Planner, 
provider::MetaProvider,
     tests::MockMetaProvider,
 };
+use runtime::{Builder, PriorityRuntime};
 use table_engine::{engine::TableEngineRef, memory::MockRemoteEngine};
 
 use crate::{
@@ -62,6 +63,7 @@ where
     pub catalog_manager: ManagerRef,
     pub table_manipulator: TableManipulatorRef,
     pub query_engine: QueryEngineRef,
+    pub read_runtime: PriorityRuntime,
 }
 
 impl<M> Env<M>
@@ -84,6 +86,7 @@ where
             self.catalog_manager.clone(),
             self.engine(),
             self.table_manipulator.clone(),
+            self.read_runtime.clone(),
         )
     }
 
@@ -236,6 +239,7 @@ where
             catalog_manager.clone(),
             self.engine(),
             table_manipulator.clone(),
+            self.read_runtime.clone(),
         );
         let insert_sql = "INSERT INTO test_missing_columns_table(key1, key2, 
field4) VALUES('tagk', 1638428434000, 1), ('tagk2', 1638428434000, 10);";
 
@@ -256,6 +260,7 @@ where
             catalog_manager,
             self.engine(),
             table_manipulator,
+            self.read_runtime.clone(),
         );
         let ctx = Context::builder(RequestId::next_id(), None)
             .default_catalog_and_schema(DEFAULT_CATALOG.to_string(), 
DEFAULT_SCHEMA.to_string())
@@ -356,14 +361,21 @@ where
     }
 }
 
-#[tokio::test]
-async fn test_interpreters_rocks() {
-    test_util::init_log_for_test();
-    let rocksdb_ctx = RocksDBEngineBuildContext::default();
-    test_interpreters(rocksdb_ctx).await;
+#[test]
+fn test_interpreters_rocks() {
+    let rt = Arc::new(Builder::default().build().unwrap());
+    let read_runtime = PriorityRuntime::new(rt.clone(), rt.clone());
+    rt.block_on(async {
+        test_util::init_log_for_test();
+        let rocksdb_ctx = RocksDBEngineBuildContext::default();
+        test_interpreters(rocksdb_ctx, read_runtime).await;
+    })
 }
 
-async fn test_interpreters<T: EngineBuildContext>(engine_context: T) {
+async fn test_interpreters<T: EngineBuildContext>(
+    engine_context: T,
+    read_runtime: PriorityRuntime,
+) {
     let env = TestEnv::builder().build();
     let mut test_ctx = env.new_context(engine_context);
     test_ctx.open().await;
@@ -391,6 +403,7 @@ async fn test_interpreters<T: 
EngineBuildContext>(engine_context: T) {
         catalog_manager,
         table_manipulator,
         query_engine,
+        read_runtime,
     };
 
     env.test_create_table().await;
diff --git a/proxy/src/instance.rs b/proxy/src/instance.rs
index ec0d2e48..00087d40 100644
--- a/proxy/src/instance.rs
+++ b/proxy/src/instance.rs
@@ -21,6 +21,7 @@ use df_operator::registry::FunctionRegistryRef;
 use interpreters::table_manipulator::TableManipulatorRef;
 use query_engine::QueryEngineRef;
 use query_frontend::config::DynamicConfig as FrontendDynamicConfig;
+use runtime::PriorityRuntime;
 use table_engine::{engine::TableEngineRef, remote::RemoteEngineRef};
 
 use crate::limiter::Limiter;
@@ -29,6 +30,7 @@ use crate::limiter::Limiter;
 pub struct Instance {
     pub catalog_manager: ManagerRef,
     pub query_engine: QueryEngineRef,
+    pub query_runtime: PriorityRuntime,
     pub table_engine: TableEngineRef,
     pub partition_table_engine: TableEngineRef,
     // User defined functions registry.
diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs
index 74d26813..93b53590 100644
--- a/proxy/src/lib.rs
+++ b/proxy/src/lib.rs
@@ -39,7 +39,6 @@ mod write;
 pub const FORWARDED_FROM: &str = "forwarded-from";
 
 use std::{
-    ops::Bound,
     sync::Arc,
     time::{Duration, Instant},
 };
@@ -55,14 +54,9 @@ use ceresdbproto::storage::{
     storage_service_client::StorageServiceClient, PrometheusRemoteQueryRequest,
     PrometheusRemoteQueryResponse, Route,
 };
-use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID, ENABLE_TTL, 
TTL};
-use datafusion::{
-    prelude::{Column, Expr},
-    scalar::ScalarValue,
-};
+use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID};
 use futures::FutureExt;
 use generic_error::BoxError;
-use influxql_query::logical_optimizer::range_predicate::find_time_range;
 use interpreters::{
     context::Context as InterpreterContext,
     factory::Factory,
@@ -80,7 +74,6 @@ use table_engine::{
     table::{TableId, TableRef},
     PARTITION_TABLE_ENGINE_TYPE,
 };
-use time_ext::{current_time_millis, parse_duration};
 use tonic::{transport::Channel, IntoRequest};
 
 use crate::{
@@ -92,9 +85,6 @@ use crate::{
     schema_config_provider::SchemaConfigProviderRef,
 };
 
-// Because the clock may have errors, choose 1 hour as the error buffer
-const QUERY_EXPIRED_BUFFER: Duration = Duration::from_secs(60 * 60);
-
 #[derive(Clone, Debug, Deserialize, Serialize)]
 #[serde(default)]
 pub struct SubTableAccessPerm {
@@ -123,6 +113,7 @@ pub struct Proxy {
     cluster_with_meta: bool,
     sub_table_access_perm: SubTableAccessPerm,
     request_notifiers: Option<ReadRequestNotifiers>,
+    expensive_query_threshold: u64,
 }
 
 impl Proxy {
@@ -140,6 +131,7 @@ impl Proxy {
         cluster_with_meta: bool,
         sub_table_access_perm: SubTableAccessPerm,
         request_notifiers: Option<ReadRequestNotifiers>,
+        expensive_query_threshold: u64,
     ) -> Self {
         let forwarder = Arc::new(Forwarder::new(
             forward_config,
@@ -159,6 +151,7 @@ impl Proxy {
             cluster_with_meta,
             sub_table_access_perm,
             request_notifiers,
+            expensive_query_threshold,
         }
     }
 
@@ -211,70 +204,6 @@ impl Proxy {
         })
     }
 
-    /// Returns true when query range maybe exceeding ttl,
-    /// Note: False positive is possible
-    // TODO(tanruixiang): Add integration testing when supported by the testing
-    // framework
-    fn is_plan_expired(
-        &self,
-        plan: &Plan,
-        catalog_name: &str,
-        schema_name: &str,
-        table_name: &str,
-    ) -> Result<bool> {
-        if let Plan::Query(query) = &plan {
-            let catalog = self.get_catalog(catalog_name)?;
-            let schema = self.get_schema(&catalog, schema_name)?;
-            let table_ref = match self.get_table(&schema, table_name) {
-                Ok(Some(v)) => v,
-                _ => return Ok(false),
-            };
-            if let Some(value) = table_ref.options().get(ENABLE_TTL) {
-                if value == "false" {
-                    return Ok(false);
-                }
-            }
-            let ttl_duration = if let Some(ttl) = table_ref.options().get(TTL) 
{
-                if let Ok(ttl) = parse_duration(ttl) {
-                    ttl
-                } else {
-                    return Ok(false);
-                }
-            } else {
-                return Ok(false);
-            };
-
-            let timestamp_name = &table_ref
-                .schema()
-                .column(table_ref.schema().timestamp_index())
-                .name
-                .clone();
-            let ts_col = Column::from_name(timestamp_name);
-            let range = find_time_range(&query.df_plan, &ts_col)
-                .box_err()
-                .context(Internal {
-                    msg: "Failed to find time range",
-                })?;
-            match range.end {
-                Bound::Included(x) | Bound::Excluded(x) => {
-                    if let Expr::Literal(ScalarValue::Int64(Some(x))) = x {
-                        let now = current_time_millis() as i64;
-                        let deadline = now
-                            - ttl_duration.as_millis() as i64
-                            - QUERY_EXPIRED_BUFFER.as_millis() as i64;
-
-                        if x * 1_000 <= deadline {
-                            return Ok(true);
-                        }
-                    }
-                }
-                Bound::Unbounded => (),
-            }
-        }
-
-        Ok(false)
-    }
-
     fn get_catalog(&self, catalog_name: &str) -> Result<CatalogRef> {
         let catalog = self
             .instance
@@ -554,6 +483,7 @@ impl Proxy {
             // Use current ctx's catalog and schema as default catalog and 
schema
             .default_catalog_and_schema(catalog.to_string(), 
schema.to_string())
             .enable_partition_table_access(enable_partition_table_access)
+            .expensive_query_threshold(self.expensive_query_threshold)
             .build();
         let interpreter_factory = Factory::new(
             self.instance.query_engine.executor(),
@@ -561,6 +491,7 @@ impl Proxy {
             self.instance.catalog_manager.clone(),
             self.instance.table_engine.clone(),
             self.instance.table_manipulator.clone(),
+            self.instance.query_runtime.clone(),
         );
         interpreter_factory
             .create(interpreter_ctx, plan)
diff --git a/proxy/src/read.rs b/proxy/src/read.rs
index 9d93221c..effb88c0 100644
--- a/proxy/src/read.rs
+++ b/proxy/src/read.rs
@@ -28,6 +28,7 @@ use notifier::notifier::{ExecutionGuard, RequestNotifiers, 
RequestResult};
 use query_frontend::{
     frontend,
     frontend::{Context as SqlContext, Frontend},
+    plan::{Plan, PriorityContext},
     provider::CatalogMetaProvider,
 };
 use router::endpoint::Endpoint;
@@ -176,7 +177,7 @@ impl Proxy {
             .slow_threshold
             .load(std::sync::atomic::Ordering::Relaxed);
         let slow_threshold = Duration::from_secs(slow_threshold_secs);
-        let slow_timer = SlowTimer::new(request_id.as_str(), sql, 
slow_threshold);
+        let mut slow_timer = SlowTimer::new(request_id.as_str(), sql, 
slow_threshold);
         let deadline = ctx.timeout.map(|t| slow_timer.start_time() + t);
         let catalog = self.instance.catalog_manager.default_catalog_name();
 
@@ -243,13 +244,11 @@ impl Proxy {
                 })?;
         }
 
-        let mut plan_maybe_expired = false;
-        if let Some(table_name) = &table_name {
-            match self.is_plan_expired(&plan, catalog, schema, table_name) {
-                Ok(v) => plan_maybe_expired = v,
-                Err(err) => {
-                    warn!("Plan expire check failed, err:{err}");
-                }
+        if let Plan::Query(plan) = &plan {
+            if let Some(priority) = plan.decide_query_priority(PriorityContext 
{
+                time_range_threshold: self.expensive_query_threshold,
+            }) {
+                slow_timer.priority(priority);
             }
         }
 
@@ -276,27 +275,7 @@ impl Proxy {
             "Handle sql query finished, sql:{sql}, elapsed:{cost:?}, 
catalog:{catalog}, schema:{schema}, ctx:{ctx:?}",
         );
 
-        match &output {
-            Output::AffectedRows(_) => Ok(output),
-            Output::Records(v) => {
-                if plan_maybe_expired {
-                    let num_rows = v
-                        .iter()
-                        .fold(0_usize, |acc, record_batch| acc + 
record_batch.num_rows());
-                    if num_rows == 0 {
-                        warn!("Query time range maybe exceed TTL, sql:{sql}");
-
-                        // TODO: Cannot return this error directly, empty query
-                        // should return 200, not 4xx/5xx
-                        // All protocols should recognize this error.
-                        // return Err(Error::QueryMaybeExceedTTL {
-                        //     msg: format!("Query time range maybe exceed TTL,
-                        // sql:{sql}"), });
-                    }
-                }
-                Ok(output)
-            }
-        }
+        Ok(output)
     }
 
     async fn maybe_forward_sql_query(
diff --git a/query_engine/Cargo.toml b/query_engine/Cargo.toml
index df2e3240..4d9388fa 100644
--- a/query_engine/Cargo.toml
+++ b/query_engine/Cargo.toml
@@ -46,6 +46,7 @@ logger = { workspace = true }
 macros = { workspace = true }
 prost = { workspace = true }
 query_frontend = { workspace = true }
+runtime = { workspace = true }
 serde = { workspace = true }
 snafu = { workspace = true }
 table_engine = { workspace = true }
diff --git a/query_engine/src/config.rs b/query_engine/src/config.rs
index 1e6350ba..61462f3c 100644
--- a/query_engine/src/config.rs
+++ b/query_engine/src/config.rs
@@ -13,6 +13,7 @@
 // limitations under the License.
 
 use serde::{Deserialize, Serialize};
+use time_ext::ReadableDuration;
 
 // FIXME: Use cpu number as the default parallelism
 const DEFAULT_READ_PARALLELISM: usize = 8;
@@ -21,12 +22,14 @@ const DEFAULT_READ_PARALLELISM: usize = 8;
 #[serde(default)]
 pub struct Config {
     pub read_parallelism: usize,
+    pub expensive_query_threshold: ReadableDuration,
 }
 
 impl Default for Config {
     fn default() -> Self {
         Self {
             read_parallelism: DEFAULT_READ_PARALLELISM,
+            expensive_query_threshold: ReadableDuration::hours(24),
         }
     }
 }
diff --git a/query_engine/src/context.rs b/query_engine/src/context.rs
index 2bf6aebf..55493aec 100644
--- a/query_engine/src/context.rs
+++ b/query_engine/src/context.rs
@@ -17,6 +17,7 @@
 use std::{sync::Arc, time::Instant};
 
 use common_types::request_id::RequestId;
+use runtime::Priority;
 
 pub type ContextRef = Arc<Context>;
 
@@ -27,4 +28,5 @@ pub struct Context {
     pub deadline: Option<Instant>,
     pub default_catalog: String,
     pub default_schema: String,
+    pub priority: Priority,
 }
diff --git a/query_engine/src/datafusion_impl/mod.rs 
b/query_engine/src/datafusion_impl/mod.rs
index ed8d963f..dfab1fd0 100644
--- a/query_engine/src/datafusion_impl/mod.rs
+++ b/query_engine/src/datafusion_impl/mod.rs
@@ -12,17 +12,15 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::{fmt, sync::Arc, time::Instant};
+use std::{sync::Arc, time::Instant};
 
 use catalog::manager::ManagerRef as CatalogManager;
 use datafusion::{
     execution::{
-        context::{QueryPlanner, SessionState},
+        context::SessionState,
         runtime_env::{RuntimeConfig, RuntimeEnv},
         FunctionRegistry,
     },
-    optimizer::analyzer::Analyzer,
-    physical_optimizer::PhysicalOptimizerRule,
     prelude::{SessionConfig, SessionContext},
 };
 use df_engine_extensions::codec::PhysicalExtensionCodecImpl;
@@ -31,8 +29,7 @@ use table_engine::{provider::CeresdbOptions, 
remote::RemoteEngineRef};
 use crate::{
     context::Context,
     datafusion_impl::{
-        executor::DatafusionExecutorImpl, 
logical_optimizer::type_conversion::TypeConversion,
-        physical_planner::DatafusionPhysicalPlannerImpl,
+        executor::DatafusionExecutorImpl, 
physical_planner::DatafusionPhysicalPlannerImpl,
         physical_planner_extension::QueryPlannerAdapter, 
task_context::Preprocessor,
     },
     executor::ExecutorRef,
@@ -41,7 +38,6 @@ use crate::{
 };
 
 pub mod executor;
-pub mod logical_optimizer;
 pub mod physical_optimizer;
 pub mod physical_plan;
 pub mod physical_plan_extension;
@@ -67,15 +63,12 @@ impl DatafusionQueryEngineImpl {
     ) -> Result<Self> {
         let runtime_env = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
         let df_physical_planner = Arc::new(QueryPlannerAdapter);
-        let df_ctx_builder = Arc::new(DfContextBuilder::new(
-            config,
-            runtime_env.clone(),
+        let df_ctx_builder = Arc::new(DfContextBuilder::new(config, 
runtime_env.clone()));
+        let physical_planner = Arc::new(DatafusionPhysicalPlannerImpl::new(
+            df_ctx_builder.clone(),
             df_physical_planner,
         ));
 
-        // Physical planner
-        let physical_planner = 
Arc::new(DatafusionPhysicalPlannerImpl::new(df_ctx_builder.clone()));
-
         // Executor
         let extension_codec = Arc::new(PhysicalExtensionCodecImpl::new());
         let preprocessor = Arc::new(Preprocessor::new(
@@ -105,33 +98,17 @@ impl QueryEngine for DatafusionQueryEngineImpl {
 }
 
 /// Datafusion context builder
-#[derive(Clone)]
+#[derive(Debug, Clone)]
 pub struct DfContextBuilder {
     config: Config,
     runtime_env: Arc<RuntimeEnv>,
-    physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
-}
-
-impl fmt::Debug for DfContextBuilder {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        f.debug_struct("DfContextBuilder")
-            .field("config", &self.config)
-            .field("runtime_env", &self.runtime_env)
-            .field("physical_planner", &"QueryPlannerAdapter")
-            .finish()
-    }
 }
 
 impl DfContextBuilder {
-    pub fn new(
-        config: Config,
-        runtime_env: Arc<RuntimeEnv>,
-        physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
-    ) -> Self {
+    pub fn new(config: Config, runtime_env: Arc<RuntimeEnv>) -> Self {
         Self {
             config,
             runtime_env,
-            physical_planner,
         }
     }
 
@@ -144,6 +121,7 @@ impl DfContextBuilder {
             request_timeout: timeout,
             default_catalog: ctx.default_catalog.clone(),
             default_schema: ctx.default_schema.clone(),
+            priority: ctx.priority,
         };
         let mut df_session_config = SessionConfig::new()
             .with_default_catalog_and_schema(
@@ -159,40 +137,7 @@ impl DfContextBuilder {
 
         // Using default logcial optimizer, if want to add more custom rule, 
using
         // `add_optimizer_rule` to add.
-        let state = SessionState::with_config_rt(df_session_config, 
self.runtime_env.clone())
-            .with_query_planner(self.physical_planner.clone());
-
-        // Register analyzer rules
-        let state = Self::register_analyzer_rules(state);
-
-        // Register iox optimizers, used by influxql.
-        let state = 
influxql_query::logical_optimizer::register_iox_logical_optimizers(state);
-
+        let state = SessionState::with_config_rt(df_session_config, 
self.runtime_env.clone());
         SessionContext::with_state(state)
     }
-
-    // TODO: this is not used now, bug of RepartitionAdapter is already fixed 
in
-    // datafusion itself. Remove this code in future.
-    #[allow(dead_code)]
-    fn apply_adapters_for_physical_optimize_rules(
-        default_rules: &[Arc<dyn PhysicalOptimizerRule + Send + Sync>],
-    ) -> Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> {
-        let mut new_rules = Vec::with_capacity(default_rules.len());
-        for rule in default_rules {
-            
new_rules.push(physical_optimizer::may_adapt_optimize_rule(rule.clone()))
-        }
-
-        new_rules
-    }
-
-    fn register_analyzer_rules(mut state: SessionState) -> SessionState {
-        // Our analyzer has high priority, so first add we custom rules, then 
add the
-        // default ones.
-        state = state.with_analyzer_rules(vec![Arc::new(TypeConversion)]);
-        for rule in Analyzer::new().rules {
-            state = state.add_analyzer_rule(rule);
-        }
-
-        state
-    }
 }
diff --git a/query_engine/src/datafusion_impl/physical_planner.rs 
b/query_engine/src/datafusion_impl/physical_planner.rs
index 58287339..8ee4198f 100644
--- a/query_engine/src/datafusion_impl/physical_planner.rs
+++ b/query_engine/src/datafusion_impl/physical_planner.rs
@@ -12,11 +12,12 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::sync::Arc;
+use std::{fmt, sync::Arc};
 
 use async_trait::async_trait;
+use datafusion::execution::context::QueryPlanner;
 use generic_error::BoxError;
-use query_frontend::{plan::QueryPlan, provider::CatalogProviderAdapter};
+use query_frontend::plan::QueryPlan;
 use snafu::ResultExt;
 
 use crate::{
@@ -30,14 +31,29 @@ use crate::{
 };
 
 /// Physical planner based on datafusion
-#[derive(Debug, Clone)]
+#[derive(Clone)]
 pub struct DatafusionPhysicalPlannerImpl {
     df_ctx_builder: Arc<DfContextBuilder>,
+    physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
+}
+
+impl fmt::Debug for DatafusionPhysicalPlannerImpl {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("DfContextBuilder")
+            .field("df_ctx_builder", &self.df_ctx_builder)
+            .finish()
+    }
 }
 
 impl DatafusionPhysicalPlannerImpl {
-    pub fn new(df_ctx_builder: Arc<DfContextBuilder>) -> Self {
-        Self { df_ctx_builder }
+    pub fn new(
+        df_ctx_builder: Arc<DfContextBuilder>,
+        physical_planner: Arc<dyn QueryPlanner + Send + Sync>,
+    ) -> Self {
+        Self {
+            df_ctx_builder,
+            physical_planner,
+        }
     }
 
     fn has_partitioned_table(logical_plan: &QueryPlan) -> bool {
@@ -59,21 +75,16 @@ impl DatafusionPhysicalPlannerImpl {
 impl PhysicalPlanner for DatafusionPhysicalPlannerImpl {
     // TODO: we should modify `QueryPlan` to support create remote plan here.
     async fn plan(&self, ctx: &Context, logical_plan: QueryPlan) -> 
Result<PhysicalPlanRef> {
-        // Register catalogs to datafusion execution context.
-        let catalogs = 
CatalogProviderAdapter::new_adapters(logical_plan.tables.clone());
         // TODO: maybe we should not build `SessionContext` in each physical 
plan's
         // building. We need to do so because we place some dynamic
         // information(such as `timeout`) in `SessionConfig`, maybe it is 
better
         // to remove it to `TaskContext`.
         let df_ctx = self.df_ctx_builder.build(ctx);
-        for (name, catalog) in catalogs {
-            df_ctx.register_catalog(&name, Arc::new(catalog));
-        }
+        let state = df_ctx.state();
 
-        // Generate physical plan.
-        let exec_plan = df_ctx
-            .state()
-            .create_physical_plan(&logical_plan.df_plan)
+        let exec_plan = self
+            .physical_planner
+            .create_physical_plan(&logical_plan.df_plan, &state)
             .await
             .box_err()
             .context(PhysicalPlannerWithCause { msg: None })?;
diff --git a/query_engine/src/datafusion_impl/task_context.rs 
b/query_engine/src/datafusion_impl/task_context.rs
index f19c5dde..8aefd563 100644
--- a/query_engine/src/datafusion_impl/task_context.rs
+++ b/query_engine/src/datafusion_impl/task_context.rs
@@ -38,6 +38,7 @@ use df_engine_extensions::dist_sql_query::{
 use futures::future::BoxFuture;
 use generic_error::BoxError;
 use prost::Message;
+use runtime::Priority;
 use snafu::ResultExt;
 use table_engine::{
     provider::{CeresdbOptions, ScanTable, SCAN_TABLE_METRICS_COLLECTOR_NAME},
@@ -191,6 +192,7 @@ impl RemotePhysicalPlanExecutor for 
RemotePhysicalPlanExecutorImpl {
             .map(|n| Instant::now() + Duration::from_millis(n));
         let default_catalog = ceresdb_options.default_catalog.clone();
         let default_schema = ceresdb_options.default_schema.clone();
+        let priority = ceresdb_options.priority;
 
         let display_plan = DisplayableExecutionPlan::new(plan.as_ref());
         let exec_ctx = ExecContext {
@@ -199,6 +201,7 @@ impl RemotePhysicalPlanExecutor for 
RemotePhysicalPlanExecutorImpl {
             default_catalog,
             default_schema,
             query: display_plan.indent(true).to_string(),
+            priority,
         };
 
         // Encode plan and schema
@@ -261,6 +264,7 @@ impl DistQueryResolverBuilder {
             self.remote_executor.clone(),
             self.catalog_manager.clone(),
             scan_builder,
+            ctx.priority,
         )
     }
 }
@@ -278,6 +282,7 @@ impl ExecutableScanBuilder for ExecutableScanBuilderImpl {
         &self,
         table: TableRef,
         ctx: TableScanContext,
+        priority: Priority,
     ) -> DfResult<Arc<dyn ExecutionPlan>> {
         let read_opts = ReadOptions {
             batch_size: ctx.batch_size,
@@ -291,6 +296,7 @@ impl ExecutableScanBuilder for ExecutableScanBuilderImpl {
             projected_schema: ctx.projected_schema,
             predicate: ctx.predicate,
             metrics_collector: 
MetricsCollector::new(SCAN_TABLE_METRICS_COLLECTOR_NAME.to_string()),
+            priority,
         };
 
         let mut scan = ScanTable::new(table, read_request);
diff --git a/query_frontend/Cargo.toml b/query_frontend/Cargo.toml
index 998beb2b..51705ab2 100644
--- a/query_frontend/Cargo.toml
+++ b/query_frontend/Cargo.toml
@@ -36,6 +36,7 @@ arrow = { workspace = true }
 async-trait = { workspace = true }
 catalog = { workspace = true }
 ceresdbproto = { workspace = true }
+chrono = { workspace = true }
 cluster = { workspace = true }
 codec = { workspace = true }
 common_types = { workspace = true }
@@ -46,6 +47,7 @@ generic_error = { workspace = true }
 hash_ext = { workspace = true }
 influxql-logical-planner = { workspace = true }
 influxql-parser = { workspace = true }
+influxql-query = { workspace = true }
 influxql-schema = { workspace = true }
 itertools = { workspace = true }
 lazy_static = { workspace = true }
@@ -56,6 +58,7 @@ paste = { workspace = true }
 prom-remote-api = { workspace = true }
 regex = { workspace = true }
 regex-syntax = "0.6.28"
+runtime = { workspace = true }
 snafu = { workspace = true }
 sqlparser = { workspace = true }
 table_engine = { workspace = true }
diff --git a/query_frontend/src/frontend.rs b/query_frontend/src/frontend.rs
index 7208cfbe..aff6eabb 100644
--- a/query_frontend/src/frontend.rs
+++ b/query_frontend/src/frontend.rs
@@ -212,50 +212,53 @@ impl<P: MetaProvider> Frontend<P> {
     }
 }
 
-pub fn parse_table_name(statements: &StatementVec) -> Option<String> {
-    // maybe have empty sql
-    if statements.is_empty() {
-        return None;
-    }
-    match &statements[0] {
-        Statement::Standard(s) => match *s.clone() {
-            SqlStatement::Insert { table_name, .. } => {
-                Some(TableName::from(table_name).to_string())
-            }
-            SqlStatement::Explain { statement, .. } => {
-                if let SqlStatement::Query(q) = *statement {
-                    match *q.body {
-                        SetExpr::Select(select) => {
-                            if select.from.len() != 1 {
-                                None
-                            } else if let TableFactor::Table { name, .. } = 
&select.from[0].relation
-                            {
-                                Some(TableName::from(name.clone()).to_string())
-                            } else {
-                                None
-                            }
+pub fn parse_table_name_with_standard(sql_statement: &SqlStatement) -> 
Option<String> {
+    match sql_statement.clone() {
+        SqlStatement::Insert { table_name, .. } => {
+            Some(TableName::from(table_name.clone()).to_string())
+        }
+        SqlStatement::Explain { statement, .. } => {
+            if let SqlStatement::Query(q) = *statement {
+                match *q.body {
+                    SetExpr::Select(select) => {
+                        if select.from.len() != 1 {
+                            None
+                        } else if let TableFactor::Table { name, .. } = 
&select.from[0].relation {
+                            Some(TableName::from(name.clone()).to_string())
+                        } else {
+                            None
                         }
-                        // TODO: return unsupported error rather than none.
-                        _ => None,
                     }
+                    // TODO: return unsupported error rather than none.
+                    _ => None,
+                }
+            } else {
+                None
+            }
+        }
+        SqlStatement::Query(q) => match *q.body {
+            SetExpr::Select(select) => {
+                if select.from.len() != 1 {
+                    None
+                } else if let TableFactor::Table { name, .. } = 
&select.from[0].relation {
+                    Some(TableName::from(name.clone()).to_string())
                 } else {
                     None
                 }
             }
-            SqlStatement::Query(q) => match *q.body {
-                SetExpr::Select(select) => {
-                    if select.from.len() != 1 {
-                        None
-                    } else if let TableFactor::Table { name, .. } = 
&select.from[0].relation {
-                        Some(TableName::from(name.clone()).to_string())
-                    } else {
-                        None
-                    }
-                }
-                _ => None,
-            },
             _ => None,
         },
+        _ => None,
+    }
+}
+
+pub fn parse_table_name(statements: &StatementVec) -> Option<String> {
+    // maybe have empty sql
+    if statements.is_empty() {
+        return None;
+    }
+    match &statements[0] {
+        Statement::Standard(s) => parse_table_name_with_standard(s),
         Statement::Create(s) => Some(s.table_name.to_string()),
         Statement::Drop(s) => Some(s.table_name.to_string()),
         Statement::Describe(s) => Some(s.table_name.to_string()),
diff --git a/query_frontend/src/influxql/planner.rs 
b/query_frontend/src/influxql/planner.rs
index dc3b3693..960da307 100644
--- a/query_frontend/src/influxql/planner.rs
+++ b/query_frontend/src/influxql/planner.rs
@@ -37,6 +37,7 @@ use table_engine::table::TableRef;
 
 use crate::{
     influxql::error::*,
+    logical_optimizer::optimize_plan,
     plan::{Plan, QueryPlan, QueryType, ShowPlan, ShowTablesPlan},
     provider::{ContextProviderAdapter, MetaProvider},
 };
@@ -171,6 +172,12 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
                         .context(BuildPlanWithCause {
                             msg: "planner stmt to plan",
                         })?;
+                let df_plan = optimize_plan(&df_plan)
+                    .box_err()
+                    .context(BuildPlanWithCause {
+                        msg: "optimize plan",
+                    })?;
+
                 let tables = Arc::new(
                     self.schema_provider
                         .context_provider
@@ -180,7 +187,11 @@ impl<'a, P: MetaProvider> Planner<'a, P> {
                             msg: "get tables from context_provider",
                         })?,
                 );
-                Ok(Plan::Query(QueryPlan { df_plan, tables }))
+                Ok(Plan::Query(QueryPlan {
+                    df_plan,
+                    tables,
+                    table_name: None,
+                }))
             }
         }
     }
diff --git a/query_frontend/src/lib.rs b/query_frontend/src/lib.rs
index ee96af79..c8a2617c 100644
--- a/query_frontend/src/lib.rs
+++ b/query_frontend/src/lib.rs
@@ -23,6 +23,7 @@ pub mod config;
 pub mod container;
 pub mod frontend;
 pub mod influxql;
+mod logical_optimizer;
 pub mod parser;
 mod partition;
 pub mod plan;
diff --git a/query_frontend/src/logical_optimizer/mod.rs 
b/query_frontend/src/logical_optimizer/mod.rs
new file mode 100644
index 00000000..9fcd46b3
--- /dev/null
+++ b/query_frontend/src/logical_optimizer/mod.rs
@@ -0,0 +1,49 @@
+// Copyright 2023 The HoraeDB Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Logical optimizer
+
+mod type_conversion;
+use std::sync::Arc;
+
+use datafusion::{
+    error::Result,
+    execution::{context::SessionState, runtime_env::RuntimeEnv},
+    logical_expr::LogicalPlan,
+    optimizer::analyzer::Analyzer,
+    prelude::SessionConfig,
+};
+use type_conversion::TypeConversion;
+
+pub fn optimize_plan(plan: &LogicalPlan) -> Result<LogicalPlan> {
+    let state = SessionState::with_config_rt(SessionConfig::new(), 
Arc::new(RuntimeEnv::default()));
+    let state = register_analyzer_rules(state);
+    // Register iox optimizers, used by influxql.
+    let state = 
influxql_query::logical_optimizer::register_iox_logical_optimizers(state);
+
+    let plan = state.optimize(plan)?;
+
+    Ok(plan)
+}
+
+fn register_analyzer_rules(mut state: SessionState) -> SessionState {
+    // Our analyzer has high priority, so first add we custom rules, then add 
the
+    // default ones.
+    state = 
state.with_analyzer_rules(vec![Arc::new(crate::logical_optimizer::TypeConversion)]);
+    for rule in Analyzer::new().rules {
+        state = state.add_analyzer_rule(rule);
+    }
+
+    state
+}
diff --git 
a/query_engine/src/datafusion_impl/logical_optimizer/type_conversion.rs 
b/query_frontend/src/logical_optimizer/type_conversion.rs
similarity index 98%
rename from 
query_engine/src/datafusion_impl/logical_optimizer/type_conversion.rs
rename to query_frontend/src/logical_optimizer/type_conversion.rs
index 29c1e956..f2b0992a 100644
--- a/query_engine/src/datafusion_impl/logical_optimizer/type_conversion.rs
+++ b/query_frontend/src/logical_optimizer/type_conversion.rs
@@ -372,7 +372,6 @@ mod tests {
     };
 
     use super::*;
-    use crate::datafusion_impl::logical_optimizer::type_conversion;
 
     fn expr_test_schema() -> DFSchemaRef {
         Arc::new(
@@ -591,7 +590,7 @@ mod tests {
             "2021-09-07 16:00:00Z",
         ];
         for string in date_string {
-            let result = 
type_conversion::string_to_timestamp_ms_workaround(string);
+            let result = string_to_timestamp_ms_workaround(string);
             assert!(result.is_err());
         }
 
@@ -600,7 +599,7 @@ mod tests {
         let t = NaiveTime::from_hms_milli_opt(16, 0, 0, 0).unwrap();
         let dt = NaiveDateTime::new(d, t);
         let expect = naive_datetime_to_timestamp(&date_string, dt).unwrap();
-        let result = 
type_conversion::string_to_timestamp_ms_workaround(&date_string);
+        let result = string_to_timestamp_ms_workaround(&date_string);
         if let Ok(ScalarValue::TimestampMillisecond(Some(mills), _)) = result {
             assert_eq!(mills, expect)
         }
diff --git a/query_frontend/src/plan.rs b/query_frontend/src/plan.rs
index f2dd864e..94c8bca6 100644
--- a/query_frontend/src/plan.rs
+++ b/query_frontend/src/plan.rs
@@ -18,14 +18,21 @@ use std::{
     collections::{BTreeMap, HashMap},
     fmt,
     fmt::{Debug, Formatter},
+    ops::Bound,
     sync::Arc,
 };
 
-use common_types::{column_schema::ColumnSchema, row::RowGroup, schema::Schema};
-use datafusion::logical_expr::{
-    expr::Expr as DfLogicalExpr, logical_plan::LogicalPlan as 
DataFusionLogicalPlan,
+use common_types::{column_schema::ColumnSchema, row::RowGroup, schema::Schema, 
time::TimeRange};
+use datafusion::{
+    logical_expr::{
+        expr::Expr as DfLogicalExpr, logical_plan::LogicalPlan as 
DataFusionLogicalPlan,
+    },
+    prelude::Column,
+    scalar::ScalarValue,
 };
+use logger::{debug, warn};
 use macros::define_result;
+use runtime::Priority;
 use snafu::Snafu;
 use table_engine::{partition::PartitionInfo, table::TableRef};
 
@@ -70,14 +77,132 @@ pub enum Plan {
     Exists(ExistsTablePlan),
 }
 
+pub struct PriorityContext {
+    pub time_range_threshold: u64,
+}
+
 pub struct QueryPlan {
     pub df_plan: DataFusionLogicalPlan,
+    pub table_name: Option<String>,
     // Contains the TableProviders so we can register the them to 
ExecutionContext later.
     // Use TableProviderAdapter here so we can get the underlying TableRef and 
also be
     // able to cast to Arc<dyn TableProvider + Send + Sync>
     pub tables: Arc<TableContainer>,
 }
 
+impl QueryPlan {
+    fn find_timestamp_column(&self) -> Option<Column> {
+        let table_name = self.table_name.as_ref()?;
+        let table_ref = self.tables.get(table_name.into())?;
+        let schema = table_ref.table.schema();
+        let timestamp_name = schema.timestamp_name();
+        Some(Column::from_name(timestamp_name))
+    }
+
+    /// This function is used to extract time range from the query plan.
+    /// It will return max possible time range. For example, if the query
+    /// contains no timestmap filter, it will return
+    /// `TimeRange::min_to_max()`
+    ///
+    /// Note: When it timestamp filter evals to false(such as ts < 10 and ts >
+    /// 100), it will return None, which means no valid time range for this
+    /// query.
+    fn extract_time_range(&self) -> Option<TimeRange> {
+        let ts_column = if let Some(v) = self.find_timestamp_column() {
+            v
+        } else {
+            warn!(
+                "Couldn't find time column, plan:{:?}, table_name:{:?}",
+                self.df_plan, self.table_name
+            );
+            return Some(TimeRange::min_to_max());
+        };
+        let time_range = match 
influxql_query::logical_optimizer::range_predicate::find_time_range(
+            &self.df_plan,
+            &ts_column,
+        ) {
+            Ok(v) => v,
+            Err(e) => {
+                warn!(
+                    "Couldn't find time range, plan:{:?}, err:{}",
+                    self.df_plan, e
+                );
+                return Some(TimeRange::min_to_max());
+            }
+        };
+
+        debug!(
+            "Extract time range, value:{time_range:?}, plan:{:?}",
+            self.df_plan
+        );
+        let mut start = i64::MIN;
+        match time_range.start {
+            Bound::Included(inclusive_start) => {
+                if let 
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+                    inclusive_start
+                {
+                    start = start.max(x);
+                }
+            }
+            Bound::Excluded(exclusive_start) => {
+                if let 
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+                    exclusive_start
+                {
+                    start = start.max(x + 1);
+                }
+            }
+            Bound::Unbounded => {}
+        }
+        let mut end = i64::MAX;
+        match time_range.end {
+            Bound::Included(inclusive_end) => {
+                if let 
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+                    inclusive_end
+                {
+                    end = end.min(x + 1);
+                }
+            }
+            Bound::Excluded(exclusive_start) => {
+                if let 
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+                    exclusive_start
+                {
+                    end = end.min(x);
+                }
+            }
+            Bound::Unbounded => {}
+        }
+
+        TimeRange::new(start.into(), end.into())
+    }
+
+    /// Decide the query priority based on the query plan.
+    /// When query contains invalid time range, it will return None.
+    // TODO: Currently we only consider the time range, consider other 
factors, such
+    // as the number of series, or slow log metrics.
+    pub fn decide_query_priority(&self, ctx: PriorityContext) -> 
Option<Priority> {
+        let threshold = ctx.time_range_threshold;
+        let time_range = self.extract_time_range()?;
+        let is_expensive = if let Some(v) = time_range
+            .exclusive_end()
+            .as_i64()
+            .checked_sub(time_range.inclusive_start().as_i64())
+        {
+            v as u64 >= threshold
+        } else {
+            // When overflow, we treat it as expensive query.
+            true
+        };
+
+        let priority = if is_expensive {
+            Priority::Low
+        } else {
+            Priority::High
+        };
+
+        Some(priority)
+    }
+}
+
 impl Debug for QueryPlan {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
         f.debug_struct("QueryPlan")
@@ -200,3 +325,81 @@ pub enum ShowPlan {
 pub struct ExistsTablePlan {
     pub exists: bool,
 }
+
+#[cfg(test)]
+mod tests {
+
+    use super::*;
+    use crate::planner::tests::sql_to_logical_plan;
+
+    #[test]
+    fn test_extract_time_range() {
+        // key2 is timestamp column
+        let testcases = [
+            (
+                "select * from test_table where key2 > 1 and key2 < 10",
+                Some((2, 10)),
+            ),
+            (
+                "select field1 from test_table where key2 > 1 and key2 < 10",
+                Some((2, 10)),
+            ),
+            (
+                "select * from test_table where key2 >= 1 and key2 <= 10",
+                Some((1, 11)),
+            ),
+            (
+                "select * from test_table where key2 < 1 and key2 > 10",
+                None,
+            ),
+            (
+                "select * from test_table where key2 < 1 ",
+                Some((i64::MIN, 1))
+            ),
+            (
+                "select * from test_table where key2 > 1 ",
+                Some((2, i64::MAX))
+            ),
+            // date literals
+            (
+                r#"select * from test_table where key2 >= "2023-11-21 
14:12:00+08:00" and key2 < "2023-11-21 14:22:00+08:00" "#,
+                Some((1700547120000, 1700547720000))
+            ),
+             // no timestamp filter
+            ("select * from test_table", Some((i64::MIN, i64::MAX))),
+            // aggr
+            (
+                "select key2, sum(field1) from test_table where key2 > 1 and 
key2 < 10 group by key2",
+                Some((2, 10)),
+            ),
+            // aggr & sort
+            (
+                "select key2, sum(field1) from test_table where key2 > 1 and 
key2 < 10 group by key2 order by key2",     Some((2, 10)),
+            ),
+            // explain
+            (
+                "explain select * from test_table where key2 > 1 and key2 < 
10",
+                Some((2, 10)),
+            ),
+            // analyze
+            (
+                "explain analyze select * from test_table where key2 > 1 and 
key2 < 10",
+                Some((2, 10)),
+            ),
+        ];
+
+        for case in testcases {
+            let sql = case.0;
+            let plan = sql_to_logical_plan(sql).unwrap();
+            let plan = match plan {
+                Plan::Query(v) => v,
+                _ => unreachable!(),
+            };
+            let expected = case
+                .1
+                .map(|v| TimeRange::new_unchecked(v.0.into(), v.1.into()));
+
+            assert_eq!(plan.extract_time_range(), expected, "sql:{}", sql);
+        }
+    }
+}
diff --git a/query_frontend/src/planner.rs b/query_frontend/src/planner.rs
index 7b1fa82e..559fbcd8 100644
--- a/query_frontend/src/planner.rs
+++ b/query_frontend/src/planner.rs
@@ -67,6 +67,8 @@ use crate::{
     },
     config::DynamicConfig,
     container::TableReference,
+    frontend::parse_table_name_with_standard,
+    logical_optimizer::optimize_plan,
     parser,
     partition::PartitionParser,
     plan::{
@@ -613,18 +615,20 @@ impl<'a, P: MetaProvider> PlannerDelegate<'a, P> {
 
     fn sql_statement_to_datafusion_plan(self, sql_stmt: SqlStatement) -> 
Result<Plan> {
         let df_planner = SqlToRel::new_with_options(&self.meta_provider, 
DEFAULT_PARSER_OPTS);
+        let table_name = parse_table_name_with_standard(&sql_stmt);
 
         let df_plan = df_planner
             .sql_statement_to_plan(sql_stmt)
             .context(DatafusionPlan)?;
+        let df_plan = optimize_plan(&df_plan).context(DatafusionPlan)?;
 
         debug!("Sql statement to datafusion plan, df_plan:\n{:#?}", df_plan);
 
         // Get all tables needed in the plan
         let tables = 
self.meta_provider.try_into_container().context(FindMeta)?;
-
         Ok(Plan::Query(QueryPlan {
             df_plan,
+            table_name,
             tables: Arc::new(tables),
         }))
     }
@@ -1389,7 +1393,7 @@ pub fn get_table_ref(table_name: &str) -> TableReference {
 }
 
 #[cfg(test)]
-mod tests {
+pub mod tests {
 
     use ceresdbproto::storage::{
         value, Field, FieldGroup, Tag, Value as PbValue, WriteSeriesEntry,
@@ -1416,7 +1420,7 @@ mod tests {
         Ok(())
     }
 
-    fn sql_to_logical_plan(sql: &str) -> Result<Plan> {
+    pub fn sql_to_logical_plan(sql: &str) -> Result<Plan> {
         let dyn_config = DynamicConfig::default();
         sql_to_logical_plan_with_config(sql, &dyn_config)
     }
@@ -1644,10 +1648,9 @@ mod tests {
         let sql = "select * from test_table;";
         quick_test(
             sql,
-            "Query(
+            r"Query(
     QueryPlan {
-        df_plan: Projection: test_table.key1, test_table.key2, 
test_table.field1, test_table.field2, test_table.field3, test_table.field4
-          TableScan: test_table,
+        df_plan: TableScan: test_table projection=[key1, key2, field1, field2, 
field3, field4],
     },
 )",
         )
diff --git a/query_frontend/src/promql/convert.rs 
b/query_frontend/src/promql/convert.rs
index e8ba5851..0109f9ac 100644
--- a/query_frontend/src/promql/convert.rs
+++ b/query_frontend/src/promql/convert.rs
@@ -154,7 +154,7 @@ impl Expr {
         meta_provider: ContextProviderAdapter<'_, P>,
         read_parallelism: usize,
     ) -> Result<(Plan, Arc<ColumnNames>)> {
-        let (logic_plan, column_name, _) =
+        let (logic_plan, column_name, table_name) =
             self.build_plan_iter(&meta_provider, INIT_LEVEL, 
read_parallelism)?;
         let tables = Arc::new(
             meta_provider
@@ -167,6 +167,7 @@ impl Expr {
             Plan::Query(QueryPlan {
                 df_plan: logic_plan,
                 tables,
+                table_name: Some(table_name),
             }),
             column_name,
         ))
diff --git a/query_frontend/src/promql/remote.rs 
b/query_frontend/src/promql/remote.rs
index 6b8c8f8f..4343048c 100644
--- a/query_frontend/src/promql/remote.rs
+++ b/query_frontend/src/promql/remote.rs
@@ -27,6 +27,7 @@ use prom_remote_api::types::{label_matcher, LabelMatcher, 
Query};
 use snafu::{OptionExt, ResultExt};
 
 use crate::{
+    logical_optimizer::optimize_plan,
     plan::{Plan, QueryPlan},
     promql::{
         convert::Selector,
@@ -81,6 +82,7 @@ pub fn remote_query_to_plan<P: MetaProvider>(
         .sort(sort_exprs)?
         .build()
         .context(BuildPlanError)?;
+    let df_plan = optimize_plan(&df_plan).context(BuildPlanError)?;
 
     let tables = Arc::new(
         meta_provider
@@ -90,7 +92,11 @@ pub fn remote_query_to_plan<P: MetaProvider>(
             })?,
     );
     Ok(RemoteQueryPlan {
-        plan: Plan::Query(QueryPlan { df_plan, tables }),
+        plan: Plan::Query(QueryPlan {
+            df_plan,
+            tables,
+            table_name: Some(metric),
+        }),
         field_col_name: field,
         timestamp_col_name: timestamp_col_name.to_string(),
     })
@@ -185,8 +191,8 @@ mod tests {
                 r#"
 Query(QueryPlan { df_plan: Sort: cpu.tsid ASC NULLS FIRST, cpu.time ASC NULLS 
FIRST
   Projection: cpu.tag1, cpu.tag2, cpu.time, cpu.tsid, cpu.value
-    Filter: cpu.tag1 = Utf8("some-value") AND cpu.time BETWEEN Int64(1000) AND 
Int64(2000)
-      TableScan: cpu })"#
+    Filter: cpu.tag1 = Utf8("some-value") AND cpu.time >= 
TimestampMillisecond(1000, None) AND cpu.time <= TimestampMillisecond(2000, 
None)
+      TableScan: cpu projection=[tsid, time, tag1, tag2, value], 
partial_filters=[cpu.tag1 = Utf8("some-value"), cpu.time >= 
TimestampMillisecond(1000, None), cpu.time <= TimestampMillisecond(2000, None)] 
})"#
                     .to_string()
             );
             assert_eq!(&field_col_name, "value");
@@ -217,8 +223,8 @@ Query(QueryPlan { df_plan: Sort: cpu.tsid ASC NULLS FIRST, 
cpu.time ASC NULLS FI
                 r#"
 Query(QueryPlan { df_plan: Sort: cpu.tsid ASC NULLS FIRST, cpu.time ASC NULLS 
FIRST
   Projection: cpu.tag1, cpu.tag2, cpu.time, cpu.tsid, cpu.field2
-    Filter: cpu.tag1 = Utf8("some-value") AND cpu.time BETWEEN Int64(1000) AND 
Int64(2000)
-      TableScan: cpu })"#
+    Filter: cpu.tag1 = Utf8("some-value") AND cpu.time >= 
TimestampMillisecond(1000, None) AND cpu.time <= TimestampMillisecond(2000, 
None)
+      TableScan: cpu projection=[tsid, time, tag1, tag2, field2], 
partial_filters=[cpu.tag1 = Utf8("some-value"), cpu.time >= 
TimestampMillisecond(1000, None), cpu.time <= TimestampMillisecond(2000, None)] 
})"#
                     .to_string()
             );
             assert_eq!(&field_col_name, "field2");
diff --git a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs 
b/server/src/grpc/remote_engine_service/metrics.rs
similarity index 65%
rename from query_engine/src/datafusion_impl/logical_optimizer/mod.rs
rename to server/src/grpc/remote_engine_service/metrics.rs
index a3b5130f..c6c8124a 100644
--- a/query_engine/src/datafusion_impl/logical_optimizer/mod.rs
+++ b/server/src/grpc/remote_engine_service/metrics.rs
@@ -12,8 +12,14 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-//! Logical optimizer
+use lazy_static::lazy_static;
+use prometheus::{register_int_counter_vec, IntCounterVec};
 
-#[cfg(test)]
-pub mod tests;
-pub mod type_conversion;
+lazy_static! {
+    pub static ref REMOTE_ENGINE_QUERY_COUNTER: IntCounterVec = 
register_int_counter_vec!(
+        "remote_engine_query_counter",
+        "remote_engine_query_counter",
+        &["priority"]
+    )
+    .unwrap();
+}
diff --git a/server/src/grpc/remote_engine_service/mod.rs 
b/server/src/grpc/remote_engine_service/mod.rs
index 11359300..eda9a61a 100644
--- a/server/src/grpc/remote_engine_service/mod.rs
+++ b/server/src/grpc/remote_engine_service/mod.rs
@@ -32,8 +32,8 @@ use ceresdbproto::{
         remote_engine_service_server::RemoteEngineService,
         row_group, AlterTableOptionsRequest, AlterTableOptionsResponse, 
AlterTableSchemaRequest,
         AlterTableSchemaResponse, ExecContext, ExecutePlanRequest, 
GetTableInfoRequest,
-        GetTableInfoResponse, MetricPayload, ReadRequest, ReadResponse, 
WriteBatchRequest,
-        WriteRequest, WriteResponse,
+        GetTableInfoResponse, MetricPayload, QueryPriority, ReadRequest, 
ReadResponse,
+        WriteBatchRequest, WriteRequest, WriteResponse,
     },
     storage::{arrow_payload, ArrowPayload},
 };
@@ -43,7 +43,7 @@ use futures::{
     Future,
 };
 use generic_error::BoxError;
-use logger::{error, info, slow_query};
+use logger::{debug, error, info, slow_query};
 use notifier::notifier::{ExecutionGuard, RequestNotifiers, RequestResult};
 use proxy::{
     hotspot::{HotspotRecorder, Message},
@@ -55,6 +55,7 @@ use query_engine::{
     physical_planner::PhysicalPlanRef,
     QueryEngineRef, QueryEngineType,
 };
+use runtime::{Priority, RuntimeRef};
 use snafu::{OptionExt, ResultExt};
 use table_engine::{
     engine::EngineRuntimes,
@@ -76,11 +77,15 @@ use crate::{
             REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC,
             REMOTE_ENGINE_WRITE_BATCH_NUM_ROWS_HISTOGRAM,
         },
-        remote_engine_service::error::{ErrNoCause, ErrWithCause, Result, 
StatusCode},
+        remote_engine_service::{
+            error::{ErrNoCause, ErrWithCause, Result, StatusCode},
+            metrics::REMOTE_ENGINE_QUERY_COUNTER,
+        },
     },
 };
 
 pub mod error;
+mod metrics;
 
 const STREAM_QUERY_CHANNEL_LEN: usize = 200;
 const DEFAULT_COMPRESS_MIN_LENGTH: usize = 80 * 1024;
@@ -139,15 +144,22 @@ struct ExecutePlanMetricCollector {
     query: String,
     request_id: RequestId,
     slow_threshold: Duration,
+    priority: Priority,
 }
 
 impl ExecutePlanMetricCollector {
-    fn new(request_id: String, query: String, slow_threshold_secs: u64) -> 
Self {
+    fn new(
+        request_id: String,
+        query: String,
+        slow_threshold_secs: u64,
+        priority: Priority,
+    ) -> Self {
         Self {
             start: Instant::now(),
             query,
             request_id: request_id.into(),
             slow_threshold: Duration::from_secs(slow_threshold_secs),
+            priority,
         }
     }
 }
@@ -157,12 +169,17 @@ impl MetricCollector for ExecutePlanMetricCollector {
         let cost = self.start.elapsed();
         if cost > self.slow_threshold {
             slow_query!(
-                "Remote query elapsed:{:?}, id:{}, query:{}",
+                "Remote query elapsed:{:?}, id:{}, priority:{}, query:{}",
                 cost,
                 self.request_id,
+                self.priority.as_str(),
                 self.query
             );
         }
+
+        REMOTE_ENGINE_QUERY_COUNTER
+            .with_label_values(&[self.priority.as_str()])
+            .inc();
         REMOTE_ENGINE_GRPC_HANDLER_DURATION_HISTOGRAM_VEC
             .execute_physical_plan
             .observe(cost.as_secs_f64());
@@ -415,6 +432,8 @@ impl RemoteEngineServiceImpl {
                     query,
                     request_notifiers.clone(),
                     config.notify_timeout.0,
+                    // TODO: decide runtime from request priority.
+                    self.runtimes.read_runtime.high(),
                 )
                 .await?;
             }
@@ -435,6 +454,7 @@ impl RemoteEngineServiceImpl {
         query: F,
         notifiers: Arc<RequestNotifiers<K, mpsc::Sender<Result<RecordBatch>>>>,
         notify_timeout: Duration,
+        rt: &RuntimeRef,
     ) -> Result<()>
     where
         K: Hash + PartialEq + Eq,
@@ -444,7 +464,7 @@ impl RemoteEngineServiceImpl {
         let mut guard = ExecutionGuard::new(|| {
             notifiers.take_notifiers(&request_key);
         });
-        let handle = self.runtimes.read_runtime.spawn(query);
+        let handle = rt.spawn(query);
         let streams = handle.await.box_err().context(ErrWithCause {
             code: StatusCode::Internal,
             msg: "fail to join task",
@@ -459,7 +479,7 @@ impl RemoteEngineServiceImpl {
                 })
             });
 
-            let handle = self.runtimes.read_runtime.spawn(async move {
+            let handle = rt.spawn(async move {
                 let mut batches = Vec::new();
                 while let Some(batch) = stream.next().await {
                     batches.push(batch)
@@ -486,7 +506,7 @@ impl RemoteEngineServiceImpl {
         let notifiers = notifiers.take_notifiers(&request_key).unwrap();
 
         // Do send in background to avoid blocking the rpc procedure.
-        self.runtimes.read_runtime.spawn(async move {
+        rt.spawn(async move {
             Self::send_dedupped_resps(resps, notifiers, notify_timeout).await;
         });
 
@@ -676,26 +696,36 @@ impl RemoteEngineServiceImpl {
             .slow_threshold
             .load(std::sync::atomic::Ordering::Relaxed);
 
-        let metric = ExecutePlanMetricCollector::new(
-            ctx.request_id_str.clone(),
-            ctx.displayable_query,
-            slow_threshold_secs,
-        );
+        let priority = ctx.priority();
         let query_ctx = create_query_ctx(
             ctx.request_id_str,
             ctx.default_catalog,
             ctx.default_schema,
             ctx.timeout_ms,
+            priority,
         );
 
+        debug!(
+            "Execute remote query, ctx:{query_ctx:?}, query:{}",
+            &ctx.displayable_query
+        );
+        let metric = ExecutePlanMetricCollector::new(
+            ctx.request_id.to_string(),
+            ctx.displayable_query,
+            slow_threshold_secs,
+            query_ctx.priority,
+        );
         let physical_plan = 
Arc::new(DataFusionPhysicalPlanAdapter::new(TypedPlan::Remote(
             encoded_plan,
         )));
-        let physical_plan_clone = physical_plan.clone();
 
-        let stream = self
+        let rt = self
             .runtimes
             .read_runtime
+            .choose_runtime(&query_ctx.priority);
+        let physical_plan_clone = physical_plan.clone();
+
+        let stream = rt
             .spawn(async move { handle_execute_plan(query_ctx, physical_plan, 
query_engine).await })
             .await
             .box_err()
@@ -730,18 +760,20 @@ impl RemoteEngineServiceImpl {
             .dyn_config
             .slow_threshold
             .load(std::sync::atomic::Ordering::Relaxed);
-        let metric = ExecutePlanMetricCollector::new(
-            ctx.request_id_str.clone(),
-            ctx.displayable_query,
-            slow_threshold_secs,
-        );
+        let priority = ctx.priority();
         let query_ctx = create_query_ctx(
             ctx.request_id_str,
             ctx.default_catalog,
             ctx.default_schema,
             ctx.timeout_ms,
+            priority,
+        );
+        let metric = ExecutePlanMetricCollector::new(
+            ctx.request_id.to_string(),
+            ctx.displayable_query,
+            slow_threshold_secs,
+            query_ctx.priority,
         );
-
         let key = PhysicalPlanKey {
             encoded_plan: encoded_plan.clone(),
         };
@@ -758,6 +790,10 @@ impl RemoteEngineServiceImpl {
             ..
         } = query_dedup;
 
+        let rt = self
+            .runtimes
+            .read_runtime
+            .choose_runtime(&query_ctx.priority);
         let (tx, rx) = mpsc::channel(config.notify_queue_cap);
         match physical_plan_notifiers.insert_notifier(key.clone(), tx) {
             // The first request, need to handle it, and then notify the other 
requests.
@@ -772,6 +808,7 @@ impl RemoteEngineServiceImpl {
                     query,
                     physical_plan_notifiers,
                     config.notify_timeout.0,
+                    rt,
                 )
                 .await?;
             }
@@ -1190,6 +1227,7 @@ fn create_query_ctx(
     default_catalog: String,
     default_schema: String,
     timeout_ms: i64,
+    priority: QueryPriority,
 ) -> QueryContext {
     let request_id = RequestId::from(request_id);
     let deadline = if timeout_ms >= 0 {
@@ -1197,12 +1235,17 @@ fn create_query_ctx(
     } else {
         None
     };
+    let priority = match priority {
+        QueryPriority::Low => Priority::Low,
+        QueryPriority::High => Priority::High,
+    };
 
     QueryContext {
         request_id,
         deadline,
         default_catalog,
         default_schema,
+        priority,
     }
 }
 
diff --git a/server/src/http.rs b/server/src/http.rs
index 068f21fa..389e7712 100644
--- a/server/src/http.rs
+++ b/server/src/http.rs
@@ -43,7 +43,7 @@ use proxy::{
     Proxy,
 };
 use router::endpoint::Endpoint;
-use runtime::{Runtime, RuntimeRef};
+use runtime::{PriorityRuntime, Runtime};
 use serde::Serialize;
 use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
 use table_engine::{engine::EngineRuntimes, table::FlushRequest};
@@ -310,7 +310,7 @@ impl Service {
             .and(self.with_proxy())
             .and(self.with_read_runtime())
             .and_then(
-                |req, mut ctx: RequestContext, proxy: Arc<Proxy>, runtime: 
RuntimeRef| async move {
+                |req, mut ctx: RequestContext, proxy: Arc<Proxy>, runtime: 
PriorityRuntime| async move {
                     // We don't timeout http api since it's mainly used for 
debugging.
                     ctx.timeout = None;
 
@@ -774,7 +774,7 @@ impl Service {
 
     fn with_read_runtime(
         &self,
-    ) -> impl Filter<Extract = (Arc<Runtime>,), Error = Infallible> + Clone {
+    ) -> impl Filter<Extract = (PriorityRuntime,), Error = Infallible> + Clone 
{
         let runtime = self.engine_runtimes.read_runtime.clone();
         warp::any().map(move || runtime.clone())
     }
diff --git a/server/src/server.rs b/server/src/server.rs
index 27f59c18..9d418a32 100644
--- a/server/src/server.rs
+++ b/server/src/server.rs
@@ -378,6 +378,7 @@ impl Builder {
         let config_content = 
self.config_content.context(MissingConfigContent)?;
         let query_engine_config = 
self.query_engine_config.context(MissingQueryEngineConfig)?;
         let datafusion_context = 
self.datatfusion_context.context(MissingDatafusionContext)?;
+        let expensive_query_threshold = 
query_engine_config.expensive_query_threshold.as_millis();
 
         let hotspot_recorder = Arc::new(HotspotRecorder::new(
             self.server_config.hotspot,
@@ -412,6 +413,7 @@ impl Builder {
             let instance = Instance {
                 catalog_manager,
                 query_engine,
+                query_runtime: engine_runtimes.read_runtime.clone(),
                 table_engine,
                 partition_table_engine,
                 function_registry,
@@ -459,6 +461,7 @@ impl Builder {
             self.cluster.is_some(),
             self.server_config.sub_table_access_perm,
             request_notifiers,
+            expensive_query_threshold,
         ));
 
         let http_service = http::Builder::new(http_config)
diff --git a/src/ceresdb/src/config.rs b/src/ceresdb/src/config.rs
index b23476d4..676df8ef 100644
--- a/src/ceresdb/src/config.rs
+++ b/src/ceresdb/src/config.rs
@@ -91,7 +91,7 @@ pub enum ClusterDeployment {
 #[derive(Clone, Debug, Deserialize, Serialize)]
 #[serde(default)]
 pub struct RuntimeConfig {
-    /// Runtime for reading data
+    /// High priority runtime for reading data
     pub read_thread_num: usize,
     /// The size of the stack used by the read thread
     ///
@@ -99,6 +99,8 @@ pub struct RuntimeConfig {
     /// TODO: this config may be removed in the future when the complex query
     /// won't overflow the stack.
     pub read_thread_stack_size: ReadableSize,
+    /// Low priority runtime for reading data
+    pub low_read_thread_num: usize,
     /// Runtime for writing data
     pub write_thread_num: usize,
     /// Runtime for communicating with meta cluster
@@ -116,6 +118,7 @@ impl Default for RuntimeConfig {
         Self {
             read_thread_num: 8,
             read_thread_stack_size: ReadableSize::mb(16),
+            low_read_thread_num: 1,
             write_thread_num: 8,
             meta_thread_num: 2,
             compact_thread_num: 4,
diff --git a/src/ceresdb/src/setup.rs b/src/ceresdb/src/setup.rs
index de78aa3d..a7ae6a31 100644
--- a/src/ceresdb/src/setup.rs
+++ b/src/ceresdb/src/setup.rs
@@ -32,6 +32,7 @@ use proxy::{
     },
 };
 use router::{rule_based::ClusterView, ClusterBasedRouter, RuleBasedRouter};
+use runtime::PriorityRuntime;
 use server::{
     config::{StaticRouteConfig, StaticTopologyConfig},
     local_tables::LocalTablesRecoverer,
@@ -86,12 +87,20 @@ fn build_runtime(name: &str, threads_num: usize) -> 
runtime::Runtime {
 }
 
 fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
+    let read_stack_size = config.read_thread_stack_size.as_byte() as usize;
     EngineRuntimes {
-        read_runtime: Arc::new(build_runtime_with_stack_size(
-            "ceres-read",
-            config.read_thread_num,
-            Some(config.read_thread_stack_size.as_byte() as usize),
-        )),
+        read_runtime: PriorityRuntime::new(
+            Arc::new(build_runtime_with_stack_size(
+                "read-low",
+                config.low_read_thread_num,
+                Some(read_stack_size),
+            )),
+            Arc::new(build_runtime_with_stack_size(
+                "read-high",
+                config.read_thread_num,
+                Some(read_stack_size),
+            )),
+        ),
         write_runtime: Arc::new(build_runtime("ceres-write", 
config.write_thread_num)),
         compact_runtime: Arc::new(build_runtime("ceres-compact", 
config.compact_thread_num)),
         meta_runtime: Arc::new(build_runtime("ceres-meta", 
config.meta_thread_num)),
@@ -266,7 +275,8 @@ async fn build_table_engine_proxy(engine_builder: 
EngineBuilder<'_>) -> Arc<Tabl
 fn make_wal_runtime(runtimes: Arc<EngineRuntimes>) -> WalRuntimes {
     WalRuntimes {
         write_runtime: runtimes.write_runtime.clone(),
-        read_runtime: runtimes.read_runtime.clone(),
+        // TODO: remove read_runtime from WalRuntimes
+        read_runtime: runtimes.read_runtime.high().clone(),
         default_runtime: runtimes.default_runtime.clone(),
     }
 }
diff --git a/system_catalog/src/sys_catalog_table.rs 
b/system_catalog/src/sys_catalog_table.rs
index 0daacb03..00f7b061 100644
--- a/system_catalog/src/sys_catalog_table.rs
+++ b/system_catalog/src/sys_catalog_table.rs
@@ -545,6 +545,7 @@ impl SysCatalogTable {
             projected_schema: 
ProjectedSchema::no_projection(self.table.schema()),
             predicate: PredicateBuilder::default().build(),
             metrics_collector: MetricsCollector::default(),
+            priority: Default::default(),
         };
         let mut batch_stream = 
self.table.read(read_request).await.context(ReadTable)?;
 
diff --git a/table_engine/src/engine.rs b/table_engine/src/engine.rs
index 9946d0cb..3411c767 100644
--- a/table_engine/src/engine.rs
+++ b/table_engine/src/engine.rs
@@ -24,7 +24,7 @@ use common_types::{
 };
 use generic_error::{GenericError, GenericResult};
 use macros::define_result;
-use runtime::RuntimeRef;
+use runtime::{PriorityRuntime, RuntimeRef};
 use snafu::{ensure, Backtrace, Snafu};
 
 use crate::{
@@ -346,7 +346,7 @@ pub type TableEngineRef = Arc<dyn TableEngine>;
 #[derive(Clone, Debug)]
 pub struct EngineRuntimes {
     /// Runtime for reading data
-    pub read_runtime: RuntimeRef,
+    pub read_runtime: PriorityRuntime,
     /// Runtime for writing data
     pub write_runtime: RuntimeRef,
     /// Runtime for compacting data
diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs
index fb3cb41d..cbc41372 100644
--- a/table_engine/src/provider.rs
+++ b/table_engine/src/provider.rs
@@ -39,6 +39,7 @@ use datafusion::{
 };
 use df_operator::visitor;
 use logger::debug;
+use runtime::Priority;
 use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector};
 
 use crate::{
@@ -55,12 +56,19 @@ pub struct CeresdbOptions {
     pub request_timeout: Option<u64>,
     pub default_schema: String,
     pub default_catalog: String,
+    pub priority: Priority,
 }
 
 impl ConfigExtension for CeresdbOptions {
     const PREFIX: &'static str = "ceresdb";
 }
 
+impl CeresdbOptions {
+    const REQUEST_ID_KEY: &'static str = "request_id";
+    const REQUEST_PRIORITY_KEY: &'static str = "request_priority";
+    const REQUEST_TIMEOUT_KEY: &'static str = "request_timeout";
+}
+
 impl ExtensionOptions for CeresdbOptions {
     fn as_any(&self) -> &dyn Any {
         self
@@ -76,33 +84,57 @@ impl ExtensionOptions for CeresdbOptions {
 
     fn set(&mut self, key: &str, value: &str) -> Result<()> {
         match key {
-            "request_id" => self.request_id = value.to_string(),
-            "request_timeout" => {
+            Self::REQUEST_ID_KEY => self.request_id = value.to_string(),
+            Self::REQUEST_TIMEOUT_KEY => {
                 self.request_timeout = Some(value.parse::<u64>().map_err(|e| {
                     DataFusionError::External(
                         format!("could not parse request_timeout, 
input:{value}, err:{e:?}").into(),
                     )
                 })?)
             }
+            Self::REQUEST_PRIORITY_KEY => {
+                self.priority = value
+                    .parse::<u8>()
+                    .map_err(|e| {
+                        DataFusionError::External(
+                            format!("request_priority should be u8, 
input:{value}, err:{e:?}")
+                                .into(),
+                        )
+                    })
+                    .and_then(|value| {
+                        Priority::try_from(value).map_err(|e| {
+                            DataFusionError::External(
+                                format!("parse request_priority failed, 
input:{value}, err:{e:?}")
+                                    .into(),
+                            )
+                        })
+                    })?
+            }
             _ => Err(DataFusionError::External(
                 format!("could not find key, key:{key}").into(),
             ))?,
         }
+
         Ok(())
     }
 
     fn entries(&self) -> Vec<ConfigEntry> {
         vec![
             ConfigEntry {
-                key: "request_id".to_string(),
+                key: Self::REQUEST_ID_KEY.to_string(),
                 value: Some(self.request_id.to_string()),
                 description: "",
             },
             ConfigEntry {
-                key: "request_timeout".to_string(),
+                key: Self::REQUEST_TIMEOUT_KEY.to_string(),
                 value: self.request_timeout.map(|v| v.to_string()),
                 description: "",
             },
+            ConfigEntry {
+                key: Self::REQUEST_PRIORITY_KEY.to_string(),
+                value: Some(self.priority.as_u8().to_string()),
+                description: "",
+            },
         ]
     }
 }
@@ -182,8 +214,9 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
             .request_timeout
             .map(|n| Instant::now() + Duration::from_millis(n));
         let read_parallelism = state.config().target_partitions();
+        let priority = ceresdb_options.priority;
         debug!(
-            "TableProvider scan table, table:{}, request_id:{}, 
projection:{:?}, filters:{:?}, limit:{:?}, deadline:{:?}, parallelism:{}",
+            "TableProvider scan table, table:{}, request_id:{}, 
projection:{:?}, filters:{:?}, limit:{:?}, deadline:{:?}, parallelism:{}, 
priority:{:?}",
             self.table.name(),
             request_id,
             projection,
@@ -191,6 +224,7 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
             limit,
             deadline,
             read_parallelism,
+            priority,
         );
 
         let predicate = self.check_and_build_predicate_from_filters(filters);
@@ -216,6 +250,7 @@ impl<B: TableScanBuilder> TableProviderAdapter<B> {
             projected_schema,
             predicate,
             metrics_collector: 
MetricsCollector::new(SCAN_TABLE_METRICS_COLLECTOR_NAME.to_string()),
+            priority,
         };
 
         self.builder.build(request).await
@@ -439,9 +474,10 @@ impl DisplayAs for ScanTable {
     fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
         write!(
             f,
-            "ScanTable: table={}, parallelism={}",
+            "ScanTable: table={}, parallelism={}, priority={:?}",
             self.table.name(),
             self.request.opts.read_parallelism,
+            self.request.priority
         )
     }
 }
diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs
index bd996703..9073e75e 100644
--- a/table_engine/src/remote/model.rs
+++ b/table_engine/src/remote/model.rs
@@ -22,7 +22,7 @@ use std::{
 
 use bytes_ext::{ByteVec, Bytes};
 use ceresdbproto::remote_engine::{
-    self, execute_plan_request, row_group::Rows::Contiguous, ColumnDesc,
+    self, execute_plan_request, row_group::Rows::Contiguous, ColumnDesc, 
QueryPriority,
 };
 use common_types::{
     request_id::RequestId,
@@ -35,6 +35,7 @@ use common_types::{
 use generic_error::{BoxError, GenericError, GenericResult};
 use itertools::Itertools;
 use macros::define_result;
+use runtime::Priority;
 use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};
 
 use crate::{
@@ -454,6 +455,7 @@ pub struct ExecContext {
     pub default_catalog: String,
     pub default_schema: String,
     pub query: String,
+    pub priority: Priority,
 }
 
 pub enum PhysicalPlan {
@@ -474,7 +476,7 @@ impl From<RemoteExecuteRequest> for 
ceresdbproto::remote_engine::ExecutePlanRequ
             default_catalog: value.context.default_catalog,
             default_schema: value.context.default_schema,
             timeout_ms: rest_duration_ms,
-            priority: 0, // not used now
+            priority: value.context.priority.as_u8() as i32,
             displayable_query: value.context.query,
         };
 
@@ -510,6 +512,10 @@ impl 
TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
         let pb_exec_ctx = value.context.context(ConvertRemoteExecuteRequest {
             msg: "missing exec ctx",
         })?;
+        let priority = match pb_exec_ctx.priority() {
+            QueryPriority::Low => Priority::Low,
+            QueryPriority::High => Priority::High,
+        };
         let ceresdbproto::remote_engine::ExecContext {
             request_id_str,
             default_catalog,
@@ -532,6 +538,7 @@ impl 
TryFrom<ceresdbproto::remote_engine::ExecutePlanRequest> for RemoteExecuteR
             default_catalog,
             default_schema,
             query: displayable_query,
+            priority,
         };
 
         // Plan
diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs
index 4f3e854f..6c380441 100644
--- a/table_engine/src/table.rs
+++ b/table_engine/src/table.rs
@@ -36,6 +36,7 @@ use common_types::{
 };
 use generic_error::{BoxError, GenericError};
 use macros::define_result;
+use runtime::Priority;
 use serde::Deserialize;
 use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
 use trace_metric::MetricsCollector;
@@ -387,6 +388,7 @@ pub struct ReadRequest {
     pub predicate: PredicateRef,
     /// Collector for metrics of this read request.
     pub metrics_collector: MetricsCollector,
+    pub priority: Priority,
 }
 
 impl fmt::Debug for ReadRequest {
@@ -415,6 +417,7 @@ impl fmt::Debug for ReadRequest {
             .field("opts", &self.opts)
             .field("projected", &projected)
             .field("predicate", &predicate)
+            .field("priority", &self.priority)
             .finish()
     }
 }
@@ -470,6 +473,8 @@ impl TryFrom<ceresdbproto::remote_engine::TableReadRequest> 
for ReadRequest {
             projected_schema,
             predicate,
             metrics_collector: MetricsCollector::default(),
+            // TODO: pass priority from request.
+            priority: Default::default(),
         })
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to