This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7e0feb4a8e436ddbf618bd0e090dcc24c3563fc4 Author: Gabor Kaszab <[email protected]> AuthorDate: Thu Feb 16 16:57:44 2023 +0100 IMPALA-11701 Part1: Don't push down predicates to scanner if already applied by Iceberg We push down predicates to Iceberg that uses them to filter out files when getting the results of planFiles(). Using the FileScanTask.residual() function we can find out if we have to use the predicates to further filter the rows of the given files or if Iceberg has already performed all the filtering. Basically if we only filter on IDENTITY-partition columns then Iceberg can filter the files and using these filters in Impala wouldn't filter any more rows from the output (assuming that no partition evolution was performed on the table). An additional benefit of not pushing down no-op predicates to the scanner is that we can potentially materialize less slots. For example: SELECT count(1) from iceberg_tbl where part_col = 10; Another additional benefit comes with count(*) queries. If all the predicates are skipped from being pushed to Impala's scanner for a count(*) query then the Parquet scanner can go to an optimized path where it uses stats instead of reading actual data to answer the query. In the above query Iceberg filters the files using the predicate on a partition column and then there won't be any need to materialize 'part_col' in Impala, nor to push down the 'part_col = 10' predicate. Note, this is an all or nothing approach, meaning that assuming N number of predicates we either push down all predicates to the scanner or none of them. There is a room for improvement to identify a subset of the predicates that we still have to push down to the scanner. However, for this we'd need a mapping between Impala predicates and the predicates returned by Iceberg's FileScanTask.residual() function that would significantly increase the complexity of the relevant code. Testing: - Some existing tests needed some extra care as they were checking for predicates being pushed down to the scanner, but with this patch not all of them are pushed down. For these tests I added some extra predicates to achieve that all of the predicates are pushed down to the scanner. - Added a new planner test suite for checking how predicate push down works with Iceberg tables. Change-Id: Icfa80ce469cecfcfbcd0dcb595a6b04b7027285b Reviewed-on: http://gerrit.cloudera.org:8080/19534 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- .../org/apache/impala/planner/HdfsScanNode.java | 17 ++- .../org/apache/impala/planner/IcebergScanNode.java | 20 +++- .../apache/impala/planner/IcebergScanPlanner.java | 51 +++++++-- .../org/apache/impala/planner/PlannerTest.java | 9 ++ .../functional/functional_schema_template.sql | 26 +++++ .../datasets/functional/schema_constraints.csv | 1 + .../queries/PlannerTest/iceberg-predicates.test | 77 ++++++++++++++ .../queries/PlannerTest/iceberg-v2-tables.test | 114 ++++++++++++++------- .../queries/PlannerTest/tablesample.test | 14 +-- .../QueryTest/iceberg-in-predicate-push-down.test | 43 ++++++-- .../QueryTest/iceberg-partitioned-insert.test | 27 ++++- .../iceberg-plain-count-star-optimization.test | 7 +- 12 files changed, 331 insertions(+), 75 deletions(-) diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 626c7c41c..71eab659c 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1203,7 +1203,9 @@ public class HdfsScanNode extends ScanNode { long partitionNumRows = partition.getNumRows(); analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId()); - fileFormats_.add(partition.getFileFormat()); + if (partition.getFileFormat() != HdfsFileFormat.ICEBERG) { + fileFormats_.add(partition.getFileFormat()); + } if (!partition.getFileFormat().isParquetBased()) { allParquet = false; } @@ -1387,7 +1389,7 @@ public class HdfsScanNode extends ScanNode { || partition.getFileFormat() == HdfsFileFormat.ORC)) { // IMPALA-8834 introduced the optimization for partition key scan by generating // one scan range for each HDFS file. With Parquet and ORC, we start with the last - // block is to get a scan range that contains a file footer for short-circuiting. + // block to get a scan range that contains a file footer for short-circuiting. i = fileDesc.getNumFileBlocks() - 1; } for (; i < fileDesc.getNumFileBlocks(); ++i) { @@ -1973,6 +1975,9 @@ public class HdfsScanNode extends ScanNode { if (isPartitionKeyScan_) { output.append(detailPrefix + "partition key scan\n"); } + + String derivedExplain = getDerivedExplainString(detailPrefix, detailLevel); + if (!derivedExplain.isEmpty()) output.append(derivedExplain); } if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) { output.append(getStatsExplainString(detailPrefix)).append("\n"); @@ -2016,6 +2021,14 @@ public class HdfsScanNode extends ScanNode { return output.toString(); } + // Overriding this function can be used to add extra information to the explain string + // of the HDFS Scan node from the derived classes (e.g. IcebergScanNode). Each new line + // in the output should be appended to 'explainLevel' to have the correct indentation. + protected String getDerivedExplainString( + String indentPrefix, TExplainLevel detailLevel) { + return ""; + } + // Helper method that prints min max original conjuncts by tuple descriptor. private String getMinMaxOriginalConjunctsExplainString( String prefix, TExplainLevel detailLevel) { diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java index 4551355d5..c2a028aac 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java @@ -36,9 +36,9 @@ import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.Type; -import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.fb.FbIcebergDataFileFormat; +import org.apache.impala.thrift.TExplainLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,15 +52,20 @@ public class IcebergScanNode extends HdfsScanNode { private final static Logger LOG = LoggerFactory.getLogger(IcebergScanNode.class); private List<FileDescriptor> fileDescs_; + // Conjuncts on columns not involved in IDENTITY-partitioning. Subset of 'conjuncts_', // but this does not include conjuncts on IDENTITY-partitioned columns, because such // conjuncts have already been pushed to Iceberg to filter out partitions/files, so // they don't have further selectivity on the surviving files. private List<Expr> nonIdentityConjuncts_; + // Conjuncts that will be skipped from pushing down to the scan node because Iceberg + // already applied them and they won't filter any further rows. + private List<Expr> skippedConjuncts_; + public IcebergScanNode(PlanNodeId id, TableRef tblRef, List<Expr> conjuncts, MultiAggregateInfo aggInfo, List<FileDescriptor> fileDescs, - List<Expr> nonIdentityConjuncts) + List<Expr> nonIdentityConjuncts, List<Expr> skippedConjuncts) throws ImpalaRuntimeException { super(id, tblRef.getDesc(), conjuncts, getIcebergPartition(((FeIcebergTable)tblRef.getTable()).getFeFsTable()), tblRef, @@ -90,6 +95,7 @@ public class IcebergScanNode extends HdfsScanNode { if (hasParquet) fileFormats_.add(HdfsFileFormat.PARQUET); if (hasOrc) fileFormats_.add(HdfsFileFormat.ORC); if (hasAvro) fileFormats_.add(HdfsFileFormat.AVRO); + this.skippedConjuncts_ = skippedConjuncts; } /** @@ -216,4 +222,14 @@ public class IcebergScanNode extends HdfsScanNode { result.put(sampledPartitionMetadata, sampleFiles); return result; } + + @Override + protected String getDerivedExplainString( + String indentPrefix, TExplainLevel detailLevel) { + if (!skippedConjuncts_.isEmpty()) { + return indentPrefix + String.format("skipped Iceberg predicates: %s\n", + Expr.getExplainString(skippedConjuncts_, detailLevel)); + } + return ""; + } } diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index 29accb498..806cdf62e 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -39,6 +39,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Expression.Operation; +import org.apache.iceberg.expressions.True; import org.apache.iceberg.expressions.UnboundPredicate; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; @@ -106,8 +107,14 @@ public class IcebergScanPlanner { private List<Expr> conjuncts_; private MultiAggregateInfo aggInfo_; - // Exprs in icebergConjuncts_ converted to Expression. + // Iceberg compatible expressions that are pushed down to Iceberg for query planning. private final List<Expression> icebergPredicates_ = new ArrayList<>(); + // The Impala representation of the expressions in 'icebergPredicates_' + private final List<Expr> impalaIcebergPredicates_ = new ArrayList<>(); + // Indicates whether we have to push down 'impalaIcebergPredicates' to Impala's scan + // node or has Iceberg already done the partition pruning and no further rows could be + // skipped using these filters. + private boolean canSkipPushingDownIcebergPredicates_ = false; private List<FileDescriptor> dataFilesWithoutDeletes_ = new ArrayList<>(); private List<FileDescriptor> dataFilesWithDeletes_ = new ArrayList<>(); @@ -135,14 +142,15 @@ public class IcebergScanPlanner { } public PlanNode createIcebergScanPlan() throws ImpalaException { - analyzer_.materializeSlots(conjuncts_); - if (!needIcebergForPlanning()) { + analyzer_.materializeSlots(conjuncts_); setFileDescriptorsBasedOnFileStore(); return createIcebergScanPlanImpl(); } filterFileDescriptors(); + filterConjuncts(); + analyzer_.materializeSlots(conjuncts_); return createIcebergScanPlanImpl(); } @@ -180,7 +188,8 @@ public class IcebergScanPlanner { // If there are no delete files we can just create a single SCAN node. Preconditions.checkState(dataFilesWithDeletes_.isEmpty()); PlanNode ret = new IcebergScanNode(ctx_.getNextNodeId(), tblRef_, conjuncts_, - aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_); + aggInfo_, dataFilesWithoutDeletes_, nonIdentityConjuncts_, + getSkippedConjuncts()); ret.init(analyzer_); return ret; } @@ -199,7 +208,7 @@ public class IcebergScanPlanner { // can just create a SCAN node for these and do a UNION ALL with the ANTI JOIN. IcebergScanNode dataScanNode = new IcebergScanNode( ctx_.getNextNodeId(), tblRef_, conjuncts_, aggInfo_, dataFilesWithoutDeletes_, - nonIdentityConjuncts_); + nonIdentityConjuncts_, getSkippedConjuncts()); dataScanNode.init(analyzer_); List<Expr> outputExprs = tblRef_.getDesc().getSlots().stream().map( entry -> new SlotRef(entry)).collect(Collectors.toList()); @@ -233,12 +242,16 @@ public class IcebergScanPlanner { addDeletePositionSlots(deleteDeltaRef); IcebergScanNode dataScanNode = new IcebergScanNode( dataScanNodeId, tblRef_, conjuncts_, aggInfo_, dataFilesWithDeletes_, - nonIdentityConjuncts_); + nonIdentityConjuncts_, getSkippedConjuncts()); dataScanNode.init(analyzer_); IcebergScanNode deleteScanNode = new IcebergScanNode( - deleteScanNodeId, deleteDeltaRef, /*conjuncts=*/Collections.emptyList(), - aggInfo_, Lists.newArrayList(deleteFiles_), - /*nonIdentityConjuncts=*/Collections.emptyList()); + deleteScanNodeId, + deleteDeltaRef, + Collections.emptyList(), /*conjuncts*/ + aggInfo_, + Lists.newArrayList(deleteFiles_), + Collections.emptyList(), /*nonIdentityConjuncts*/ + Collections.emptyList()); /*skippedConjuncts*/ deleteScanNode.init(analyzer_); // Now let's create the JOIN node @@ -336,10 +349,15 @@ public class IcebergScanPlanner { private void filterFileDescriptors() throws ImpalaException { TimeTravelSpec timeTravelSpec = tblRef_.getTimeTravelSpec(); + canSkipPushingDownIcebergPredicates_ = true; try (CloseableIterable<FileScanTask> fileScanTasks = IcebergUtil.planFiles( - getIceTable(), icebergPredicates_, timeTravelSpec)) { + getIceTable(), new ArrayList<Expression>(icebergPredicates_), timeTravelSpec)) { long dataFilesCacheMisses = 0; for (FileScanTask fileScanTask : fileScanTasks) { + Expression residualExpr = fileScanTask.residual(); + if (residualExpr != null && !(residualExpr instanceof True)) { + canSkipPushingDownIcebergPredicates_ = false; + } Pair<FileDescriptor, Boolean> fileDesc = getFileDescriptor(fileScanTask.file()); if (!fileDesc.second) ++dataFilesCacheMisses; if (fileScanTask.deletes().isEmpty()) { @@ -360,7 +378,6 @@ public class IcebergScanPlanner { } } } - if (dataFilesCacheMisses > 0) { Preconditions.checkState(timeTravelSpec != null); LOG.info("File descriptors had to be loaded on demand during time travel: " + @@ -374,6 +391,17 @@ public class IcebergScanPlanner { updateDeleteStatistics(); } + private void filterConjuncts() { + if (canSkipPushingDownIcebergPredicates_) { + conjuncts_.removeAll(impalaIcebergPredicates_); + } + } + + private List<Expr> getSkippedConjuncts() { + if (!canSkipPushingDownIcebergPredicates_) return Collections.emptyList(); + return impalaIcebergPredicates_; + } + private void updateDeleteStatistics() { for (FileDescriptor fd : dataFilesWithDeletes_) { updateDataFilesWithDeletesStatistics(fd); @@ -584,6 +612,7 @@ public class IcebergScanPlanner { Expression predicate = convertIcebergPredicate(expr); if (predicate != null) { icebergPredicates_.add(predicate); + impalaIcebergPredicates_.add(expr); LOG.debug("Push down the predicate: " + predicate + " to iceberg"); return true; } diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index 54eb40393..6b4eba927 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -1264,6 +1264,15 @@ public class PlannerTest extends PlannerTestBase { PlannerTestOption.DO_NOT_VALIDATE_ROWCOUNT_ESTIMATION_FOR_PARTITIONS)); } + /** + * Checks exercising predicate pushdown with Iceberg tables. + */ + @Test + public void testIcebergPredicates() { + runPlannerTestFile("iceberg-predicates", "functional_parquet", + ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY)); + } + /** * Check that Iceberg V2 table scans work as expected. */ diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index 507600762..90e842ea8 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -3256,6 +3256,32 @@ hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/i ---- DATASET functional ---- BASE_TABLE_NAME +iceberg_partition_evolution +---- CREATE +CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} +(id int, int_col int, string_col string, date_string_col string, year int, month int) +PARTITIONED BY SPEC (year, truncate(4, date_string_col)) +STORED AS ICEBERG; +---- DEPENDENT_LOAD +# We can use 'date_string_col' as it is once IMPALA-11954 is done. +INSERT INTO {db_name}{db_suffix}.iceberg_partition_evolution + SELECT id, int_col, string_col, regexp_replace(date_string_col, '/', ''), year, month + FROM {db_name}{db_suffix}.alltypes; +ALTER TABLE {db_name}{db_suffix}.iceberg_partition_evolution + SET PARTITION SPEC (year, truncate(4, date_string_col), month); +INSERT INTO {db_name}{db_suffix}.iceberg_partition_evolution + SELECT + cast(id + 7300 as int), + int_col, + string_col, + regexp_replace(date_string_col, '/', ''), + year, + month + FROM {db_name}{db_suffix}.alltypes; +==== +---- DATASET +functional +---- BASE_TABLE_NAME airports_orc ---- CREATE CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv index 4c23e23dd..ee8bee76b 100644 --- a/testdata/datasets/functional/schema_constraints.csv +++ b/testdata/datasets/functional/schema_constraints.csv @@ -78,6 +78,7 @@ table_name:iceberg_alltypes_part, constraint:restrict_to, table_format:parquet/n table_name:iceberg_alltypes_part_orc, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_legacy_partition_schema_evolution, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_legacy_partition_schema_evolution_orc, constraint:restrict_to, table_format:parquet/none/none +table_name:iceberg_partition_evolution, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_timestamp_part, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_timestamptz_part, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_uppercase_col, constraint:restrict_to, table_format:parquet/none/none diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test new file mode 100644 index 000000000..a682813a0 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-predicates.test @@ -0,0 +1,77 @@ +# A predicate on a partition that is present both before and after partition evolution is +# not pushed down to scan node because Iceberg already filtered out the associated rows. +# Additionally, the slot associated with this predicate is not materialized. +SELECT id, int_col, string_col from iceberg_partition_evolution where year = 2010; +---- PLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional_parquet.iceberg_partition_evolution] + HDFS partitions=1/1 files=730 size=1.25MB + skipped Iceberg predicates: `year` = 2010 + row-size=20B cardinality=7.30K +==== +# A predicate on a partition that is introduced by partition evolution is pushed down to +# the scan node. Also the associated slot is materialized. +SELECT id, int_col, string_col from iceberg_partition_evolution where month = 1; +---- PLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional_parquet.iceberg_partition_evolution] + HDFS partitions=1/1 files=124 size=216.63KB + predicates: `month` = 1 + row-size=24B cardinality=1.24K +==== +# The predicates that couldn't be pushed to Iceberg are pushed down to the scan node, +# while the ones that are pushed to Iceberg could be skipped from pushing down to +# Impala's scan node if they won't filter any further rows. +SELECT id, int_col, string_col from iceberg_partition_evolution where year = 2010 and power(id, 3) > 1000; +---- PLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional_parquet.iceberg_partition_evolution] + HDFS partitions=1/1 files=730 size=1.25MB + predicates: power(id, 3) > 1000 + skipped Iceberg predicates: `year` = 2010 + row-size=20B cardinality=730 +==== +# Here both predicates are pushed to Iceberg and also to Impala's scan node. However, +# here is a room for optimisation as we could skip pushing down 'year' to the scan node +# as it won't filter further rows. +SELECT id, int_col, string_col from iceberg_partition_evolution where year = 2010 and id > 1000; +---- PLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional_parquet.iceberg_partition_evolution] + HDFS partitions=1/1 files=730 size=1.25MB + predicates: `year` = 2010, id > 1000 + row-size=24B cardinality=730 +==== +# If we have predicates on partition columns with non-identity transform that could not +# be pushed to Iceberg then all the predicates are also pushed to Impala's scan node. +# However, here is a room for optimisation as we could skip pushing down 'year' to the +# scan node as it won't filter further rows. +SELECT * FROM iceberg_partition_evolution +WHERE year = 2010 AND date_string_col='061610'; +---- PLAN +PLAN-ROOT SINK +| +00:SCAN HDFS [functional_parquet.iceberg_partition_evolution] + HDFS partitions=1/1 files=2 size=3.49KB + predicates: `year` = 2010, date_string_col = '061610' + row-size=40B cardinality=2 +==== +# Checks when all the predicates are skipped in a count(*) query then the relevant +# optimization kicks in for Parquet scanners. +SELECT COUNT(*) FROM functional_parquet.iceberg_partitioned WHERE action = 'click'; +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.iceberg_partitioned.stats: num_rows) +| row-size=8B cardinality=1 +| +00:SCAN HDFS [functional_parquet.iceberg_partitioned] + HDFS partitions=1/1 files=6 size=6.85KB + skipped Iceberg predicates: action = 'click' + row-size=8B cardinality=6 +==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test index 74315a408..30c45b779 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-tables.test @@ -809,12 +809,12 @@ PLAN-ROOT SINK runtime filters: RF000 -> i row-size=36B cardinality=4 ==== -select * from iceberg_v2_partitioned_position_deletes where action = 'download'; +select * from iceberg_v2_partitioned_position_deletes where action = 'download' and id > 0; ---- PLAN PLAN-ROOT SINK | 02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN] -| row-size=64B cardinality=6 +| row-size=64B cardinality=1 | |--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] | HDFS partitions=1/1 files=1 size=3.18KB @@ -822,15 +822,15 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] HDFS partitions=1/1 files=1 size=1.17KB - predicates: action = 'download' - row-size=64B cardinality=6 + predicates: id > 0, action = 'download' + row-size=64B cardinality=1 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 04:EXCHANGE [UNPARTITIONED] | 02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST] -| row-size=64B cardinality=6 +| row-size=64B cardinality=1 | |--03:EXCHANGE [BROADCAST] | | @@ -840,8 +840,8 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] HDFS partitions=1/1 files=1 size=1.17KB - predicates: action = 'download' - row-size=64B cardinality=6 + predicates: id > 0, action = 'download' + row-size=64B cardinality=1 ==== select * from iceberg_v2_partitioned_position_deletes where action = 'download' and user = 'Lisa'; @@ -878,14 +878,14 @@ PLAN-ROOT SINK predicates: `user` = 'Lisa', action = 'download' row-size=64B cardinality=1 ==== -select event_time, action from iceberg_partitioned where action = 'click' or action = 'view'; +select event_time, action from iceberg_partitioned where (action = 'click' or action = 'view') and id > 0; ---- PLAN PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=14 size=15.93KB - predicates: action IN ('click', 'view') - row-size=28B cardinality=14 + predicates: id > 0, action IN ('click', 'view') + row-size=32B cardinality=1 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -893,17 +893,17 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=14 size=15.93KB - predicates: action IN ('click', 'view') - row-size=28B cardinality=14 + predicates: id > 0, action IN ('click', 'view') + row-size=32B cardinality=1 ==== -select event_time, action from iceberg_partitioned where action in ('click', 'view'); +select event_time, action from iceberg_partitioned where action in ('click', 'view') and id > 0; ---- PLAN PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=14 size=15.93KB - predicates: action IN ('click', 'view') - row-size=28B cardinality=14 + predicates: id > 0, action IN ('click', 'view') + row-size=32B cardinality=1 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -911,17 +911,17 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=14 size=15.93KB - predicates: action IN ('click', 'view') - row-size=28B cardinality=14 + predicates: id > 0, action IN ('click', 'view') + row-size=32B cardinality=1 ==== -select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action = 'click'; +select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action = 'click') and id > 0; ---- PLAN PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=6 size=6.85KB - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' - row-size=28B cardinality=6 + predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click') + row-size=32B cardinality=1 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -929,17 +929,17 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=6 size=6.85KB - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' - row-size=28B cardinality=6 + predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click') + row-size=32B cardinality=1 ==== -select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action = 'click' or action = 'view'; +select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action = 'click' or action = 'view') and id > 0; ---- PLAN PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=14 size=15.93KB - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view' - row-size=28B cardinality=14 + predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view') + row-size=32B cardinality=1 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -947,17 +947,17 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=14 size=15.93KB - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view' - row-size=28B cardinality=14 + predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action = 'click' OR action = 'view') + row-size=32B cardinality=1 ==== -select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action in ('click', 'view'); +select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action in ('click', 'view')) and id > 0; ---- PLAN PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=14 size=15.93KB - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view') - row-size=28B cardinality=14 + predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view')) + row-size=32B cardinality=1 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -965,17 +965,17 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=14 size=15.93KB - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view') - row-size=28B cardinality=14 + predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action IN ('click', 'view')) + row-size=32B cardinality=1 ==== -select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00' or action > 'a'; +select event_time, action from iceberg_partitioned where (event_time='2020-01-01 11:00:00' or action > 'a') and id > 0; ---- PLAN PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=20 size=22.90KB - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a' - row-size=28B cardinality=20 + predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a') + row-size=32B cardinality=2 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -983,16 +983,17 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=20 size=22.90KB - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a' - row-size=28B cardinality=20 + predicates: id > 0, (event_time = TIMESTAMP '2020-01-01 11:00:00' OR action > 'a') + row-size=32B cardinality=2 ==== +# All predicates are pushed down to Iceberg and won't filter any further rows. Skip pushing it to Scan node. select event_time, action from iceberg_partitioned where event_time='2020-01-01 11:00:00'; ---- PLAN PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=0 size=0B - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' + skipped Iceberg predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' row-size=28B cardinality=0 ---- DISTRIBUTEDPLAN PLAN-ROOT SINK @@ -1001,6 +1002,41 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=0 size=0B - predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' + skipped Iceberg predicates: event_time = TIMESTAMP '2020-01-01 11:00:00' row-size=28B cardinality=0 ==== +# Similar as above but on a table with positional deletes on all data files. +select * from iceberg_v2_partitioned_position_deletes where action = 'download'; +---- PLAN +PLAN-ROOT SINK +| +02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN] +| row-size=64B cardinality=6 +| +|--01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] +| HDFS partitions=1/1 files=1 size=3.18KB +| row-size=207B cardinality=2 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] + HDFS partitions=1/1 files=1 size=1.17KB + skipped Iceberg predicates: action = 'download' + row-size=64B cardinality=6 +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +04:EXCHANGE [UNPARTITIONED] +| +02:DELETE EVENTS HASH JOIN [LEFT ANTI JOIN, BROADCAST] +| row-size=64B cardinality=6 +| +|--03:EXCHANGE [BROADCAST] +| | +| 01:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE-01 functional_parquet.iceberg_v2_partitioned_position_deletes-position-delete] +| HDFS partitions=1/1 files=1 size=3.18KB +| row-size=207B cardinality=2 +| +00:SCAN HDFS [functional_parquet.iceberg_v2_partitioned_position_deletes] + HDFS partitions=1/1 files=1 size=1.17KB + skipped Iceberg predicates: action = 'download' + row-size=64B cardinality=6 +==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test index 201f72371..d4c347c76 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test @@ -266,7 +266,7 @@ PLAN-ROOT SINK tuple-ids=0 row-size=44B cardinality=3 in pipelines: 00(GETNEXT) ==== -# Sampling Iceberg tables. Count(*) is not optimized. +# Sampling Iceberg tables. Count(*) is optimized. select count(*) from functional_parquet.iceberg_non_partitioned tablesample system(10) repeatable(1234) ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -276,7 +276,7 @@ PLAN-ROOT SINK | mem-estimate=0B mem-reservation=0B thread-reservation=0 | 01:AGGREGATE [FINALIZE] -| output: count(*) +| output: sum_init_zero(functional_parquet.iceberg_non_partitioned.stats: num_rows) | mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0 | tuple-ids=1 row-size=8B cardinality=1 | in pipelines: 01(GETNEXT), 00(OPEN) @@ -288,7 +288,7 @@ PLAN-ROOT SINK columns: all extrapolated-rows=disabled max-scan-range-rows=6 mem-estimate=32.00MB mem-reservation=8.00KB thread-reservation=1 - tuple-ids=0 row-size=0B cardinality=3 + tuple-ids=0 row-size=8B cardinality=20 in pipelines: 00(GETNEXT) ==== # Sampling partitioned Iceberg tables. @@ -313,7 +313,7 @@ PLAN-ROOT SINK # Sampling Iceberg tables with predicates. Predicate pushdown to Iceberg happens # before sampling (similarly to static partition pruning). select * from functional_parquet.iceberg_partitioned tablesample system(50) repeatable(1234) -where action = 'click' +where action = 'click' and id > 0 ---- PLAN F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 | Per-Host Resources: mem-estimate=68.00MB mem-reservation=4.03MB thread-reservation=2 @@ -323,13 +323,13 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.iceberg_partitioned] HDFS partitions=1/1 files=4 size=4.57KB - predicates: action = 'click' + predicates: id > CAST(0 AS INT), action = 'click' stored statistics: table: rows=20 size=22.90KB columns: unavailable extrapolated-rows=disabled max-scan-range-rows=5 - parquet statistics predicates: action = 'click' - parquet dictionary predicates: action = 'click' + parquet statistics predicates: id > CAST(0 AS INT), action = 'click' + parquet dictionary predicates: id > CAST(0 AS INT), action = 'click' mem-estimate=64.00MB mem-reservation=32.00KB thread-reservation=1 tuple-ids=0 row-size=44B cardinality=4 in pipelines: 00(GETNEXT) diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test index 5e1bee9ea..fcec4cb69 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test @@ -47,26 +47,43 @@ STRING, STRING, STRING, STRING ---- QUERY # Start testing predicate push-down for int column # The IN predicate matches all row group +# Note, the filter on a non-partition col is needed in the below tests because without it Iceberg +# could do the filtering for us and predicate pushdown to the scanners wouldn't be needed. select count(1) from ice_pred_pd1 where - col_i in (0, 1, 2); + col_i in (0, 1, 2) and col_bi > 0; ---- RESULTS 9 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 3 ==== ---- QUERY -# The IN predicate matches two row group +# Filtering only on a partition column is done by Iceberg and in Impala we can get the results +# simply using file metadata. +select + count(1) +from + ice_pred_pd1 +where + col_i in (0, 1, 2); +---- RESULTS +9 +---- RUNTIME_PROFILE +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumFileMetadataRead): 3 +==== +---- QUERY +# The IN predicate matches two row groups # InList: constant expr select count(1) from ice_pred_pd1 where - col_i in (ceil(-0.1), 1 * 2); + col_i in (ceil(-0.1), 1 * 2) and col_bi > 0; ---- RESULTS 6 ---- RUNTIME_PROFILE @@ -79,7 +96,7 @@ select from ice_pred_pd1 where - col_i in (0); + col_i in (0) and col_bi > 0; ---- RESULTS 3 ---- RUNTIME_PROFILE @@ -92,7 +109,7 @@ select from ice_pred_pd1 where - col_i in (-1, 3); + col_i in (-1, 3) and col_bi > 0; ---- RESULTS 0 ---- RUNTIME_PROFILE @@ -422,13 +439,27 @@ select from ice_pred_pd1 where - col_i not in (0, 1) and col_i >= 0; + col_i not in (0, 1) and col_i >= 0 and col_bi > 0; ---- RESULTS 3 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 1 ==== ---- QUERY +# NOT_IN could be answered using file metadata if only partition cols are included +select + count(1) +from + ice_pred_pd1 +where + col_i not in (0, 1) and col_i >= 0; +---- RESULTS +3 +---- RUNTIME_PROFILE +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumFileMetadataRead): 1 +==== +---- QUERY # NOT_IN does not work because col_dt is not the partition column select count(1) diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test index faeef6e4f..a1b955a94 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test @@ -173,11 +173,23 @@ aggregation(SUM, NumRowGroups): 0 aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY +# When filtered only by partition column Iceberg can do the filtering and no need to read data in Impala. select count(*) from ice_bigints where i = 0 and j = 0; ---- RESULTS 1217 ---- RUNTIME_PROFILE +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, RowsRead): 0 +aggregation(SUM, NumFileMetadataRead): 1 +==== +---- QUERY +# When not just partition columns are involved in the filtering then Impala has to read data to answer the query. +select count(*) from ice_bigints +where i = 0 and j = 0 and k >= 0; +---- RESULTS +1217 +---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 1 aggregation(SUM, RowsRead): 1217 ==== @@ -276,7 +288,8 @@ where bool_col = true; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 4 +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumFileMetadataRead): 4 ==== ---- QUERY select count(*) from alltypes_part @@ -286,7 +299,8 @@ where float_col = 0; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 4 +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumFileMetadataRead): 4 ==== ---- QUERY select count(*) from alltypes_part @@ -296,7 +310,8 @@ where double_col = 0; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 4 +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumFileMetadataRead): 4 ==== ---- QUERY select count(*) from alltypes_part @@ -306,7 +321,8 @@ where date_col = '2009-01-01'; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 2 +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumFileMetadataRead): 2 ==== ---- QUERY select count(*) from alltypes_part @@ -316,7 +332,8 @@ where string_col = '0'; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 4 +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumFileMetadataRead): 4 ==== ---- QUERY # 'timestamp_col' is not a partitioning column, so min/max stats will not be used to diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test index 5a3a19656..daeb1e440 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test @@ -144,6 +144,7 @@ explain select 123, count(*), 321 from ice_tbl; 'SMALLINT)' ==== ---- QUERY +# Filtering by a partition column results in Iceberg doing the filtering instead of Impala. select count(*) from @@ -153,8 +154,8 @@ where ---- RESULTS 4 ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 2 -aggregation(SUM, NumFileMetadataRead): 0 +aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumFileMetadataRead): 2 ==== ---- QUERY select @@ -232,4 +233,4 @@ BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 aggregation(SUM, NumFileMetadataRead): 0 -==== \ No newline at end of file +====
