This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a79dd88a5f [fix](iceberg)fix iceberg v3 row lineage count distinct 
error result (#63826)
7a79dd88a5f is described below

commit 7a79dd88a5f0b5c6e058f9c2f31d430174b83d07
Author: daidai <[email protected]>
AuthorDate: Mon Jun 1 11:12:53 2026 +0800

    [fix](iceberg)fix iceberg v3 row lineage count distinct error result 
(#63826)
    
    Critical checkpoint conclusions:
    
    Goal and proof: The PR fixes Parquet Iceberg v3 row-lineage-only queries 
where _row_id is synthesized without reading physical table columns. The code 
now keeps the row positions generated inside _read_empty_batch instead of 
recomputing them after _total_read_rows has advanced, and the regression adds 
distinct, group by, count(distinct), and ndv coverage for _row_id-only reads.
    Scope: The code change is small and focused on Parquet row-position 
preparation. The test addition is directly tied to the bug.
    Concurrency and lifecycle: No new shared state, threads, locks, static 
initialization, or lifecycle ownership changes were introduced.
    Configuration and compatibility: No new config, persisted format, RPC, or 
FE/BE protocol compatibility changes.
    Parallel code paths: The fix applies to the Parquet path where the bug is 
introduced by _read_empty_batch and _total_read_rows; the regression 
intentionally gates the aggregate-only check to Parquet.
    Conditional checks: The new _need_current_batch_row_positions() helper 
preserves the existing synthesized/generated handler condition and removes 
duplicated condition logic.
    Test coverage: Added external Iceberg regression coverage for 
row-lineage-only aggregate/distinct reads before and after an insert. I did not 
run the regression locally in this GitHub Actions review session.
    Observability, transactions, persistence, data writes, metrics, and memory 
tracking: Not applicable to this change.
    User focus: No additional user-provided focus points were supplied.
---
 be/src/format/parquet/vparquet_group_reader.cpp    | 30 +++++------
 be/src/format/parquet/vparquet_group_reader.h      |  4 +-
 ...test_iceberg_v3_row_lineage_query_insert.groovy | 63 ++++++++++++++++++++--
 3 files changed, 74 insertions(+), 23 deletions(-)

diff --git a/be/src/format/parquet/vparquet_group_reader.cpp 
b/be/src/format/parquet/vparquet_group_reader.cpp
index 016fc4eb9d7..030793e2ca0 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -326,17 +326,13 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
 
     // Process external table query task that select columns are all from path.
     if (_read_table_columns.empty()) {
-        bool modify_row_ids = false;
-        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof, 
&modify_row_ids));
+        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
 
         DCHECK(_table_format_reader);
         RETURN_IF_ERROR(_table_format_reader->on_fill_partition_columns(
                 block, *read_rows, _lazy_read_ctx.partition_col_names));
         RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
                 block, *read_rows, _lazy_read_ctx.missing_col_names));
-        if (_table_format_reader->has_synthesized_column_handlers()) {
-            RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
-        }
         RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, 
*read_rows));
         RETURN_IF_ERROR(_table_format_reader->fill_generated_columns(block, 
*read_rows));
         Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, 
block, block->columns());
@@ -357,8 +353,7 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
         RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
                 block, *read_rows, _lazy_read_ctx.missing_col_names));
 
-        if (_table_format_reader->has_synthesized_column_handlers() ||
-            _table_format_reader->has_generated_column_handlers()) {
+        if (_need_current_batch_row_positions()) {
             RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
         }
         RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, 
*read_rows));
@@ -639,8 +634,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
                 block, pre_read_rows, 
_lazy_read_ctx.predicate_partition_col_names));
         RETURN_IF_ERROR(_table_format_reader->on_fill_missing_columns(
                 block, pre_read_rows, 
_lazy_read_ctx.predicate_missing_col_names));
-        if (_table_format_reader->has_synthesized_column_handlers() ||
-            _table_format_reader->has_generated_column_handlers()) {
+        if (_need_current_batch_row_positions()) {
             RETURN_IF_ERROR(_get_current_batch_row_id(pre_read_rows));
         }
         RETURN_IF_ERROR(_table_format_reader->fill_synthesized_columns(block, 
pre_read_rows));
@@ -949,9 +943,13 @@ Status RowGroupReader::_get_block_column_pos(const Block& 
block, const std::stri
     return Status::OK();
 }
 
-Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, 
bool* batch_eof,
-                                         bool* modify_row_ids) {
-    *modify_row_ids = false;
+bool RowGroupReader::_need_current_batch_row_positions() const {
+    DCHECK(_table_format_reader);
+    return _table_format_reader->has_synthesized_column_handlers() ||
+           _table_format_reader->has_generated_column_handlers();
+}
+
+Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, 
bool* batch_eof) {
     if (_position_delete_ctx.has_filter) {
         int64_t start_row_id = _position_delete_ctx.current_row_id;
         int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + 
(int64_t)batch_size,
@@ -975,9 +973,7 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, 
size_t* read_rows, b
         _position_delete_ctx.current_row_id = end_row_id;
         *batch_eof = _position_delete_ctx.current_row_id == 
_position_delete_ctx.last_row_id;
 
-        if (_table_format_reader->has_synthesized_column_handlers() ||
-            _table_format_reader->has_generated_column_handlers()) {
-            *modify_row_ids = true;
+        if (_need_current_batch_row_positions()) {
             _current_batch_row_ids.clear();
             _current_batch_row_ids.resize(*read_rows);
             size_t idx = 0;
@@ -1000,9 +996,7 @@ Status RowGroupReader::_read_empty_batch(size_t 
batch_size, size_t* read_rows, b
             _remaining_rows = 0;
             *batch_eof = true;
         }
-        if (_table_format_reader->has_synthesized_column_handlers() ||
-            _table_format_reader->has_generated_column_handlers()) {
-            *modify_row_ids = true;
+        if (_need_current_batch_row_positions()) {
             RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
         }
     }
diff --git a/be/src/format/parquet/vparquet_group_reader.h 
b/be/src/format/parquet/vparquet_group_reader.h
index e9eb5370e02..79b171ca064 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -227,8 +227,7 @@ protected:
     }
 
 private:
-    Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* 
batch_eof,
-                             bool* modify_row_ids);
+    Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* 
batch_eof);
 
     Status _read_column_data(Block* block, const std::vector<std::string>& 
columns,
                              size_t batch_size, size_t* read_rows, bool* 
batch_eof,
@@ -255,6 +254,7 @@ private:
                                   const IColumn::Filter& filter);
 
     bool _can_filter_by_dict(int slot_id, const tparquet::ColumnMetaData& 
column_metadata);
+    bool _need_current_batch_row_positions() const;
     bool is_dictionary_encoded(const tparquet::ColumnMetaData& 
column_metadata);
     Status _rewrite_dict_predicates();
     Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int 
slot_id, bool is_nullable);
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
index 7276fadba76..28e28bdd887 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
@@ -140,6 +140,56 @@ suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_
         assertEquals(expectedIds[2], 
combinedPredicate[1][0].toString().toInteger())
     }
 
