This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new c40246efa96 [bugfix](iceberg)Fixed random core with writing iceberg
partitioned table for 2.1 (#39808)(#39569) (#39832)
c40246efa96 is described below
commit c40246efa966c2fbf5fc9b9a45bd71316b160236
Author: wuwenchi <[email protected]>
AuthorDate: Fri Aug 23 17:19:48 2024 +0800
[bugfix](iceberg)Fixed random core with writing iceberg partitioned table
for 2.1 (#39808)(#39569) (#39832)
## Proposed changes
bp: #39808 #39569
---
.../sink/writer/iceberg/viceberg_table_writer.cpp | 50 ++++++-------
.../sink/writer/iceberg/viceberg_table_writer.h | 8 +--
...test_iceberg_overwrite_with_wrong_partition.out | 23 ++++++
...t_iceberg_overwrite_with_wrong_partition.groovy | 84 ++++++++++++++++++++++
4 files changed, 132 insertions(+), 33 deletions(-)
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 4b705b0e51b..ddd4101c714 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -125,7 +125,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block)
{
auto writer_iter = _partitions_to_writers.find("");
if (writer_iter == _partitions_to_writers.end()) {
try {
- writer = _create_partition_writer(output_block, -1);
+ writer = _create_partition_writer(nullptr, -1);
} catch (doris::Exception& e) {
return e.to_status();
}
@@ -141,7 +141,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block)
{
}
_partitions_to_writers.erase(writer_iter);
try {
- writer = _create_partition_writer(output_block, -1,
&file_name,
+ writer = _create_partition_writer(nullptr, -1,
&file_name,
file_name_index + 1);
} catch (doris::Exception& e) {
return e.to_status();
@@ -160,21 +160,21 @@ Status VIcebergTableWriter::write(vectorized::Block&
block) {
}
{
+ Block transformed_block;
SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
- _transformed_block.reserve(_iceberg_partition_columns.size());
+ transformed_block.reserve(_iceberg_partition_columns.size());
for (auto& iceberg_partition_columns : _iceberg_partition_columns) {
-
_transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
+
transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
output_block, iceberg_partition_columns.source_idx()));
}
for (int i = 0; i < output_block.rows(); ++i) {
std::optional<PartitionData> partition_data;
try {
- partition_data = _get_partition_data(_transformed_block, i);
+ partition_data = _get_partition_data(&transformed_block, i);
} catch (doris::Exception& e) {
return e.to_status();
}
std::string partition_name;
- DCHECK(partition_data.has_value());
try {
partition_name = _partition_to_path(partition_data.value());
} catch (doris::Exception& e) {
@@ -185,7 +185,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block)
{
const std::string* file_name, int file_name_index,
std::shared_ptr<VIcebergPartitionWriter>& writer_ptr)
-> Status {
try {
- auto writer = _create_partition_writer(output_block,
position, file_name,
+ auto writer = _create_partition_writer(&transformed_block,
position, file_name,
file_name_index);
RETURN_IF_ERROR(writer->open(_state, _profile));
IColumn::Filter filter(output_block.rows(), 0);
@@ -341,30 +341,27 @@ std::vector<std::string>
VIcebergTableWriter::_partition_values(
}
std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_writer(
- vectorized::Block& block, int position, const std::string* file_name,
int file_name_index) {
+ vectorized::Block* transformed_block, int position, const std::string*
file_name,
+ int file_name_index) {
auto& iceberg_table_sink = _t_sink.iceberg_table_sink;
- std::optional<PartitionData> partition_data;
- partition_data = _get_partition_data(_transformed_block, position);
- std::string partition_path;
std::vector<std::string> partition_values;
- if (partition_data.has_value()) {
- partition_path = _partition_to_path(partition_data.value());
- partition_values = _partition_values(partition_data.value());
- }
const std::string& output_path = iceberg_table_sink.output_path;
-
std::string write_path;
std::string original_write_path;
std::string target_path;
- if (partition_path.empty()) {
- original_write_path = iceberg_table_sink.original_output_path;
- target_path = output_path;
- write_path = output_path;
- } else {
+
+ if (transformed_block != nullptr) {
+ PartitionData partition_data = _get_partition_data(transformed_block,
position);
+ std::string partition_path = _partition_to_path(partition_data);
+ partition_values = _partition_values(partition_data);
original_write_path =
fmt::format("{}/{}", iceberg_table_sink.original_output_path,
partition_path);
target_path = fmt::format("{}/{}", output_path, partition_path);
write_path = fmt::format("{}/{}", output_path, partition_path);
+ } else {
+ original_write_path = iceberg_table_sink.original_output_path;
+ target_path = output_path;
+ write_path = output_path;
}
VIcebergPartitionWriter::WriteInfo write_info = {
@@ -387,18 +384,15 @@ std::shared_ptr<VIcebergPartitionWriter>
VIcebergTableWriter::_create_partition_
iceberg_table_sink.hadoop_config);
}
-std::optional<PartitionData> VIcebergTableWriter::_get_partition_data(
- vectorized::Block& transformed_block, int position) {
- if (_iceberg_partition_columns.empty()) {
- return std::nullopt;
- }
-
+PartitionData VIcebergTableWriter::_get_partition_data(vectorized::Block*
transformed_block,
+ int position) {
+ DCHECK(!_iceberg_partition_columns.empty());
std::vector<std::any> values;
values.reserve(_iceberg_partition_columns.size());
int column_idx = 0;
for (auto& iceberg_partition_column : _iceberg_partition_columns) {
const vectorized::ColumnWithTypeAndName& partition_column =
- transformed_block.get_by_position(column_idx);
+ transformed_block->get_by_position(column_idx);
const TypeDescriptor& result_type =
iceberg_partition_column.partition_column_transform().get_result_type();
auto value = _get_iceberg_partition_value(result_type,
partition_column, position);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index 35e71d1960f..b9435b50cbb 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -97,10 +97,10 @@ private:
std::vector<std::string> _partition_values(const
doris::iceberg::StructLike& data);
std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer(
- vectorized::Block& block, int position, const std::string*
file_name = nullptr,
- int file_name_index = 0);
+ vectorized::Block* transformed_block, int position,
+ const std::string* file_name = nullptr, int file_name_index = 0);
- std::optional<PartitionData> _get_partition_data(vectorized::Block& block,
int position);
+ PartitionData _get_partition_data(vectorized::Block* block, int position);
std::any _get_iceberg_partition_value(const TypeDescriptor& type_desc,
const ColumnWithTypeAndName&
partition_column,
@@ -127,8 +127,6 @@ private:
VExprContextSPtrs _write_output_vexpr_ctxs;
- Block _transformed_block;
-
size_t _row_count = 0;
// profile counters
diff --git
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out
new file mode 100644
index 00000000000..b17bf8063c7
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !qt01 --
+2450841 2450841
+2450841 2450841
+2450842 2450842
+2450842 2450842
+2450843 2450843
+2450843 2450843
+2450844 2450844
+2450844 2450844
+2450845 2450845
+2450845 2450845
+2450846 2450846
+2450846 2450846
+2450847 2450847
+2450847 2450847
+2450848 2450848
+2450848 2450848
+2450849 2450849
+2450849 2450849
+2450850 2450850
+2450850 2450850
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy
new file mode 100644
index 00000000000..760611ab3b4
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_overwrite_with_wrong_partition",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String tb1 = "tb_dst";
+ String tb2 = "tb_src";
+
+ try {
+ String rest_port =
context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "test_iceberg_overwrite_with_wrong_partition"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """ switch ${catalog_name} """
+ sql """ use multi_catalog """
+
+ sql """ drop table if exists ${tb1} """
+ sql """ drop table if exists ${tb2} """
+
+ sql """
+ create table ${tb1} (
+ id bigint,
+ id2 bigint
+ ) PARTITION BY LIST(id2)() ;
+ """
+ sql """
+ create table ${tb2} (
+ id bigint,
+ id2 bigint
+ );
+ """
+
+ sql """ insert into ${tb2} values (2450841,2450841),
(2450842,2450842); """
+ sql """ insert into ${tb2} values (2450843,2450843),
(2450844,2450844); """
+ sql """ insert into ${tb2} values (2450845,2450845),
(2450846,2450846); """
+ sql """ insert into ${tb2} values (2450847,2450847),
(2450848,2450848); """
+ sql """ insert into ${tb2} values (2450849,2450849),
(2450850,2450850); """
+ sql """ insert into ${tb2} values (2450841,2450841),
(2450842,2450842); """
+ sql """ insert into ${tb2} values (2450843,2450843),
(2450844,2450844); """
+ sql """ insert into ${tb2} values (2450845,2450845),
(2450846,2450846); """
+ sql """ insert into ${tb2} values (2450847,2450847),
(2450848,2450848); """
+ sql """ insert into ${tb2} values (2450849,2450849),
(2450850,2450850); """
+
+ sql """ insert overwrite table ${tb1} (id, id2) select id, id2 from
${tb2} where id2 >= 2450841 AND id2 < 2450851; """
+
+ order_qt_qt01 """ select * from ${tb1} """
+
+ } finally {
+ sql """ drop table if exists ${tb1} """
+ sql """ drop table if exists ${tb2} """
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]