This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 61d12e5ceab branch-4.1: [fix](iceberg) Fix NPE in COUNT(*) pushdown
when snapshot summary omits total-* counters (#64648) (#65061)
61d12e5ceab is described below
commit 61d12e5ceabb8b5c012dd40bd4ccee01ef7eb77c
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Wed Jul 1 17:40:09 2026 +0800
branch-4.1: [fix](iceberg) Fix NPE in COUNT(*) pushdown when snapshot
summary omits total-* counters (#64648) (#65061)
## Proposed changes
Backport of #64648 to branch-4.1.
`IcebergScanNode.getCountFromSnapshot()` read `total-equality-deletes` /
`total-position-deletes` / `total-records` from the Iceberg snapshot
summary and
called `.equals()` / `Long.parseLong()` directly on the `Map.get()`
results. An
Iceberg snapshot summary is **not guaranteed** to carry these `total-*`
counters
(snapshots written by compaction/replace, or some writers, may omit
them), so
`SELECT COUNT(*)` threw a `NullPointerException` on such tables while
`SELECT *`
worked fine.
### Fix
- Extract the summary parsing into a pure static
`IcebergUtils.getCountFromSummary(summary, ignoreDanglingDelete)` that
null-checks the counters and falls back to a normal scan (`return -1`)
when any
required counter is absent, and reuse it from both `IcebergScanNode` and
`IcebergUtils.getIcebergRowCount()`.
- BE: `IcebergTableReader::init_row_filters()` accepts a table-level row
count of
`0` (`>= 0` instead of `> 0`) so a genuine pushed-down count of 0 takes
the
CountReader fast path. Applied to this branch's
`be/src/format/table/iceberg_reader.cpp` (master's
`iceberg_reader_mixin.h` does not exist on this branch).
Unit test `IcebergCountPushDownTest` covers the missing-counter,
no-delete,
equality-delete, position-delete and zero-count cases.
Co-authored-by: Raghvendra Singh <[email protected]>
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
be/src/format/table/iceberg_reader.cpp | 8 +-
.../doris/datasource/iceberg/IcebergUtils.java | 35 ++++++++-
.../datasource/iceberg/source/IcebergScanNode.java | 19 +----
.../iceberg/source/IcebergCountPushDownTest.java | 86 ++++++++++++++++++++++
4 files changed, 127 insertions(+), 21 deletions(-)
diff --git a/be/src/format/table/iceberg_reader.cpp
b/be/src/format/table/iceberg_reader.cpp
index aaf80dc9a2f..a6647770aad 100644
--- a/be/src/format/table/iceberg_reader.cpp
+++ b/be/src/format/table/iceberg_reader.cpp
@@ -199,8 +199,12 @@ Status IcebergTableReader::get_next_block_inner(Block*
block, size_t* read_rows,
}
Status IcebergTableReader::init_row_filters() {
- // We get the count value by doris's be, so we don't need to read the
delete file
- if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_table_level_row_count > 0) {
+ // We get the count value by doris's be, so we don't need to read the
delete file.
+ // A table-level row count of 0 (e.g. an all-deleted table read with
ignore_iceberg_dangling_delete,
+ // where total-records == total-position-deletes) is still a valid
pushed-down count, so accept >= 0.
+ // FE sends -1 when there is no table-level count; using > 0 here would
drop a genuine 0 into the
+ // delete-applying path below and never produce the intended
CountReader(0).
+ if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_table_level_row_count >= 0) {
return Status::OK();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index df4adead31b..676dfd258f1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -1063,6 +1063,34 @@ public class IcebergUtils {
return resSchema;
}
+ /**
+ * Decide whether a row count can be read from an Iceberg snapshot summary.
+ * Returns {@link TableIf#UNKNOWN_ROW_COUNT} when required counters are
absent
+ * or when delete semantics make the summary count unsafe to use.
+ */
+ @VisibleForTesting
+ public static long getCountFromSummary(Map<String, String> summary,
boolean ignoreDanglingDelete) {
+ String equalityDeletes = summary.get(TOTAL_EQUALITY_DELETES);
+ String positionDeletes = summary.get(TOTAL_POSITION_DELETES);
+ String totalRecords = summary.get(TOTAL_RECORDS);
+ if (equalityDeletes == null || positionDeletes == null || totalRecords
== null) {
+ return TableIf.UNKNOWN_ROW_COUNT;
+ }
+ if (!equalityDeletes.equals("0")) {
+ return TableIf.UNKNOWN_ROW_COUNT;
+ }
+
+ long deleteCount = Long.parseLong(positionDeletes);
+ if (deleteCount == 0) {
+ return Long.parseLong(totalRecords);
+ }
+ if (ignoreDanglingDelete) {
+ return Long.parseLong(totalRecords) - deleteCount;
+ } else {
+ return TableIf.UNKNOWN_ROW_COUNT;
+ }
+ }
+
/**
* Estimate iceberg table row count.
* Get the row count by adding all task file recordCount.
@@ -1082,7 +1110,12 @@ public class IcebergUtils {
return TableIf.UNKNOWN_ROW_COUNT;
}
Map<String, String> summary = snapshot.summary();
- long rows = Long.parseLong(summary.get(TOTAL_RECORDS)) -
Long.parseLong(summary.get(TOTAL_POSITION_DELETES));
+ long rows = getCountFromSummary(summary, true);
+ if (rows == TableIf.UNKNOWN_ROW_COUNT) {
+ LOG.info("Iceberg table {}.{}.{} row count in summary is unknown,
return -1.",
+ tbl.getCatalog().getName(), tbl.getDbName(),
tbl.getName());
+ return TableIf.UNKNOWN_ROW_COUNT;
+ }
LOG.info("Iceberg table {}.{}.{} row count in summary is {}",
tbl.getCatalog().getName(), tbl.getDbName(), tbl.getName(),
rows);
return rows;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index e8763a09ca0..bb1110687fc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -1067,24 +1067,7 @@ public class IcebergScanNode extends FileQueryScanNode {
return 0;
}
- Map<String, String> summary = snapshot.summary();
- if (!summary.get(IcebergUtils.TOTAL_EQUALITY_DELETES).equals("0")) {
- // has equality delete files, can not push down count
- return -1;
- }
-
- long deleteCount =
Long.parseLong(summary.get(IcebergUtils.TOTAL_POSITION_DELETES));
- if (deleteCount == 0) {
- // no delete files, can push down count directly
- return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS));
- }
- if (sessionVariable.ignoreIcebergDanglingDelete) {
- // has position delete files, if we ignore dangling delete, can
push down count
- return Long.parseLong(summary.get(IcebergUtils.TOTAL_RECORDS)) -
deleteCount;
- } else {
- // otherwise, can not push down count
- return -1;
- }
+ return IcebergUtils.getCountFromSummary(snapshot.summary(),
sessionVariable.ignoreIcebergDanglingDelete);
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java
new file mode 100644
index 00000000000..19ad6bece44
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergCountPushDownTest.java
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.source;
+
+import org.apache.doris.datasource.iceberg.IcebergUtils;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class IcebergCountPushDownTest {
+
+ private static Map<String, String> summary(String equalityDeletes, String
positionDeletes, String totalRecords) {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ if (equalityDeletes != null) {
+ builder.put(IcebergUtils.TOTAL_EQUALITY_DELETES, equalityDeletes);
+ }
+ if (positionDeletes != null) {
+ builder.put(IcebergUtils.TOTAL_POSITION_DELETES, positionDeletes);
+ }
+ if (totalRecords != null) {
+ builder.put(IcebergUtils.TOTAL_RECORDS, totalRecords);
+ }
+ return builder.build();
+ }
+
+ @Test
+ public void testMissingCounterFallsBackToScan() {
+ // Snapshots written by compaction/replace (and some writers) may omit
+ // total-* counters. The pushdown previously NPE'd on the missing key;
+ // it must now fall back to a normal scan (return -1).
+ Assertions.assertEquals(-1L,
IcebergUtils.getCountFromSummary(summary(null, "0", "100"), false));
+ Assertions.assertEquals(-1L,
IcebergUtils.getCountFromSummary(summary("0", null, "100"), false));
+ Assertions.assertEquals(-1L,
IcebergUtils.getCountFromSummary(summary("0", "0", null), false));
+ Assertions.assertEquals(-1L,
IcebergUtils.getCountFromSummary(Collections.emptyMap(), false));
+ }
+
+ @Test
+ public void testUtilityMissingCounterReturnsUnknownCount() {
+ Assertions.assertEquals(-1L,
IcebergUtils.getCountFromSummary(summary("0", null, "100"), true));
+ }
+
+ @Test
+ public void testNoDeletesPushesDownTotalRecords() {
+ Assertions.assertEquals(100L,
IcebergUtils.getCountFromSummary(summary("0", "0", "100"), false));
+ }
+
+ @Test
+ public void testEqualityDeletesCannotPushDown() {
+ Assertions.assertEquals(-1L,
IcebergUtils.getCountFromSummary(summary("3", "0", "100"), false));
+ }
+
+ @Test
+ public void testPositionDeletesRespectIgnoreDangling() {
+ // ignoreDanglingDelete = true -> total-records minus position-deletes
+ Assertions.assertEquals(90L,
IcebergUtils.getCountFromSummary(summary("0", "10", "100"), true));
+ // ignoreDanglingDelete = false -> cannot push down (fall back to scan)
+ Assertions.assertEquals(-1L,
IcebergUtils.getCountFromSummary(summary("0", "10", "100"), false));
+ }
+
+ @Test
+ public void testZeroCountWithPositionDeletesIsPushedDown() {
+ // total-records == position-deletes -> count is 0. With
ignore_iceberg_dangling_delete this
+ // is a valid pushed-down count; FE returns 0 and BE honors it via
CountReader(0) (the BE
+ // table-level guard accepts table_level_row_count >= 0). It must NOT
fall back to -1.
+ Assertions.assertEquals(0L,
IcebergUtils.getCountFromSummary(summary("0", "100", "100"), true));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]