This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 9d3342207f8 branch-4.1:[fix](iceberg)fix iceberg v3 row lineage count 
distinct error result(#63826) (#63999)
9d3342207f8 is described below

commit 9d3342207f812e695b46136c813e3d6a211e2ff0
Author: daidai <[email protected]>
AuthorDate: Tue Jun 2 18:59:36 2026 +0800

    branch-4.1:[fix](iceberg)fix iceberg v3 row lineage count distinct error 
result(#63826) (#63999)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    pick #63826
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/format/parquet/vparquet_group_reader.cpp    | 24 ++++-----
 be/src/format/parquet/vparquet_group_reader.h      |  4 +-
 ...test_iceberg_v3_row_lineage_query_insert.groovy | 63 ++++++++++++++++++++--
 3 files changed, 74 insertions(+), 17 deletions(-)

diff --git a/be/src/format/parquet/vparquet_group_reader.cpp 
b/be/src/format/parquet/vparquet_group_reader.cpp
index d7f822f77d0..bd954db6859 100644
--- a/be/src/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/format/parquet/vparquet_group_reader.cpp
@@ -384,15 +384,15 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
 
     // Process external table query task that select columns are all from path.
     if (_read_table_columns.empty()) {
-        bool modify_row_ids = false;
-        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof, 
&modify_row_ids));
+        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
 
         RETURN_IF_ERROR(
                 _fill_partition_columns(block, *read_rows, 
_lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, 
_lazy_read_ctx.missing_columns));
 
-        RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, 
modify_row_ids));
-        RETURN_IF_ERROR(_append_iceberg_rowid_column(block, *read_rows, 
modify_row_ids));
+        bool row_positions_ready = _need_current_batch_row_positions();
+        RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, 
row_positions_ready));
+        RETURN_IF_ERROR(_append_iceberg_rowid_column(block, *read_rows, 
row_positions_ready));
 
         Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, 
block, block->columns());
         *read_rows = block->rows();
@@ -914,9 +914,12 @@ Status RowGroupReader::_get_block_column_pos(const Block& 
block, const std::stri
     return Status::OK();
 }
 
-Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, 
bool* batch_eof,
-                                         bool* modify_row_ids) {
-    *modify_row_ids = false;
+bool RowGroupReader::_need_current_batch_row_positions() const {
+    return _row_id_column_iterator_pair.first != nullptr || 
_iceberg_rowid_params.enabled ||
+           (_row_lineage_columns != nullptr && 
_row_lineage_columns->need_row_ids());
+}
+
+Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, 
bool* batch_eof) {
     if (_position_delete_ctx.has_filter) {
         int64_t start_row_id = _position_delete_ctx.current_row_id;
         int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + 
(int64_t)batch_size,
@@ -940,9 +943,7 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, 
size_t* read_rows, b
         _position_delete_ctx.current_row_id = end_row_id;
         *batch_eof = _position_delete_ctx.current_row_id == 
_position_delete_ctx.last_row_id;
 
-        if (_row_id_column_iterator_pair.first != nullptr || 
_iceberg_rowid_params.enabled ||
-            (_row_lineage_columns != nullptr && 
_row_lineage_columns->need_row_ids())) {
-            *modify_row_ids = true;
+        if (_need_current_batch_row_positions()) {
             _current_batch_row_ids.clear();
             _current_batch_row_ids.resize(*read_rows);
             size_t idx = 0;
@@ -965,8 +966,7 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, 
size_t* read_rows, b
             _remaining_rows = 0;
             *batch_eof = true;
         }
-        if (_iceberg_rowid_params.enabled) {
-            *modify_row_ids = true;
+        if (_need_current_batch_row_positions()) {
             RETURN_IF_ERROR(_get_current_batch_row_id(*read_rows));
         }
     }
diff --git a/be/src/format/parquet/vparquet_group_reader.h 
b/be/src/format/parquet/vparquet_group_reader.h
index 208d3995b90..c103a8d8102 100644
--- a/be/src/format/parquet/vparquet_group_reader.h
+++ b/be/src/format/parquet/vparquet_group_reader.h
@@ -215,8 +215,7 @@ protected:
     }
 
 private:
-    Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* 
batch_eof,
-                             bool* modify_row_ids);
+    Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* 
batch_eof);
 
     Status _read_column_data(Block* block, const std::vector<std::string>& 
columns,
                              size_t batch_size, size_t* read_rows, bool* 
batch_eof,
@@ -242,6 +241,7 @@ private:
                                   const IColumn::Filter& filter);
 
     bool _can_filter_by_dict(int slot_id, const tparquet::ColumnMetaData& 
column_metadata);
+    bool _need_current_batch_row_positions() const;
     bool is_dictionary_encoded(const tparquet::ColumnMetaData& 
column_metadata);
     Status _rewrite_dict_predicates();
     Status _rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int 
