asolimando commented on code in PR #19957:
URL: https://github.com/apache/datafusion/pull/19957#discussion_r2911488857


##########
datafusion/common/src/stats.rs:
##########
@@ -660,7 +637,14 @@ impl Statistics {
             col_stats.max_value = 
col_stats.max_value.max(&item_col_stats.max_value);
             col_stats.min_value = 
col_stats.min_value.min(&item_col_stats.min_value);
             col_stats.sum_value = 
col_stats.sum_value.add(&item_col_stats.sum_value);
-            col_stats.distinct_count = Precision::Absent;
+            // Use max as a conservative lower bound for distinct count
+            // (can't accurately merge NDV since duplicates may exist across 
partitions)

Review Comment:
   >I'm concerned that the conservative lower-bound estimation could cause huge 
inaccuracy.
   >
   > For example, we have 100 parts, and each part has 100 rows, and they are 
linearly increasing. The reality is that the ndv should be 100 * 100, but now 
we evaluate it as 100. The result could cause inaccuracies to propagate 
throughout subsequent cost estimation algorithms.
   
   You raise a good point, the true NDV across partitions lies in 
`[max(partition NDVs), sum(partition NDVs)]`: `max` when values fully overlap 
(e.g., a `status` column), `sum` when completely disjoint (e.g., `order_id`), 
no single strategy is optimal.
   
   This is how different major OSS systems deal with the problem:
   
   - Trino: `max` across partitions 
