This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a396f34e380 [feature](iceberg) support insert into iceberg table with
sort-order (#60540)
a396f34e380 is described below
commit a396f34e380545534643c071ed142876a76fa7f0
Author: zhangstar333 <[email protected]>
AuthorDate: Sun Feb 22 08:19:24 2026 +0800
[feature](iceberg) support insert into iceberg table with sort-order
(#60540)
### What problem does this PR solve?
Problem Summary:
support write iceberg table with sort-order, the write data have been
local sorted, and have add lower/upper_bounds metadata. so the iceberg
plan could use it to prune datafile.
**Notes**: this is only a local sort, not global sort. so if you are
more parallel about iceberg writer, you many see overlapping of
lower/upper_bounds between files.
if you need a global sort, maybe could add order by cluster in the
insert SQL.
you could create table, and then alter table eg:
```
CREATE TABLE test_table2 (
id INT,
name STRING,
score DOUBLE,
create_time datetime
)
ORDER BY (
id ASC NULLS FIRST,
score DESC NULLS LAST)
PROPERTIES (
'write-format'='ORC'
);
```
---
.../pipeline/exec/iceberg_table_sink_operator.cpp | 2 +-
be/src/pipeline/exec/operator.cpp | 4 +
.../exec/spill_iceberg_table_sink_operator.cpp | 196 ++++++++++++
.../exec/spill_iceberg_table_sink_operator.h | 93 ++++++
be/src/pipeline/pipeline_fragment_context.cpp | 12 +-
be/src/vec/common/sort/sorter.cpp | 6 +-
be/src/vec/common/sort/sorter.h | 6 +-
be/src/vec/exec/format/table/parquet_utils.cpp | 33 ++
be/src/vec/exec/format/table/parquet_utils.h | 5 +
be/src/vec/runtime/vorc_transformer.cpp | 209 +++++++++++-
be/src/vec/runtime/vorc_transformer.h | 12 +-
be/src/vec/runtime/vparquet_transformer.cpp | 64 ++++
be/src/vec/runtime/vparquet_transformer.h | 2 +
.../writer/iceberg/viceberg_partition_writer.cpp | 56 ++--
.../writer/iceberg/viceberg_partition_writer.h | 29 +-
.../vec/sink/writer/iceberg/viceberg_sort_writer.h | 352 +++++++++++++++++++++
.../sink/writer/iceberg/viceberg_table_writer.cpp | 68 ++--
.../sink/writer/iceberg/viceberg_table_writer.h | 21 +-
.../sink/writer/iceberg/vpartition_writer_base.h | 64 ++++
be/test/vec/exec/sort/full_sort_test.cpp | 4 +-
.../create_preinstalled_scripts/iceberg/run26.sql | 73 +++++
.../datasource/iceberg/IcebergTransaction.java | 12 +-
.../iceberg/helper/IcebergWriterHelper.java | 56 +++-
.../glue/translator/PhysicalPlanTranslator.java | 4 +
.../org/apache/doris/planner/IcebergTableSink.java | 40 ++-
gensrc/thrift/DataSinks.thrift | 11 +
.../iceberg/write/test_iceberg_write_stats2.out | 33 ++
.../iceberg/write/test_iceberg_write_stats2.groovy | 118 +++++++
28 files changed, 1465 insertions(+), 120 deletions(-)
diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
b/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
index 9f59ff040a5..ab76beac655 100644
--- a/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/iceberg_table_sink_operator.cpp
@@ -26,7 +26,7 @@ Status IcebergTableSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
- RETURN_IF_ERROR(_writer->init_properties(p._pool));
+ RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index ff92b598b79..13d9b5a5851 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -76,6 +76,7 @@
#include "pipeline/exec/set_source_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
+#include "pipeline/exec/spill_iceberg_table_sink_operator.h"
#include "pipeline/exec/spill_sort_sink_operator.h"
#include "pipeline/exec/spill_sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_operator.h"
@@ -820,6 +821,7 @@ DECLARE_OPERATOR(OlapTableSinkV2LocalState)
DECLARE_OPERATOR(HiveTableSinkLocalState)
DECLARE_OPERATOR(TVFTableSinkLocalState)
DECLARE_OPERATOR(IcebergTableSinkLocalState)
+DECLARE_OPERATOR(SpillIcebergTableSinkLocalState)
DECLARE_OPERATOR(AnalyticSinkLocalState)
DECLARE_OPERATOR(BlackholeSinkLocalState)
DECLARE_OPERATOR(SortSinkLocalState)
@@ -938,6 +940,8 @@ template class
AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOp
template class AsyncWriterSink<doris::vectorized::VTabletWriterV2,
OlapTableSinkV2OperatorX>;
template class AsyncWriterSink<doris::vectorized::VHiveTableWriter,
HiveTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VIcebergTableWriter,
IcebergTableSinkOperatorX>;
+template class AsyncWriterSink<doris::vectorized::VIcebergTableWriter,
+ SpillIcebergTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTVFTableWriter,
TVFTableSinkOperatorX>;
#ifdef BE_TEST
diff --git a/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
new file mode 100644
index 00000000000..d8f577af648
--- /dev/null
+++ b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.cpp
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "spill_iceberg_table_sink_operator.h"
+
+#include "common/status.h"
+#include "pipeline/exec/iceberg_table_sink_operator.h"
+#include "pipeline/exec/spill_utils.h"
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
+#include "vec/sink/writer/iceberg/viceberg_table_writer.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+SpillIcebergTableSinkLocalState::SpillIcebergTableSinkLocalState(DataSinkOperatorXBase*
parent,
+ RuntimeState*
state)
+ : Base(parent, state) {}
+
+Status SpillIcebergTableSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
+ RETURN_IF_ERROR(Base::init(state, info));
+ SCOPED_TIMER(exec_time_counter());
+ SCOPED_TIMER(_init_timer);
+
+ _init_spill_counters();
+
+ auto& p = _parent->cast<Parent>();
+ RETURN_IF_ERROR(_writer->init_properties(p._pool, p._row_desc));
+ return Status::OK();
+}
+
+Status SpillIcebergTableSinkLocalState::open(RuntimeState* state) {
+ SCOPED_TIMER(Base::exec_time_counter());
+ SCOPED_TIMER(Base::_open_timer);
+ RETURN_IF_ERROR(Base::open(state));
+ return Status::OK();
+}
+
+bool SpillIcebergTableSinkLocalState::is_blockable() const {
+ return true;
+}
+
+size_t SpillIcebergTableSinkLocalState::get_reserve_mem_size(RuntimeState*
state, bool eos) {
+ if (!_writer || !_writer->_current_writer) {
+ return 0;
+ }
+
+ auto* sort_writer =
+
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+ if (!sort_writer || !sort_writer->sorter()) {
+ return 0;
+ }
+
+ return sort_writer->sorter()->get_reserve_mem_size(state, eos);
+}
+
+size_t SpillIcebergTableSinkLocalState::get_revocable_mem_size(RuntimeState*
state) const {
+ if (!_writer || !_writer->_current_writer) {
+ return 0;
+ }
+
+ auto* sort_writer =
+
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+ if (!sort_writer || !sort_writer->sorter()) {
+ return 0;
+ }
+
+ return sort_writer->sorter()->data_size();
+}
+
+Status SpillIcebergTableSinkLocalState::revoke_memory(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
+ if (!_writer || !_writer->_current_writer) {
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
+ return Status::OK();
+ }
+
+ auto* sort_writer =
+
dynamic_cast<vectorized::VIcebergSortWriter*>(_writer->_current_writer.get());
+
+ if (!sort_writer || !sort_writer->sorter()) {
+ if (spill_context) {
+ spill_context->on_task_finished();
+ }
+ return Status::OK();
+ }
+
+ auto exception_catch_func = [sort_writer]() {
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION({ return sort_writer->trigger_spill();
});
+ }();
+ return status;
+ };
+
+
state->get_query_ctx()->resource_ctx()->task_controller()->increase_revoking_tasks_count();
+ auto status =
+ SpillSinkRunnable(state, spill_context, operator_profile(),
exception_catch_func).run();
+ if (!status.ok()) {
+
state->get_query_ctx()->resource_ctx()->task_controller()->decrease_revoking_tasks_count();
+ }
+ return status;
+}
+
+SpillIcebergTableSinkOperatorX::SpillIcebergTableSinkOperatorX(
+ ObjectPool* pool, int operator_id, const RowDescriptor& row_desc,
+ const std::vector<TExpr>& t_output_expr)
+ : Base(operator_id, 0, 0), _row_desc(row_desc),
_t_output_expr(t_output_expr), _pool(pool) {
+ _spillable = true;
+}
+
+Status SpillIcebergTableSinkOperatorX::init(const TDataSink& thrift_sink) {
+ RETURN_IF_ERROR(Base::init(thrift_sink));
+ _name = "SPILL_ICEBERG_TABLE_SINK_OPERATOR";
+ RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr,
_output_vexpr_ctxs));
+ return Status::OK();
+}
+
+Status SpillIcebergTableSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
+ return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+}
+
+Status SpillIcebergTableSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block,
+ bool eos) {
+ auto& local_state = get_local_state(state);
+ SCOPED_TIMER(local_state.exec_time_counter());
+ COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
+ return local_state.sink(state, in_block, eos);
+}
+
+size_t SpillIcebergTableSinkOperatorX::get_reserve_mem_size(RuntimeState*
state, bool eos) {
+ auto& local_state = get_local_state(state);
+ return local_state.get_reserve_mem_size(state, eos);
+}
+
+size_t SpillIcebergTableSinkOperatorX::revocable_mem_size(RuntimeState* state)
const {
+ auto& local_state = get_local_state(state);
+ return local_state.get_revocable_mem_size(state);
+}
+
+Status SpillIcebergTableSinkOperatorX::revoke_memory(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
+ auto& local_state = get_local_state(state);
+ return local_state.revoke_memory(state, spill_context);
+}
+
+void SpillIcebergTableSinkLocalState::_init_spill_counters() {
+ auto* profile = custom_profile();
+ //seems init_spill_write_counters()
+ ADD_TIMER_WITH_LEVEL(profile, "SpillWriteTime", 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteTaskWaitInQueueCount",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteTaskCount", TUnit::UNIT, 1);
+ ADD_TIMER_WITH_LEVEL(profile, "SpillWriteTaskWaitInQueueTime", 1);
+ ADD_TIMER_WITH_LEVEL(profile, "SpillWriteFileTime", 1);
+ ADD_TIMER_WITH_LEVEL(profile, "SpillWriteSerializeBlockTime", 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteBlockCount", TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteBlockBytes", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileBytes", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteRows", TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileTotalCount", TUnit::UNIT,
1);
+
+ //seems init_spill_read_counters()
+ ADD_TIMER_WITH_LEVEL(profile, "SpillTotalTime", 1);
+ ADD_TIMER_WITH_LEVEL(profile, "SpillRecoverTime", 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueCount",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillReadTaskCount", TUnit::UNIT, 1);
+ ADD_TIMER_WITH_LEVEL(profile, "SpillReadTaskWaitInQueueTime", 1);
+ ADD_TIMER_WITH_LEVEL(profile, "SpillReadFileTime", 1);
+ ADD_TIMER_WITH_LEVEL(profile, "SpillReadDerializeBlockTime", 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockCount", TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillReadBlockBytes", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileBytes", TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillReadRows", TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillReadFileCount", TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentBytes",
TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(profile, "SpillWriteFileCurrentCount", TUnit::UNIT,
1);
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/spill_iceberg_table_sink_operator.h
b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.h
new file mode 100644
index 00000000000..fb0e50ee794
--- /dev/null
+++ b/be/src/pipeline/exec/spill_iceberg_table_sink_operator.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "operator.h"
+#include "vec/sink/writer/iceberg/viceberg_table_writer.h"
+
+namespace doris::pipeline {
+#include "common/compile_check_begin.h"
+
+class SpillIcebergTableSinkLocalState;
+class SpillIcebergTableSinkOperatorX;
+
+class SpillIcebergTableSinkLocalState final
+ : public AsyncWriterSink<vectorized::VIcebergTableWriter,
SpillIcebergTableSinkOperatorX> {
+public:
+ using Base = AsyncWriterSink<vectorized::VIcebergTableWriter,
SpillIcebergTableSinkOperatorX>;
+ using Parent = SpillIcebergTableSinkOperatorX;
+ ENABLE_FACTORY_CREATOR(SpillIcebergTableSinkLocalState);
+
+ SpillIcebergTableSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state);
+ ~SpillIcebergTableSinkLocalState() override = default;
+
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+ Status open(RuntimeState* state) override;
+
+ bool is_blockable() const override;
+ [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
+ Status revoke_memory(RuntimeState* state, const
std::shared_ptr<SpillContext>& spill_context);
+ size_t get_revocable_mem_size(RuntimeState* state) const;
+
+private:
+ void _init_spill_counters();
+ friend class SpillIcebergTableSinkOperatorX;
+};
+
+class SpillIcebergTableSinkOperatorX final
+ : public DataSinkOperatorX<SpillIcebergTableSinkLocalState> {
+public:
+ using Base = DataSinkOperatorX<SpillIcebergTableSinkLocalState>;
+ using LocalStateType = SpillIcebergTableSinkLocalState;
+
+ SpillIcebergTableSinkOperatorX(ObjectPool* pool, int operator_id, const
RowDescriptor& row_desc,
+ const std::vector<TExpr>& t_output_expr);
+
+ Status init(const TDataSink& thrift_sink) override;
+
+ Status prepare(RuntimeState* state) override;
+
+ Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
+
+ size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
+
+ size_t revocable_mem_size(RuntimeState* state) const override;
+
+ Status revoke_memory(RuntimeState* state,
+ const std::shared_ptr<SpillContext>& spill_context)
override;
+
+ using DataSinkOperatorX<LocalStateType>::node_id;
+ using DataSinkOperatorX<LocalStateType>::operator_id;
+ using DataSinkOperatorX<LocalStateType>::get_local_state;
+
+private:
+ friend class SpillIcebergTableSinkLocalState;
+ template <typename Writer, typename Parent>
+ requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
+ friend class AsyncWriterSink;
+
+ const RowDescriptor& _row_desc;
+ vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+ const std::vector<TExpr>& _t_output_expr;
+ ObjectPool* _pool = nullptr;
+};
+
+#include "common/compile_check_end.h"
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 4dfa58e45a0..98c79c4e492 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -96,6 +96,7 @@
#include "pipeline/exec/set_source_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
+#include "pipeline/exec/spill_iceberg_table_sink_operator.h"
#include "pipeline/exec/spill_sort_sink_operator.h"
#include "pipeline/exec/spill_sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_operator.h"
@@ -1074,10 +1075,15 @@ Status
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
}
case TDataSinkType::ICEBERG_TABLE_SINK: {
if (!thrift_sink.__isset.iceberg_table_sink) {
- return Status::InternalError("Missing hive table sink.");
+ return Status::InternalError("Missing iceberg table sink.");
+ }
+ if (thrift_sink.iceberg_table_sink.__isset.sort_info) {
+ _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool,
next_sink_operator_id(),
+ row_desc,
output_exprs);
+ } else {
+ _sink = std::make_shared<IcebergTableSinkOperatorX>(pool,
next_sink_operator_id(),
+ row_desc,
output_exprs);
}
- _sink = std::make_shared<IcebergTableSinkOperatorX>(pool,
next_sink_operator_id(), row_desc,
- output_exprs);
break;
}
case TDataSinkType::JDBC_TABLE_SINK: {
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 0e588b119f0..6f909acd916 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -235,7 +235,7 @@ Status FullSorter::append_block(Block* block) {
// iff have reach limit and the unsorted block capacity can't hold the
block data size
if (_reach_limit() && !has_enough_capacity(block,
_state->unsorted_block().get())) {
- RETURN_IF_ERROR(_do_sort());
+ RETURN_IF_ERROR(do_sort());
}
{
@@ -268,7 +268,7 @@ Status FullSorter::prepare_for_read(bool is_spill) {
_state->ignore_offset();
}
if (_state->unsorted_block()->rows() > 0) {
- RETURN_IF_ERROR(_do_sort());
+ RETURN_IF_ERROR(do_sort());
}
return _state->build_merge_tree(_sort_description);
}
@@ -282,7 +282,7 @@ Status FullSorter::merge_sort_read_for_spill(RuntimeState*
state, doris::vectori
return _state->merge_sort_read(block, batch_size, eos);
}
-Status FullSorter::_do_sort() {
+Status FullSorter::do_sort() {
Block* src_block = _state->unsorted_block().get();
Block desc_block = src_block->clone_without_columns();
COUNTER_UPDATE(_partial_sort_counter, 1);
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index bd5ae9f9726..d1d1229d908 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -202,6 +202,10 @@ public:
_max_buffered_block_bytes = max_buffered_block_bytes;
}
+ auto merge_sort_state() { return _state.get(); }
+
+ Status do_sort();
+
private:
bool _reach_limit() {
return _state->unsorted_block()->allocated_bytes() >=
_max_buffered_block_bytes;
@@ -209,8 +213,6 @@ private:
bool has_enough_capacity(Block* input_block, Block* unsorted_block) const;
- Status _do_sort();
-
std::unique_ptr<MergeSorterState> _state;
static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 * 1024 * 1024;
diff --git a/be/src/vec/exec/format/table/parquet_utils.cpp
b/be/src/vec/exec/format/table/parquet_utils.cpp
index d5aee2079a5..4892516f573 100644
--- a/be/src/vec/exec/format/table/parquet_utils.cpp
+++ b/be/src/vec/exec/format/table/parquet_utils.cpp
@@ -433,4 +433,37 @@ void build_path_map(const FieldSchema& field, const
std::string& prefix,
}
}
+#define MERGE_STATS_CASE(ParquetType)
\
+ case ParquetType: {
\
+ auto typed_left_stat = std::static_pointer_cast<
\
+
::parquet::TypedStatistics<::parquet::PhysicalType<ParquetType>>>(left); \
+ auto typed_right_stat = std::static_pointer_cast<
\
+
::parquet::TypedStatistics<::parquet::PhysicalType<ParquetType>>>(right); \
+ typed_left_stat->Merge(*typed_right_stat);
\
+ return;
\
+ }
+
+void merge_stats(const std::shared_ptr<::parquet::Statistics>& left,
+ const std::shared_ptr<::parquet::Statistics>& right) {
+ if (left == nullptr || right == nullptr) {
+ return;
+ }
+ DCHECK(left->physical_type() == right->physical_type());
+
+ switch (left->physical_type()) {
+ MERGE_STATS_CASE(::parquet::Type::BOOLEAN);
+ MERGE_STATS_CASE(::parquet::Type::INT32);
+ MERGE_STATS_CASE(::parquet::Type::INT64);
+ MERGE_STATS_CASE(::parquet::Type::INT96);
+ MERGE_STATS_CASE(::parquet::Type::FLOAT);
+ MERGE_STATS_CASE(::parquet::Type::DOUBLE);
+ MERGE_STATS_CASE(::parquet::Type::BYTE_ARRAY);
+ MERGE_STATS_CASE(::parquet::Type::FIXED_LEN_BYTE_ARRAY);
+ default:
+ LOG(WARNING) << "Unsupported parquet type for statistics merge: "
+ << static_cast<int>(left->physical_type());
+ break;
+ }
+}
+
} // namespace doris::vectorized::parquet_utils
diff --git a/be/src/vec/exec/format/table/parquet_utils.h
b/be/src/vec/exec/format/table/parquet_utils.h
index 0f966ab0676..b79dee1cb39 100644
--- a/be/src/vec/exec/format/table/parquet_utils.h
+++ b/be/src/vec/exec/format/table/parquet_utils.h
@@ -17,6 +17,8 @@
#pragma once
+#include <parquet/statistics.h>
+
#include <array>
#include <string>
#include <unordered_map>
@@ -176,4 +178,7 @@ std::string decode_statistics_value(const FieldSchema*
schema_field,
void build_path_map(const FieldSchema& field, const std::string& prefix,
std::unordered_map<std::string, const FieldSchema*>* map);
+void merge_stats(const std::shared_ptr<::parquet::Statistics>& left,
+ const std::shared_ptr<::parquet::Statistics>& right);
+
} // namespace doris::vectorized::parquet_utils
diff --git a/be/src/vec/runtime/vorc_transformer.cpp
b/be/src/vec/runtime/vorc_transformer.cpp
index a42a8eb84d4..39c226a971b 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -23,6 +23,7 @@
#include <exception>
#include <ostream>
+#include <sstream>
#include "common/cast_set.h"
#include "common/status.h"
@@ -55,6 +56,7 @@
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_struct.h"
+#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/runtime/vdatetime_value.h"
@@ -113,8 +115,10 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state,
doris::io::FileWriter* fil
const VExprContextSPtrs& output_vexpr_ctxs,
std::string schema,
std::vector<std::string> column_names, bool
output_object_data,
TFileCompressType::type compress_type,
- const iceberg::Schema* iceberg_schema)
+ const iceberg::Schema* iceberg_schema,
+ std::shared_ptr<io::FileSystem> fs)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
+ _fs(fs),
_file_writer(file_writer),
_column_names(std::move(column_names)),
_write_options(new orc::WriterOptions()),
@@ -343,6 +347,209 @@ Status VOrcTransformer::close() {
return Status::OK();
}
+Status
VOrcTransformer::collect_file_statistics_after_close(TIcebergColumnStats*
stats) {
+ if (stats == nullptr || _iceberg_schema == nullptr || _fs == nullptr) {
+ return Status::OK();
+ }
+
+ try {
+ // orc writer do not provide api to get column statistics
+ // so we do not implement it now, we could implement it in future if
really needed
+ // maybe at the close of orc writer, we could do statistics by hands
+ // eg:
https://github.com/trinodb/trino/blob/master/lib/trino-orc/src/main/java/io/trino/orc/OrcWriter.java
+ io::FileReaderSPtr file_reader;
+ io::FileReaderOptions reader_options;
+ RETURN_IF_ERROR(_fs->open_file(_file_writer->path(), &file_reader,
&reader_options));
+ auto input_stream = std::make_unique<ORCFileInputStream>(
+ _file_writer->path().native(), file_reader, nullptr, nullptr,
8L * 1024L * 1024L,
+ 1L * 1024L * 1024L);
+ std::unique_ptr<orc::Reader> reader =
+ orc::createReader(std::move(input_stream),
orc::ReaderOptions());
+ std::unique_ptr<orc::Statistics> file_stats = reader->getStatistics();
+
+ if (file_stats == nullptr) {
+ return Status::OK();
+ }
+
+ std::map<int32_t, int64_t> value_counts;
+ std::map<int32_t, int64_t> null_value_counts;
+ std::map<int32_t, std::string> lower_bounds;
+ std::map<int32_t, std::string> upper_bounds;
+ bool has_any_null_count = false;
+ bool has_any_min_max = false;
+
+ const iceberg::StructType& root_struct =
_iceberg_schema->root_struct();
+ const auto& nested_fields = root_struct.fields();
+ for (uint32_t i = 0; i < nested_fields.size(); i++) {
+ uint32_t orc_col_id = i + 1; // skip root struct
+ if (orc_col_id >= file_stats->getNumberOfColumns()) {
+ continue;
+ }
+
+ const orc::ColumnStatistics* col_stats =
file_stats->getColumnStatistics(orc_col_id);
+ if (col_stats == nullptr) {
+ continue;
+ }
+
+ int32_t field_id = nested_fields[i].field_id();
+ int64_t non_null_count = col_stats->getNumberOfValues();
+ value_counts[field_id] = non_null_count;
+ if (col_stats->hasNull()) {
+ has_any_null_count = true;
+ int64_t null_count = _cur_written_rows - non_null_count;
+ null_value_counts[field_id] = null_count;
+ value_counts[field_id] += null_count;
+ }
+
+ if (_collect_column_bounds(col_stats, field_id,
+
_output_vexpr_ctxs[i]->root()->data_type(), &lower_bounds,
+ &upper_bounds)) {
+ has_any_min_max = true;
+ }
+ }
+
+ stats->__set_value_counts(value_counts);
+ if (has_any_null_count) {
+ stats->__set_null_value_counts(null_value_counts);
+ }
+ if (has_any_min_max) {
+ stats->__set_lower_bounds(lower_bounds);
+ stats->__set_upper_bounds(upper_bounds);
+ }
+ return Status::OK();
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Failed to collect ORC file statistics: " << e.what();
+ return Status::OK();
+ }
+}
+
+bool VOrcTransformer::_collect_column_bounds(const orc::ColumnStatistics*
col_stats,
+ int32_t field_id, const
DataTypePtr& data_type,
+ std::map<int32_t, std::string>*
lower_bounds,
+ std::map<int32_t, std::string>*
upper_bounds) {
+ bool has_bounds = false;
+ auto primitive_type = remove_nullable(data_type)->get_primitive_type();
+ if (const auto* bool_stats = dynamic_cast<const
orc::BooleanColumnStatistics*>(col_stats)) {
+ if (bool_stats->hasCount()) {
+ uint64_t true_count = bool_stats->getTrueCount();
+ uint64_t false_count = bool_stats->getFalseCount();
+ if (true_count > 0 || false_count > 0) {
+ has_bounds = true;
+ bool min_val = (false_count == 0);
+ bool max_val = (true_count > 0);
+ (*lower_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&min_val),
sizeof(bool));
+ (*upper_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&max_val),
sizeof(bool));
+ }
+ }
+ } else if (const auto* int_stats =
+ dynamic_cast<const
orc::IntegerColumnStatistics*>(col_stats)) {
+ if (int_stats->hasMinimum() && int_stats->hasMaximum()) {
+ has_bounds = true;
+ int64_t min_val = int_stats->getMinimum();
+ int64_t max_val = int_stats->getMaximum();
+ (*lower_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&min_val),
sizeof(int64_t));
+ (*upper_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&max_val),
sizeof(int64_t));
+ }
+ } else if (const auto* double_stats =
+ dynamic_cast<const
orc::DoubleColumnStatistics*>(col_stats)) {
+ if (double_stats->hasMinimum() && double_stats->hasMaximum()) {
+ has_bounds = true;
+ if (primitive_type == TYPE_FLOAT) {
+ auto min_val = static_cast<float>(double_stats->getMinimum());
+ auto max_val = static_cast<float>(double_stats->getMaximum());
+ (*lower_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&min_val),
sizeof(float));
+ (*upper_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&max_val),
sizeof(float));
+ } else {
+ double min_val = double_stats->getMinimum();
+ double max_val = double_stats->getMaximum();
+ (*lower_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&min_val),
sizeof(double));
+ (*upper_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&max_val),
sizeof(double));
+ }
+ }
+ } else if (const auto* string_stats =
+ dynamic_cast<const
orc::StringColumnStatistics*>(col_stats)) {
+ if (string_stats->hasMinimum() && string_stats->hasMaximum()) {
+ has_bounds = true;
+ (*lower_bounds)[field_id] = string_stats->getMinimum();
+ (*upper_bounds)[field_id] = string_stats->getMaximum();
+ }
+ } else if (const auto* date_stats = dynamic_cast<const
orc::DateColumnStatistics*>(col_stats)) {
+ if (date_stats->hasMinimum() && date_stats->hasMaximum()) {
+ has_bounds = true;
+ int32_t min_val = date_stats->getMinimum();
+ int32_t max_val = date_stats->getMaximum();
+ (*lower_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&min_val),
sizeof(int32_t));
+ (*upper_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&max_val),
sizeof(int32_t));
+ }
+ } else if (const auto* ts_stats =
+ dynamic_cast<const
orc::TimestampColumnStatistics*>(col_stats)) {
+ if (ts_stats->hasMinimum() && ts_stats->hasMaximum()) {
+ has_bounds = true;
+ int64_t min_val = ts_stats->getMinimum() * 1000;
+ int64_t max_val = ts_stats->getMaximum() * 1000;
+ (*lower_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&min_val),
sizeof(int64_t));
+ (*upper_bounds)[field_id] =
+ std::string(reinterpret_cast<const char*>(&max_val),
sizeof(int64_t));
+ }
+ } else if (const auto* decimal_stats =
+ dynamic_cast<const
orc::DecimalColumnStatistics*>(col_stats)) {
+ if (decimal_stats->hasMinimum() && decimal_stats->hasMaximum()) {
+ has_bounds = true;
+ (*lower_bounds)[field_id] =
_decimal_to_bytes(decimal_stats->getMinimum());
+ (*upper_bounds)[field_id] =
_decimal_to_bytes(decimal_stats->getMaximum());
+ }
+ }
+
+ return has_bounds;
+}
+
+std::string VOrcTransformer::_decimal_to_bytes(const orc::Decimal& decimal) {
+ orc::Int128 val = decimal.value;
+ if (val == 0) {
+ char zero = 0;
+ return std::string(&zero, 1);
+ }
+
+ // Convert Int128 -> signed big-endian minimal bytes
+ bool negative = val < 0;
+ auto high = static_cast<uint64_t>(val.getHighBits());
+ auto low = val.getLowBits();
+
+ // If negative, convert to two's complement explicitly
+ if (negative) {
+ // two's complement for 128-bit
+ low = ~low + 1;
+ high = ~high + (low == 0 ? 1 : 0);
+ }
+
+ // Serialize to big-endian bytes
+ uint8_t buf[16];
+ for (int i = 0; i < 8; ++i) {
+ buf[i] = static_cast<uint8_t>(high >> (56 - i * 8));
+ buf[i + 8] = static_cast<uint8_t>(low >> (56 - i * 8));
+ }
+
+ // Strip leading sign-extension bytes (Iceberg minimal encoding)
+ int start = 0;
+ uint8_t sign_byte = negative ? 0xFF : 0x00;
+ while (start < 15 && buf[start] == sign_byte &&
+ ((buf[start + 1] & 0x80) == (sign_byte & 0x80))) {
+ ++start;
+ }
+ return std::string(reinterpret_cast<const char*>(buf + start), 16 - start);
+}
+
Status VOrcTransformer::write(const Block& block) {
if (block.rows() == 0) {
return Status::OK();
diff --git a/be/src/vec/runtime/vorc_transformer.h
b/be/src/vec/runtime/vorc_transformer.h
index 3523c6b14af..d3876c7b5c6 100644
--- a/be/src/vec/runtime/vorc_transformer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -83,7 +83,8 @@ public:
const VExprContextSPtrs& output_vexpr_ctxs, std::string
schema,
std::vector<std::string> column_names, bool
output_object_data,
TFileCompressType::type compression,
- const iceberg::Schema* iceberg_schema = nullptr);
+ const iceberg::Schema* iceberg_schema = nullptr,
+ std::shared_ptr<io::FileSystem> fs = nullptr);
~VOrcTransformer() = default;
@@ -95,6 +96,8 @@ public:
int64_t written_len() override;
+ Status collect_file_statistics_after_close(TIcebergColumnStats* stats);
+
private:
void set_compression_type(const TFileCompressType::type& compress_type);
std::unique_ptr<orc::Type> _build_orc_type(const DataTypePtr& type,
@@ -106,7 +109,12 @@ private:
// so we need to resize the subtype of a complex type
Status _resize_row_batch(const DataTypePtr& type, const IColumn& column,
orc::ColumnVectorBatch* orc_col_batch);
-
+ bool _collect_column_bounds(const orc::ColumnStatistics* col_stats,
int32_t field_id,
+ const DataTypePtr& data_type,
+ std::map<int32_t, std::string>* lower_bounds,
+ std::map<int32_t, std::string>* upper_bounds);
+ std::string _decimal_to_bytes(const orc::Decimal& decimal);
+ std::shared_ptr<io::FileSystem> _fs = nullptr;
doris::io::FileWriter* _file_writer = nullptr;
std::vector<std::string> _column_names;
std::unique_ptr<orc::OutputStream> _output_stream;
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp
b/be/src/vec/runtime/vparquet_transformer.cpp
index d75513b0b0a..1563e092810 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -21,6 +21,7 @@
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <glog/logging.h>
+#include <parquet/api/reader.h>
#include <parquet/column_writer.h>
#include <parquet/platform.h>
#include <parquet/schema.h>
@@ -42,6 +43,7 @@
#include "util/arrow/utils.h"
#include "util/debug_util.h"
#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
+#include "vec/exec/format/table/parquet_utils.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
@@ -309,7 +311,69 @@ Status VParquetTransformer::close() {
LOG(WARNING) << "Parquet writer close error: " << e.what();
return Status::IOError(e.what());
}
+
return Status::OK();
}
+Status
VParquetTransformer::collect_file_statistics_after_close(TIcebergColumnStats*
stats) {
+ std::shared_ptr<parquet::FileMetaData> file_metadata = _writer->metadata();
+ if (file_metadata == nullptr) {
+ return Status::InternalError("File metadata is not available");
+ }
+ std::map<int, int64_t> column_sizes;
+ std::map<int, int64_t> value_counts;
+ std::map<int, int64_t> null_value_counts;
+ std::map<int, std::string> lower_bounds;
+ std::map<int, std::string> upper_bounds;
+ std::map<int, std::shared_ptr<parquet::Statistics>> merged_column_stats;
+
+ const int num_row_groups = file_metadata->num_row_groups();
+ const int num_columns = file_metadata->num_columns();
+ for (int col_idx = 0; col_idx < num_columns; ++col_idx) {
+ auto field_id =
file_metadata->schema()->Column(col_idx)->schema_node()->field_id();
+
+ for (int rg_idx = 0; rg_idx < num_row_groups; ++rg_idx) {
+ auto row_group = file_metadata->RowGroup(rg_idx);
+ auto column_chunk = row_group->ColumnChunk(col_idx);
+ column_sizes[field_id] += column_chunk->total_compressed_size();
+
+ if (column_chunk->is_stats_set()) {
+ auto column_stat = column_chunk->statistics();
+ if (!merged_column_stats.contains(field_id)) {
+ merged_column_stats[field_id] = column_stat;
+ } else {
+ parquet_utils::merge_stats(merged_column_stats[field_id],
column_stat);
+ }
+ }
+ }
+ }
+
+ bool has_any_null_count = false;
+ bool has_any_min_max = false;
+ for (const auto& [field_id, column_stat] : merged_column_stats) {
+ value_counts[field_id] = column_stat->num_values();
+ if (column_stat->HasNullCount()) {
+ has_any_null_count = true;
+ int64_t null_count = column_stat->null_count();
+ null_value_counts[field_id] = null_count;
+ value_counts[field_id] += null_count;
+ }
+ if (column_stat->HasMinMax()) {
+ has_any_min_max = true;
+ lower_bounds[field_id] = column_stat->EncodeMin();
+ upper_bounds[field_id] = column_stat->EncodeMax();
+ }
+ }
+
+ stats->__set_column_sizes(column_sizes);
+ stats->__set_value_counts(value_counts);
+ if (has_any_null_count) {
+ stats->__set_null_value_counts(null_value_counts);
+ }
+ if (has_any_min_max) {
+ stats->__set_lower_bounds(lower_bounds);
+ stats->__set_upper_bounds(upper_bounds);
+ }
+ return Status::OK();
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vparquet_transformer.h
b/be/src/vec/runtime/vparquet_transformer.h
index b654b331414..228a31c55a3 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -111,6 +111,8 @@ public:
int64_t written_len() override;
+ Status collect_file_statistics_after_close(TIcebergColumnStats* stats);
+
private:
Status _parse_properties();
Status _parse_schema();
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 933c8d3f562..f5c4ed1dd54 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -17,6 +17,9 @@
#include "viceberg_partition_writer.h"
+#include <memory>
+#include <sstream>
+
#include "io/file_factory.h"
#include "runtime/runtime_state.h"
#include "vec/columns/column_map.h"
@@ -47,9 +50,9 @@ VIcebergPartitionWriter::VIcebergPartitionWriter(
_compress_type(compress_type),
_hadoop_conf(hadoop_conf) {}
-Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile*
profile) {
+Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile*
profile,
+ const RowDescriptor* row_desc) {
_state = state;
-
io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
if (!_write_info.broker_addresses.empty()) {
@@ -83,17 +86,19 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
to_string(_compress_type));
}
}
- ParquetFileOptions parquet_options = {parquet_compression_type,
- TParquetVersion::PARQUET_1_0,
false, false};
- _file_format_transformer.reset(new VParquetTransformer(
+ ParquetFileOptions parquet_options = {.compression_type =
parquet_compression_type,
+ .parquet_version =
TParquetVersion::PARQUET_1_0,
+ .parquet_disable_dictionary =
false,
+ .enable_int96_timestamps =
false};
+ _file_format_transformer = std::make_unique<VParquetTransformer>(
state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names, false,
- parquet_options, _iceberg_schema_json, &_schema));
+ parquet_options, _iceberg_schema_json, &_schema);
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
- _file_format_transformer.reset(
- new VOrcTransformer(state, _file_writer.get(),
_write_output_expr_ctxs, "",
- _write_column_names, false,
_compress_type, &_schema));
+ _file_format_transformer = std::make_unique<VOrcTransformer>(
+ state, _file_writer.get(), _write_output_expr_ctxs, "",
_write_column_names, false,
+ _compress_type, &_schema, _fs);
return _file_format_transformer->open();
}
default: {
@@ -121,7 +126,8 @@ Status VIcebergPartitionWriter::close(const Status& status)
{
}
}
if (status_ok) {
- auto commit_data = _build_iceberg_commit_data();
+ TIcebergCommitData commit_data;
+ RETURN_IF_ERROR(_build_iceberg_commit_data(&commit_data));
_state->add_iceberg_commit_datas(commit_data);
}
return result_status;
@@ -133,16 +139,28 @@ Status VIcebergPartitionWriter::write(vectorized::Block&
block) {
return Status::OK();
}
-TIcebergCommitData VIcebergPartitionWriter::_build_iceberg_commit_data() {
- TIcebergCommitData iceberg_commit_data;
- iceberg_commit_data.__set_file_path(
- fmt::format("{}/{}", _write_info.original_write_path,
_get_target_file_name()));
- iceberg_commit_data.__set_row_count(_row_count);
+Status VIcebergPartitionWriter::_build_iceberg_commit_data(TIcebergCommitData*
commit_data) {
+ DCHECK(commit_data != nullptr);
DCHECK(_file_format_transformer != nullptr);
-
iceberg_commit_data.__set_file_size(_file_format_transformer->written_len());
- iceberg_commit_data.__set_file_content(TFileContent::DATA);
- iceberg_commit_data.__set_partition_values(_partition_values);
- return iceberg_commit_data;
+
+ commit_data->__set_file_path(
+ fmt::format("{}/{}", _write_info.original_write_path,
_get_target_file_name()));
+ commit_data->__set_row_count(_row_count);
+ commit_data->__set_file_size(_file_format_transformer->written_len());
+ commit_data->__set_file_content(TFileContent::DATA);
+ commit_data->__set_partition_values(_partition_values);
+ if (_file_format_type == TFileFormatType::FORMAT_PARQUET) {
+ TIcebergColumnStats column_stats;
+
RETURN_IF_ERROR(static_cast<VParquetTransformer*>(_file_format_transformer.get())
+
->collect_file_statistics_after_close(&column_stats));
+ commit_data->__set_column_stats(column_stats);
+ } else if (_file_format_type == TFileFormatType::FORMAT_ORC) {
+ TIcebergColumnStats column_stats;
+
RETURN_IF_ERROR(static_cast<VOrcTransformer*>(_file_format_transformer.get())
+
->collect_file_statistics_after_close(&column_stats));
+ commit_data->__set_column_stats(column_stats);
+ }
+ return Status::OK();
}
std::string VIcebergPartitionWriter::_get_file_extension(
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
index 28605f80426..b4db050dd71 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.h
@@ -20,10 +20,10 @@
#include <gen_cpp/DataSinks_types.h>
#include "io/fs/file_writer.h"
-#include "vec/columns/column.h"
#include "vec/exec/format/table/iceberg/schema.h"
#include "vec/exprs/vexpr_fwd.h"
#include "vec/runtime/vfile_format_transformer.h"
+#include "vec/sink/writer/iceberg/vpartition_writer_base.h"
namespace doris {
namespace io {
@@ -43,16 +43,8 @@ namespace vectorized {
class Block;
class VFileFormatTransformer;
-class VIcebergPartitionWriter {
+class VIcebergPartitionWriter : public IPartitionWriterBase {
public:
- struct WriteInfo {
- std::string write_path;
- std::string original_write_path;
- std::string target_path;
- TFileType::type file_type;
- std::vector<TNetworkAddress> broker_addresses;
- };
-
VIcebergPartitionWriter(const TDataSink& t_sink, std::vector<std::string>
partition_values,
const VExprContextSPtrs& write_output_expr_ctxs,
const doris::iceberg::Schema& schema,
@@ -63,24 +55,23 @@ public:
TFileCompressType::type compress_type,
const std::map<std::string, std::string>&
hadoop_conf);
- Status init_properties(ObjectPool* pool) { return Status::OK(); }
-
- Status open(RuntimeState* state, RuntimeProfile* profile);
+ Status open(RuntimeState* state, RuntimeProfile* profile,
+ const RowDescriptor* row_desc) override;
- Status write(vectorized::Block& block);
+ Status write(vectorized::Block& block) override;
- Status close(const Status& status);
+ Status close(const Status& status) override;
- inline const std::string& file_name() const { return _file_name; }
+ inline const std::string& file_name() const override { return _file_name; }
- inline int file_name_index() const { return _file_name_index; }
+ inline int file_name_index() const override { return _file_name_index; }
- inline size_t written_len() { return
_file_format_transformer->written_len(); }
+ inline size_t written_len() const override { return
_file_format_transformer->written_len(); }
private:
std::string _get_target_file_name();
- TIcebergCommitData _build_iceberg_commit_data();
+ Status _build_iceberg_commit_data(TIcebergCommitData* commit_data);
std::string _get_file_extension(TFileFormatType::type file_format_type,
TFileCompressType::type
write_compress_type);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
new file mode 100644
index 00000000000..f9b91021f2b
--- /dev/null
+++ b/be/src/vec/sink/writer/iceberg/viceberg_sort_writer.h
@@ -0,0 +1,352 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include "common/config.h"
+#include "common/object_pool.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/common/sort/sorter.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vslot_ref.h"
+#include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
+#include "vec/sink/writer/iceberg/vpartition_writer_base.h"
+#include "vec/spill/spill_stream.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris {
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+class VIcebergSortWriter : public IPartitionWriterBase {
+public:
+ using CreateWriterLambda =
std::function<std::shared_ptr<VIcebergPartitionWriter>(
+ const std::string* file_name, int file_name_index)>;
+
+ VIcebergSortWriter(std::shared_ptr<VIcebergPartitionWriter>
partition_writer,
+ TSortInfo sort_info, int64_t target_file_size_bytes,
+ CreateWriterLambda create_writer_lambda = nullptr)
+ : _sort_info(std::move(sort_info)),
+ _iceberg_partition_writer(std::move(partition_writer)),
+ _create_writer_lambda(std::move(create_writer_lambda)),
+ _target_file_size_bytes(target_file_size_bytes) {}
+
+ Status open(RuntimeState* state, RuntimeProfile* profile,
+ const RowDescriptor* row_desc) override {
+ DCHECK(row_desc != nullptr);
+ _runtime_state = state;
+ _profile = profile;
+ _row_desc = row_desc;
+
+ RETURN_IF_ERROR(_vsort_exec_exprs.init(_sort_info, &_pool));
+ RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, *row_desc,
*row_desc));
+ RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
+
+ _sorter = vectorized::FullSorter::create_unique(
+ _vsort_exec_exprs, -1, 0, &_pool, _sort_info.is_asc_order,
_sort_info.nulls_first,
+ *row_desc, state, _profile);
+ _sorter->init_profile(_profile);
+ _sorter->set_enable_spill();
+ _do_spill_count_counter = ADD_COUNTER(_profile, "IcebergDoSpillCount",
TUnit::UNIT);
+ RETURN_IF_ERROR(_iceberg_partition_writer->open(state, profile,
row_desc));
+ return Status::OK();
+ }
+
+ Status write(vectorized::Block& block) override {
+ RETURN_IF_ERROR(_sorter->append_block(&block));
+ _update_spill_block_batch_row_count(block);
+ // sort in memory and write directly to Parquet file
+ if (_sorter->data_size() >= _target_file_size_bytes) {
+ return _flush_to_file();
+ }
+ // trigger_spill() will be called by memory management system
+ return Status::OK();
+ }
+
+ Status close(const Status& status) override {
+ Defer defer {[&]() {
+ Status st = _iceberg_partition_writer->close(status);
+ if (!st.ok()) {
+ LOG(WARNING) << fmt::format("_iceberg_partition_writer close
failed, reason: {}",
+ st.to_string());
+ }
+ _cleanup_spill_streams();
+ }};
+
+ if (!status.ok() || _runtime_state->is_cancelled()) {
+ return status;
+ }
+
+ if (_sorter == nullptr) {
+ return Status::OK();
+ }
+
+ if (!_sorter->merge_sort_state()->unsorted_block()->empty() ||
+ !_sorter->merge_sort_state()->get_sorted_block().empty()) {
+ if (_sorted_streams.empty()) {
+ // data remaining in memory
+ RETURN_IF_ERROR(_sorter->do_sort());
+ RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+ RETURN_IF_ERROR(_write_sorted_data());
+ return Status::OK();
+ }
+
+ // spill remaining data
+ RETURN_IF_ERROR(_do_spill());
+ }
+
+ // Merge all spilled streams and output final sorted data
+ if (!_sorted_streams.empty()) {
+ RETURN_IF_ERROR(_combine_files_output());
+ }
+
+ return Status::OK();
+ }
+
+ inline const std::string& file_name() const override {
+ return _iceberg_partition_writer->file_name();
+ }
+
+ inline int file_name_index() const override {
+ return _iceberg_partition_writer->file_name_index();
+ }
+
+ inline size_t written_len() const override { return
_iceberg_partition_writer->written_len(); }
+
+ auto sorter() const { return _sorter.get(); }
+
+ Status trigger_spill() { return _do_spill(); }
+
+private:
+ // how many rows need in spill block batch
+ void _update_spill_block_batch_row_count(const vectorized::Block& block) {
+ auto rows = block.rows();
+ if (rows > 0 && 0 == _avg_row_bytes) {
+ _avg_row_bytes = std::max(1UL, block.bytes() / rows);
+ int64_t spill_batch_bytes =
_runtime_state->spill_sort_batch_bytes(); // default 8MB
+ _spill_block_batch_row_count =
+ (spill_batch_bytes + _avg_row_bytes - 1) / _avg_row_bytes;
+ }
+ }
+
+ // have enought data, flush in-memory sorted data to file
+ Status _flush_to_file() {
+ RETURN_IF_ERROR(_sorter->do_sort());
+ RETURN_IF_ERROR(_sorter->prepare_for_read(false));
+ RETURN_IF_ERROR(_write_sorted_data());
+ RETURN_IF_ERROR(_close_current_writer_and_open_next());
+ _sorter->reset();
+ return Status::OK();
+ }
+
+ // write data into file
+ Status _write_sorted_data() {
+ bool eos = false;
+ Block block;
+ while (!eos && !_runtime_state->is_cancelled()) {
+ RETURN_IF_ERROR(_sorter->get_next(_runtime_state, &block, &eos));
+ RETURN_IF_ERROR(_iceberg_partition_writer->write(block));
+ block.clear_column_data();
+ }
+ return Status::OK();
+ }
+
+ // close current writer and open a new one with incremented file index
+ Status _close_current_writer_and_open_next() {
+ std::string current_file_name = _iceberg_partition_writer->file_name();
+ int current_file_index = _iceberg_partition_writer->file_name_index();
+ RETURN_IF_ERROR(_iceberg_partition_writer->close(Status::OK()));
+
+ _iceberg_partition_writer =
+ _create_writer_lambda(¤t_file_name, current_file_index +
1);
+ if (!_iceberg_partition_writer) {
+ return Status::InternalError("Failed to create new partition
writer");
+ }
+
+ RETURN_IF_ERROR(_iceberg_partition_writer->open(_runtime_state,
_profile, _row_desc));
+ return Status::OK();
+ }
+
+ // batch size max is int32_t max
+ int32_t _get_spill_batch_size() const {
+ if (_spill_block_batch_row_count >
std::numeric_limits<int32_t>::max()) {
+ return std::numeric_limits<int32_t>::max();
+ }
+ return static_cast<int32_t>(_spill_block_batch_row_count);
+ }
+
+ Status _do_spill() {
+ COUNTER_UPDATE(_do_spill_count_counter, 1);
+ RETURN_IF_ERROR(_sorter->prepare_for_read(true));
+ int32_t batch_size = _get_spill_batch_size();
+
+ SpillStreamSPtr spilling_stream;
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ _runtime_state, spilling_stream,
print_id(_runtime_state->query_id()),
+ "iceberg-sort", 1 /* node_id */, batch_size,
+ _runtime_state->spill_sort_batch_bytes(), _profile));
+ _sorted_streams.emplace_back(spilling_stream);
+
+ // spill sorted data to stream
+ bool eos = false;
+ Block block;
+ while (!eos && !_runtime_state->is_cancelled()) {
+ RETURN_IF_ERROR(_sorter->merge_sort_read_for_spill(
+ _runtime_state, &block, (int)_spill_block_batch_row_count,
&eos));
+ RETURN_IF_ERROR(spilling_stream->spill_block(_runtime_state,
block, eos));
+ block.clear_column_data();
+ }
+ _sorter->reset();
+ return Status::OK();
+ }
+
+ // merge spilled streams and output sorted data to Parquet files
+ Status _combine_files_output() {
+ // merge until all streams can be merged in one pass
+ while (_sorted_streams.size() >
static_cast<size_t>(_calc_max_merge_streams())) {
+ RETURN_IF_ERROR(_do_intermediate_merge());
+ }
+ RETURN_IF_ERROR(_create_final_merger());
+
+ bool eos = false;
+ Block output_block;
+ size_t current_file_bytes = _iceberg_partition_writer->written_len();
+ while (!eos && !_runtime_state->is_cancelled()) {
+ RETURN_IF_ERROR(_merger->get_next(&output_block, &eos));
+ if (output_block.rows() > 0) {
+ size_t block_bytes = output_block.bytes();
+
RETURN_IF_ERROR(_iceberg_partition_writer->write(output_block));
+ current_file_bytes += block_bytes;
+ if (current_file_bytes > _target_file_size_bytes) {
+ // close current writer and commit to file
+ RETURN_IF_ERROR(_close_current_writer_and_open_next());
+ current_file_bytes = 0;
+ }
+ }
+ output_block.clear_column_data();
+ }
+ return Status::OK();
+ }
+
+ Status _do_intermediate_merge() {
+ int max_stream_count = _calc_max_merge_streams();
+ RETURN_IF_ERROR(_create_merger(false, _spill_block_batch_row_count,
max_stream_count));
+
+ // register new spill stream for merged output
+ int32_t batch_size = _get_spill_batch_size();
+ SpillStreamSPtr tmp_stream;
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ _runtime_state, tmp_stream,
print_id(_runtime_state->query_id()),
+ "iceberg-sort-merge", 1 /* node_id */, batch_size,
+ _runtime_state->spill_sort_batch_bytes(), _profile));
+
+ _sorted_streams.emplace_back(tmp_stream);
+
+ // merge current streams and write to new spill stream
+ bool eos = false;
+ Block merge_sorted_block;
+ while (!eos && !_runtime_state->is_cancelled()) {
+ merge_sorted_block.clear_column_data();
+ RETURN_IF_ERROR(_merger->get_next(&merge_sorted_block, &eos));
+ RETURN_IF_ERROR(tmp_stream->spill_block(_runtime_state,
merge_sorted_block, eos));
+ }
+
+ // clean up merged streams
+ for (auto& stream : _current_merging_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+ }
+ _current_merging_streams.clear();
+ return Status::OK();
+ }
+
+ int _calc_max_merge_streams() const {
+ auto count =
+ _runtime_state->spill_sort_mem_limit() /
_runtime_state->spill_sort_batch_bytes();
+ if (count > std::numeric_limits<int>::max()) {
+ return std::numeric_limits<int>::max();
+ }
+ return std::max(2, static_cast<int>(count));
+ }
+
+ // create merger for merging spill streams
+ Status _create_merger(bool is_final_merge, size_t batch_size, int
num_streams) {
+ std::vector<vectorized::BlockSupplier> child_block_suppliers;
+ _merger =
std::make_unique<vectorized::VSortedRunMerger>(_sorter->get_sort_description(),
+ batch_size,
-1, 0, _profile);
+ _current_merging_streams.clear();
+ size_t streams_to_merge = is_final_merge ? _sorted_streams.size() :
num_streams;
+
+ for (size_t i = 0; i < streams_to_merge && !_sorted_streams.empty();
++i) {
+ auto stream = _sorted_streams.front();
+ stream->set_read_counters(_profile);
+ _current_merging_streams.emplace_back(stream);
+ child_block_suppliers.emplace_back([stream](vectorized::Block*
block, bool* eos) {
+ return stream->read_next_block_sync(block, eos);
+ });
+ _sorted_streams.pop_front();
+ }
+
+ RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
+ return Status::OK();
+ }
+
+ Status _create_final_merger() { return _create_merger(true,
_runtime_state->batch_size(), 1); }
+
+ // clean up all spill streams to ensure proper resource cleanup
+ void _cleanup_spill_streams() {
+ for (auto& stream : _sorted_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+ }
+ _sorted_streams.clear();
+
+ for (auto& stream : _current_merging_streams) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+ }
+ _current_merging_streams.clear();
+ }
+
+ RuntimeState* _runtime_state = nullptr;
+ RuntimeProfile* _profile = nullptr;
+ const RowDescriptor* _row_desc = nullptr;
+ ObjectPool _pool;
+ TSortInfo _sort_info;
+ VSortExecExprs _vsort_exec_exprs;
+ std::shared_ptr<VIcebergPartitionWriter> _iceberg_partition_writer;
+ CreateWriterLambda _create_writer_lambda; // creating new writers after
commit
+
+ // Sorter and merger
+ std::unique_ptr<vectorized::FullSorter> _sorter;
+ std::unique_ptr<vectorized::VSortedRunMerger> _merger;
+ std::deque<vectorized::SpillStreamSPtr> _sorted_streams;
+ std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
+
+ int64_t _target_file_size_bytes = 0; //config::iceberg_sink_max_file_size
default 1GB
+ size_t _avg_row_bytes = 0;
+ size_t _spill_block_batch_row_count = 4096;
+
+ RuntimeProfile::Counter* _do_spill_count_counter = nullptr;
+};
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 6f4400418eb..9b495f093a5 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -29,6 +29,7 @@
#include "vec/exprs/vexpr_context.h"
#include "vec/sink/writer/iceberg/partition_transformers.h"
#include "vec/sink/writer/iceberg/viceberg_partition_writer.h"
+#include "vec/sink/writer/iceberg/viceberg_sort_writer.h"
#include "vec/sink/writer/vhive_utils.h"
namespace doris {
@@ -43,10 +44,6 @@ VIcebergTableWriter::VIcebergTableWriter(const TDataSink&
t_sink,
DCHECK(_t_sink.__isset.iceberg_table_sink);
}
-Status VIcebergTableWriter::init_properties(ObjectPool* pool) {
- return Status::OK();
-}
-
Status VIcebergTableWriter::open(RuntimeState* state, RuntimeProfile* profile)
{
_state = state;
@@ -193,12 +190,12 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
_vec_output_expr_ctxs, block, &output_block, false));
materialize_block_inplace(output_block);
- std::unordered_map<std::shared_ptr<VIcebergPartitionWriter>,
IColumn::Filter> writer_positions;
+ std::unordered_map<std::shared_ptr<IPartitionWriterBase>, IColumn::Filter>
writer_positions;
_row_count += output_block.rows();
// Case 1: Full static partition - all data goes to a single partition
if (_is_full_static_partition) {
- std::shared_ptr<VIcebergPartitionWriter> writer;
+ std::shared_ptr<IPartitionWriterBase> writer;
{
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
auto writer_iter =
_partitions_to_writers.find(_static_partition_path);
@@ -209,14 +206,14 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
return e.to_status();
}
_partitions_to_writers.insert({_static_partition_path,
writer});
- RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+ RETURN_IF_ERROR(writer->open(_state, _operator_profile,
_row_desc));
} else {
if (writer_iter->second->written_len() >
_target_file_size_bytes) {
std::string file_name(writer_iter->second->file_name());
int file_name_index =
writer_iter->second->file_name_index();
{
SCOPED_RAW_TIMER(&_close_ns);
-
static_cast<void>(writer_iter->second->close(Status::OK()));
+
RETURN_IF_ERROR(writer_iter->second->close(Status::OK()));
}
_partitions_to_writers.erase(writer_iter);
try {
@@ -226,7 +223,7 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
return e.to_status();
}
_partitions_to_writers.insert({_static_partition_path,
writer});
- RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+ RETURN_IF_ERROR(writer->open(_state, _operator_profile,
_row_desc));
} else {
writer = writer_iter->second;
}
@@ -235,12 +232,13 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
output_block.erase(_non_write_columns_indices);
RETURN_IF_ERROR(writer->write(output_block));
+ _current_writer = writer;
return Status::OK();
}
// Case 2: Non-partitioned table
if (_iceberg_partition_columns.empty()) {
- std::shared_ptr<VIcebergPartitionWriter> writer;
+ std::shared_ptr<IPartitionWriterBase> writer;
{
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
auto writer_iter = _partitions_to_writers.find("");
@@ -251,14 +249,14 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
return e.to_status();
}
_partitions_to_writers.insert({"", writer});
- RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+ RETURN_IF_ERROR(writer->open(_state, _operator_profile,
_row_desc));
} else {
if (writer_iter->second->written_len() >
_target_file_size_bytes) {
std::string file_name(writer_iter->second->file_name());
int file_name_index =
writer_iter->second->file_name_index();
{
SCOPED_RAW_TIMER(&_close_ns);
-
static_cast<void>(writer_iter->second->close(Status::OK()));
+
RETURN_IF_ERROR(writer_iter->second->close(Status::OK()));
}
_partitions_to_writers.erase(writer_iter);
try {
@@ -268,7 +266,7 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
return e.to_status();
}
_partitions_to_writers.insert({"", writer});
- RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+ RETURN_IF_ERROR(writer->open(_state, _operator_profile,
_row_desc));
} else {
writer = writer_iter->second;
}
@@ -277,9 +275,11 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
SCOPED_RAW_TIMER(&_partition_writers_write_ns);
output_block.erase(_non_write_columns_indices);
RETURN_IF_ERROR(writer->write(output_block));
+ _current_writer = writer;
return Status::OK();
}
+ // Case 3: Partitioned table - handle multiple partitions
{
Block transformed_block;
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
@@ -319,11 +319,11 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
auto create_and_open_writer =
[&](const std::string& partition_name, int position,
const std::string* file_name, int file_name_index,
- std::shared_ptr<VIcebergPartitionWriter>& writer_ptr)
-> Status {
+ std::shared_ptr<IPartitionWriterBase>& writer_ptr) ->
Status {
try {
auto writer = _create_partition_writer(&transformed_block,
position, file_name,
file_name_index);
- RETURN_IF_ERROR(writer->open(_state, _operator_profile));
+ RETURN_IF_ERROR(writer->open(_state, _operator_profile,
_row_desc));
IColumn::Filter filter(output_block.rows(), 0);
filter[position] = 1;
writer_positions.insert({writer, std::move(filter)});
@@ -337,7 +337,7 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
auto writer_iter = _partitions_to_writers.find(partition_name);
if (writer_iter == _partitions_to_writers.end()) {
- std::shared_ptr<VIcebergPartitionWriter> writer;
+ std::shared_ptr<IPartitionWriterBase> writer;
if (_partitions_to_writers.size() + 1 >
config::table_sink_partition_write_max_partition_nums_per_writer) {
return Status::InternalError(
@@ -346,13 +346,13 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
}
RETURN_IF_ERROR(create_and_open_writer(partition_name, i,
nullptr, 0, writer));
} else {
- std::shared_ptr<VIcebergPartitionWriter> writer;
+ std::shared_ptr<IPartitionWriterBase> writer;
if (writer_iter->second->written_len() >
_target_file_size_bytes) {
std::string file_name(writer_iter->second->file_name());
int file_name_index =
writer_iter->second->file_name_index();
{
SCOPED_RAW_TIMER(&_close_ns);
-
static_cast<void>(writer_iter->second->close(Status::OK()));
+
RETURN_IF_ERROR(writer_iter->second->close(Status::OK()));
}
writer_positions.erase(writer_iter->second);
_partitions_to_writers.erase(writer_iter);
@@ -378,6 +378,7 @@ Status VIcebergTableWriter::write(RuntimeState* state,
vectorized::Block& block)
Block filtered_block;
RETURN_IF_ERROR(_filter_block(output_block, &it->second,
&filtered_block));
RETURN_IF_ERROR(it->first->write(filtered_block));
+ _current_writer = it->first;
}
return Status::OK();
}
@@ -491,7 +492,7 @@ std::vector<std::string>
VIcebergTableWriter::_partition_values(
return partition_values;
}
-std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_writer(
+std::shared_ptr<IPartitionWriterBase>
VIcebergTableWriter::_create_partition_writer(
vectorized::Block* transformed_block, int position, const std::string*
file_name,
int file_name_index) {
auto& iceberg_table_sink = _t_sink.iceberg_table_sink;
@@ -531,7 +532,7 @@ std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_
write_path = output_path;
}
- VIcebergPartitionWriter::WriteInfo write_info = {
+ IPartitionWriterBase::WriteInfo write_info = {
.write_path = std::move(write_path),
.original_write_path = std::move(original_write_path),
.target_path = std::move(target_path),
@@ -542,7 +543,6 @@ std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_
iceberg_table_sink.broker_addresses.end());
}
- _write_file_count++;
std::vector<std::string> column_names;
column_names.reserve(_write_output_vexpr_ctxs.size());
for (int i = 0; i < _schema->columns().size(); i++) {
@@ -550,12 +550,26 @@ std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_
column_names.emplace_back(_schema->columns()[i].field_name());
}
}
- return std::make_shared<VIcebergPartitionWriter>(
- _t_sink, std::move(partition_values), _write_output_vexpr_ctxs,
*_schema,
- &_t_sink.iceberg_table_sink.schema_json, std::move(column_names),
std::move(write_info),
- (file_name == nullptr) ? _compute_file_name() : *file_name,
file_name_index,
- iceberg_table_sink.file_format,
iceberg_table_sink.compression_type,
- iceberg_table_sink.hadoop_config);
+
+ auto create_writer_lambda =
+ [this, partition_values, column_names, write_info](
+ const std::string* file_name,
+ int file_name_index) ->
std::shared_ptr<VIcebergPartitionWriter> {
+ auto& iceberg_table_sink = _t_sink.iceberg_table_sink;
+ _write_file_count++;
+ return std::make_shared<VIcebergPartitionWriter>(
+ _t_sink, partition_values, _write_output_vexpr_ctxs, *_schema,
+ &_t_sink.iceberg_table_sink.schema_json, column_names,
write_info,
+ (file_name == nullptr) ? _compute_file_name() : *file_name,
file_name_index,
+ iceberg_table_sink.file_format,
iceberg_table_sink.compression_type,
+ iceberg_table_sink.hadoop_config);
+ };
+ auto partition_write = create_writer_lambda(file_name, file_name_index);
+ if (iceberg_table_sink.__isset.sort_info) {
+ return std::make_shared<VIcebergSortWriter>(partition_write,
iceberg_table_sink.sort_info,
+ _target_file_size_bytes,
create_writer_lambda);
+ }
+ return partition_write;
}
PartitionData VIcebergTableWriter::_get_partition_data(vectorized::Block*
transformed_block,
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index b6e3c4edc57..842f962713f 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -19,11 +19,8 @@
#include <gen_cpp/DataSinks_types.h>
-#include <optional>
-
#include "util/runtime_profile.h"
#include "vec/columns/column.h"
-#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/exec/format/table/iceberg/partition_spec_parser.h"
#include "vec/exec/format/table/iceberg/schema_parser.h"
@@ -40,7 +37,8 @@ class RuntimeState;
namespace vectorized {
class IColumn;
-class VIcebergPartitionWriter;
+class IPartitionWriterBase;
+class VIcebergSortWriter;
struct ColumnWithTypeAndName;
class VIcebergTableWriter final : public AsyncResultWriter {
@@ -51,7 +49,10 @@ public:
~VIcebergTableWriter() = default;
- Status init_properties(ObjectPool* pool);
+ Status init_properties(ObjectPool* pool, const RowDescriptor& row_desc) {
+ _row_desc = &row_desc;
+ return Status::OK();
+ }
Status open(RuntimeState* state, RuntimeProfile* profile) override;
@@ -59,6 +60,8 @@ public:
Status close(Status) override;
+ std::shared_ptr<IPartitionWriterBase> _current_writer;
+
private:
class IcebergPartitionColumn {
public:
@@ -102,7 +105,7 @@ private:
// Build static partition path from static partition values
std::string _build_static_partition_path();
- std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer(
+ std::shared_ptr<IPartitionWriterBase> _create_partition_writer(
vectorized::Block* transformed_block, int position,
const std::string* file_name = nullptr, int file_name_index = 0);
@@ -146,12 +149,10 @@ private:
// Pre-computed static partition value list (for full static mode only)
std::vector<std::string> _static_partition_value_list;
- std::unordered_map<std::string, std::shared_ptr<VIcebergPartitionWriter>>
- _partitions_to_writers;
-
+ std::unordered_map<std::string, std::shared_ptr<IPartitionWriterBase>>
_partitions_to_writers;
VExprContextSPtrs _write_output_vexpr_ctxs;
-
size_t _row_count = 0;
+ const RowDescriptor* _row_desc = nullptr;
// profile counters
int64_t _send_data_ns = 0;
diff --git a/be/src/vec/sink/writer/iceberg/vpartition_writer_base.h
b/be/src/vec/sink/writer/iceberg/vpartition_writer_base.h
new file mode 100644
index 00000000000..1afd75f40dd
--- /dev/null
+++ b/be/src/vec/sink/writer/iceberg/vpartition_writer_base.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "common/status.h"
+#include "runtime/descriptors.h"
+namespace doris {
+
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+
+namespace vectorized {
+
+// IPartitionWriterBase
+// ├── VIcebergPartitionWriter
+// └── VIcebergSortWriter
+// └── wraps VIcebergPartitionWriter
+class IPartitionWriterBase {
+public:
+ struct WriteInfo {
+ std::string write_path;
+ std::string original_write_path;
+ std::string target_path;
+ TFileType::type file_type;
+ std::vector<TNetworkAddress> broker_addresses;
+ };
+
+ IPartitionWriterBase() = default;
+
+ virtual ~IPartitionWriterBase() = default;
+
+ virtual Status open(RuntimeState* state, RuntimeProfile* profile,
+ const RowDescriptor* row_desc) = 0;
+
+ virtual Status write(vectorized::Block& block) = 0;
+
+ virtual Status close(const Status& status) = 0;
+
+ virtual const std::string& file_name() const = 0;
+
+ virtual int file_name_index() const = 0;
+
+ virtual size_t written_len() const = 0;
+};
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/test/vec/exec/sort/full_sort_test.cpp
b/be/test/vec/exec/sort/full_sort_test.cpp
index 8add8b7e9f5..9d59890017b 100644
--- a/be/test/vec/exec/sort/full_sort_test.cpp
+++ b/be/test/vec/exec/sort/full_sort_test.cpp
@@ -109,13 +109,13 @@ TEST_F(FullSorterTest, test_full_sorter3) {
{
Block block = ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5, 6, 7, 8, 9, 10});
EXPECT_TRUE(sorter->append_block(&block).ok());
- EXPECT_TRUE(sorter->_do_sort());
+ EXPECT_TRUE(sorter->do_sort());
}
{
Block block = ColumnHelper::create_block<DataTypeInt64>({4, 5, 6, 7});
EXPECT_TRUE(sorter->append_block(&block).ok());
- EXPECT_TRUE(sorter->_do_sort());
+ EXPECT_TRUE(sorter->do_sort());
}
EXPECT_EQ(sorter->_state->get_sorted_block()[0]->rows(), 6);
EXPECT_EQ(sorter->_state->get_sorted_block()[1]->rows(), 4);
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run26.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run26.sql
new file mode 100644
index 00000000000..5b7513e1a53
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run26.sql
@@ -0,0 +1,73 @@
+create database if not exists demo.test_stats_order;
+USE demo.test_stats_order;
+
+DROP TABLE IF EXISTS iceberg_all_types_orc;
+DROP TABLE IF EXISTS iceberg_all_types_parquet;
+
+CREATE TABLE iceberg_all_types_orc (
+ `boolean_col` boolean,
+ `int_col` int,
+ `bigint_col` bigint,
+ `float_col` float,
+ `double_col` double,
+ `decimal_col1` decimal(9,0),
+ `decimal_col2` decimal(8,4),
+ `decimal_col3` decimal(18,6),
+ `decimal_col4` decimal(38,12),
+ `string_col` string,
+ `date_col` date,
+ `datetime_col1` TIMESTAMP_NTZ
+)
+USING iceberg
+TBLPROPERTIES(
+ 'write.format.default' = 'orc',
+ 'format-version' = '2'
+);
+
+CREATE TABLE iceberg_all_types_parquet (
+ `boolean_col` boolean,
+ `int_col` int,
+ `bigint_col` bigint,
+ `float_col` float,
+ `double_col` double,
+ `decimal_col1` decimal(9,0),
+ `decimal_col2` decimal(8,4),
+ `decimal_col3` decimal(18,6),
+ `decimal_col4` decimal(38,12),
+ `string_col` string,
+ `date_col` date,
+ `datetime_col1` TIMESTAMP_NTZ
+)
+USING iceberg
+TBLPROPERTIES(
+ 'write.format.default' = 'parquet',
+ 'format-version' = '2'
+);
+
+ALTER TABLE iceberg_all_types_parquet WRITE ORDERED BY int_col ASC;
+ALTER TABLE iceberg_all_types_orc WRITE ORDERED BY int_col ASC;
+
+CREATE TABLE iceberg_int_order (
+ int_col int
+)
+USING iceberg
+TBLPROPERTIES(
+ 'write.format.default' = 'parquet',
+ 'format-version' = '2'
+);
+
+
+CREATE TABLE iceberg_int_no_order (
+ int_col int
+)
+USING iceberg
+TBLPROPERTIES(
+ 'write.format.default' = 'parquet',
+ 'format-version' = '2'
+);
+
+ALTER TABLE iceberg_int_order WRITE ORDERED BY int_col ASC;
+
+
+
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 502b1eaa7c8..c6473ca64dd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -34,7 +34,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionField;
@@ -177,12 +176,8 @@ public class IcebergTransaction implements Transaction {
return;
}
- // Get table specification information
- PartitionSpec spec = transaction.table().spec();
- FileFormat fileFormat =
IcebergUtils.getFileFormat(transaction.table());
-
// Convert commit data to DataFile objects using the same logic as
insert
- WriteResult writeResult =
IcebergWriterHelper.convertToWriterResult(fileFormat, spec, commitDataList);
+ WriteResult writeResult =
IcebergWriterHelper.convertToWriterResult(transaction.table(), commitDataList);
// Add the generated DataFiles to filesToAdd list
synchronized (filesToAdd) {
@@ -249,16 +244,13 @@ public class IcebergTransaction implements Transaction {
}
private void updateManifestAfterInsert(TUpdateMode updateMode) {
- PartitionSpec spec = transaction.table().spec();
- FileFormat fileFormat =
IcebergUtils.getFileFormat(transaction.table());
-
List<WriteResult> pendingResults;
if (commitDataList.isEmpty()) {
pendingResults = Collections.emptyList();
} else {
//convert commitDataList to writeResult
WriteResult writeResult = IcebergWriterHelper
- .convertToWriterResult(fileFormat, spec, commitDataList);
+ .convertToWriterResult(transaction.table(),
commitDataList);
pendingResults = Lists.newArrayList(writeResult);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
index d429e95e8ce..9a47874762c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
@@ -19,32 +19,46 @@ package org.apache.doris.datasource.iceberg.helper;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.statistics.CommonStatistics;
+import org.apache.doris.thrift.TIcebergColumnStats;
import org.apache.doris.thrift.TIcebergCommitData;
import com.google.common.base.VerifyException;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
public class IcebergWriterHelper {
+ private static final Logger LOG =
LogManager.getLogger(IcebergWriterHelper.class);
private static final int DEFAULT_FILE_COUNT = 1;
public static WriteResult convertToWriterResult(
- FileFormat format,
- PartitionSpec spec,
+ Table table,
List<TIcebergCommitData> commitDataList) {
List<DataFile> dataFiles = new ArrayList<>();
+
+ // Get table specification information
+ PartitionSpec spec = table.spec();
+ FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+
for (TIcebergCommitData commitData : commitDataList) {
//get the files path
String location = commitData.getFilePath();
@@ -53,7 +67,7 @@ public class IcebergWriterHelper {
long fileSize = commitData.getFileSize();
long recordCount = commitData.getRowCount();
CommonStatistics stat = new CommonStatistics(recordCount,
DEFAULT_FILE_COUNT, fileSize);
-
+ Metrics metrics = buildDataFileMetrics(table, fileFormat,
commitData);
Optional<PartitionData> partitionData = Optional.empty();
//get and check partitionValues when table is partitionedTable
if (spec.isPartitioned()) {
@@ -67,7 +81,8 @@ public class IcebergWriterHelper {
// Convert human-readable partition values to PartitionData
partitionData =
Optional.of(convertToPartitionData(partitionValues, spec));
}
- DataFile dataFile = genDataFile(format, location, spec,
partitionData, stat);
+ DataFile dataFile = genDataFile(fileFormat, location, spec,
partitionData, stat, metrics,
+ table.sortOrder());
dataFiles.add(dataFile);
}
return WriteResult.builder()
@@ -81,12 +96,14 @@ public class IcebergWriterHelper {
String location,
PartitionSpec spec,
Optional<PartitionData> partitionData,
- CommonStatistics statistics) {
+ CommonStatistics statistics, Metrics metrics, SortOrder sortOrder)
{
DataFiles.Builder builder = DataFiles.builder(spec)
.withPath(location)
.withFileSizeInBytes(statistics.getTotalFileBytes())
.withRecordCount(statistics.getRowCount())
+ .withMetrics(metrics)
+ .withSortOrder(sortOrder)
.withFormat(format);
partitionData.ifPresent(builder::withPartition);
@@ -132,4 +149,33 @@ public class IcebergWriterHelper {
return partitionData;
}
+
+ private static Metrics buildDataFileMetrics(Table table, FileFormat
fileFormat, TIcebergCommitData commitData) {
+ Map<Integer, Long> columnSizes = new HashMap<>();
+ Map<Integer, Long> valueCounts = new HashMap<>();
+ Map<Integer, Long> nullValueCounts = new HashMap<>();
+ Map<Integer, ByteBuffer> lowerBounds = new HashMap<>();
+ Map<Integer, ByteBuffer> upperBounds = new HashMap<>();
+ if (commitData.isSetColumnStats()) {
+ TIcebergColumnStats stats = commitData.column_stats;
+ if (stats.isSetColumnSizes()) {
+ columnSizes = stats.column_sizes;
+ }
+ if (stats.isSetValueCounts()) {
+ valueCounts = stats.value_counts;
+ }
+ if (stats.isSetNullValueCounts()) {
+ nullValueCounts = stats.null_value_counts;
+ }
+ if (stats.isSetLowerBounds()) {
+ lowerBounds = stats.lower_bounds;
+ }
+ if (stats.isSetUpperBounds()) {
+ upperBounds = stats.upper_bounds;
+ }
+ }
+
+ return new Metrics(commitData.getRowCount(), columnSizes, valueCounts,
+ nullValueCounts, null, lowerBounds, upperBounds);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index f99f1b337e7..3f87d1a856b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -575,8 +575,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
PlanTranslatorContext
context) {
PlanFragment rootFragment = icebergTableSink.child().accept(this,
context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
+ List<Expr> outputExprs = Lists.newArrayList();
+ icebergTableSink.getOutput().stream().map(Slot::getExprId)
+ .forEach(exprId ->
outputExprs.add(context.findSlotRef(exprId)));
IcebergTableSink sink = new IcebergTableSink((IcebergExternalTable)
icebergTableSink.getTargetTable());
rootFragment.setSink(sink);
+ sink.setOutputExprs(outputExprs);
return rootFragment;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index ef1f7c28772..581eee08a5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -17,6 +17,8 @@
package org.apache.doris.planner;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SortInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.credentials.VendedCredentialsFactory;
@@ -32,10 +34,8 @@ import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergTableSink;
-import org.apache.doris.thrift.TSortField;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpecParser;
@@ -44,16 +44,19 @@ import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
-import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class IcebergTableSink extends BaseExternalTableDataSink {
+ private List<Expr> outputExprs;
private final IcebergExternalTable targetTable;
private static final HashSet<TFileFormatType> supportedTypes = new
HashSet<TFileFormatType>() {{
add(TFileFormatType.FORMAT_ORC);
@@ -82,6 +85,10 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
return supportedTypes;
}
+ public void setOutputExprs(List<Expr> outputExprs) {
+ this.outputExprs = outputExprs;
+ }
+
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
@@ -122,24 +129,25 @@ public class IcebergTableSink extends
BaseExternalTableDataSink {
// sort order
if (icebergTable.sortOrder().isSorted()) {
SortOrder sortOrder = icebergTable.sortOrder();
- Set<Integer> baseColumnFieldIds =
icebergTable.schema().columns().stream()
- .map(Types.NestedField::fieldId)
- .collect(ImmutableSet.toImmutableSet());
- ImmutableList.Builder<TSortField> sortFields =
ImmutableList.builder();
+ ArrayList<Expr> orderingExprs = Lists.newArrayList();
+ ArrayList<Boolean> isAscOrder = Lists.newArrayList();
+ ArrayList<Boolean> isNullsFirst = Lists.newArrayList();
for (SortField sortField : sortOrder.fields()) {
if (!sortField.transform().isIdentity()) {
continue;
}
- if (!baseColumnFieldIds.contains(sortField.sourceId())) {
- continue;
+ for (int i = 0; i < icebergTable.schema().columns().size();
++i) {
+ NestedField column =
icebergTable.schema().columns().get(i);
+ if (column.fieldId() == sortField.sourceId()) {
+ orderingExprs.add(outputExprs.get(i));
+
isAscOrder.add(sortField.direction().equals(SortDirection.ASC));
+
isNullsFirst.add(sortField.nullOrder().equals(NullOrder.NULLS_FIRST));
+ break;
+ }
}
- TSortField tSortField = new TSortField();
- tSortField.setSourceColumnId(sortField.sourceId());
-
tSortField.setAscending(sortField.direction().equals(SortDirection.ASC));
-
tSortField.setNullFirst(sortField.nullOrder().equals(NullOrder.NULLS_FIRST));
- sortFields.add(tSortField);
}
- tSink.setSortFields(sortFields.build());
+ SortInfo sortInfo = new SortInfo(orderingExprs, isAscOrder,
isNullsFirst, null);
+ tSink.setSortInfo(sortInfo.toThrift());
}
// file info
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index cb817b7dbe8..67c2b216de8 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -397,6 +397,15 @@ enum TFileContent {
EQUALITY_DELETES = 2
}
+struct TIcebergColumnStats {
+ 1: optional map<i32, i64> column_sizes
+ 2: optional map<i32, i64> value_counts
+ 3: optional map<i32, i64> null_value_counts
+ 4: optional map<i32, i64> nan_value_counts
+ 5: optional map<i32, binary> lower_bounds;
+ 6: optional map<i32, binary> upper_bounds;
+}
+
struct TIcebergCommitData {
1: optional string file_path
2: optional i64 row_count
@@ -404,6 +413,7 @@ struct TIcebergCommitData {
4: optional TFileContent file_content
5: optional list<string> partition_values
6: optional list<string> referenced_data_files
+ 7: optional TIcebergColumnStats column_stats
}
struct TSortField {
@@ -431,6 +441,7 @@ struct TIcebergTableSink {
// Key: partition column name, Value: partition value as string
// When set, BE should use these values directly instead of computing from
data
15: optional map<string, string> static_partition_values;
+ 16: optional PlanNodes.TSortInfo sort_info;
}
enum TDictLayoutType {
diff --git
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out
new file mode 100644
index 00000000000..48765f6c4dd
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_stats2.out
@@ -0,0 +1,33 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql_0 --
+false 22 222 2.2 2.2 2222 8765.4321 8765.432100
987654321098765432.987654321099 bbb 2023-06-15 2023-06-15T23:45:01
+true 11 111 1.1 1.1 1111 1234.5678 1234.567890
123456789012345678.123456789012 aaa 2023-01-01 2023-01-01T12:34:56
+
+-- !sql_1 --
+0 PARQUET 2 {1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2,
11:2, 12:2} {1:0, 2:0, 3:0, 4:0, 5:0, 6:0, 7:0, 8:0, 9:0, 10:0, 11:0, 12:0}
{1:0x00, 2:0x0B000000, 3:0x6F00000000000000, 4:0xCDCC8C3F,
5:0x9A9999999999F13F, 6:0x00000457, 7:0x00BC614E, 8:0x00000000499602D2,
9:0x000000018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000,
12:0x005CE70F33F10500} {1:0x01, 2:0x16000000, 3:0xDE00000000000000,
4:0xCDCC0C40, 5:0x9A99999999990140, 6:0x000008AE, 7:0x05397FB1,
8:0x000000020A75E124, 9 [...]
+
+-- !sql_2 --
+{"bigint_col":{"column_size":118, "value_count":2, "null_value_count":0,
"nan_value_count":null, "lower_bound":111, "upper_bound":222},
"boolean_col":{"column_size":49, "value_count":2, "null_value_count":0,
"nan_value_count":null, "lower_bound":0, "upper_bound":1},
"date_col":{"column_size":94, "value_count":2, "null_value_count":0,
"nan_value_count":null, "lower_bound":"2023-01-01",
"upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":118,
"value_count":2, "null_value_count":0, [...]
+
+-- !sql_3 --
+false 22 222 2.2 2.2 2222 8765.4321 8765.432100
987654321098765432.987654321099 bbb 2023-06-15 2023-06-15T23:45:01
+true 11 111 1.1 1.1 1111 1234.5678 1234.567890
123456789012345678.123456789012 aaa 2023-01-01 2023-01-01T12:34:56
+
+-- !sql_4 --
+0 ORC 2 {1:2, 2:2, 3:2, 4:2, 5:2, 6:2, 7:2, 8:2, 9:2, 10:2,
11:2, 12:2} {} {1:0x00, 2:0x0B00000000000000, 3:0x6F00000000000000,
4:0xCDCC8C3F, 5:0x9A9999999999F13F, 6:0x0457, 7:0x00BC614E, 8:0x075BCD15,
9:0x018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000,
12:0x005CE70F33F10500} {1:0x01, 2:0x1600000000000000,
3:0xDE00000000000000, 4:0xCDCC0C40, 5:0x9A99999999990140, 6:0x08AE,
7:0x05397FB1, 8:0x05397FB1, 9:0x0C7748819DFFB62505316873CB, 10:0x626262,
11:0x434C0000, 12:0x40D91FA833FE0500}
+
+-- !sql_5 --
+{"bigint_col":{"column_size":null, "value_count":2, "null_value_count":null,
"nan_value_count":null, "lower_bound":111, "upper_bound":222},
"boolean_col":{"column_size":null, "value_count":2, "null_value_count":null,
"nan_value_count":null, "lower_bound":0, "upper_bound":1},
"date_col":{"column_size":null, "value_count":2, "null_value_count":null,
"nan_value_count":null, "lower_bound":"2023-01-01",
"upper_bound":"2023-06-15"}, "datetime_col1":{"column_size":null,
"value_count":2, "null_v [...]
+
+-- !sql_6 --
+{1:0x00, 2:0x0B000000, 3:0x6F00000000000000, 4:0xCDCC8C3F,
5:0x9A9999999999F13F, 6:0x00000457, 7:0x00BC614E, 8:0x00000000499602D2,
9:0x000000018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000,
12:0x005CE70F33F10500}
+
+-- !sql_7 --
+{1:0x00, 2:0x0B00000000000000, 3:0x6F00000000000000, 4:0xCDCC8C3F,
5:0x9A9999999999F13F, 6:0x0457, 7:0x00BC614E, 8:0x075BCD15,
9:0x018EE90FF6C373E0393713FA14, 10:0x616161, 11:0x9E4B0000,
12:0x005CE70F33F10500}
+
+-- !sql_8 --
+{1:0x01, 2:0x16000000, 3:0xDE00000000000000, 4:0xCDCC0C40,
5:0x9A99999999990140, 6:0x000008AE, 7:0x05397FB1, 8:0x000000020A75E124,
9:0x0000000C7748819DFFB62505316873CB, 10:0x626262, 11:0x434C0000,
12:0x40D91FA833FE0500}
+
+-- !sql_9 --
+{1:0x01, 2:0x1600000000000000, 3:0xDE00000000000000, 4:0xCDCC0C40,
5:0x9A99999999990140, 6:0x08AE, 7:0x05397FB1, 8:0x05397FB1,
9:0x0C7748819DFFB62505316873CB, 10:0x626262, 11:0x434C0000,
12:0x40D91FA833FE0500}
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_stats2.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_stats2.groovy
new file mode 100644
index 00000000000..8ae25ab76fe
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_stats2.groovy
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_write_stats2",
"p0,external,iceberg,external_docker,external_docker_iceberg") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ for (String hivePrefix : ["hive2"]) {
+ setHivePrefix(hivePrefix)
+ String catalog_name = "test_iceberg_write_stats"
+ String db_name = "test_stats"
+ String rest_port =
context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1",
+ "enable.mapping.varbinary"="true"
+ );"""
+
+ sql """switch ${catalog_name}"""
+ sql """create database if not exists ${db_name}"""
+ sql """use ${db_name}"""
+
+ sql """ DROP TABLE IF EXISTS `iceberg_all_types_parquet`; """
+
+ sql """
+ CREATE TABLE `iceberg_all_types_parquet`(
+ `boolean_col` boolean,
+ `int_col` int,
+ `bigint_col` bigint,
+ `float_col` float,
+ `double_col` double,
+ `decimal_col1` decimal(9,0),
+ `decimal_col2` decimal(8,4),
+ `decimal_col3` decimal(18,6),
+ `decimal_col4` decimal(38,12),
+ `string_col` string,
+ `date_col` date,
+ `datetime_col1` datetime)
+ ENGINE=iceberg
+ properties (
+ "write-format"="parquet"
+ );
+ """
+
+ sql """
+ INSERT INTO iceberg_all_types_parquet VALUES
+ (true, 11, 111, 1.1, 1.1, 1111.0, 1234.5678, 1234.56789,
123456789012345678.123456789012, 'aaa', '2023-01-01', '2023-01-01 12:34:56'),
+ (false, 22, 222, 2.2, 2.2, 2222.0, 8765.4321, 8765.43210,
987654321098765432.98765432109876, 'bbb', '2023-06-15', '2023-06-15 23:45:01');
+ """
+
+ qt_sql_0 """ select * from iceberg_all_types_parquet order by 1; """;
+ qt_sql_1 """ select
content,file_format,record_count,value_counts,null_value_counts,lower_bounds,upper_bounds
from iceberg_all_types_parquet\$files order by 1; """;
+ qt_sql_2 """ select readable_metrics from
iceberg_all_types_parquet\$files order by 1; """;
+
+
+ sql """ DROP TABLE IF EXISTS `iceberg_all_types_orc`; """
+
+ sql """
+ CREATE TABLE `iceberg_all_types_orc`(
+ `boolean_col` boolean,
+ `int_col` int,
+ `bigint_col` bigint,
+ `float_col` float,
+ `double_col` double,
+ `decimal_col1` decimal(9,0),
+ `decimal_col2` decimal(8,4),
+ `decimal_col3` decimal(18,6),
+ `decimal_col4` decimal(38,12),
+ `string_col` string,
+ `date_col` date,
+ `datetime_col1` datetime)
+ ENGINE=iceberg
+ properties (
+ "write-format"="orc"
+ );
+ """
+
+ sql """
+ INSERT INTO iceberg_all_types_orc VALUES
+ (true, 11, 111, 1.1, 1.1, 1111.0, 1234.5678, 1234.56789,
123456789012345678.123456789012, 'aaa', '2023-01-01', '2023-01-01 12:34:56'),
+ (false, 22, 222, 2.2, 2.2, 2222.0, 8765.4321, 8765.43210,
987654321098765432.98765432109876, 'bbb', '2023-06-15', '2023-06-15 23:45:01');
+ """
+
+ qt_sql_3 """ select * from iceberg_all_types_orc order by 1; """;
+ qt_sql_4 """ select
content,file_format,record_count,value_counts,null_value_counts,lower_bounds,upper_bounds
from iceberg_all_types_orc\$files order by 1; """;
+ qt_sql_5 """ select readable_metrics from iceberg_all_types_orc\$files
order by 1; """;
+ qt_sql_6 """ select lower_bounds from iceberg_all_types_parquet\$files
order by 1; """;
+ qt_sql_7 """ select lower_bounds from iceberg_all_types_orc\$files
order by 1; """;
+ qt_sql_8 """ select upper_bounds from iceberg_all_types_parquet\$files
order by 1; """;
+ qt_sql_9 """ select upper_bounds from iceberg_all_types_orc\$files
order by 1; """;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]