This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 52d7650b163 branch-3.1: [enhance](paimon) opt count pushdown for 
paimon and refactor be logic #46911 (#52060)
52d7650b163 is described below

commit 52d7650b163968f81dfdc4a54c12637c6e222efa
Author: Socrates <[email protected]>
AuthorDate: Tue Jun 24 11:10:36 2025 +0800

    branch-3.1: [enhance](paimon) opt count pushdown for paimon and refactor be 
logic #46911 (#52060)
    
    bp #46911
---
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  45 ++----
 be/src/vec/exec/format/table/iceberg_reader.h      |  24 +---
 be/src/vec/exec/format/table/paimon_jni_reader.cpp |  23 ++-
 be/src/vec/exec/format/table/paimon_jni_reader.h   |   1 +
 be/src/vec/exec/format/table/paimon_reader.cpp     |  28 ++--
 be/src/vec/exec/format/table/paimon_reader.h       |  26 ++--
 .../vec/exec/format/table/table_format_reader.cpp  |  25 ----
 be/src/vec/exec/format/table/table_format_reader.h |  63 +++++++--
 .../format/table/transactional_hive_reader.cpp     |  16 +--
 .../exec/format/table/transactional_hive_reader.h  |  14 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  13 +-
 .../org/apache/doris/datasource/FileScanNode.java  |  46 +++---
 .../datasource/iceberg/source/IcebergScanNode.java |  12 +-
 .../datasource/paimon/source/PaimonScanNode.java   | 155 +++++++++++++--------
 .../datasource/paimon/source/PaimonSplit.java      |  15 +-
 .../paimon/source/PaimonScanNodeTest.java          | 127 +++++++++++++++++
 gensrc/thrift/PlanNodes.thrift                     |   3 +
 .../paimon/test_paimon_catalog.out                 | Bin 795479 -> 795039 bytes
 .../paimon/test_paimon_deletion_vector.out         | Bin 0 -> 525 bytes
 .../paimon/test_paimon_catalog.groovy              |  14 --
 .../paimon/test_paimon_deletion_vector.groovy      |  96 +++++++++++++
 21 files changed, 497 insertions(+), 249 deletions(-)

diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 7dea5d99617..b7666bda688 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -76,13 +76,8 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
                                        const TFileScanRangeParams& params,
                                        const TFileRangeDesc& range, 
ShardedKVCache* kv_cache,
                                        io::IOContext* io_ctx)
-        : TableFormatReader(std::move(file_format_reader)),
-          _profile(profile),
-          _state(state),
-          _params(params),
-          _range(range),
-          _kv_cache(kv_cache),
-          _io_ctx(io_ctx) {
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx),
+          _kv_cache(kv_cache) {
     static const char* iceberg_profile = "IcebergProfile";
     ADD_TIMER(_profile, iceberg_profile);
     _iceberg_profile.num_delete_files =
@@ -93,31 +88,9 @@ 
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
             ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
     _iceberg_profile.delete_rows_sort_time =
             ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
-    if (range.table_format_params.iceberg_params.__isset.row_count) {
-        _remaining_table_level_row_count = 
range.table_format_params.iceberg_params.row_count;
-    } else {
-        _remaining_table_level_row_count = -1;
-    }
 }
 
-Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
-    // already get rows from be
-    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count > 0) {
-        auto rows = std::min(_remaining_table_level_row_count,
-                             (int64_t)_state->query_options().batch_size);
-        _remaining_table_level_row_count -= rows;
-        auto mutate_columns = block->mutate_columns();
-        for (auto& col : mutate_columns) {
-            col->resize(rows);
-        }
-        block->set_columns(std::move(mutate_columns));
-        *read_rows = rows;
-        if (_remaining_table_level_row_count == 0) {
-            *eof = true;
-        }
-
-        return Status::OK();
-    }
+Status IcebergTableReader::get_next_block_inner(Block* block, size_t* 
read_rows, bool* eof) {
     RETURN_IF_ERROR(_expand_block_if_need(block));
 
     // To support iceberg schema evolution. We change the column name in block 
to
@@ -160,14 +133,14 @@ Status IcebergTableReader::get_columns(
     return _file_format_reader->get_columns(name_to_type, missing_cols);
 }
 
-Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, 
io::IOContext* io_ctx) {
+Status IcebergTableReader::init_row_filters() {
     // We get the count value by doris's be, so we don't need to read the 
delete file
-    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count > 0) {
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_table_level_row_count > 0) {
         return Status::OK();
     }
 
-    auto& table_desc = range.table_format_params.iceberg_params;
-    auto& version = table_desc.format_version;
+    const auto& table_desc = _range.table_format_params.iceberg_params;
+    const auto& version = table_desc.format_version;
     if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
         return Status::OK();
     }
@@ -547,7 +520,7 @@ Status IcebergParquetReader::init_reader(
     _gen_new_colname_to_value_range();
     parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
     parquet_reader->iceberg_sanitize(_all_required_col_names);
-    RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
+    RETURN_IF_ERROR(init_row_filters());
     return parquet_reader->init_reader(
             _all_required_col_names, _not_in_file_col_names, 
&_new_colname_to_value_range,
             conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
@@ -619,7 +592,7 @@ Status IcebergOrcReader::init_reader(
     _gen_file_col_names();
     _gen_new_colname_to_value_range();
     orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
-    RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
+    RETURN_IF_ERROR(init_row_filters());
     return orc_reader->init_reader(&_all_required_col_names, 
&_new_colname_to_value_range,
                                    conjuncts, false, tuple_descriptor, 
row_descriptor,
                                    not_single_slot_filter_conjuncts, 
slot_id_to_filter_conjuncts);
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h 
b/be/src/vec/exec/format/table/iceberg_reader.h
index b057cb0657a..48236021ce9 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -30,10 +30,8 @@
 #include "exec/olap_common.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/primitive_type.h"
-#include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "table_format_reader.h"
-#include "util/runtime_profile.h"
 #include "vec/columns/column_dictionary.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
@@ -80,9 +78,9 @@ public:
                        io::IOContext* io_ctx);
     ~IcebergTableReader() override = default;
 
-    Status init_row_filters(const TFileRangeDesc& range, io::IOContext* 
io_ctx) final;
+    Status init_row_filters() final;
 
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) final;
+    Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) 
final;
 
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) final;
@@ -136,10 +134,6 @@ protected:
     // Remove the added delete columns
     Status _shrink_block_if_need(Block* block);
 