([source](https://github.com/trinodb/trino/blob/9c1abc44e162ae8c63fcc07d072c56a16def1ed6/plugin/trino-hive/src/main/java/io/trino/plugin/hive/statistics/AbstractHiveStatisticsProvider.java#L678-L688)).
 Known to be problematic with disjoint domains: 
[trinodb/trino#50](https://github.com/trinodb/trino/issues/50) shows 62K 
estimated vs 150M actual. This is what I considered in the current proposal.
   
   - Spark: `ANALYZE TABLE` runs a full-table `HyperLogLogPlusPlus` aggregation 
([source](https://github.com/apache/spark/blob/5932f11fb91606606234f486f289be1358b1d829/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L392)),
 so there are no per-partition NDVs to merge. But this strategy doesn't seem to 
be fitting here.
   
   - Hive: stores HLL bitvector sketches per partition in the metastore, merges 
them at planning time ([HLL 
merge](https://github.com/apache/hive/blob/925df927248c1be11b84df3e8a0c3e43955e133e/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java#L443-L485),
 [aggregator 
merge](https://github.com/apache/hive/blob/925df927248c1be11b84df3e8a0c3e43955e133e/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java#L88-L100)),
 and 
[interpolates](https://github.com/apache/hive/blob/925df927248c1be11b84df3e8a0c3e43955e133e/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java#L200-L201)
 for missing partitions by merging adjacent bitvectors and extrapolating. Note 
that this is  exactly what 
[trinodb/trino#50](https://github.com/trinodb/trino/issues/50) suggests, but in 
this first PR I a
 imed for simplicity.
     
   - DuckDB: HLL merge with Good-Turing sampling correction 
([merge](https://github.com/duckdb/duckdb/blob/19a12e9b74386a9e212811ef0c04014b04baede5/src/storage/statistics/distinct_statistics.cpp#L19-L23),
 
[Good-Turing](https://github.com/duckdb/duckdb/blob/19a12e9b74386a9e212811ef0c04014b04baede5/src/storage/statistics/distinct_statistics.cpp#L53-L68)),
 but only for its native format. No HLL data available when reading Parquet. 
Good-Turing won't help here but it could be used for 
https://github.com/apache/datafusion/pull/19957#discussion_r2897575666, to 
extrapolate NDVs from a sample of the whole population, even if sampling should 
be random (cc: @jonathanc-n, as a complement to the threshold strategy?).
   
   > HyperLogLog is a classic way to process this, but for a large dataset, it 
takes a long time to process.
   
   The core constraint is that Parquet stores NDV as a single `optional i64` 
([spec](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L301)),
 not a sketch, and most writers don't even set it (see 
[apache/arrow-rs#8608](https://github.com/apache/arrow-rs/issues/8608)). So 
when merging row groups, we're interpolating from scalar values.
   
   Regarding HLL cost: sketches are compact (~1.6KB for 2% error), merge is 
`O(registers)`, so it's cheap at planning time. The cost is at ingestion. Data 
sketches are standard practice at scale: Hive uses HLL for NDV and KLL for 
histograms/quantiles (e.g., 
[HIVE-26243](https://issues.apache.org/jira/browse/HIVE-26243), 
[HIVE-26221](https://issues.apache.org/jira/browse/HIVE-26221)). In extreme 
cases sketch size might be a concern, but it's not the general case in my 
experience.
   
   > IIRC, the merge method may be called during query execution, so using 
HyperLogLog here isn't ideal.
   
   To clarify: `try_merge` is only called at planning time (by 
`partition_statistics(None)` in optimizer rules like `JoinSelection`, and 
during file group metadata collection), not during query execution. This is 
orthogonal to statistics-based pruning work like 
[#19609](https://github.com/apache/datafusion/pull/19609).
   
   I mentioned `JoinSelection` as a concrete use of NDV, but NDV can be used to 
drive other planning decisions 
(https://github.com/apache/datafusion/issues/20766 covers many of them). Some 
examples:
   - Aggregation pushdown: NDV estimates the grouping reduction ratio: 
overestimating NDV makes the optimizer skip beneficial pushdowns, 
underestimating makes it push down aggregation expecting good compression that 
doesn't materialize.
   - Hash table sizing via NDV: underestimate causes resizing/spilling, 
overestimate wastes memory.
   - Filters (NDV for IN/equality): underestimating makes a filter push-down 
look more appealing than it is, overestimating might make us not push the 
filter down.
    
   Bottom line is: there's no universally "safe" direction, the impact depends 
on how the statistic is consumed/used, since the consumers of NDV aren't fully 
defined yet, there is no strong guidance available.
   
   Longer term, if we invest in CBO and statistics (as 
[#8227](https://github.com/apache/datafusion/issues/8227) and 
[#20766](https://github.com/apache/datafusion/issues/20766) suggest), the "one 
size fits all" problem will be exacerbated for downstream projects. Systems 
like Hive, Spark, or DuckDB, being end-user databases/warehouses, can make hard 
choices (e.g., Hive mandating HLL in the metastore). DataFusion is a framework, 
so we can't take one single route with no way to override. Decision points need 
to be tunable: what and how you read stats, how to propagate them through 
operators and expressions, and how to use them for planning decisions, for both 
built-in and custom stats, expressions, and operators.
   
   I'm working on this via a `StatisticsProvider`/`StatisticsRegistry` pattern 
([WIP 
branch](https://github.com/asolimando/datafusion/tree/asolimando/statistics-planner-prototype)),
 including `ExtendedStatistics` for custom data (histograms, sketches, etc.). 
For reading custom stats from Parquet, [user-defined Parquet 
indexes](https://datafusion.apache.org/blog/2025/07/14/user-defined-parquet-indexes/)
 could provide the ingestion mechanism: embed HLL sketches alongside Parquet 
data and read them at planning time through the provider chain.
   
   Until we have that, maybe we should gate this novel propagation mechanism 
with a configuration property, so we are free to experiment without causing 
unexpected changes in existing planning? For NDV I haven't done it because, as 
discussed, it's not usually set by standard writers, so I don't expect any 
impact unless you make a conscious effort to write them and use them, but if we 
feel unsure I can add that safeguard.
   
   Apologies for the wall of text! This discussion deserves its own space, but 
I wanted to address your concern here, happy to take the discussion to an RFC 
or issue, as you see fit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to