+    def assertRowLineageOnlyAggregatesReadable = { tableName, expectedRowCount 
->
+        def rowIdsWithPhysicalColumn = sql("""
+            select _row_id, id
+            from ${tableName}
+            order by _row_id
+        """).collect { row -> row[0].toString().toLong() }
+        log.info("Checking row lineage only baseline for ${tableName}: 
rowIds=${rowIdsWithPhysicalColumn}")
+        assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.size())
+        assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.toSet().size())
+
+        def distinctRowIdsWithPhysicalColumn = sql("""
+            select distinct _row_id, id
+            from ${tableName}
+            order by _row_id
+        """).collect { row -> row[0].toString().toLong() }
+        log.info("""Checking distinct _row_id with physical column for 
${tableName}: distinctRowIds=${distinctRowIdsWithPhysicalColumn}""")
+        assertEquals(rowIdsWithPhysicalColumn, 
distinctRowIdsWithPhysicalColumn)
+
+        def distinctRowIds = sql("""
+            select distinct _row_id
+            from ${tableName}
+            order by _row_id
+        """).collect { row -> row[0].toString().toLong() }
+        log.info("Checking distinct _row_id only for ${tableName}: 
distinctRowIds=${distinctRowIds}")
+        assertEquals(rowIdsWithPhysicalColumn, distinctRowIds)
+
+        def groupRows = sql("""
+            select _row_id, count(*)
+            from ${tableName}
+            group by _row_id
+            order by _row_id
+        """)
+        log.info("Checking group by _row_id only for ${tableName}: 
groupRows=${groupRows}")
+        assertEquals(expectedRowCount, groupRows.size())
+        assertEquals(rowIdsWithPhysicalColumn, groupRows.collect { row -> 
row[0].toString().toLong() })
+        groupRows.each { row ->
+            assertEquals(1, row[1].toString().toInteger())
+        }
+
+        def distinctAggRows = sql("""
+            select count(*), count(distinct _row_id), ndv(_row_id)
+            from ${tableName}
+        """)
+        log.info("Checking distinct aggregate on _row_id only for 
${tableName}: result=${distinctAggRows}")
+        assertEquals(1, distinctAggRows.size())
+        assertEquals(expectedRowCount, 
distinctAggRows[0][0].toString().toInteger())
+        assertEquals(expectedRowCount, 
distinctAggRows[0][1].toString().toInteger())
+        assertEquals(expectedRowCount, 
distinctAggRows[0][2].toString().toInteger())
+    }
+
     sql """drop catalog if exists ${catalogName}"""
     sql """
         create catalog if not exists ${catalogName} properties (
@@ -180,10 +230,11 @@ suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_
                 """
 
                 sql """
-                    insert into ${unpartitionedTable} values(1, 'Alice', 25);
+                    insert into ${unpartitionedTable} values
+                    (1, 'Alice', 25),
+                    (2, 'Bob', 30),
+                    (3, 'Charlie', 35)
                 """
-                sql """ insert into ${unpartitionedTable} values(2, 'Bob', 30) 
"""
-                sql """ insert into ${unpartitionedTable} values(3, 'Charlie', 
35) """
 
                 log.info("Inserted initial rows into ${unpartitionedTable}")
 
@@ -193,6 +244,9 @@ suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_
                 // 3. Explicit SELECT on row lineage columns returns non-null 
values.
                 assertRowLineageHiddenColumns(unpartitionedTable, 3)
                 assertExplicitRowLineageReadable(unpartitionedTable, [1, 2, 3])
+                if (format == "parquet") {
+                    assertRowLineageOnlyAggregatesReadable(unpartitionedTable, 
3)
+                }
 
                 test {
                     sql """insert into ${unpartitionedTable}(_row_id, id, 
name, age) values (1, 9, 'BadRow', 99)"""
@@ -216,6 +270,9 @@ suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_
                         unpartitionedTable,
                         format,
                         "Unpartitioned normal INSERT")
+                if (format == "parquet") {
+                    assertRowLineageOnlyAggregatesReadable(unpartitionedTable, 
4)
+                }
 
                 sql """drop table if exists ${partitionedTable}"""
                 sql """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to