-    RuntimeProfile* _profile;
-    RuntimeState* _state;
-    const TFileScanRangeParams& _params;
-    const TFileRangeDesc& _range;
     // owned by scan node
     ShardedKVCache* _kv_cache;
     IcebergProfile _iceberg_profile;
@@ -163,13 +157,9 @@ protected:
     std::vector<std::string> _expand_col_names;
     std::vector<ColumnWithTypeAndName> _expand_columns;
 
-    io::IOContext* _io_ctx;
     bool _has_schema_change = false;
     bool _has_iceberg_schema = false;
 
-    // the table level row count for optimizing query like:
-    // select count(*) from table;
-    int64_t _remaining_table_level_row_count;
     Fileformat _file_format = Fileformat::NONE;
 
     const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
@@ -213,9 +203,9 @@ public:
             const std::unordered_map<int, VExprContextSPtrs>* 
slot_id_to_filter_conjuncts);
 
     Status _read_position_delete_file(const TFileRangeDesc* delete_range,
-                                      DeleteFile* position_delete) override;
+                                      DeleteFile* position_delete) final;
 
-    void set_delete_rows() override {
+    void set_delete_rows() final {
         auto* parquet_reader = (ParquetReader*)(_file_format_reader.get());
         parquet_reader->set_delete_rows(&_iceberg_delete_rows);
     }
@@ -224,7 +214,7 @@ public:
 
 protected:
     std::unique_ptr<GenericReader> _create_equality_reader(
-            const TFileRangeDesc& delete_desc) override {
+            const TFileRangeDesc& delete_desc) final {
         return ParquetReader::create_unique(
                 _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
                 const_cast<cctz::time_zone*>(&_state->timezone_obj()), 
_io_ctx, _state);
@@ -235,7 +225,7 @@ public:
     ENABLE_FACTORY_CREATOR(IcebergOrcReader);
 
     Status _read_position_delete_file(const TFileRangeDesc* delete_range,
-                                      DeleteFile* position_delete) override;
+                                      DeleteFile* position_delete) final;
 
     IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
                      RuntimeState* state, const TFileScanRangeParams& params,
@@ -243,7 +233,7 @@ public:
             : IcebergTableReader(std::move(file_format_reader), profile, 
state, params, range,
                                  kv_cache, io_ctx) {}
 
-    void set_delete_rows() override {
+    void set_delete_rows() final {
         auto* orc_reader = (OrcReader*)_file_format_reader.get();
         orc_reader->set_position_delete_rowids(&_iceberg_delete_rows);
     }
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index 71bb496d301..e5d997e281a 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -21,9 +21,9 @@
 #include <ostream>
 
 #include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
 #include "runtime/types.h"
 #include "vec/core/types.h"
-
 namespace doris {
 class RuntimeProfile;
 class RuntimeState;
@@ -65,6 +65,11 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
     if (range_params->__isset.serialized_table) {
         params["serialized_table"] = range_params->serialized_table;
     }
+    if (range.table_format_params.__isset.table_level_row_count) {
+        _remaining_table_level_row_count = 
range.table_format_params.table_level_row_count;
+    } else {
+        _remaining_table_level_row_count = -1;
+    }
 
     // Used to create paimon option
     for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
@@ -81,6 +86,22 @@ PaimonJniReader::PaimonJniReader(const 
std::vector<SlotDescriptor*>& file_slot_d
 }
 
 Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
+    if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_remaining_table_level_row_count >= 0) {
+        auto rows = std::min(_remaining_table_level_row_count,
+                             (int64_t)_state->query_options().batch_size);
+        _remaining_table_level_row_count -= rows;
+        auto mutate_columns = block->mutate_columns();
+        for (auto& col : mutate_columns) {
+            col->resize(rows);
+        }
+        block->set_columns(std::move(mutate_columns));
+        *read_rows = rows;
+        if (_remaining_table_level_row_count == 0) {
+            *eof = true;
+        }
+
+        return Status::OK();
+    }
     return _jni_connector->get_next_block(block, read_rows, eof);
 }
 
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h 
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 3ac2229e655..6ed9a57f62e 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -68,6 +68,7 @@ public:
 
 private:
     const std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
+    int64_t _remaining_table_level_row_count;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp 
b/be/src/vec/exec/format/table/paimon_reader.cpp
index 055d6179b2c..cd8b7c0060f 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -20,12 +20,15 @@
 #include <vector>
 
 #include "common/status.h"
+#include "runtime/runtime_state.h"
 #include "util/deletion_vector.h"
 
 namespace doris::vectorized {
 PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
-                           RuntimeProfile* profile, const 
TFileScanRangeParams& params)
-        : TableFormatReader(std::move(file_format_reader)), _profile(profile), 
_params(params) {
+                           RuntimeProfile* profile, RuntimeState* state,
+                           const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
+                           io::IOContext* io_ctx)
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx) {
     static const char* paimon_profile = "PaimonProfile";
     ADD_TIMER(_profile, paimon_profile);
     _paimon_profile.num_delete_rows =
@@ -34,15 +37,18 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader> 
file_format_reader,
             ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
 }
 
