featzhang created FLINK-39096:
---------------------------------
Summary: [SQL]Enhance EXPLAIN output to display detailed estimated
rowcount and cost information
Key: FLINK-39096
URL: https://issues.apache.org/jira/browse/FLINK-39096
Project: Flink
Issue Type: Improvement
Reporter: featzhang
h2. What is the issue?
Currently, when executing EXPLAIN statements in Flink SQL, detailed cost
information (including rowcount and cumulative cost breakdown) is only
displayed when using {{ExplainDetail.ESTIMATED_COST}} which sets the explain
level to {{ALL_ATTRIBUTES}}. In the default {{EXPPLAN_ATTRIBUTES}} mode, this
valuable information is not shown, making it harder for users to understand
query optimizer decisions and identify performance bottlenecks.
h3. Current Behavior
{code}
== Optimized Physical Plan ==
HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, EXPR$1, d, EXPR$10],
build=[left])
:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS
EXPR$1])
: +- Exchange(distribution=[hash[a]])
: +- LocalHashAggregate(groupBy=[a], select=[a, Partial_SUM(b) AS sum$0])
{code}
Without the {{ESTIMATED_COST}} detail flag, operators show only their names and
attributes, providing no visibility into the cost model's estimations.
h3. Expected Behavior
{code}
== Optimized Physical Plan ==
HashJoin(joinType=[InnerJoin], where=[=(a, d)], select=[a, EXPR$1, d, EXPR$10],
build=[left]): rowcount = 10000, cumulative cost = {10000 rows, 20000 cpu, 0
io, 0 network, 0 memory}
:- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS
EXPR$1]): rowcount = 5000, cumulative cost = {5000 rows, 10000 cpu, 0 io, 0
network, 0 memory}
: +- Exchange(distribution=[hash[a]]): rowcount = 5000, cumulative cost =
{5000 rows, 10000 cpu, 0 io, 0 network, 0 memory}
{code}
Cost information should be visible at the default {{EXPPLAN_ATTRIBUTES}} level
to improve query plan readability and performance analysis.
h2. Why is this valuable?
This enhancement provides significant value for query optimization and
performance tuning:
* *Better Visibility*: Users can see cost estimations without requiring
additional explain flags
* *Performance Analysis*: Easier identification of expensive operations in the
query plan
* *Optimizer Understanding*: Better insight into how the cost-based optimizer
makes decisions
* *Debugging Support*: Cost details help diagnose unexpected plan choices
* *Simplified Workflow*: No need to remember to add {{ESTIMATED_COST}} flag for
basic analysis
h2. Proposed Solution
Enhance {{RelTreeWriterImpl.scala}} to display rowcount and cumulative cost
information at both {{ALL_ATTRIBUTES}} and {{EXPPLAN_ATTRIBUTES}} explain
levels:
# Extend the condition that controls cost display from {{ALL_ATTRIBUTES}} only
to include {{EXPPLAN_ATTRIBUTES}}
# Add null safety checks to handle cases where cost or rowcount may be
unavailable
# Maintain the existing {{FlinkCost}} format: {{\\{rows, cpu, io, network,
memory\\}}}
h2. Implementation Details
*Modified Components:*
*
{{flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala}}
*Key Classes Involved:*
* {{org.apache.flink.table.planner.plan.utils.RelTreeWriterImpl}} - Generates
execution plan text output
* {{org.apache.flink.table.planner.plan.cost.FlinkCost}} - Multi-dimensional
cost model
* {{org.apache.calcite.rel.metadata.RelMetadataQuery}} - Provides rowcount and
cost metadata
*Code Changes:*
{code:scala}
// Before:
if (explainLevel == SqlExplainLevel.ALL_ATTRIBUTES) {
s.append(": rowcount = ")
.append(mq.getRowCount(rel))
.append(", cumulative cost = ")
.append(mq.getCumulativeCost(rel))
}
// After:
if (explainLevel == SqlExplainLevel.ALL_ATTRIBUTES ||
explainLevel == SqlExplainLevel.EXPPLAN_ATTRIBUTES) {
val rowCount = mq.getRowCount(rel)
val cost = mq.getCumulativeCost(rel)
s.append(": rowcount = ")
.append(if (rowCount != null) rowCount else "unknown")
.append(", cumulative cost = ")
.append(if (cost != null) cost else "unknown")
}
{code}
h2. FlinkCost Dimensions
The {{FlinkCost}} class provides five dimensions of cost information:
* *rows*: Number of rows processed by the operator
* *cpu*: CPU resource usage (base unit)
* *io*: I/O resource usage (2.0x CPU conversion ratio)
* *network*: Network resource usage (4.0x CPU conversion ratio)
* *memory*: Memory resource usage (1.0x CPU conversion ratio)
Output format: {{\\{<rows> rows, <cpu> cpu, <io> io, <network> network,
<memory> memory\\}}}
h2. Example Usage
h3. Simple Query
{code:sql}
EXPLAIN SELECT * FROM orders WHERE amount > 100;
{code}
*Output (After Enhancement):*
{code}
Calc(select=[order_id, customer_id, amount, order_date], where=[>(amount,
100)]): rowcount = 5000, cumulative cost = {5000 rows, 10000 cpu, 0 io, 0
network, 0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, orders]],
fields=[order_id, customer_id, amount, order_date]): rowcount = 10000,
cumulative cost = {10000 rows, 5000 cpu, 0 io, 0 network, 0 memory}
{code}
h3. Complex Join Query
{code:sql}
EXPLAIN
SELECT o.order_id, c.customer_name, o.amount
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id;
{code}
Each operator in the output will now display detailed cost information, making
it easy to identify expensive operations.
h2. Compatibility
* *Backward Compatible*: Yes - only adds information, does not remove or change
existing behavior
* *Public API Impact*: None - only changes internal explain output format
* *Runtime Impact*: Negligible - {{getRowCount()}} and {{getCumulativeCost()}}
are already called in {{ALL_ATTRIBUTES}} mode
* *Test Impact*: Test expectations may need updates due to changed output format
* *Serialization Impact*: None - only affects text output
h2. Testing
The enhancement can be verified by:
h3. Build Verification
{code:bash}
./mvnw clean spotless:apply install -DskipTests -Pfast -pl
flink-table/flink-table-planner
{code}
h3. Manual Testing
{code:sql}
-- Create test tables
CREATE TABLE t1 (id INT, name STRING);
CREATE TABLE t2 (id INT, value BIGINT);
-- Execute EXPLAIN and verify cost information is displayed
EXPLAIN SELECT * FROM t1 JOIN t2 ON t1.id = t2.id;
{code}
h3. Existing Tests
{code:bash}
mvn test -pl flink-table/flink-table-planner -Dtest=ExplainTest
{code}
Note: Some test XML files may need updates to reflect the new output format.
h2. Priority
*Impact*: Medium-High - Improves user experience for query analysis and
performance tuning
*Effort*: Low - Minimal code changes (6 additions, 3 deletions)
*Risk*: Low - Display-only enhancement, no runtime or API changes
*Category*: Improvement
h2. Affected Versions
* Flink 2.3-SNAPSHOT (development)
* Potentially backportable to earlier versions if desired
h2. Related Work
This enhancement aligns with other recent improvements to EXPLAIN plan
readability:
* FLINK-39042: Display watermark specifications in execution plans
* Display ChangelogMode in execution plans for streaming queries
* Structured display of join conditions
* Structured display of filter conditions
* Enhanced projection field alias display
All of these improvements share the goal of making EXPLAIN output more
informative and user-friendly without requiring additional configuration.
h2. Additional Notes
* The {{FlinkCost}} class already provides a comprehensive {{toString()}}
method with all five cost dimensions
* This change simply exposes existing cost information more prominently
* Users who need even more detailed cost analysis can still use
{{ExplainDetail.ESTIMATED_COST}} for {{ALL_ATTRIBUTES}} level
* The null checks ensure robustness when cost information is unavailable
h2. Benefits Summary
By implementing this enhancement, Flink users will benefit from:
# *Immediate visibility* into query optimizer cost estimations
# *Faster performance analysis* without needing to remember special flags
# *Better understanding* of why the optimizer chooses certain execution
strategies
# *Easier identification* of performance bottlenecks in complex queries
# *Consistent experience* across different EXPLAIN modes
--
This message was sent by Atlassian Jira
(v8.20.10#820010)