slot_id, bool is_nullable);
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
index 7276fadba76..28e28bdd887 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_v3_row_lineage_query_insert.groovy
@@ -140,6 +140,56 @@ suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_
         assertEquals(expectedIds[2], 
combinedPredicate[1][0].toString().toInteger())
     }
 
+    def assertRowLineageOnlyAggregatesReadable = { tableName, expectedRowCount 
->
+        def rowIdsWithPhysicalColumn = sql("""
+            select _row_id, id
+            from ${tableName}
+            order by _row_id
+        """).collect { row -> row[0].toString().toLong() }
+        log.info("Checking row lineage only baseline for ${tableName}: 
rowIds=${rowIdsWithPhysicalColumn}")
+        assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.size())
+        assertEquals(expectedRowCount, rowIdsWithPhysicalColumn.toSet().size())
+
+        def distinctRowIdsWithPhysicalColumn = sql("""
+            select distinct _row_id, id
+            from ${tableName}
+            order by _row_id
+        """).collect { row -> row[0].toString().toLong() }
+        log.info("""Checking distinct _row_id with physical column for 
${tableName}: distinctRowIds=${distinctRowIdsWithPhysicalColumn}""")
+        assertEquals(rowIdsWithPhysicalColumn, 
distinctRowIdsWithPhysicalColumn)
+
+        def distinctRowIds = sql("""
+            select distinct _row_id
+            from ${tableName}
+            order by _row_id
+        """).collect { row -> row[0].toString().toLong() }
+        log.info("Checking distinct _row_id only for ${tableName}: 
distinctRowIds=${distinctRowIds}")
+        assertEquals(rowIdsWithPhysicalColumn, distinctRowIds)
+
+        def groupRows = sql("""
+            select _row_id, count(*)
+            from ${tableName}
+            group by _row_id
+            order by _row_id
+        """)
+        log.info("Checking group by _row_id only for ${tableName}: 
groupRows=${groupRows}")
+        assertEquals(expectedRowCount, groupRows.size())
+        assertEquals(rowIdsWithPhysicalColumn, groupRows.collect { row -> 
row[0].toString().toLong() })
+        groupRows.each { row ->
+            assertEquals(1, row[1].toString().toInteger())
+        }
+
+        def distinctAggRows = sql("""
+            select count(*), count(distinct _row_id), ndv(_row_id)
+            from ${tableName}
+        """)
+        log.info("Checking distinct aggregate on _row_id only for 
${tableName}: result=${distinctAggRows}")
+        assertEquals(1, distinctAggRows.size())
+        assertEquals(expectedRowCount, 
distinctAggRows[0][0].toString().toInteger())
+        assertEquals(expectedRowCount, 
distinctAggRows[0][1].toString().toInteger())
+        assertEquals(expectedRowCount, 
distinctAggRows[0][2].toString().toInteger())
+    }
+
     sql """drop catalog if exists ${catalogName}"""
     sql """
         create catalog if not exists ${catalogName} properties (
@@ -180,10 +230,11 @@ suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_
                 """
 
                 sql """
-                    insert into ${unpartitionedTable} values(1, 'Alice', 25);
+                    insert into ${unpartitionedTable} values
+                    (1, 'Alice', 25),
+                    (2, 'Bob', 30),
+                    (3, 'Charlie', 35)
                 """
-                sql """ insert into ${unpartitionedTable} values(2, 'Bob', 30) 
"""
-                sql """ insert into ${unpartitionedTable} values(3, 'Charlie', 
35) """
 
                 log.info("Inserted initial rows into ${unpartitionedTable}")
 
@@ -193,6 +244,9 @@ suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_
                 // 3. Explicit SELECT on row lineage columns returns non-null 
values.
                 assertRowLineageHiddenColumns(unpartitionedTable, 3)
                 assertExplicitRowLineageReadable(unpartitionedTable, [1, 2, 3])
+                if (format == "parquet") {
+                    assertRowLineageOnlyAggregatesReadable(unpartitionedTable, 
3)
+                }
 
                 test {
                     sql """insert into ${unpartitionedTable}(_row_id, id, 
name, age) values (1, 9, 'BadRow', 99)"""
@@ -216,6 +270,9 @@ suite("test_iceberg_v3_row_lineage_query_insert", 
"p0,external,iceberg,external_
                         unpartitionedTable,
                         format,
                         "Unpartitioned normal INSERT")
+                if (format == "parquet") {
+                    assertRowLineageOnlyAggregatesReadable(unpartitionedTable, 
4)
+                }
 
                 sql """drop table if exists ${partitionedTable}"""
                 sql """


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to