-Status PaimonReader::init_row_filters(const TFileRangeDesc& range, 
io::IOContext* io_ctx) {
-    const auto& table_desc = range.table_format_params.paimon_params;
+Status PaimonReader::init_row_filters() {
+    const auto& table_desc = _range.table_format_params.paimon_params;
     if (!table_desc.__isset.deletion_file) {
         return Status::OK();
     }
 
     // set push down agg type to NONE because we can not do count push down opt
     // if there are delete files.
-    _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
+    if (!_range.table_format_params.paimon_params.__isset.row_count) {
+        _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
+    }
+
     const auto& deletion_file = table_desc.deletion_file;
     io::FileSystemProperties properties = {
             .system_type = _params.file_type,
@@ -50,9 +56,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
             .hdfs_params = _params.hdfs_params,
             .broker_addresses {},
     };
-    if (range.__isset.file_type) {
+    if (_range.__isset.file_type) {
         // for compatibility
-        properties.system_type = range.file_type;
+        properties.system_type = _range.file_type;
     }
     if (_params.__isset.broker_addresses) {
         properties.broker_addresses.assign(_params.broker_addresses.begin(),
@@ -63,7 +69,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
             .path = deletion_file.path,
             .file_size = -1,
             .mtime = 0,
-            .fs_name = range.fs_name,
+            .fs_name = _range.fs_name,
     };
 
     // TODO: cache the file in local
@@ -77,7 +83,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
     {
         SCOPED_TIMER(_paimon_profile.delete_files_read_time);
         RETURN_IF_ERROR(
-                delete_file_reader->read_at(deletion_file.offset, result, 
&bytes_read, io_ctx));
+                delete_file_reader->read_at(deletion_file.offset, result, 
&bytes_read, _io_ctx));
     }
     if (bytes_read != deletion_file.length + 4) {
         return Status::IOError(
@@ -98,4 +104,8 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& 
range, io::IOContext
     }
     return Status::OK();
 }
+
+Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows, 
bool* eof) {
+    return _file_format_reader->get_next_block(block, read_rows, eof);
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_reader.h 
b/be/src/vec/exec/format/table/paimon_reader.h
index 3d82b7a3b5c..dcc7bf9d700 100644
--- a/be/src/vec/exec/format/table/paimon_reader.h
+++ b/be/src/vec/exec/format/table/paimon_reader.h
@@ -28,10 +28,13 @@ namespace doris::vectorized {
 class PaimonReader : public TableFormatReader {
 public:
     PaimonReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
-                 const TFileScanRangeParams& params);
+                 RuntimeState* state, const TFileScanRangeParams& params,
+                 const TFileRangeDesc& range, io::IOContext* io_ctx);
     ~PaimonReader() override = default;
 
-    Status init_row_filters(const TFileRangeDesc& range, io::IOContext* 
io_ctx) final;
+    Status init_row_filters() final;
+
+    Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) 
final;
 
 protected:
     struct PaimonProfile {
@@ -39,23 +42,21 @@ protected:
         RuntimeProfile::Counter* delete_files_read_time;
     };
     std::vector<int64_t> _delete_rows;
-    RuntimeProfile* _profile;
     PaimonProfile _paimon_profile;
-    virtual void set_delete_rows() = 0;
 
-private:
-    const TFileScanRangeParams& _params;
+    virtual void set_delete_rows() = 0;
 };
 
 class PaimonOrcReader final : public PaimonReader {
 public:
     ENABLE_FACTORY_CREATOR(PaimonOrcReader);
     PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
-                    const TFileScanRangeParams& params)
-            : PaimonReader(std::move(file_format_reader), profile, params) {};
+                    RuntimeState* state, const TFileScanRangeParams& params,
+                    const TFileRangeDesc& range, io::IOContext* io_ctx)
+            : PaimonReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
     ~PaimonOrcReader() final = default;
 
-    void set_delete_rows() override {
+    void set_delete_rows() final {
         (reinterpret_cast<OrcReader*>(_file_format_reader.get()))
                 ->set_position_delete_rowids(&_delete_rows);
     }
@@ -65,11 +66,12 @@ class PaimonParquetReader final : public PaimonReader {
 public:
     ENABLE_FACTORY_CREATOR(PaimonParquetReader);
     PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeProfile* profile,
-                        const TFileScanRangeParams& params)
-            : PaimonReader(std::move(file_format_reader), profile, params) {};
+                        RuntimeState* state, const TFileScanRangeParams& 
params,
+                        const TFileRangeDesc& range, io::IOContext* io_ctx)
+            : PaimonReader(std::move(file_format_reader), profile, state, 
params, range, io_ctx) {};
     ~PaimonParquetReader() final = default;
 
