This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 52d7650b163 branch-3.1: [enhance](paimon) opt count pushdown for
paimon and refactor be logic #46911 (#52060)
52d7650b163 is described below
commit 52d7650b163968f81dfdc4a54c12637c6e222efa
Author: Socrates <[email protected]>
AuthorDate: Tue Jun 24 11:10:36 2025 +0800
branch-3.1: [enhance](paimon) opt count pushdown for paimon and refactor be
logic #46911 (#52060)
bp #46911
---
be/src/vec/exec/format/table/iceberg_reader.cpp | 45 ++----
be/src/vec/exec/format/table/iceberg_reader.h | 24 +---
be/src/vec/exec/format/table/paimon_jni_reader.cpp | 23 ++-
be/src/vec/exec/format/table/paimon_jni_reader.h | 1 +
be/src/vec/exec/format/table/paimon_reader.cpp | 28 ++--
be/src/vec/exec/format/table/paimon_reader.h | 26 ++--
.../vec/exec/format/table/table_format_reader.cpp | 25 ----
be/src/vec/exec/format/table/table_format_reader.h | 63 +++++++--
.../format/table/transactional_hive_reader.cpp | 16 +--
.../exec/format/table/transactional_hive_reader.h | 14 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 13 +-
.../org/apache/doris/datasource/FileScanNode.java | 46 +++---
.../datasource/iceberg/source/IcebergScanNode.java | 12 +-
.../datasource/paimon/source/PaimonScanNode.java | 155 +++++++++++++--------
.../datasource/paimon/source/PaimonSplit.java | 15 +-
.../paimon/source/PaimonScanNodeTest.java | 127 +++++++++++++++++
gensrc/thrift/PlanNodes.thrift | 3 +
.../paimon/test_paimon_catalog.out | Bin 795479 -> 795039 bytes
.../paimon/test_paimon_deletion_vector.out | Bin 0 -> 525 bytes
.../paimon/test_paimon_catalog.groovy | 14 --
.../paimon/test_paimon_deletion_vector.groovy | 96 +++++++++++++
21 files changed, 497 insertions(+), 249 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 7dea5d99617..b7666bda688 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -76,13 +76,8 @@
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
const TFileScanRangeParams& params,
const TFileRangeDesc& range,
ShardedKVCache* kv_cache,
io::IOContext* io_ctx)
- : TableFormatReader(std::move(file_format_reader)),
- _profile(profile),
- _state(state),
- _params(params),
- _range(range),
- _kv_cache(kv_cache),
- _io_ctx(io_ctx) {
+ : TableFormatReader(std::move(file_format_reader), state, profile,
params, range, io_ctx),
+ _kv_cache(kv_cache) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
_iceberg_profile.num_delete_files =
@@ -93,31 +88,9 @@
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
- if (range.table_format_params.iceberg_params.__isset.row_count) {
- _remaining_table_level_row_count =
range.table_format_params.iceberg_params.row_count;
- } else {
- _remaining_table_level_row_count = -1;
- }
}
-Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows,
bool* eof) {
- // already get rows from be
- if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_table_level_row_count > 0) {
- auto rows = std::min(_remaining_table_level_row_count,
- (int64_t)_state->query_options().batch_size);
- _remaining_table_level_row_count -= rows;
- auto mutate_columns = block->mutate_columns();
- for (auto& col : mutate_columns) {
- col->resize(rows);
- }
- block->set_columns(std::move(mutate_columns));
- *read_rows = rows;
- if (_remaining_table_level_row_count == 0) {
- *eof = true;
- }
-
- return Status::OK();
- }
+Status IcebergTableReader::get_next_block_inner(Block* block, size_t*
read_rows, bool* eof) {
RETURN_IF_ERROR(_expand_block_if_need(block));
// To support iceberg schema evolution. We change the column name in block
to
@@ -160,14 +133,14 @@ Status IcebergTableReader::get_columns(
return _file_format_reader->get_columns(name_to_type, missing_cols);
}
-Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range,
io::IOContext* io_ctx) {
+Status IcebergTableReader::init_row_filters() {
// We get the count value by doris's be, so we don't need to read the
delete file
- if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_table_level_row_count > 0) {
+ if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_table_level_row_count > 0) {
return Status::OK();
}
- auto& table_desc = range.table_format_params.iceberg_params;
- auto& version = table_desc.format_version;
+ const auto& table_desc = _range.table_format_params.iceberg_params;
+ const auto& version = table_desc.format_version;
if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
return Status::OK();
}
@@ -547,7 +520,7 @@ Status IcebergParquetReader::init_reader(
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
- RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
+ RETURN_IF_ERROR(init_row_filters());
return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names,
&_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
@@ -619,7 +592,7 @@ Status IcebergOrcReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
- RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
+ RETURN_IF_ERROR(init_row_filters());
return orc_reader->init_reader(&_all_required_col_names,
&_new_colname_to_value_range,
conjuncts, false, tuple_descriptor,
row_descriptor,
not_single_slot_filter_conjuncts,
slot_id_to_filter_conjuncts);
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index b057cb0657a..48236021ce9 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -30,10 +30,8 @@
#include "exec/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
-#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "table_format_reader.h"
-#include "util/runtime_profile.h"
#include "vec/columns/column_dictionary.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
@@ -80,9 +78,9 @@ public:
io::IOContext* io_ctx);
~IcebergTableReader() override = default;
- Status init_row_filters(const TFileRangeDesc& range, io::IOContext*
io_ctx) final;
+ Status init_row_filters() final;
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) final;
+ Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof)
final;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) final;
@@ -136,10 +134,6 @@ protected:
// Remove the added delete columns
Status _shrink_block_if_need(Block* block);
- RuntimeProfile* _profile;
- RuntimeState* _state;
- const TFileScanRangeParams& _params;
- const TFileRangeDesc& _range;
// owned by scan node
ShardedKVCache* _kv_cache;
IcebergProfile _iceberg_profile;
@@ -163,13 +157,9 @@ protected:
std::vector<std::string> _expand_col_names;
std::vector<ColumnWithTypeAndName> _expand_columns;
- io::IOContext* _io_ctx;
bool _has_schema_change = false;
bool _has_iceberg_schema = false;
- // the table level row count for optimizing query like:
- // select count(*) from table;
- int64_t _remaining_table_level_row_count;
Fileformat _file_format = Fileformat::NONE;
const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
@@ -213,9 +203,9 @@ public:
const std::unordered_map<int, VExprContextSPtrs>*
slot_id_to_filter_conjuncts);
Status _read_position_delete_file(const TFileRangeDesc* delete_range,
- DeleteFile* position_delete) override;
+ DeleteFile* position_delete) final;
- void set_delete_rows() override {
+ void set_delete_rows() final {
auto* parquet_reader = (ParquetReader*)(_file_format_reader.get());
parquet_reader->set_delete_rows(&_iceberg_delete_rows);
}
@@ -224,7 +214,7 @@ public:
protected:
std::unique_ptr<GenericReader> _create_equality_reader(
- const TFileRangeDesc& delete_desc) override {
+ const TFileRangeDesc& delete_desc) final {
return ParquetReader::create_unique(
_profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
const_cast<cctz::time_zone*>(&_state->timezone_obj()),
_io_ctx, _state);
@@ -235,7 +225,7 @@ public:
ENABLE_FACTORY_CREATOR(IcebergOrcReader);
Status _read_position_delete_file(const TFileRangeDesc* delete_range,
- DeleteFile* position_delete) override;
+ DeleteFile* position_delete) final;
IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
@@ -243,7 +233,7 @@ public:
: IcebergTableReader(std::move(file_format_reader), profile,
state, params, range,
kv_cache, io_ctx) {}
- void set_delete_rows() override {
+ void set_delete_rows() final {
auto* orc_reader = (OrcReader*)_file_format_reader.get();
orc_reader->set_position_delete_rowids(&_iceberg_delete_rows);
}
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index 71bb496d301..e5d997e281a 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -21,9 +21,9 @@
#include <ostream>
#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "vec/core/types.h"
-
namespace doris {
class RuntimeProfile;
class RuntimeState;
@@ -65,6 +65,11 @@ PaimonJniReader::PaimonJniReader(const
std::vector<SlotDescriptor*>& file_slot_d
if (range_params->__isset.serialized_table) {
params["serialized_table"] = range_params->serialized_table;
}
+ if (range.table_format_params.__isset.table_level_row_count) {
+ _remaining_table_level_row_count =
range.table_format_params.table_level_row_count;
+ } else {
+ _remaining_table_level_row_count = -1;
+ }
// Used to create paimon option
for (auto& kv : range.table_format_params.paimon_params.paimon_options) {
@@ -81,6 +86,22 @@ PaimonJniReader::PaimonJniReader(const
std::vector<SlotDescriptor*>& file_slot_d
}
Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool*
eof) {
+ if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_remaining_table_level_row_count >= 0) {
+ auto rows = std::min(_remaining_table_level_row_count,
+ (int64_t)_state->query_options().batch_size);
+ _remaining_table_level_row_count -= rows;
+ auto mutate_columns = block->mutate_columns();
+ for (auto& col : mutate_columns) {
+ col->resize(rows);
+ }
+ block->set_columns(std::move(mutate_columns));
+ *read_rows = rows;
+ if (_remaining_table_level_row_count == 0) {
+ *eof = true;
+ }
+
+ return Status::OK();
+ }
return _jni_connector->get_next_block(block, read_rows, eof);
}
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 3ac2229e655..6ed9a57f62e 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -68,6 +68,7 @@ public:
private:
const std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ int64_t _remaining_table_level_row_count;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp
b/be/src/vec/exec/format/table/paimon_reader.cpp
index 055d6179b2c..cd8b7c0060f 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -20,12 +20,15 @@
#include <vector>
#include "common/status.h"
+#include "runtime/runtime_state.h"
#include "util/deletion_vector.h"
namespace doris::vectorized {
PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
- RuntimeProfile* profile, const
TFileScanRangeParams& params)
- : TableFormatReader(std::move(file_format_reader)), _profile(profile),
_params(params) {
+ RuntimeProfile* profile, RuntimeState* state,
+ const TFileScanRangeParams& params, const
TFileRangeDesc& range,
+ io::IOContext* io_ctx)
+ : TableFormatReader(std::move(file_format_reader), state, profile,
params, range, io_ctx) {
static const char* paimon_profile = "PaimonProfile";
ADD_TIMER(_profile, paimon_profile);
_paimon_profile.num_delete_rows =
@@ -34,15 +37,18 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader>
file_format_reader,
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
}
-Status PaimonReader::init_row_filters(const TFileRangeDesc& range,
io::IOContext* io_ctx) {
- const auto& table_desc = range.table_format_params.paimon_params;
+Status PaimonReader::init_row_filters() {
+ const auto& table_desc = _range.table_format_params.paimon_params;
if (!table_desc.__isset.deletion_file) {
return Status::OK();
}
// set push down agg type to NONE because we can not do count push down opt
// if there are delete files.
- _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
+ if (!_range.table_format_params.paimon_params.__isset.row_count) {
+ _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
+ }
+
const auto& deletion_file = table_desc.deletion_file;
io::FileSystemProperties properties = {
.system_type = _params.file_type,
@@ -50,9 +56,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc&
range, io::IOContext
.hdfs_params = _params.hdfs_params,
.broker_addresses {},
};
- if (range.__isset.file_type) {
+ if (_range.__isset.file_type) {
// for compatibility
- properties.system_type = range.file_type;
+ properties.system_type = _range.file_type;
}
if (_params.__isset.broker_addresses) {
properties.broker_addresses.assign(_params.broker_addresses.begin(),
@@ -63,7 +69,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc&
range, io::IOContext
.path = deletion_file.path,
.file_size = -1,
.mtime = 0,
- .fs_name = range.fs_name,
+ .fs_name = _range.fs_name,
};
// TODO: cache the file in local
@@ -77,7 +83,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc&
range, io::IOContext
{
SCOPED_TIMER(_paimon_profile.delete_files_read_time);
RETURN_IF_ERROR(
- delete_file_reader->read_at(deletion_file.offset, result,
&bytes_read, io_ctx));
+ delete_file_reader->read_at(deletion_file.offset, result,
&bytes_read, _io_ctx));
}
if (bytes_read != deletion_file.length + 4) {
return Status::IOError(
@@ -98,4 +104,8 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc&
range, io::IOContext
}
return Status::OK();
}
+
+Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows,
bool* eof) {
+ return _file_format_reader->get_next_block(block, read_rows, eof);
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_reader.h
b/be/src/vec/exec/format/table/paimon_reader.h
index 3d82b7a3b5c..dcc7bf9d700 100644
--- a/be/src/vec/exec/format/table/paimon_reader.h
+++ b/be/src/vec/exec/format/table/paimon_reader.h
@@ -28,10 +28,13 @@ namespace doris::vectorized {
class PaimonReader : public TableFormatReader {
public:
PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
- const TFileScanRangeParams& params);
+ RuntimeState* state, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, io::IOContext* io_ctx);
~PaimonReader() override = default;
- Status init_row_filters(const TFileRangeDesc& range, io::IOContext*
io_ctx) final;
+ Status init_row_filters() final;
+
+ Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof)
final;
protected:
struct PaimonProfile {
@@ -39,23 +42,21 @@ protected:
RuntimeProfile::Counter* delete_files_read_time;
};
std::vector<int64_t> _delete_rows;
- RuntimeProfile* _profile;
PaimonProfile _paimon_profile;
- virtual void set_delete_rows() = 0;
-private:
- const TFileScanRangeParams& _params;
+ virtual void set_delete_rows() = 0;
};
class PaimonOrcReader final : public PaimonReader {
public:
ENABLE_FACTORY_CREATOR(PaimonOrcReader);
PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
- const TFileScanRangeParams& params)
- : PaimonReader(std::move(file_format_reader), profile, params) {};
+ RuntimeState* state, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, io::IOContext* io_ctx)
+ : PaimonReader(std::move(file_format_reader), profile, state,
params, range, io_ctx) {};
~PaimonOrcReader() final = default;
- void set_delete_rows() override {
+ void set_delete_rows() final {
(reinterpret_cast<OrcReader*>(_file_format_reader.get()))
->set_position_delete_rowids(&_delete_rows);
}
@@ -65,11 +66,12 @@ class PaimonParquetReader final : public PaimonReader {
public:
ENABLE_FACTORY_CREATOR(PaimonParquetReader);
PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
- const TFileScanRangeParams& params)
- : PaimonReader(std::move(file_format_reader), profile, params) {};
+ RuntimeState* state, const TFileScanRangeParams&
params,
+ const TFileRangeDesc& range, io::IOContext* io_ctx)
+ : PaimonReader(std::move(file_format_reader), profile, state,
params, range, io_ctx) {};
~PaimonParquetReader() final = default;
- void set_delete_rows() override {
+ void set_delete_rows() final {
(reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
->set_delete_rows(&_delete_rows);
}
diff --git a/be/src/vec/exec/format/table/table_format_reader.cpp
b/be/src/vec/exec/format/table/table_format_reader.cpp
deleted file mode 100644
index ea8111d81b3..00000000000
--- a/be/src/vec/exec/format/table/table_format_reader.cpp
+++ /dev/null
@@ -1,25 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "table_format_reader.h"
-
-namespace doris::vectorized {
-
-TableFormatReader::TableFormatReader(std::unique_ptr<GenericReader>
file_format_reader)
- : _file_format_reader(std::move(file_format_reader)) {}
-
-} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/table_format_reader.h
b/be/src/vec/exec/format/table/table_format_reader.h
index 5a102a7665e..0257a94a09b 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -17,14 +17,14 @@
#pragma once
-#include <stddef.h>
-
-#include <memory>
+#include <algorithm>
+#include <cstddef>
#include <string>
-#include <unordered_map>
-#include <unordered_set>
#include "common/status.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
#include "vec/exec/format/generic_reader.h"
namespace doris {
@@ -40,11 +40,44 @@ namespace doris::vectorized {
class TableFormatReader : public GenericReader {
public:
- TableFormatReader(std::unique_ptr<GenericReader> file_format_reader);
+ TableFormatReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeState* state,
+ RuntimeProfile* profile, const TFileScanRangeParams&
params,
+ const TFileRangeDesc& range, io::IOContext* io_ctx)
+ : _file_format_reader(std::move(file_format_reader)),
+ _state(state),
+ _profile(profile),
+ _params(params),
+ _range(range),
+ _io_ctx(io_ctx) {
+ if (range.table_format_params.__isset.table_level_row_count) {
+ _table_level_row_count =
range.table_format_params.table_level_row_count;
+ } else {
+ _table_level_row_count = -1;
+ }
+ }
~TableFormatReader() override = default;
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) override
{
- return _file_format_reader->get_next_block(block, read_rows, eof);
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) final {
+ if (_push_down_agg_type == TPushAggOp::type::COUNT &&
_table_level_row_count >= 0) {
+ auto rows =
+ std::min(_table_level_row_count,
(int64_t)_state->query_options().batch_size);
+ _table_level_row_count -= rows;
+ auto mutate_columns = block->mutate_columns();
+ for (auto& col : mutate_columns) {
+ col->resize(rows);
+ }
+ block->set_columns(std::move(mutate_columns));
+ *read_rows = rows;
+ if (_table_level_row_count == 0) {
+ *eof = true;
+ }
+
+ return Status::OK();
+ }
+ return get_next_block_inner(block, read_rows, eof);
}
+
+ virtual Status get_next_block_inner(Block* block, size_t* read_rows, bool*
eof) = 0;
+
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) override
{
return _file_format_reader->get_columns(name_to_type, missing_cols);
@@ -64,18 +97,22 @@ public:
bool fill_all_columns() const override { return
_file_format_reader->fill_all_columns(); }
- virtual Status init_row_filters(const TFileRangeDesc& range,
io::IOContext* io_ctx) = 0;
+ virtual Status init_row_filters() = 0;
protected:
+ std::string _table_format; // hudi, iceberg,
paimon
+ std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
+ RuntimeState* _state = nullptr; // for query options
+ RuntimeProfile* _profile = nullptr;
+ const TFileScanRangeParams& _params;
+ const TFileRangeDesc& _range;
+ io::IOContext* _io_ctx = nullptr;
+ int64_t _table_level_row_count = -1; // for optimization of count(*) push
down
void _collect_profile_before_close() override {
if (_file_format_reader != nullptr) {
_file_format_reader->collect_profile_before_close();
}
}
-
-protected:
- std::string _table_format; // hudi, iceberg
- std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
index 8be11f6773a..f1d02c36399 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.cpp
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -17,7 +17,6 @@
#include "transactional_hive_reader.h"
-#include "runtime/runtime_state.h"
#include "transactional_hive_common.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/exec/format/orc/vorc_reader.h"
@@ -38,12 +37,7 @@
TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader>
RuntimeProfile* profile,
RuntimeState* state,
const TFileScanRangeParams&
params,
const TFileRangeDesc& range,
io::IOContext* io_ctx)
- : TableFormatReader(std::move(file_format_reader)),
- _profile(profile),
- _state(state),
- _params(params),
- _range(range),
- _io_ctx(io_ctx) {
+ : TableFormatReader(std::move(file_format_reader), state, profile,
params, range, io_ctx) {
static const char* transactional_hive_profile = "TransactionalHiveProfile";
ADD_TIMER(_profile, transactional_hive_profile);
_transactional_orc_profile.num_delete_files =
@@ -71,7 +65,7 @@ Status TransactionalHiveReader::init_reader(
return status;
}
-Status TransactionalHiveReader::get_next_block(Block* block, size_t*
read_rows, bool* eof) {
+Status TransactionalHiveReader::get_next_block_inner(Block* block, size_t*
read_rows, bool* eof) {
for (int i = 0; i < TransactionalHive::READ_PARAMS.size(); ++i) {
DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
TypeDescriptor(TransactionalHive::READ_PARAMS[i].type), false);
@@ -90,8 +84,7 @@ Status TransactionalHiveReader::get_columns(
return _file_format_reader->get_columns(name_to_type, missing_cols);
}
-Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range,
- io::IOContext* io_ctx) {
+Status TransactionalHiveReader::init_row_filters() {
std::string data_file_path = _range.path;
// the path in _range is remove the namenode prefix,
// and the file_path in delete file is full path, so we should add it back.
@@ -109,7 +102,8 @@ Status TransactionalHiveReader::init_row_filters(const
TFileRangeDesc& range,
std::filesystem::path file_path(data_file_path);
SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
- for (auto& delete_delta :
range.table_format_params.transactional_hive_params.delete_deltas) {
+ for (const auto& delete_delta :
+ _range.table_format_params.transactional_hive_params.delete_deltas) {
const std::string file_name = file_path.filename().string();
auto iter = std::find(delete_delta.file_names.begin(),
delete_delta.file_names.end(),
file_name);
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h
b/be/src/vec/exec/format/table/transactional_hive_reader.h
index 23a691d037b..f27f33f4563 100644
--- a/be/src/vec/exec/format/table/transactional_hive_reader.h
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.h
@@ -30,8 +30,6 @@
#include "common/status.h"
#include "exec/olap_common.h"
#include "table_format_reader.h"
-#include "util/runtime_profile.h"
-#include "vec/columns/column_dictionary.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
namespace doris {
@@ -89,12 +87,12 @@ public:
io::IOContext* io_ctx);
~TransactionalHiveReader() override = default;
- Status init_row_filters(const TFileRangeDesc& range, io::IOContext*
io_ctx) override;
+ Status init_row_filters() final;
- Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+ Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof)
final;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
- std::unordered_set<std::string>* missing_cols) override;
+ std::unordered_set<std::string>* missing_cols) final;
Status init_reader(
const std::vector<std::string>& column_names,
@@ -111,16 +109,10 @@ private:
RuntimeProfile::Counter* delete_files_read_time = nullptr;
};
- RuntimeProfile* _profile = nullptr;
- RuntimeState* _state = nullptr;
- const TFileScanRangeParams& _params;
- const TFileRangeDesc& _range;
TransactionalHiveProfile _transactional_orc_profile;
AcidRowIDSet _delete_rows;
std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr;
std::vector<std::string> _col_names;
-
- io::IOContext* _io_ctx = nullptr;
};
inline bool operator<(const TransactionalHiveReader::AcidRowID& lhs,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 8a1f64a55b4..985111cd040 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -27,7 +27,6 @@
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
-#include <iterator>
#include <map>
#include <ranges>
#include <tuple>
@@ -970,8 +969,8 @@ Status VFileScanner::_get_next_reader() {
&_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonParquetReader> paimon_reader =
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
- *_params);
- RETURN_IF_ERROR(paimon_reader->init_row_filters(range,
_io_ctx.get()));
+ _state, *_params,
range, _io_ctx.get());
+ RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else {
bool hive_parquet_use_column_names = true;
@@ -1012,7 +1011,7 @@ Status VFileScanner::_get_next_reader() {
_file_col_names, _colname_to_value_range,
_push_down_conjuncts,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
- RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range,
_io_ctx.get()));
+ RETURN_IF_ERROR(tran_orc_reader->init_row_filters());
_cur_reader = std::move(tran_orc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type ==
"iceberg") {
@@ -1032,9 +1031,9 @@ Status VFileScanner::_get_next_reader() {
&_file_col_names, _colname_to_value_range,
_push_down_conjuncts, false,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
- std::unique_ptr<PaimonOrcReader> paimon_reader =
- PaimonOrcReader::create_unique(std::move(orc_reader),
_profile, *_params);
- RETURN_IF_ERROR(paimon_reader->init_row_filters(range,
_io_ctx.get()));
+ std::unique_ptr<PaimonOrcReader> paimon_reader =
PaimonOrcReader::create_unique(
+ std::move(orc_reader), _profile, _state, *_params,
range, _io_ctx.get());
+ RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else {
bool hive_orc_use_column_names = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index b7d34312313..8d3aeaa6a26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -61,6 +61,8 @@ public abstract class FileScanNode extends ExternalScanNode {
// For explain
protected long totalFileSize = 0;
protected long totalPartitionNum = 0;
+ // For display pushdown agg result
+ protected long tableLevelRowCount = -1;
public FileScanNode(PlanNodeId id, TupleDescriptor desc, String
planNodeName, StatisticalType statisticalType,
boolean needCheckColumnPriv) {
@@ -82,11 +84,12 @@ public abstract class FileScanNode extends ExternalScanNode
{
super.toThrift(planNode);
}
- public long getPushDownCount() {
- // 1. Do not use `0`: If the number of entries in the table is 0,
- // it is unclear whether optimization has been
performed.
- // 2. Do not use `null` or `-`: This makes it easier for the program
to parse the `explain` data.
- return -1;
+ protected void setPushDownCount(long count) {
+ tableLevelRowCount = count;
+ }
+
+ private long getPushDownCount() {
+ return tableLevelRowCount;
}
@Override
@@ -106,9 +109,9 @@ public abstract class FileScanNode extends ExternalScanNode
{
output.append("(approximate)");
}
output.append("inputSplitNum=").append(selectedSplitNum).append(",
totalFileSize=")
- .append(totalFileSize).append(",
scanRanges=").append(scanRangeLocations.size()).append("\n");
+ .append(totalFileSize).append(",
scanRanges=").append(scanRangeLocations.size()).append("\n");
output.append(prefix).append("partition=").append(selectedPartitionNum).append("/").append(totalPartitionNum)
- .append("\n");
+ .append("\n");
if (detailLevel == TExplainLevel.VERBOSE && !isBatchMode()) {
output.append(prefix).append("backends:").append("\n");
@@ -133,25 +136,25 @@ public abstract class FileScanNode extends
ExternalScanNode {
if (size <= 4) {
for (TFileRangeDesc file : fileRangeDescs) {
output.append(prefix).append("
").append(file.getPath())
- .append(" start: ").append(file.getStartOffset())
- .append(" length: ").append(file.getSize())
- .append("\n");
+ .append(" start:
").append(file.getStartOffset())
+ .append(" length: ").append(file.getSize())
+ .append("\n");
}
} else {
for (int i = 0; i < 3; i++) {
TFileRangeDesc file = fileRangeDescs.get(i);
output.append(prefix).append("
").append(file.getPath())
- .append(" start: ").append(file.getStartOffset())
- .append(" length: ").append(file.getSize())
- .append("\n");
+ .append(" start:
").append(file.getStartOffset())
+ .append(" length: ").append(file.getSize())
+ .append("\n");
}
int other = size - 4;
output.append(prefix).append(" ... other
").append(other).append(" files ...\n");
TFileRangeDesc file = fileRangeDescs.get(size - 1);
output.append(prefix).append(" ").append(file.getPath())
- .append(" start: ").append(file.getStartOffset())
- .append(" length: ").append(file.getSize())
- .append("\n");
+ .append(" start: ").append(file.getStartOffset())
+ .append(" length: ").append(file.getSize())
+ .append("\n");
}
}
}
@@ -182,10 +185,10 @@ public abstract class FileScanNode extends
ExternalScanNode {
}
protected void setDefaultValueExprs(TableIf tbl,
- Map<String, SlotDescriptor>
slotDescByName,
- Map<String, Expr> exprByName,
- TFileScanRangeParams params,
- boolean useVarcharAsNull) throws
UserException {
+ Map<String, SlotDescriptor> slotDescByName,
+ Map<String, Expr> exprByName,
+ TFileScanRangeParams params,
+ boolean useVarcharAsNull) throws UserException {
Preconditions.checkNotNull(tbl);
TExpr tExpr = new TExpr();
tExpr.setNodes(Lists.newArrayList());
@@ -222,7 +225,8 @@ public abstract class FileScanNode extends ExternalScanNode
{
// if slot desc is null, which mean it is an unrelated slot, just
skip.
// eg:
// (a, b, c) set (x=a, y=b, z=c)
- // c does not exist in file, the z will be filled with null, even
if z has default value.
+ // c does not exist in file, the z will be filled with null, even
if z has
+ // default value.
// and if z is not nullable, the load will fail.
if (slotDesc != null) {
if (expr != null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 9551ea20388..34efa59a459 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -166,12 +166,12 @@ public class IcebergScanNode extends FileQueryScanNode {
private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit
icebergSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
+ if (tableLevelPushDownCount) {
+
tableFormatFileDesc.setTableLevelRowCount(icebergSplit.getTableLevelRowCount());
+ }
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
fileDesc.setFormatVersion(formatVersion);
fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
- if (tableLevelPushDownCount) {
- fileDesc.setRowCount(icebergSplit.getTableLevelRowCount());
- }
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
@@ -382,6 +382,7 @@ public class IcebergScanNode extends FileQueryScanNode {
break;
}
}
+ setPushDownCount(countFromSnapshot);
assignCountToSplits(splits, countFromSnapshot);
return splits;
} else {
@@ -563,11 +564,6 @@ public class IcebergScanNode extends FileQueryScanNode {
super.toThrift(planNode);
}
- @Override
- public long getPushDownCount() {
- return getCountFromSnapshot();
- }
-
@Override
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
if (pushdownIcebergPredicates.isEmpty()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index a856139ed18..511c49a8525 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -56,6 +56,7 @@ import org.apache.paimon.utils.InstantiationUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -105,8 +106,6 @@ public class PaimonScanNode extends FileQueryScanNode {
private int paimonSplitNum = 0;
private List<SplitStat> splitStats = new ArrayList<>();
private String serializedTable;
-
- private boolean pushDownCount = false;
private static final long COUNT_WITH_PARALLEL_SPLITS = 10000;
public PaimonScanNode(PlanNodeId id,
@@ -193,7 +192,8 @@ public class PaimonScanNode extends FileQueryScanNode {
fileDesc.setDbId(((PaimonExternalTable)
source.getTargetTable()).getDbId());
fileDesc.setTblId(source.getTargetTable().getId());
fileDesc.setLastUpdateTime(source.getTargetTable().getUpdateTime());
- // The hadoop conf should be same with
PaimonExternalCatalog.createCatalog()#getConfiguration()
+ // The hadoop conf should be same with
+ // PaimonExternalCatalog.createCatalog()#getConfiguration()
fileDesc.setHadoopConf(source.getCatalog().getCatalogProperty().getHadoopProperties());
Optional<DeletionFile> optDeletionFile = paimonSplit.getDeletionFile();
if (optDeletionFile.isPresent()) {
@@ -208,10 +208,41 @@ public class PaimonScanNode extends FileQueryScanNode {
tDeletionFile.setLength(deletionFile.length());
fileDesc.setDeletionFile(tDeletionFile);
}
+ if (paimonSplit.getRowCount().isPresent()) {
+
tableFormatFileDesc.setTableLevelRowCount(paimonSplit.getRowCount().get());
+ }
tableFormatFileDesc.setPaimonParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
+ @VisibleForTesting
+ public static Optional<Long>
calcuteTableLevelCount(List<org.apache.paimon.table.source.Split> paimonSplits)
{
+ // check if all splits don't have deletion vector or cardinality of
every
+ // deletion vector is not null
+ long totalCount = 0;
+ long deletionVectorCount = 0;
+
+ for (org.apache.paimon.table.source.Split s : paimonSplits) {
+ totalCount += s.rowCount();
+
+ Optional<List<DeletionFile>> deletionFiles = s.deletionFiles();
+ if (deletionFiles.isPresent()) {
+ for (DeletionFile dv : deletionFiles.get()) {
+ if (dv != null) {
+ Long cardinality = dv.cardinality();
+ if (cardinality == null) {
+ // if there is a null deletion vector, we can't
calculate the table level count
+ return Optional.empty();
+ } else {
+ deletionVectorCount += cardinality;
+ }
+ }
+ }
+ }
+ }
+ return Optional.of(totalCount - deletionVectorCount);
+ }
+
@Override
public List<Split> getSplits(int numBackends) throws UserException {
boolean forceJniScanner = sessionVariable.isForceJniScanner();
@@ -223,7 +254,9 @@ public class PaimonScanNode extends FileQueryScanNode {
boolean applyCountPushdown = getPushDownAggNoGroupingOp() ==
TPushAggOp.COUNT;
// Just for counting the number of selected partitions for this paimon
table
Set<BinaryRow> selectedPartitionValues = Sets.newHashSet();
- long realFileSplitSize = getRealFileSplitSize(0);
+ // if applyCountPushdown is true, we cannot to split the file
+ // because the raw file and deletion vector is one-to-one mapping
+ long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ?
Long.MAX_VALUE : 0);
for (org.apache.paimon.table.source.Split split : paimonSplits) {
SplitStat splitStat = new SplitStat();
splitStat.setRowCount(split.rowCount());
@@ -241,38 +274,32 @@ public class PaimonScanNode extends FileQueryScanNode {
splitStat.setType(SplitReadType.NATIVE);
splitStat.setRawFileConvertable(true);
List<RawFile> rawFiles = optRawFiles.get();
- if (optDeletionFiles.isPresent()) {
- List<DeletionFile> deletionFiles =
optDeletionFiles.get();
- for (int i = 0; i < rawFiles.size(); i++) {
- RawFile file = rawFiles.get(i);
- DeletionFile deletionFile = deletionFiles.get(i);
- LocationPath locationPath = new
LocationPath(file.path(),
- source.getCatalog().getProperties());
- try {
- List<Split> dorisSplits =
FileSplitter.splitFile(
- locationPath,
- realFileSplitSize,
- null,
- file.length(),
- -1,
- true,
- null,
-
PaimonSplit.PaimonSplitCreator.DEFAULT);
- for (Split dorisSplit : dorisSplits) {
- // the element in DeletionFiles might be
null
- if (deletionFile != null) {
- splitStat.setHasDeletionVector(true);
- ((PaimonSplit)
dorisSplit).setDeletionFile(deletionFile);
- }
- splits.add(dorisSplit);
+ for (int i = 0; i < rawFiles.size(); i++) {
+ RawFile file = rawFiles.get(i);
+ LocationPath locationPath = new
LocationPath(file.path(),
+ source.getCatalog().getProperties());
+ try {
+ List<Split> dorisSplits = FileSplitter.splitFile(
+ locationPath,
+ realFileSplitSize,
+ null,
+ file.length(),
+ -1,
+ true,
+ null,
+ PaimonSplit.PaimonSplitCreator.DEFAULT);
+ for (Split dorisSplit : dorisSplits) {
+ // try to set deletion file
+ if (optDeletionFiles.isPresent() &&
optDeletionFiles.get().get(i) != null) {
+ ((PaimonSplit)
dorisSplit).setDeletionFile(optDeletionFiles.get().get(i));
+ splitStat.setHasDeletionVector(true);
}
- ++rawFileSplitNum;
- } catch (IOException e) {
- throw new UserException("Paimon error to split
file: " + e.getMessage(), e);
}
+ splits.addAll(dorisSplits);
+ ++rawFileSplitNum;
+ } catch (IOException e) {
+ throw new UserException("Paimon error to split
file: " + e.getMessage(), e);
}
- } else {
- createRawFileSplits(rawFiles, splits,
applyCountPushdown ? Long.MAX_VALUE : 0);
}
} else {
if (ignoreSplitType ==
SessionVariable.IgnoreSplitType.IGNORE_JNI) {
@@ -294,47 +321,46 @@ public class PaimonScanNode extends FileQueryScanNode {
// We need to set the target size for all splits so that we can
calculate the proportion of each split later.
splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize));
+ // if applyCountPushdown is true, calcute row count for count pushdown
+ if (applyCountPushdown) {
+ // we can create a special empty split and skip the plan process
+ if (splits.isEmpty()) {
+ return splits;
+ }
+ Optional<Long> optTableLevelCount =
calcuteTableLevelCount(paimonSplits);
+ if (optTableLevelCount.isPresent()) {
+ long tableLevelRowCount = optTableLevelCount.get();
+ List<Split> pushDownCountSplits;
+ if (tableLevelRowCount > COUNT_WITH_PARALLEL_SPLITS) {
+ int minSplits =
sessionVariable.getParallelExecInstanceNum() * numBackends;
+ pushDownCountSplits = splits.subList(0,
Math.min(splits.size(), minSplits));
+ } else {
+ pushDownCountSplits =
Collections.singletonList(splits.get(0));
+ }
+ setPushDownCount(tableLevelRowCount);
+ assignCountToSplits(pushDownCountSplits, tableLevelRowCount);
+ return pushDownCountSplits;
+ }
+ }
+
this.selectedPartitionNum = selectedPartitionValues.size();
- // TODO: get total partition number
return splits;
}
@VisibleForTesting
public List<org.apache.paimon.table.source.Split> getPaimonSplitFromAPI() {
int[] projected = desc.getSlots().stream().mapToInt(
- slot -> source.getPaimonTable().rowType()
+ slot -> source.getPaimonTable().rowType()
.getFieldNames()
.stream()
.map(String::toLowerCase)
.collect(Collectors.toList())
.indexOf(slot.getColumn().getName()))
- .toArray();
+ .toArray();
ReadBuilder readBuilder = source.getPaimonTable().newReadBuilder();
return readBuilder.withFilter(predicates)
- .withProjection(projected)
- .newScan().plan().splits();
- }
-
- private void createRawFileSplits(List<RawFile> rawFiles, List<Split>
splits, long blockSize) throws UserException {
- for (RawFile file : rawFiles) {
- LocationPath locationPath = new LocationPath(file.path(),
- source.getCatalog().getProperties());
- try {
- splits.addAll(
- FileSplitter.splitFile(
- locationPath,
- getRealFileSplitSize(blockSize),
- null,
- file.length(),
- -1,
- true,
- null,
- PaimonSplit.PaimonSplitCreator.DEFAULT));
- ++rawFileSplitNum;
- } catch (IOException e) {
- throw new UserException("Paimon error to split file: " +
e.getMessage(), e);
- }
- }
+ .withProjection(projected)
+ .newScan().plan().splits();
}
private String getFileFormat(String path) {
@@ -421,4 +447,13 @@ public class PaimonScanNode extends FileQueryScanNode {
}
return sb.toString();
}
+
+ private void assignCountToSplits(List<Split> splits, long totalCount) {
+ int size = splits.size();
+ long countPerSplit = totalCount / size;
+ for (int i = 0; i < size - 1; i++) {
+ ((PaimonSplit) splits.get(i)).setRowCount(countPerSplit);
+ }
+ ((PaimonSplit) splits.get(size - 1)).setRowCount(countPerSplit +
totalCount % size);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index 988f043ad0e..f4d3d724089 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -36,13 +36,13 @@ public class PaimonSplit extends FileSplit {
private static final LocationPath DUMMY_PATH = new
LocationPath("/dummyPath", Maps.newHashMap());
private Split split;
private TableFormatType tableFormatType;
- private Optional<DeletionFile> optDeletionFile;
+ private Optional<DeletionFile> optDeletionFile = Optional.empty();
+ private Optional<Long> optRowCount = Optional.empty();
public PaimonSplit(Split split) {
super(DUMMY_PATH, 0, 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
- this.optDeletionFile = Optional.empty();
if (split instanceof DataSplit) {
List<DataFileMeta> dataFileMetas = ((DataSplit) split).dataFiles();
@@ -57,7 +57,6 @@ public class PaimonSplit extends FileSplit {
String[] hosts, List<String> partitionList) {
super(file, start, length, fileLength, modificationTime, hosts,
partitionList);
this.tableFormatType = TableFormatType.PAIMON;
- this.optDeletionFile = Optional.empty();
this.selfSplitWeight = length;
}
@@ -90,6 +89,14 @@ public class PaimonSplit extends FileSplit {
this.optDeletionFile = Optional.of(deletionFile);
}
+ public Optional<Long> getRowCount() {
+ return optRowCount;
+ }
+
+ public void setRowCount(long rowCount) {
+ this.optRowCount = Optional.of(rowCount);
+ }
+
public static class PaimonSplitCreator implements SplitCreator {
static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
@@ -103,7 +110,7 @@ public class PaimonSplit extends FileSplit {
long modificationTime,
String[] hosts,
List<String> partitionValues) {
- PaimonSplit split = new PaimonSplit(path, start, length,
fileLength,
+ PaimonSplit split = new PaimonSplit(path, start, length,
fileLength,
modificationTime, hosts, partitionValues);
split.setTargetSplitSize(fileSplitSize);
return split;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 4a2d61f2c7d..bc253680eb0 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -33,7 +33,9 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
+import org.apache.paimon.table.source.Split;
import org.junit.Assert;
import org.junit.Test;
@@ -45,6 +47,131 @@ import java.util.Optional;
public class PaimonScanNodeTest {
+ @Test
+ public void testCalcuteTableLevelCount() {
+ List<Split> splits = new ArrayList<>();
+
+ // Create mock splits with row count and deletion files
+ Split split1 = new Split() {
+ @Override
+ public long rowCount() {
+ return 100;
+ }
+
+ @Override
+ public Optional<List<DeletionFile>> deletionFiles() {
+ List<DeletionFile> deletionFiles = new ArrayList<>();
+ deletionFiles.add(new DeletionFile("path1", 0, 10, 10L));
+ deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+ return Optional.of(deletionFiles);
+ }
+ };
+
+ Split split2 = new Split() {
+ @Override
+ public long rowCount() {
+ return 200;
+ }
+
+ @Override
+ public Optional<List<DeletionFile>> deletionFiles() {
+ List<DeletionFile> deletionFiles = new ArrayList<>();
+ deletionFiles.add(new DeletionFile("path3", 0, 30, 30L));
+ deletionFiles.add(new DeletionFile("path4", 0, 40, 40L));
+ return Optional.of(deletionFiles);
+ }
+ };
+
+ splits.add(split1);
+ splits.add(split2);
+
+ Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+ Assert.assertTrue(result.isPresent());
+ Assert.assertEquals(200, result.get().longValue());
+ }
+
+ @Test
+ public void testCalcuteTableLevelCountWithNullDeletionFile() {
+ List<Split> splits = new ArrayList<>();
+
+ // Create mock splits with row count and null deletion files
+ Split split1 = new Split() {
+ @Override
+ public long rowCount() {
+ return 100;
+ }
+
+ @Override
+ public Optional<List<DeletionFile>> deletionFiles() {
+ List<DeletionFile> deletionFiles = new ArrayList<>();
+ deletionFiles.add(null);
+ deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+ return Optional.of(deletionFiles);
+ }
+ };
+
+ Split split2 = new Split() {
+ @Override
+ public long rowCount() {
+ return 200;
+ }
+
+ @Override
+ public Optional<List<DeletionFile>> deletionFiles() {
+ return Optional.empty();
+ }
+ };
+
+ splits.add(split1);
+ splits.add(split2);
+
+ Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+ Assert.assertTrue(result.isPresent());
+ Assert.assertEquals(280, result.get().longValue());
+ }
+
+ @Test
+ public void testCalcuteTableLevelCountWithNullCardinality() {
+ List<Split> splits = new ArrayList<>();
+
+ // Create mock splits with row count and deletion files with null
cardinality
+ Split split1 = new Split() {
+ @Override
+ public long rowCount() {
+ return 100;
+ }
+
+ @Override
+ public Optional<List<DeletionFile>> deletionFiles() {
+ List<DeletionFile> deletionFiles = new ArrayList<>();
+ deletionFiles.add(new DeletionFile("path1", 0, 10, null));
+ deletionFiles.add(new DeletionFile("path2", 0, 20, 20L));
+ return Optional.of(deletionFiles);
+ }
+ };
+
+ Split split2 = new Split() {
+ @Override
+ public long rowCount() {
+ return 200;
+ }
+
+ @Override
+ public Optional<List<DeletionFile>> deletionFiles() {
+ List<DeletionFile> deletionFiles = new ArrayList<>();
+ deletionFiles.add(new DeletionFile("path3", 0, 30, 30L));
+ deletionFiles.add(null);
+ return Optional.of(deletionFiles);
+ }
+ };
+
+ splits.add(split1);
+ splits.add(split2);
+
+ Optional<Long> result = PaimonScanNode.calcuteTableLevelCount(splits);
+ Assert.assertFalse(result.isPresent());
+ }
+
@Mocked
private SessionVariable sv;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 611c58d2bfd..7411383670f 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -314,6 +314,7 @@ struct TIcebergFileDesc {
// Deprecated
5: optional Exprs.TExpr file_select_conjunct;
6: optional string original_file_path;
+ // Deprecated
7: optional i64 row_count;
}
@@ -338,6 +339,7 @@ struct TPaimonFileDesc {
12: optional TPaimonDeletionFileDesc deletion_file;
13: optional map<string, string> hadoop_conf // deprecated
14: optional string paimon_table // deprecated
+ 15: optional i64 row_count // deprecated
}
struct TTrinoConnectorFileDesc {
@@ -405,6 +407,7 @@ struct TTableFormatFileDesc {
6: optional TMaxComputeFileDesc max_compute_params
7: optional TTrinoConnectorFileDesc trino_connector_params
8: optional TLakeSoulFileDesc lakesoul_params
+ 9: optional i64 table_level_row_count
}
enum TTextSerdeType {
diff --git
a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out
index a394836625d..f3b44964915 100644
Binary files
a/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out and
b/regression-test/data/external_table_p0/paimon/test_paimon_catalog.out differ
diff --git
a/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out
b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out
new file mode 100644
index 00000000000..f0b1e92a088
Binary files /dev/null and
b/regression-test/data/external_table_p0/paimon/test_paimon_deletion_vector.out
differ
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
index 9668cbb0950..41afb02e0f9 100644
--- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
+++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy
@@ -181,13 +181,6 @@ suite("test_paimon_catalog",
"p0,external,doris,external_docker,external_docker_
def c108= """ select id from tb_with_upper_case where id = 1 """
def c109= """ select id from tb_with_upper_case where id < 1 """
- def c110 = """select count(*) from deletion_vector_orc;"""
- def c111 = """select count(*) from deletion_vector_parquet;"""
- def c112 = """select count(*) from deletion_vector_orc where id >
2;"""
- def c113 = """select count(*) from deletion_vector_parquet where
id > 2;"""
- def c114 = """select * from deletion_vector_orc where id > 2;"""
- def c115 = """select * from deletion_vector_parquet where id >
2;"""
-
String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
String catalog_name = "ctl_test_paimon_catalog"
String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
@@ -296,13 +289,6 @@ suite("test_paimon_catalog",
"p0,external,doris,external_docker,external_docker_
qt_c107 c107
qt_c108 c108
qt_c109 c109
-
- qt_c110 c110
- qt_c111 c111
- qt_c112 c112
- qt_c113 c113
- qt_c114 c114
- qt_c115 c115
}
test_cases("false", "false")
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
new file mode 100644
index 00000000000..fade251ed56
--- /dev/null
+++
b/regression-test/suites/external_table_p0/paimon/test_paimon_deletion_vector.groovy
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_deletion_vector",
"p0,external,doris,external_docker,external_docker_doris") {
+
+ logger.info("start paimon test")
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disabled paimon test")
+ return
+ }
+
+ try {
+ String catalog_name = "test_paimon_deletion_vector"
+ String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type" = "paimon",
+ "paimon.catalog.type"="filesystem",
+ "warehouse" =
"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1"
+ );"""
+ sql """use `${catalog_name}`.`db1`"""
+
+ def test_cases = { String force ->
+ sql """ set force_jni_scanner=${force} """
+ qt_1 """select count(*) from deletion_vector_orc;"""
+ qt_2 """select count(*) from deletion_vector_parquet;"""
+ qt_3 """select count(*) from deletion_vector_orc where id > 2;"""
+ qt_4 """select count(*) from deletion_vector_parquet where id >
2;"""
+ qt_5 """select * from deletion_vector_orc where id > 2 order by
id;"""
+ qt_6 """select * from deletion_vector_parquet where id > 2 order
by id;"""
+ qt_7 """select * from deletion_vector_table_1_0 order by id;"""
+ qt_8 """select count(*) from deletion_vector_table_1_0;"""
+ qt_9 """select count(*) from deletion_vector_table_1_0 where id >
2;"""
+ }
+
+ def test_table_count_push_down = { String force ->
+ sql """ set force_jni_scanner=${force} """
+ explain {
+ sql("select count(*) from deletion_vector_orc;")
+ contains "pushdown agg=COUNT (-1)"
+ }
+ explain {
+ sql("select count(*) from deletion_vector_parquet;")
+ contains "pushdown agg=COUNT (-1)"
+ }
+ explain {
+ sql("select count(*) from deletion_vector_table_1_0;")
+ contains "pushdown agg=COUNT (8)"
+ }
+ }
+
+ def test_not_table_count_push_down = { String force ->
+ sql """ set enable_count_push_down_for_external_table=false; """
+ sql """ set force_jni_scanner=${force} """
+ explain {
+ sql("select count(*) from deletion_vector_orc;")
+ contains "pushdown agg=NONE"
+ }
+ explain {
+ sql("select count(*) from deletion_vector_parquet;")
+ contains "pushdown agg=NONE"
+ }
+ explain {
+ sql("select count(*) from deletion_vector_table_1_0;")
+ contains "pushdown agg=NONE"
+ }
+ }
+
+ test_cases("false")
+ test_cases("true")
+ test_table_count_push_down("false")
+ test_table_count_push_down("true")
+ test_not_table_count_push_down("false")
+ test_not_table_count_push_down("true")
+ } finally {
+ sql """ set enable_count_push_down_for_external_table=true; """
+ sql """set force_jni_scanner=false"""
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]