[
https://issues.apache.org/jira/browse/IMPALA-12894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831892#comment-17831892
]
ASF subversion and git services commented on IMPALA-12894:
----------------------------------------------------------
Commit b03cfcf2ade6dea7fed10f4a3db5c58ddf2c6bd2 in impala's branch
refs/heads/master from Zoltan Borok-Nagy
[ https://gitbox.apache.org/repos/asf?p=impala.git;h=b03cfcf2a ]
IMPALA-12894: (part 2) Fix optimized count(*) for Iceberg tables with dangling
delete files
Impala can return incorrect results if a table has dangling delete
files. Dangling delete files are delete files that are part of the
snapshot but they are not applicable to any of the data files. We can
have such delete files after Spark's rewrite_data_files action.
During analysis we check the existence of delete files based on the
snapshot summary. If there are no delete files in the table, we just
replace the count(*) expression with NumericLiteral($record_count).
If there are delete files in the table (based on the summary), we set
optimize_count_star_for_iceberg_v2 in the query context.
Without optimize_count_star_for_iceberg_v2 in the query context, the
IcebergScanPlanner would create the following plan.
AGGREGATE
COUNT(*)
|
UNION ALL
/ \
/ \
/ \
SCAN all ANTI JOIN
datafiles / \
without / \
deletes SCAN SCAN
datafiles deletes
with deletes
With optimize_count_star_for_iceberg_v2 the final plan looks like
the following:
ArithmeticExpr(ADD)
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without ANTI JOIN
deletes / \
/ \
SCAN SCAN
datafiles deletes
with deletes
The ArithmeticExpr(ADD) and its left child (record_count) is created
by the analyzer, IcebergScanPlanner is responsible in creating the
plan under AGGREGATE COUNT(*). And if it has delete files and
optimize_count_star_for_iceberg_v2 is true, it knows it can omit
the original UNION ALL and its left child.
However, IcebergScanPlanner checks delete file existence based on the
result of planFiles(), hence dangling delete files are eliminated.
And if there are no delete files, IcebergScanPlanner assumes that
case is already handled by the Analyzer (i.e. it replaced count(*)
with NumericLiteral($record_count)). So it will incorrectly create a
normal SCAN plan of the table under COUNT(*), i.e. we end up
with this:
ArithmeticExpr(ADD)
/ \
/ \
/ \
record_count AGGREGATE
of all COUNT(*)
datafiles |
without SCAN
deletes datafiles
without
deletes
Which means Impala will yield $record_count * 2 as a result.
This patch fixes the FeIcebergTable.hasDeleteFiles() method, so it
also ignores dangling delete files. Therefore, the analyzer will just
substitute count(*) with NumericLiteral($record_count) if all deletes
are dangling, i.e. no need to involve the IcebergScanPlanner at all.
The patch also introduces a new query option,
"iceberg_disable_count_star_optimization", so users can completely
disable the statistic-based count(*)-optimization if necessary.
Testing:
* e2e tests
* planner tests
Change-Id: Ie3aca0b0a104f9ca4589cde9643f3f341d4ff99f
Reviewed-on: http://gerrit.cloudera.org:8080/21190
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
> Optimized count(*) for Iceberg gives wrong results after a Spark
> rewrite_data_files
> -----------------------------------------------------------------------------------
>
> Key: IMPALA-12894
> URL: https://issues.apache.org/jira/browse/IMPALA-12894
> Project: IMPALA
> Issue Type: Bug
> Components: Frontend
> Affects Versions: Impala 4.3.0
> Reporter: Gabor Kaszab
> Assignee: Zoltán Borók-Nagy
> Priority: Critical
> Labels: correctness, impala-iceberg
> Attachments: count_star_correctness_repro.tar.gz
>
>
> Issue was introduced by https://issues.apache.org/jira/browse/IMPALA-11802
> that implemented an optimized way to get results for count(*). However, if
> the table was compacted by Spark this optimization can give incorrect results.
> The reason is that Spark can[ skip dropping delete
> files|https://iceberg.apache.org/docs/latest/spark-procedures/#rewrite_position_delete_files]
> that are pointing to compacted data files, as a result there might be delete
> files after compaction that are no longer applied to any data files.
> Repro:
> With Impala
> {code:java}
> create table default.iceberg_testing (id int, j bigint) STORED AS ICEBERG
> TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
> 'iceberg.catalog_location'='/tmp/spark_iceberg_catalog/',
> 'iceberg.table_identifier'='iceberg_testing',
> 'format-version'='2');
> insert into iceberg_testing values
> (1, 1), (2, 4), (3, 9), (4, 16), (5, 25);
> update iceberg_testing set j = -100 where id = 4;
> delete from iceberg_testing where id = 4;{code}
> Count * returns 4 at this point.
> Run compaction in Spark:
> {code:java}
> spark.sql(s"CALL local.system.rewrite_data_files(table =>
> 'default.iceberg_testing', options => map('min-input-files','2') )").show()
> {code}
> Now count * in Impala returns 8 (might require an IM if in HadoopCatalog).
> Hive returns correct results. Also a SELECT * returns correct results.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]