-    void set_delete_rows() override {
+    void set_delete_rows() final {
         (reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
                 ->set_delete_rows(&_delete_rows);
     }
diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp 
b/be/src/vec/exec/format/table/table_format_reader.cpp
deleted file mode 100644
index ea8111d81b3..00000000000
--- a/be/src/vec/exec/format/table/table_format_reader.cpp
+++ /dev/null
@@ -1,25 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "table_format_reader.h"
-
-namespace doris::vectorized {
-
-TableFormatReader::TableFormatReader(std::unique_ptr<GenericReader> 
file_format_reader)
-        : _file_format_reader(std::move(file_format_reader)) {}
-
-} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/table_format_reader.h 
b/be/src/vec/exec/format/table/table_format_reader.h
index 5a102a7665e..0257a94a09b 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -17,14 +17,14 @@
 
 #pragma once
 
-#include <stddef.h>
-
-#include <memory>
+#include <algorithm>
+#include <cstddef>
 #include <string>
-#include <unordered_map>
-#include <unordered_set>
 
 #include "common/status.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
 #include "vec/exec/format/generic_reader.h"
 
 namespace doris {
@@ -40,11 +40,44 @@ namespace doris::vectorized {
 
 class TableFormatReader : public GenericReader {
 public:
-    TableFormatReader(std::unique_ptr<GenericReader> file_format_reader);
+    TableFormatReader(std::unique_ptr<GenericReader> file_format_reader, 
RuntimeState* state,
+                      RuntimeProfile* profile, const TFileScanRangeParams& 
params,
+                      const TFileRangeDesc& range, io::IOContext* io_ctx)
+            : _file_format_reader(std::move(file_format_reader)),
+              _state(state),
+              _profile(profile),
+              _params(params),
+              _range(range),
+              _io_ctx(io_ctx) {
+        if (range.table_format_params.__isset.table_level_row_count) {
+            _table_level_row_count = 
range.table_format_params.table_level_row_count;
+        } else {
+            _table_level_row_count = -1;
+        }
+    }
     ~TableFormatReader() override = default;
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override 
{
-        return _file_format_reader->get_next_block(block, read_rows, eof);
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) final {
+        if (_push_down_agg_type == TPushAggOp::type::COUNT && 
_table_level_row_count >= 0) {
+            auto rows =
+                    std::min(_table_level_row_count, 
(int64_t)_state->query_options().batch_size);
+            _table_level_row_count -= rows;
+            auto mutate_columns = block->mutate_columns();
+            for (auto& col : mutate_columns) {
+                col->resize(rows);
+            }
+            block->set_columns(std::move(mutate_columns));
+            *read_rows = rows;
+            if (_table_level_row_count == 0) {
+                *eof = true;
+            }
+
+            return Status::OK();
+        }
+        return get_next_block_inner(block, read_rows, eof);
     }
+
+    virtual Status get_next_block_inner(Block* block, size_t* read_rows, bool* 
eof) = 0;
+
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
                        std::unordered_set<std::string>* missing_cols) override 
{
         return _file_format_reader->get_columns(name_to_type, missing_cols);
@@ -64,18 +97,22 @@ public:
 
     bool fill_all_columns() const override { return 
_file_format_reader->fill_all_columns(); }
 
-    virtual Status init_row_filters(const TFileRangeDesc& range, 
io::IOContext* io_ctx) = 0;
+    virtual Status init_row_filters() = 0;
 
 protected:
+    std::string _table_format;                          // hudi, iceberg, 
paimon
+    std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
+    RuntimeState* _state = nullptr;                     // for query options
+    RuntimeProfile* _profile = nullptr;
+    const TFileScanRangeParams& _params;
+    const TFileRangeDesc& _range;
+    io::IOContext* _io_ctx = nullptr;
+    int64_t _table_level_row_count = -1; // for optimization of count(*) push 
down
     void _collect_profile_before_close() override {
         if (_file_format_reader != nullptr) {
             _file_format_reader->collect_profile_before_close();
         }
     }
-
-protected:
-    std::string _table_format;                          // hudi, iceberg
-    std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp 
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index 8be11f6773a..f1d02c36399 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -17,7 +17,6 @@
 
 #include "transactional_hive_reader.h"
 
-#include "runtime/runtime_state.h"
 #include "transactional_hive_common.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/exec/format/orc/vorc_reader.h"
@@ -38,12 +37,7 @@ 
TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader>
                                                  RuntimeProfile* profile, 
RuntimeState* state,
                                                  const TFileScanRangeParams& 
params,
                                                  const TFileRangeDesc& range, 
io::IOContext* io_ctx)
-        : TableFormatReader(std::move(file_format_reader)),
-          _profile(profile),
-          _state(state),
-          _params(params),
-          _range(range),
-          _io_ctx(io_ctx) {
+        : TableFormatReader(std::move(file_format_reader), state, profile, 
params, range, io_ctx) {
     static const char* transactional_hive_profile = "TransactionalHiveProfile";
     ADD_TIMER(_profile, transactional_hive_profile);
     _transactional_orc_profile.num_delete_files =
@@ -71,7 +65,7 @@ Status TransactionalHiveReader::init_reader(
     return status;
 }
 
-Status TransactionalHiveReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
+Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t* 
read_rows, bool* eof) {
     for (int i = 0; i < TransactionalHive::READ_PARAMS.size(); ++i) {
         DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
                 TypeDescriptor(TransactionalHive::READ_PARAMS[i].type), false);
@@ -90,8 +84,7 @@ Status TransactionalHiveReader::get_columns(
     return _file_format_reader->get_columns(name_to_type, missing_cols);
 }
 
-Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range,
-                                                 io::IOContext* io_ctx) {
+Status TransactionalHiveReader::init_row_filters() {
     std::string data_file_path = _range.path;
     // the path in _range is remove the namenode prefix,
     // and the file_path in delete file is full path, so we should add it back.
@@ -109,7 +102,8 @@ Status TransactionalHiveReader::init_row_filters(const 
TFileRangeDesc& range,
     std::filesystem::path file_path(data_file_path);
 
     SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
-    for (auto& delete_delta : 
range.table_format_params.transactional_hive_params.delete_deltas) {
+    for (const auto& delete_delta :
+         _range.table_format_params.transactional_hive_params.delete_deltas) {
         const std::string file_name = file_path.filename().string();
         auto iter = std::find(delete_delta.file_names.begin(), 
delete_delta.file_names.end(),
                               file_name);
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h 
b/be/src/vec/exec/format/table/transactional_hive_reader.h
index 23a691d037b..f27f33f4563 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.h
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.h
@@ -30,8 +30,6 @@
 #include "common/status.h"
 #include "exec/olap_common.h"
 #include "table_format_reader.h"
-#include "util/runtime_profile.h"
-#include "vec/columns/column_dictionary.h"
 #include "vec/common/hash_table/phmap_fwd_decl.h"
 
 namespace doris {
@@ -89,12 +87,12 @@ public:
                             io::IOContext* io_ctx);
     ~TransactionalHiveReader() override = default;
 
-    Status init_row_filters(const TFileRangeDesc& range, io::IOContext* 
io_ctx) override;
+    Status init_row_filters() final;
 
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+    Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) 
final;
 
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
+                       std::unordered_set<std::string>* missing_cols) final;
 
     Status init_reader(
             const std::vector<std::string>& column_names,
@@ -111,16 +109,10 @@ private:
         RuntimeProfile::Counter* delete_files_read_time = nullptr;
     };
 
-    RuntimeProfile* _profile = nullptr;
-    RuntimeState* _state = nullptr;
-    const TFileScanRangeParams& _params;
-    const TFileRangeDesc& _range;
     TransactionalHiveProfile _transactional_orc_profile;
     AcidRowIDSet _delete_rows;
     std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr;
     std::vector<std::string> _col_names;
-
-    io::IOContext* _io_ctx = nullptr;
 };
 
 inline bool operator<(const TransactionalHiveReader::AcidRowID& lhs,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 8a1f64a55b4..985111cd040 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -27,7 +27,6 @@
 
 #include <algorithm>
 #include <boost/iterator/iterator_facade.hpp>
-#include <iterator>
 #include <map>
 #include <ranges>
 #include <tuple>
@@ -970,8 +969,8 @@ Status VFileScanner::_get_next_reader() {
                         &_slot_id_to_filter_conjuncts);
                 std::unique_ptr<PaimonParquetReader> paimon_reader =
                         
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
-                                                           *_params);
-                RETURN_IF_ERROR(paimon_reader->init_row_filters(range, 
_io_ctx.get()));
+                                                           _state, *_params, 
range, _io_ctx.get());
+                RETURN_IF_ERROR(paimon_reader->init_row_filters());
                 _cur_reader = std::move(paimon_reader);
             } else {
                 bool hive_parquet_use_column_names = true;
@@ -1012,7 +1011,7 @@ Status VFileScanner::_get_next_reader() {
                         _file_col_names, _colname_to_value_range, 
_push_down_conjuncts,
                         _real_tuple_desc, _default_val_row_desc.get(),
                         &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
-                RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range, 
_io_ctx.get()));
+                RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
                 _cur_reader = std::move(tran_orc_reader);
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == 
"iceberg") {
@@ -1032,9 +1031,9 @@ Status VFileScanner::_get_next_reader() {
                         &_file_col_names, _colname_to_value_range, 
_push_down_conjuncts, false,
                         _real_tuple_desc, _default_val_row_desc.get(),
                         &_not_single_slot_filter_conjuncts, 
&_slot_id_to_filter_conjuncts);
-                std::unique_ptr<PaimonOrcReader> paimon_reader =
-                        PaimonOrcReader::create_unique(std::move(orc_reader), 
_profile, *_params);
-                RETURN_IF_ERROR(paimon_reader->init_row_filters(range, 
_io_ctx.get()));
+                std::unique_ptr<PaimonOrcReader> paimon_reader = 
PaimonOrcReader::create_unique(
+                        std::move(orc_reader), _profile, _state, *_params, 
range, _io_ctx.get());
+                RETURN_IF_ERROR(paimon_reader->init_row_filters());
                 _cur_reader = std::move(paimon_reader);
             } else {
                 bool hive_orc_use_column_names = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index b7d34312313..8d3aeaa6a26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -61,6 +61,8 @@ public abstract class FileScanNode extends ExternalScanNode {
     // For explain
     protected long totalFileSize = 0;
     protected long totalPartitionNum = 0;
+    // For display pushdown agg result
+    protected long tableLevelRowCount = -1;
 
     public FileScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
             boolean needCheckColumnPriv) {
@@ -82,11 +84,12 @@ public abstract class FileScanNode extends ExternalScanNode 
{
         super.toThrift(planNode);
     }
 
-    public long getPushDownCount() {
-        // 1. Do not use `0`: If the number of entries in the table is 0,
-        //                    it is unclear whether optimization has been 
performed.
-        // 2. Do not use `null` or `-`: This makes it easier for the program 
to parse the `explain` data.
-        return -1;
+    protected void setPushDownCount(long count) {
+        tableLevelRowCount = count;
+    }
+
+    private long getPushDownCount() {
+        return tableLevelRowCount;
     }
 
     @Override
@@ -106,9 +109,9 @@ public abstract class FileScanNode extends ExternalScanNode 
{
             output.append("(approximate)");
         }
         output.append("inputSplitNum=").append(selectedSplitNum).append(", 
totalFileSize=")
-            .append(totalFileSize).append(", 
scanRanges=").append(scanRangeLocations.size()).append("\n");
+                .append(totalFileSize).append(", 
scanRanges=").append(scanRangeLocations.size()).append("\n");
         
output.append(prefix).append("partition=").append(selectedPartitionNum).append("/").append(totalPartitionNum)
-            .append("\n");
+                .append("\n");
 
         if (detailLevel == TExplainLevel.VERBOSE && !isBatchMode()) {
             output.append(prefix).append("backends:").append("\n");
@@ -133,25 +136,25 @@ public abstract class FileScanNode extends 
ExternalScanNode {
                 if (size <= 4) {
                     for (TFileRangeDesc file : fileRangeDescs) {
                         output.append(prefix).append("    
").append(file.getPath())
-                            .append(" start: ").append(file.getStartOffset())
-                            .append(" length: ").append(file.getSize())
-                            .append("\n");
+                                .append(" start: 
").append(file.getStartOffset())
+                                .append(" length: ").append(file.getSize())
+                                .append("\n");
                     }
                 } else {
                     for (int i = 0; i < 3; i++) {
                         TFileRangeDesc file = fileRangeDescs.get(i);
                         output.append(prefix).append("    
").append(file.getPath())
-                            .append(" start: ").append(file.getStartOffset())
-                            .append(" length: ").append(file.getSize())
-                            .append("\n");
+                                .append(" start: 
").append(file.getStartOffset())
+                                .append(" length: ").append(file.getSize())
+                                .append("\n");
                     }
                     int other = size - 4;
                     output.append(prefix).append("    ... other 
").append(other).append(" files ...\n");
                     TFileRangeDesc file = fileRangeDescs.get(size - 1);
                     output.append(prefix).append("    ").append(file.getPath())
-                        .append(" start: ").append(file.getStartOffset())
-                        .append(" length: ").append(file.getSize())
-                        .append("\n");
+                            .append(" start: ").append(file.getStartOffset())
+                            .append(" length: ").append(file.getSize())
+                            .append("\n");
                 }
             }
         }
@@ -182,10 +185,10 @@ public abstract class FileScanNode extends 
ExternalScanNode {
     }
 
     protected void setDefaultValueExprs(TableIf tbl,
-                                        Map<String, SlotDescriptor> 
slotDescByName,
-                                        Map<String, Expr> exprByName,
-                                        TFileScanRangeParams params,
-                                        boolean useVarcharAsNull) throws 
UserException {
+            Map<String, SlotDescriptor> slotDescByName,
+            Map<String, Expr> exprByName,
+            TFileScanRangeParams params,
+            boolean useVarcharAsNull) throws UserException {
         Preconditions.checkNotNull(tbl);
         TExpr tExpr = new TExpr();
         tExpr.setNodes(Lists.newArrayList());
@@ -222,7 +225,8 @@ public abstract class FileScanNode extends ExternalScanNode 
{
             // if slot desc is null, which mean it is an unrelated slot, just 
skip.
             // eg:
             // (a, b, c) set (x=a, y=b, z=c)
-            // c does not exist in file, the z will be filled with null, even 
if z has default value.
+            // c does not exist in file, the z will be filled with null, even 
if z has
+            // default value.
             // and if z is not nullable, the load will fail.
             if (slotDesc != null) {
                 if (expr != null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 9551ea20388..34efa59a459 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -166,12 +166,12 @@ public class IcebergScanNode extends FileQueryScanNode {
     private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit 
icebergSplit) {
         TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
         
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
+        if (tableLevelPushDownCount) {
+            
tableFormatFileDesc.setTableLevelRowCount(icebergSplit.getTableLevelRowCount());
+        }
         TIcebergFileDesc fileDesc = new TIcebergFileDesc();
         fileDesc.setFormatVersion(formatVersion);
         fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
-        if (tableLevelPushDownCount) {
-            fileDesc.setRowCount(icebergSplit.getTableLevelRowCount());
-        }
         if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
             fileDesc.setContent(FileContent.DATA.id());
         } else {
@@ -382,6 +382,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                         break;
                     }
                 }
+                setPushDownCount(countFromSnapshot);
                 assignCountToSplits(splits, countFromSnapshot);
                 return splits;
             } else {
@@ -563,11 +564,6 @@ public class IcebergScanNode extends FileQueryScanNode {
         super.toThrift(planNode);
     }
 
-    @Override
-    public long getPushDownCount() {
-        return getCountFromSnapshot();
-    }
-
     @Override
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         if (pushdownIcebergPredicates.isEmpty()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index a856139ed18..511c49a8525 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -56,6 +56,7 @@ import org.apache.paimon.utils.InstantiationUtil;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -105,8 +106,6 @@ public class PaimonScanNode extends FileQueryScanNode {
     private int paimonSplitNum = 0;
     private List<SplitStat> splitStats = new ArrayList<>();
     private String serializedTable;
-
-    private boolean pushDownCount = false;
     private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
 
     public PaimonScanNode(PlanNodeId id,
@@ -193,7 +192,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         fileDesc.setDbId(((PaimonExternalTable) 
source.getTargetTable()).getDbId());
         fileDesc.setTblId(source.getTargetTable().getId());
         fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
-        // The hadoop conf should be same with 
PaimonExternalCatalog.createCatalog()#getConfiguration()
+        // The hadoop conf should be same with
+        // PaimonExternalCatalog.createCatalog()#getConfiguration()
         
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
         Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
         if (optDeletionFile.isPresent()) {
@@ -208,10 +208,41 @@ public class PaimonScanNode extends FileQueryScanNode {
             tDeletionFile.setLength(deletionFile.length());
             fileDesc.setDeletionFile(tDeletionFile);
         }
+        if (paimonSplit.getRowCount().isPresent()) {
+            
tableFormatFileDesc.setTableLevelRowCount(paimonSplit.getRowCount().get());
+        }
         tableFormatFileDesc.setPaimonParams(fileDesc);
         rangeDesc.setTableFormatParams(tableFormatFileDesc);
     }
 
+    @VisibleForTesting
+    public static Optional<Long> 
calcuteTableLevelCount(List<org.apache.paimon.table.source.Split> paimonSplits) 
{
+        // check if all splits don't have deletion vector or cardinality of 
every
+        // deletion vector is not null
+        long totalCount = 0;
+        long deletionVectorCount = 0;
+
+        for (org.apache.paimon.table.source.Split s : paimonSplits) {
+            totalCount += s.rowCount();
+
+            Optional<List<DeletionFile>> deletionFiles = s.deletionFiles();
+            if (deletionFiles.isPresent()) {
+                for (DeletionFile dv : deletionFiles.get()) {
+                    if (dv != null) {
+                        Long cardinality = dv.cardinality();
+                        if (cardinality == null) {
+                            // if there is a null deletion vector, we can't 
calculate the table level count
+                            return Optional.empty();
+                        } else {
+                            deletionVectorCount += cardinality;
+                        }
+                    }
+                }
+            }
+        }
+        return Optional.of(totalCount - deletionVectorCount);
+    }
+
     @Override
     public List<Split> getSplits(int numBackends) throws UserException {
         boolean forceJniScanner = sessionVariable.isForceJniScanner();
@@ -223,7 +254,9 @@ public class PaimonScanNode extends FileQueryScanNode {
         boolean applyCountPushdown = getPushDownAggNoGroupingOp() == 
TPushAggOp.COUNT;
         // Just for counting the number of selected partitions for this paimon 
table
         Set<BinaryRow> selectedPartitionValues = Sets.newHashSet();
-        long realFileSplitSize = getRealFileSplitSize(0);
+        // if applyCountPushdown is true, we cannot to split the file
+        // because the raw file and deletion vector is one-to-one mapping
+        long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? 
Long.MAX_VALUE : 0);
         for (org.apache.paimon.table.source.Split split : paimonSplits) {
             SplitStat splitStat = new SplitStat();
             splitStat.setRowCount(split.rowCount());
@@ -241,38 +274,32 @@ public class PaimonScanNode extends FileQueryScanNode {
                     splitStat.setType(SplitReadType.NATIVE);
                     splitStat.setRawFileConvertable(true);
                     List<RawFile> rawFiles = optRawFiles.get();
-                    if (optDeletionFiles.isPresent()) {
-                        List<DeletionFile> deletionFiles = 
optDeletionFiles.get();
-                        for (int i = 0; i < rawFiles.size(); i++) {
-                            RawFile file = rawFiles.get(i);
-                            DeletionFile deletionFile = deletionFiles.get(i);
-                            LocationPath locationPath = new 
LocationPath(file.path(),
-                                    source.getCatalog().getProperties());
-                            try {
-                                List<Split> dorisSplits = 
FileSplitter.splitFile(
-                                        locationPath,
-                                        realFileSplitSize,
-                                        null,
-                                        file.length(),
-                                        -1,
-                                        true,
-                                        null,
-                                        
PaimonSplit.PaimonSplitCreator.DEFAULT);
-                                for (Split dorisSplit : dorisSplits) {
-                                    // the element in DeletionFiles might be 
null
-                                    if (deletionFile != null) {
-                                        splitStat.setHasDeletionVector(true);
-                                        ((PaimonSplit) 
dorisSplit).setDeletionFile(deletionFile);
-                                    }
-                                    splits.add(dorisSplit);
+                    for (int i = 0; i < rawFiles.size(); i++) {
+                        RawFile file = rawFiles.get(i);
+                        LocationPath locationPath = new 
LocationPath(file.path(),
+                                source.getCatalog().getProperties());
+                        try {
+                            List<Split> dorisSplits = FileSplitter.splitFile(
+                                    locationPath,
+                                    realFileSplitSize,
+                                    null,
+                                    file.length(),
+                                    -1,
+                                    true,
+                                    null,
+                                    PaimonSplit.PaimonSplitCreator.DEFAULT);
+                            for (Split dorisSplit : dorisSplits) {
+                                // try to set deletion file
+                                if (optDeletionFiles.isPresent() && 
optDeletionFiles.get().get(i) != null) {
+                                    ((PaimonSplit) 
dorisSplit).setDeletionFile(optDeletionFiles.get().get(i));
+                                    splitStat.setHasDeletionVector(true);
                                 }
-                                ++rawFileSplitNum;
-                            } catch (IOException e) {
-                                throw new UserException("Paimon error to split 
file: " + e.getMessage(), e);
                             }
+                            splits.addAll(dorisSplits);
+                            ++rawFileSplitNum;
+                        } catch (IOException e) {
+                            throw new UserException("Paimon error to split 
file: " + e.getMessage(), e);
                         }
-                    } else {
-                        createRawFileSplits(rawFiles, splits, 
applyCountPushdown ? Long.MAX_VALUE : 0);
                     }
                 } else {
                     if (ignoreSplitType == 
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
@@ -294,47 +321,46 @@ public class PaimonScanNode extends FileQueryScanNode {
         // We need to set the target size for all splits so that we can 
calculate the proportion of each split later.
         splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize));
 
+        // if applyCountPushdown is true, calcute row count for count pushdown
+        if (applyCountPushdown) {
+            // we can create a special empty split and skip the plan process
+            if (splits.isEmpty()) {
+                return splits;
+            }
+            Optional<Long> optTableLevelCount = 
calcuteTableLevelCount(paimonSplits);
+            if (optTableLevelCount.isPresent()) {
+                long tableLevelRowCount = optTableLevelCount.get();
+                List<Split> pushDownCountSplits;
+                if (tableLevelRowCount > COUNT_WITH_PARALLEL_SPLITS) {
+                    int minSplits = 
sessionVariable.getParallelExecInstanceNum() * numBackends;
+                    pushDownCountSplits = splits.subList(0, 
Math.min(splits.size(), minSplits));
+                } else {
+                    pushDownCountSplits = 
Collections.singletonList(splits.get(0));
+                }
+                setPushDownCount(tableLevelRowCount);
+                assignCountToSplits(pushDownCountSplits, tableLevelRowCount);
+                return pushDownCountSplits;
+            }
+        }
+
         this.selectedPartitionNum = selectedPartitionValues.size();
-        // TODO: get total partition number
         return splits;
     }
 
     @VisibleForTesting
     public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() {
         int[] projected = desc.getSlots().stream().mapToInt(
-                slot -> source.getPaimonTable().rowType()
+            slot -> source.getPaimonTable().rowType()
                     .getFieldNames()
                     .stream()
                     .map(String::toLowerCase)
                     .collect(Collectors.toList())
                     .indexOf(slot.getColumn().getName()))
-            .toArray();
+                    .toArray();
         ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
         return readBuilder.withFilter(predicates)
-            .withProjection(projected)
-            .newScan().plan().splits();
-    }
-
-    private void createRawFileSplits(List<RawFile> rawFiles, List<Split> 
splits, long blockSize) throws UserException {
-        for (RawFile file : rawFiles) {
-            LocationPath locationPath = new LocationPath(file.path(),
-                    source.getCatalog().getProperties());
-            try {
-                splits.addAll(
-                        FileSplitter.splitFile(
-                                locationPath,
-                                getRealFileSplitSize(blockSize),
-                                null,
-                                file.length(),
-                                -1,
-                                true,
-                                null,
-                                PaimonSplit.PaimonSplitCreator.DEFAULT));
-                ++rawFileSplitNum;
-            } catch (IOException e) {
-                throw new UserException("Paimon error to split file: " + 
e.getMessage(), e);
-            }
-        }
+                .withProjection(projected)
+                .newScan().plan().splits();
     }
 
     private String getFileFormat(String path) {
@@ -421,4 +447,13 @@ public class PaimonScanNode extends FileQueryScanNode {
         }
         return sb.toString();
     }
+
+    private void assignCountToSplits(List<Split> splits, long totalCount) {
+        int size = splits.size();
+        long countPerSplit = totalCount / size;
+        for (int i = 0; i < size - 1; i++) {
+            ((PaimonSplit) splits.get(i)).setRowCount(countPerSplit);
+        }
+        ((PaimonSplit) splits.get(size - 1)).setRowCount(countPerSplit + 
totalCount % size);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index 988f043ad0e..f4d3d724089 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -36,13 +36,13 @@ public class PaimonSplit extends FileSplit {
     private static final LocationPath DUMMY_PATH = new 
LocationPath("/dummyPath", Maps.newHashMap());
     private Split split;
     private TableFormatType tableFormatType;
-    private Optional<DeletionFile> optDeletionFile;
+    private Optional<DeletionFile> optDeletionFile = Optional.empty();
+    private Optional<Long> optRowCount = Optional.empty();
 
     public PaimonSplit(Split split) {
         super(DUMMY_PATH, 0, 0, 0, 0, null, null);
         this.split = split;
         this.tableFormatType = TableFormatType.PAIMON;
-        this.optDeletionFile = Optional.empty();
 
         if (split instanceof DataSplit) {
             List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
@@ -57,7 +57,6 @@ public class PaimonSplit extends FileSplit {
             String[] hosts, List<String> partitionList) {
         super(file, start, length, fileLength, modificationTime, hosts, 
partitionList);
         this.tableFormatType = TableFormatType.PAIMON;
-        this.optDeletionFile = Optional.empty();
         this.selfSplitWeight = length;
     }
 
@@ -90,6 +89,14 @@ public class PaimonSplit extends FileSplit {
         this.optDeletionFile = Optional.of(deletionFile);
     }
 
+    public Optional<Long> getRowCount() {
+        return optRowCount;
+    }
+
+    public void setRowCount(long rowCount) {
+        this.optRowCount = Optional.of(rowCount);
+    }
+
     public static class PaimonSplitCreator implements SplitCreator {
 
         static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
@@ -103,7 +110,7 @@ public class PaimonSplit extends FileSplit {
                 long modificationTime,
                 String[] hosts,
                 List<String> partitionValues) {
-            PaimonSplit split =  new PaimonSplit(path, start, length, 
fileLength,
+            PaimonSplit split = new PaimonSplit(path, start, length, 
fileLength,
                     modificationTime, hosts, partitionValues);
             split.setTargetSplitSize(fileSplitSize);
             return split;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 4a2d61f2c7d..bc253680eb0 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -33,7 +33,9 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.RawFile;
+import org.apache.paimon.table.source.Split;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -45,6 +47,131 @@ import java.util.Optional;
 
 public class PaimonScanNodeTest {
 
+    @Test
+    public void testCalcuteTableLevelCount() {
+        List<Split> splits = new ArrayList<>();
+
+        // Create mock splits with row count and deletion files
+        Split split1 = new Split() {
+            @Override
+            public long rowCount() {
+                return 100;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(new DeletionFile("path1", 0, 10, 10L));
+                deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        Split split2 = new Split() {
+            @Override
+            public long rowCount() {
+                return 200;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(new DeletionFile("path3", 0, 30, 30L));
+                deletionFiles.add(new DeletionFile("path4", 0, 40, 40L));
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        splits.add(split1);
+        splits.add(split2);
+
+        Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+        Assert.assertTrue(result.isPresent());
+        Assert.assertEquals(200, result.get().longValue());
+    }
+
+    @Test
+    public void testCalcuteTableLevelCountWithNullDeletionFile() {
+        List<Split> splits = new ArrayList<>();
+
+        // Create mock splits with row count and null deletion files
+        Split split1 = new Split() {
+            @Override
+            public long rowCount() {
+                return 100;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(null);
+                deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        Split split2 = new Split() {
+            @Override
+            public long rowCount() {
+                return 200;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                return Optional.empty();
+            }
+        };
+
+        splits.add(split1);
+        splits.add(split2);
+
+        Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+        Assert.assertTrue(result.isPresent());
+        Assert.assertEquals(280, result.get().longValue());
+    }
+
+    @Test
+    public void testCalcuteTableLevelCountWithNullCardinality() {
+        List<Split> splits = new ArrayList<>();
+
+        // Create mock splits with row count and deletion files with null 
cardinality
+        Split split1 = new Split() {
+            @Override
+            public long rowCount() {
+                return 100;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(new DeletionFile("path1", 0, 10, null));
+                deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        Split split2 = new Split() {
+            @Override
+            public long rowCount() {
+                return 200;
+            }
+
+            @Override
+            public Optional<List<DeletionFile>> deletionFiles() {
+                List<DeletionFile> deletionFiles = new ArrayList<>();
+                deletionFiles.add(new DeletionFile("path3", 0, 30, 30L));
+                deletionFiles.add(null);
+                return Optional.of(deletionFiles);
+            }
+        };
+
+        splits.add(split1);
+        splits.add(split2);
+
+        Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+        Assert.assertFalse(result.isPresent());
+    }
+
     @Mocked
     private SessionVariable sv;
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 611c58d2bfd..7411383670f 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -314,6 +314,7 @@ struct TIcebergFileDesc {
     // Deprecated
     5: optional Exprs.TExpr file_select_conjunct;
     6: optional string original_file_path;
+    // Deprecated
     7: optional i64 row_count;
 }
 
@@ -338,6 +339,7 @@ struct TPaimonFileDesc {
     12: optional TPaimonDeletionFileDesc deletion_file;
     13: optional map<string, string> hadoop_conf // deprecated
     14: optional string paimon_table  // deprecated
+    15: optional i64 row_count // deprecated
 }
 
 struct TTrinoConnectorFileDesc {
@@ -405,6 +407,7 @@ struct TTableFormatFileDesc {
     6: optional TMaxComputeFileDesc max_compute_params
     7: optional TTrinoConnectorFileDesc trino_connector_params
     8: optional TLakeSoulFileDesc lakesoul_params
+    9: optional i64 table_level_row_count
 }
 
 enum TTextSerdeType {
diff --git 
a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out 
b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
index a394836625d..f3b44964915 100644
Binary files 
a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out and 
b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out differ
diff --git 
a/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out 
b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out
new file mode 100644
index 00000000000..f0b1e92a088
Binary files /dev/null and 
b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out 
differ
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy 
b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
index 9668cbb0950..41afb02e0f9 100644
--- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
+++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
@@ -181,13 +181,6 @@ suite("test_paimon_catalog", 
"p0,external,doris,external_docker,external_docker_
             def c108= """ select id from tb_with_upper_case where id = 1 """
             def c109= """ select id from tb_with_upper_case where id < 1 """
 
-            def c110 = """select count(*) from deletion_vector_orc;"""
-            def c111 = """select count(*) from deletion_vector_parquet;"""
-            def c112 = """select count(*) from deletion_vector_orc where id > 
2;"""
-            def c113 = """select count(*) from deletion_vector_parquet where 
id > 2;"""
-            def c114 = """select * from deletion_vector_orc where id > 2;"""
-            def c115 = """select * from deletion_vector_parquet where id > 
2;"""
-
             String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
             String catalog_name = "ctl_test_paimon_catalog"
             String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
@@ -296,13 +289,6 @@ suite("test_paimon_catalog", 
"p0,external,doris,external_docker,external_docker_
                 qt_c107 c107
                 qt_c108 c108
                 qt_c109 c109
-
-                qt_c110 c110
-                qt_c111 c111
-                qt_c112 c112
-                qt_c113 c113
-                qt_c114 c114
-                qt_c115 c115
             }
 
             test_cases("false", "false")
diff --git 
a/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
 
b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
new file mode 100644
index 00000000000..fade251ed56
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_deletion_vector", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    logger.info("start paimon test")
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disabled paimon test")
+        return
+    }
+
+    try {
+        String catalog_name = "test_paimon_deletion_vector"
+        String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """create catalog if not exists ${catalog_name} properties (
+            "type" = "paimon",
+            "paimon.catalog.type"="filesystem",
+            "warehouse" = 
"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1"
+        );"""
+        sql """use `${catalog_name}`.`db1`"""
+
+        def test_cases = { String force ->
+            sql """ set force_jni_scanner=${force} """
+            qt_1 """select count(*) from deletion_vector_orc;"""
+            qt_2 """select count(*) from deletion_vector_parquet;"""
+            qt_3 """select count(*) from deletion_vector_orc where id > 2;"""
+            qt_4 """select count(*) from deletion_vector_parquet where id > 
2;"""
+            qt_5 """select * from deletion_vector_orc where id > 2 order by 
id;"""
+            qt_6 """select * from deletion_vector_parquet where id > 2 order 
by id;"""
+            qt_7 """select * from deletion_vector_table_1_0 order by id;"""
+            qt_8 """select count(*) from deletion_vector_table_1_0;"""
+            qt_9 """select count(*) from deletion_vector_table_1_0 where id > 
2;"""
+        }
+
+        def test_table_count_push_down = { String force ->
+            sql """ set force_jni_scanner=${force} """
+            explain {
+                sql("select count(*) from deletion_vector_orc;")
+                contains "pushdown agg=COUNT (-1)"
+            }
+            explain {
+                sql("select count(*) from deletion_vector_parquet;")
+                contains "pushdown agg=COUNT (-1)"
+            }
+            explain {
+                sql("select count(*) from deletion_vector_table_1_0;")
+                contains "pushdown agg=COUNT (8)"
+            }
+        }
+
+        def test_not_table_count_push_down = { String force ->
+            sql """ set enable_count_push_down_for_external_table=false; """
+            sql """ set force_jni_scanner=${force} """
+            explain {
+                sql("select count(*) from deletion_vector_orc;")
+                contains "pushdown agg=NONE"
+            }
+            explain {
+                sql("select count(*) from deletion_vector_parquet;")
+                contains "pushdown agg=NONE"
+            }
+            explain {
+                sql("select count(*) from deletion_vector_table_1_0;")
+                contains "pushdown agg=NONE"
+            }
+        }
+
+        test_cases("false")
+        test_cases("true")
+        test_table_count_push_down("false")
+        test_table_count_push_down("true")
+        test_not_table_count_push_down("false")
+        test_not_table_count_push_down("true")
+    } finally {
+        sql """ set enable_count_push_down_for_external_table=true; """
+        sql """set force_jni_scanner=false"""
+    }
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to