Rachelint commented on code in PR #1303:
URL:
https://github.com/apache/incubator-horaedb/pull/1303#discussion_r1436770348
##########
query_frontend/src/plan.rs:
##########
@@ -72,12 +79,125 @@ pub enum Plan {
pub struct QueryPlan {
pub df_plan: DataFusionLogicalPlan,
+ pub table_name: Option<String>,
// Contains the TableProviders so we can register the them to
ExecutionContext later.
// Use TableProviderAdapter here so we can get the underlying TableRef and
also be
// able to cast to Arc<dyn TableProvider + Send + Sync>
pub tables: Arc<TableContainer>,
}
+impl QueryPlan {
+ fn find_timestamp_column(&self) -> Option<Column> {
+ let table_name = self.table_name.as_ref()?;
+ let table_ref = self.tables.get(table_name.into())?;
+ let schema = table_ref.table.schema();
+ let timestamp_name = schema.timestamp_name();
+ Some(Column::from_name(timestamp_name))
+ }
+
+ /// This function is used to extract time range from the query plan.
+ /// It will return max possible time range. For example, if the query
+ /// contains no timestmap filter, it will return
+ /// `TimeRange::min_to_max()`
+ ///
+ /// Note: When it timestamp filter evals to false(such as ts < 10 and ts >
+ /// 100), it will return None, which means no valid time range for this
+ /// query.
+ fn extract_time_range(&self) -> Option<TimeRange> {
+ let ts_column = if let Some(v) = self.find_timestamp_column() {
+ v
+ } else {
+ warn!(
+ "Couldn't find time column, plan:{:?}, table_name:{:?}",
+ self.df_plan, self.table_name
+ );
+ return Some(TimeRange::min_to_max());
+ };
+ let time_range = match
influxql_query::logical_optimizer::range_predicate::find_time_range(
+ &self.df_plan,
+ &ts_column,
+ ) {
+ Ok(v) => v,
+ Err(e) => {
+ warn!(
+ "Couldn't find time range, plan:{:?}, err:{}",
+ self.df_plan, e
+ );
+ return Some(TimeRange::min_to_max());
+ }
+ };
+
+ debug!(
+ "Extract time range, value:{time_range:?}, plan:{:?}",
+ self.df_plan
+ );
+ let mut start = i64::MIN;
+ match time_range.start {
+ Bound::Included(inclusive_start) => {
+ if let
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+ inclusive_start
+ {
+ start = start.max(x);
+ }
+ }
+ Bound::Excluded(exclusive_start) => {
+ if let
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+ exclusive_start
+ {
+ start = start.max(x + 1);
+ }
+ }
+ Bound::Unbounded => {}
+ }
+ let mut end = i64::MAX;
+ match time_range.end {
+ Bound::Included(inclusive_end) => {
+ if let
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+ inclusive_end
+ {
+ end = end.min(x + 1);
+ }
+ }
+ Bound::Excluded(exclusive_start) => {
+ if let
DfLogicalExpr::Literal(ScalarValue::TimestampMillisecond(Some(x), _)) =
+ exclusive_start
+ {
+ end = end.min(x);
+ }
+ }
+ Bound::Unbounded => {}
+ }
+
+ TimeRange::new(start.into(), end.into())
+ }
+
+ /// Decide the query priority based on the query plan.
+ /// When query contains invalid time range, it will return None.
+ // TODO: Currently we only consider the time range, consider other
factors, such
+ // as the number of series, or slow log metrics.
+ pub fn decide_query_priority(&self, threshold: u64) -> Option<Priority> {
Review Comment:
I think we may support deciding priority by more factors after, how about
decide a struct like `PriorityContext` rather than passing raw time threshold?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]