github-actions[bot] commented on code in PR #63893:
URL: https://github.com/apache/doris/pull/63893#discussion_r3417745771


##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -2873,10 +2879,9 @@ public static boolean isEagerAggregationOnJoin() {
     public static final String ENABLE_MC_LIMIT_SPLIT_OPTIMIZATION = 
"enable_mc_limit_split_optimization";
     @VarAttrDef.VarAttr(
             name = ENABLE_EXTERNAL_TABLE_BATCH_MODE,
-            fuzzy = true,
             description = {"使能外表的 batch mode 功能", "Enable the batch mode 
function of the external table."},
             needForward = true)

Review Comment:
   This silently flips `enable_external_table_batch_mode` from true to false 
and removes it from fuzzy testing. `IcebergScanNode.isBatchMode()` returns 
false as soon as this session variable is false, so existing Iceberg scans stop 
using batch mode by default. That is a user-visible planner/performance change 
while the PR release note says `None`; please either keep the previous default 
and fuzzy coverage, or document the behavior change with dedicated tests.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java:
##########
@@ -1112,6 +1113,11 @@ public static double getHotValueThreshold() {
             "FileScanNode 扫描数据的最大并发,默认为 16", "The max threads to read data of 
FileScanNode, default 16"})
     public int maxFileScannersConcurrency = 16;
 
+    @VarAttrDef.VarAttr(name = ENABLE_FILE_SCANNER_V2, needForward = true, 
description = {
+            "开启后 FileScanNode 会在支持的查询场景使用 FileScannerV2,默认关闭",
+            "When enabled, FileScanNode uses FileScannerV2 for supported query 
scans. Disabled by default."})

Review Comment:
   The variable is documented as disabled by default and the thrift field 
default is also false, but this initializes every FE session to true. 
`toThrift()` always forwards that value, and 
`FileScanLocalState::_init_scanners` selects `FileScannerV2` whenever it is 
true and all splits are supported, so this makes the new reader stack the 
default for supported query scans. For a refactor of this size, that removes 
the intended rollback-by-default path. Please default this to false, or update 
the release/compatibility plan if default enablement is intentional.
   
   ```suggestion
       public boolean enableFileScannerV2 = false;
   ```



##########
be/src/format_v2/table_reader.h:
##########
@@ -0,0 +1,1506 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <bvar/status.h>
+
+#include <algorithm>
+#include <exception>
+#include <map>
+#include <memory>
+#include <optional>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "common/cast_set.h"
+#include "common/exception.h"
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/column/column_array.h"
+#include "core/column/column_const.h"
+#include "core/column/column_map.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_struct.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_map.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
+#include "core/field.h"
+#include "exec/common/stringop_substring.h"
+#include "exprs/vexpr.h"
+#include "exprs/vexpr_context.h"
+#include "exprs/vexpr_fwd.h"
+#include "exprs/vslot_ref.h"
+#include "format_v2/column_data.h"
+#include "format_v2/column_mapper.h"
+#include "format_v2/expr/cast.h"
+#include "format_v2/expr/delete_predicate.h"
+#include "format_v2/file_reader.h"
+#include "format_v2/parquet/reader/column_reader.h"
+#include "format_v2/schema_projection.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "runtime/descriptors.h"
+
+namespace doris {
+class Block;
+class ColumnPredicate;
+struct DeleteFileDesc;
+class RuntimeState;
+} // namespace doris
+
+namespace doris::format {
+
+using DeleteRows = std::vector<int64_t>;
+
+// Row-level predicates on table/global schema. They are rewritten to 
file-local expressions when
+// possible, and remain the source of row-level filtering after localization.
+struct TableFilter {
+    VExprContextSPtr conjunct;
+    std::vector<GlobalIndex> global_indices;
+};
+
+struct ScanTask {
+    virtual ~ScanTask() = default;
+
+    std::unique_ptr<io::FileDescription> data_file;
+};
+
+struct ProjectedColumnBuildContext {
+    const TFileScanRangeParams* scan_params = nullptr;
+    const TFileRangeDesc* range = nullptr;
+    RuntimeState* runtime_state = nullptr;
+    std::optional<ColumnDefinition> schema_column = std::nullopt;
+    size_t next_file_column_idx = 0;
+};
+
+struct ReadProfile {
+    RuntimeProfile::Counter* num_delete_files = nullptr;
+    RuntimeProfile::Counter* num_delete_rows = nullptr;
+    RuntimeProfile::Counter* parse_delete_file_time = nullptr;
+    RuntimeProfile::Counter* exec_timer = nullptr;
+    RuntimeProfile::Counter* prepare_split_timer = nullptr;
+    RuntimeProfile::Counter* finalize_timer = nullptr;
+    RuntimeProfile::Counter* create_reader_timer = nullptr;
+    RuntimeProfile::Counter* pushdown_agg_timer = nullptr;
+    RuntimeProfile::Counter* open_reader_timer = nullptr;
+};
+
+struct TableReadOptions {
+    // Columns need to be read from file and output by table reader. They are 
all in table/global
+    // schema semantics.
+    const std::vector<ColumnDefinition> projected_columns;
+    // Simple predicates for a single column, which is parsed on scan operator.
+    const TableColumnPredicates column_predicates;
+    // All complex conjuncts from scan operator
+    const VExprContextSPtrs conjuncts;
+    // File format of the underlying data files, needed for reader 
initialization and reader-level
+    // filter pushdown.
+    const FileFormat format;
+    TFileScanRangeParams* scan_params;
+    std::shared_ptr<io::IOContext> io_ctx;
+    RuntimeState* runtime_state;
+    RuntimeProfile* scanner_profile;
+    // Push-down aggregate type.
+    const TPushAggOp::type push_down_agg_type = TPushAggOp::type::NONE;
+};
+
+struct SplitReadOptions {
+    // Split-level information for reader initialization, which may include 
file path, partition values, delete file info, etc. The content is table format 
specific and opaque to table reader base class; it's the responsibility of the 
concrete table reader implementation to parse necessary information for reader 
initialization and filter pushdown.
+    std::map<std::string, Field> partition_values;
+    ShardedKVCache* cache;
+    TFileRangeDesc current_range;
+    std::optional<GlobalRowIdContext> global_rowid_context;
+};
+
+// table-level reader 基类。
+// 该层负责多文件编排和动态分区裁剪等通用 table-level 逻辑,对外输出 table block。
+// 子类只需要实现“如何打开下一个具体 reader”和“如何读取当前 reader”的表格式语义。
+class TableReader {
+public:
+    virtual ~TableReader() = default;
+
+    // 初始化 table reader 的通用运行参数。
+    // 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
+    virtual Status init(TableReadOptions&& options);
+
+    // Prepare for reading a new split/task.
+    // 1. Pass a new split/task to reader, which will be used in subsequent 
open_reader() to initialize the underlying file reader.
+    // 2. Parse delete predicates from split/task information, which will be 
used for later dynamic filtering and delete handling.
+    virtual Status prepare_split(const SplitReadOptions& options);
+
+    // 对外读取 table block 的统一入口。
+    // 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。
+    // table_block 的列必须已经是 table/global schema 语义。
+    virtual Status get_block(Block* block, bool* eos) {
+        SCOPED_TIMER(_profile.exec_timer);
+        DORIS_CHECK(block->columns() == _projected_columns.size());
+        block->clear_column_data(_projected_columns.size());
+
+        while (true) {
+            if (*eos) {
+                return Status::OK();
+            }
+            if (!_data_reader.reader) {
+                if (_is_table_level_count_active()) {
+                    RETURN_IF_ERROR(_read_table_level_count(block, eos));
+                    return Status::OK();
+                }
+                RETURN_IF_ERROR(create_next_reader(eos));
+                if (!_data_reader.reader) {
+                    DCHECK(*eos);
+                    return Status::OK();
+                }
+            }
+
+            // Materialize a reduced row set for upper aggregate operators 
when aggregate
+            // pushdown can be applied. This is not the final aggregate 
result: COUNT emits
+            // `count` default rows for the upper COUNT(*), and MIN/MAX emits 
two rows containing
+            // file-level min/max values for the upper MIN/MAX.
+            if (!_aggregate_pushdown_tried) {
+                SCOPED_TIMER(_profile.pushdown_agg_timer);
+                bool pushed_down = false;
+                
RETURN_IF_ERROR(_try_materialize_aggregate_pushdown_rows(block, &pushed_down));
+                if (pushed_down) {
+                    return Status::OK();
+                }
+            }
+
+            bool current_eof = false;
+            _data_reader.block_template.clear_column_data(
+                    cast_set<int64_t>(_data_reader.file_block_layout.size()));
+            size_t current_rows = 0;
+            
RETURN_IF_ERROR(_data_reader.reader->get_block(&_data_reader.block_template,
+                                                           &current_rows, 
&current_eof));
+            if (current_rows == 0) {
+                if (current_eof) {
+                    RETURN_IF_ERROR(close_current_reader());
+                }
+                continue;
+            }
+            DCHECK_EQ(_data_reader.block_template.columns(), 
_data_reader.file_block_layout.size())
+                    << _data_reader.block_template.dump_structure();
+#ifndef NDEBUG
+            RETURN_IF_ERROR(_check_file_block_columns("after file reader 
get_block", current_rows));
+#endif
+            DORIS_CHECK(block->columns() == 
_data_reader.column_mapper.mappings().size());
+            RETURN_IF_ERROR(finalize_chunk(block, current_rows));
+#ifndef NDEBUG
+            RETURN_IF_ERROR(
+                    _check_table_block_columns("after finalize_chunk", block, 
current_rows));
+#endif
+            if (current_eof) {
+                RETURN_IF_ERROR(close_current_reader());
+            }
+            return Status::OK();
+        }
+    }
+
+    // 关闭 table reader 及当前正在读取的底层 reader。
+    // 子类如果持有额外表格式资源,应 override 后先调用 TableReader::close()。
+    virtual Status close() {
+        if (_data_reader.reader) {
+            RETURN_IF_ERROR(close_current_reader());
+        }
+        _current_task.reset();
+        _remaining_table_level_count = -1;
+        return Status::OK();
+    }
+
+    virtual std::string debug_string() const;
+
+    virtual Status annotate_projected_column(const TFileScanSlotInfo& 
slot_info,
+                                             ProjectedColumnBuildContext* 
context,
+                                             ColumnDefinition* column) const;
+
+    virtual Status validate_projected_columns(const 
ProjectedColumnBuildContext& context) const {
+        (void)context;
+        return Status::OK();
+    }
+
+protected:
+    // Parse deletion vector information from table format specific file 
description.
+    virtual Status _parse_deletion_vector_file(const TTableFormatFileDesc& 
t_desc,
+                                               DeleteFileDesc* desc, bool* 
has_delete_file) {
+        *has_delete_file = false;
+        return Status::OK();
+    }
+
+    // 切换到下一个 reader 的通用流程。
+    // 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
+    Status create_next_reader(bool* eos);
+    virtual Status create_file_reader(std::unique_ptr<FileReader>* reader);
+    virtual TableColumnMappingMode mapping_mode() const { return 
TableColumnMappingMode::BY_NAME; }
+    virtual Status annotate_file_schema(std::vector<ColumnDefinition>* 
file_schema) {
+        DORIS_CHECK(file_schema != nullptr);
+        return Status::OK();
+    }
+
+    // 打开当前具体 reader。
+    // 子类在这里基于当前 split/task 初始化底层 FileReader。
+    virtual Status open_reader() {
+        SCOPED_TIMER(_profile.open_reader_timer);
+        // 1. Get file schema and create column mapping.
+        std::vector<ColumnDefinition> file_schema;
+        RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
+        // For Paimon/Hudi, field ID is set by `history_schema_info` from FE. 
So we need to annotate file schema with Field ID before creating column mapping 
when mapping by field ID.
+        RETURN_IF_ERROR(annotate_file_schema(&file_schema));
+        _data_reader.file_schema = file_schema;
+        _mapper_options.mode = mapping_mode();
+
+        _data_reader.column_mapper = TableColumnMapper(_mapper_options);
+        
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns,
+                                                                  
_partition_values, file_schema));
+        DORIS_CHECK(_data_reader.column_mapper.mappings().size() == 
_projected_columns.size());
+
+        // 2. Build table filters based on conjuncts and column predicates.
+        RETURN_IF_ERROR(_build_table_filters_from_conjuncts());
+
+        // 3. Create file scan request based on column mapping and table 
filters, then open file
+        // reader with the request. File scan request carries row-level 
expression filters and
+        // file-level pruning hints. Only expression filters decide returned 
rows; column predicates
+        // are pruning hints.
+        auto file_request = std::make_shared<FileScanRequest>();
+        RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
+                _table_filters, _table_column_predicates, _projected_columns, 
file_request.get(),
+                _runtime_state));
+        bool constant_filter_pruned_split = false;
+        
RETURN_IF_ERROR(_evaluate_constant_filters(&constant_filter_pruned_split));
+        if (constant_filter_pruned_split) {
+            RETURN_IF_ERROR(close_current_reader());
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(customize_file_scan_request(file_request.get()));
+        RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
+        _data_reader.file_block_layout.clear();
+        _data_reader.block_template.clear();
+        
_data_reader.file_block_layout.resize(file_request->local_positions.size());
+
+        // 4. Build file block layout from file schema and column mapping. The 
layout describes
+        // the block returned by file reader before table-column 
materialization.
+        for (const auto& [file_column_id, block_position] : 
file_request->local_positions) {
+            DORIS_CHECK(block_position.value() < 
_data_reader.file_block_layout.size());
+            const auto* field = 
_find_column_definition(_data_reader.file_schema, file_column_id);
+            DORIS_CHECK(field != nullptr);
+
+            ColumnDefinition projected_field;
+            {
+                auto it = std::find_if(
+                        file_request->non_predicate_columns.begin(),
+                        file_request->non_predicate_columns.end(),
+                        [&](const LocalColumnIndex& p) { return p.column_id() 
== file_column_id; });
+                if (it != file_request->non_predicate_columns.end()) {
+                    RETURN_IF_ERROR(project_column_definition(*field, *it, 
&projected_field));
+                }
+            }
+            {
+                auto it = std::find_if(
+                        file_request->predicate_columns.begin(),
+                        file_request->predicate_columns.end(),
+                        [&](const LocalColumnIndex& p) { return p.column_id() 
== file_column_id; });
+                if (it != file_request->predicate_columns.end()) {
+                    RETURN_IF_ERROR(project_column_definition(*field, *it, 
&projected_field));
+                }
+            }
+            _data_reader.file_block_layout[block_position.value()] = {
+                    .file_column_id = file_column_id,
+                    .name = projected_field.name,
+                    .type = projected_field.type,
+            };
+            
DORIS_CHECK(_data_reader.file_block_layout[block_position.value()].type != 
nullptr);
+        }
+
+        // 5. Prepare block template from file block layout. The block 
template stores the block
+        // returned by file reader before table-column materialization.
+        
_data_reader.block_template.reserve(_data_reader.file_block_layout.size());
+        for (const auto& column : _data_reader.file_block_layout) {
+            _data_reader.block_template.insert(
+                    {column.type->create_column(), column.type, column.name});
+        }
+        LOG(WARNING) << "TableReader debug: " << debug_string();

Review Comment:
   This warning is emitted every time a file reader is opened, and 
`debug_string()` includes the projected schema, table filters, column 
predicates, conjunct debug strings, and full column mapping state. With V2 
selected for supported scans, a normal external query over many files will 
write one warning per file and can flood BE logs. Please remove this or gate it 
behind debug/VLOG-level logging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to