NGA-TRAN opened a new issue, #8099: URL: https://github.com/apache/arrow-datafusion/issues/8099
### Is your feature request related to a problem or challenge? These are examples and use cases for the design of https://github.com/apache/arrow-datafusion/issues/8078 InfluxDB IOx uses `row statistics` to optimize some queries and we found, for many queries, their statistics are either lost (become `absent`) or become `inexact` while being propagated upwards. Our request is to keep the statistics `conservative` that cover the exact bound rather then going it or making it `inexact` ### Table for the reproducer ```SQL create table t1(state string, city string, min_temp float, area int, time timestamp) as values ('MA', 'Boston', 70.4, 1, 50), ('MA', 'Bedford', 71.59, 2, 150), ('CA', 'SF', 79.0, 1, 300), ('MA', 'Boston', 75.4, 3, 250), ('MA', 'Andover', 69.5, 4, 250), ('MA', 'Bedford', 78.2, 2, 150), ('MA', 'Boston', 65.0, 2, 250), ('CA', 'SJ', 78.4, 1, 300), ('MA', 'Reading', 53.0, 4, 250), ('CA', 'SJ', 75.4, 5, 350); ``` ### stats of timestamp filter is lost ```SQL explain select * from t1 where time <= to_timestamp(350); +---------------+---------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------+ | logical_plan | Filter: t1.time <= TimestampNanosecond(350000000000, None) | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] | | | FilterExec: time@4 <= 350000000000, statistics=[Rows=Absent, Bytes=Absent] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+---------------------------------------------------------------------------------------------------+ ``` ### stats of string filter is lost ```SQL explain select * from t1 where state = 'MA'; +---------------+---------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------+ | logical_plan | Filter: t1.state = Utf8("MA") | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] | | | FilterExec: state@0 = MA, statistics=[Rows=Absent, Bytes=Absent] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+---------------------------------------------------------------------------------------------------+ ``` ### stats of integer filter becomes inexact ```SQL explain select * from t1 where area <= 100; +---------------+---------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------+ | logical_plan | Filter: t1.area <= Int32(100) | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)] | | | FilterExec: area@3 <= 100, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+---------------------------------------------------------------------------------------------------+ ``` ### stats of float filter becomes inexact ```SQL explain select * from t1 where min_temp = 10.0; +---------------+---------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------+ | logical_plan | Filter: t1.area <= Int32(100) | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)] | | | FilterExec: area@3 <= 100, statistics=[Rows=Inexact(10), Bytes=Inexact(2960)] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+---------------------------------------------------------------------------------------------------+ ``` ### stats of filter IN is lost for any data type & subquery ```SQL explain select * from t1 where area in (1, 2); +---------------+---------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------+ | logical_plan | Filter: t1.area = Int32(1) OR t1.area = Int32(2) | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] | | | FilterExec: area@3 = 1 OR area@3 = 2, statistics=[Rows=Absent, Bytes=Absent] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+---------------------------------------------------------------------------------------------------+ explain select * from t1 where city in ('Boston', 'Reading'); +---------------+---------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+---------------------------------------------------------------------------------------------------+ | logical_plan | Filter: t1.city = Utf8("Boston") OR t1.city = Utf8("Reading") | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] | | | FilterExec: city@1 = Boston OR city@1 = Reading, statistics=[Rows=Absent, Bytes=Absent] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+---------------------------------------------------------------------------------------------------+ explain select * from t1 where city in (select city from t1); +---------------+----------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+----------------------------------------------------------------------------------------------------------------------------+ | logical_plan | LeftSemi Join: t1.city = __correlated_sq_1.city | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | | SubqueryAlias: __correlated_sq_1 | | | TableScan: t1 projection=[city] | | physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] | | | HashJoinExec: mode=Partitioned, join_type=LeftSemi, on=[(city@1, city@0)], statistics=[Rows=Absent, Bytes=Absent] | | | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | RepartitionExec: partitioning=Hash([city@0], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+----------------------------------------------------------------------------------------------------------------------------+ ``` ### stats of join is lost ```SQL explain select * from t1, t1 as t2 where t1.city = t2.city; +---------------+----------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+----------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Inner Join: t1.city = t2.city | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | | SubqueryAlias: t2 | | | TableScan: t1 projection=[state, city, min_temp, area, time] | | physical_plan | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent] | | | HashJoinExec: mode=Partitioned, join_type=Inner, on=[(city@1, city@1)], statistics=[Rows=Absent, Bytes=Absent] | | | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | RepartitionExec: partitioning=Hash([city@1], 10), input_partitions=1, statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+----------------------------------------------------------------------------------------------------------------------------+ ``` ### stats of aggregation becomes inexact ```SQL explain select city, max(min_temp) as max_min_temp from t1 group by city order by max_min_temp DESC limit 5; +---------------+------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Limit: skip=0, fetch=5 | | | Sort: max_min_temp DESC NULLS FIRST, fetch=5 | | | Projection: t1.city, MAX(t1.min_temp) AS max_min_temp | | | Aggregate: groupBy=[[t1.city]], aggr=[[MAX(t1.min_temp)]] | | | TableScan: t1 projection=[city, min_temp] | | physical_plan | GlobalLimitExec: skip=0, fetch=5, statistics=[Rows=Inexact(5), Bytes=Absent] | | | SortPreservingMergeExec: [max_min_temp@1 DESC], fetch=5, statistics=[Rows=Inexact(10), Bytes=Absent] | | | SortExec: TopK(fetch=5), expr=[max_min_temp@1 DESC], statistics=[Rows=Inexact(10), Bytes=Absent] | | | ProjectionExec: expr=[city@0 as city, MAX(t1.min_temp)@1 as max_min_temp], statistics=[Rows=Inexact(10), Bytes=Absent] | | | AggregateExec: mode=FinalPartitioned, gby=[city@0 as city], aggr=[MAX(t1.min_temp)], statistics=[Rows=Inexact(10), Bytes=Absent] | | | CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(10), Bytes=Absent] | | | RepartitionExec: partitioning=Hash([city@0], 10), input_partitions=10, statistics=[Rows=Inexact(10), Bytes=Absent] | | | RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, statistics=[Rows=Inexact(10), Bytes=Absent] | | | AggregateExec: mode=Partial, gby=[city@0 as city], aggr=[MAX(t1.min_temp)], statistics=[Rows=Inexact(10), Bytes=Absent] | | | MemoryExec: partitions=1, partition_sizes=[1], statistics=[Rows=Exact(10), Bytes=Exact(2960)] | | | | +---------------+------------------------------------------------------------------------------------------------------------------------------------------+ ``` ### Describe the solution you'd like Have another enum value `conservative` keep the stats. A few examples: Row statistics of . Filter : same value as non-filter . Join: Cartesian product of 2 inputs ### Describe alternatives you've considered _No response_ ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
