This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new c40246efa96 [bugfix](iceberg)Fixed random core with writing iceberg 
partitioned table for 2.1 (#39808)(#39569) (#39832)
c40246efa96 is described below

commit c40246efa966c2fbf5fc9b9a45bd71316b160236
Author: wuwenchi <[email protected]>
AuthorDate: Fri Aug 23 17:19:48 2024 +0800

    [bugfix](iceberg)Fixed random core with writing iceberg partitioned table 
for 2.1 (#39808)(#39569) (#39832)
    
    ## Proposed changes
    
    bp: #39808 #39569
---
 .../sink/writer/iceberg/viceberg_table_writer.cpp  | 50 ++++++-------
 .../sink/writer/iceberg/viceberg_table_writer.h    |  8 +--
 ...test_iceberg_overwrite_with_wrong_partition.out | 23 ++++++
 ...t_iceberg_overwrite_with_wrong_partition.groovy | 84 ++++++++++++++++++++++
 4 files changed, 132 insertions(+), 33 deletions(-)

diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 4b705b0e51b..ddd4101c714 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -125,7 +125,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) 
{
             auto writer_iter = _partitions_to_writers.find("");
             if (writer_iter == _partitions_to_writers.end()) {
                 try {
-                    writer = _create_partition_writer(output_block, -1);
+                    writer = _create_partition_writer(nullptr, -1);
                 } catch (doris::Exception& e) {
                     return e.to_status();
                 }
@@ -141,7 +141,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) 
{
                     }
                     _partitions_to_writers.erase(writer_iter);
                     try {
-                        writer = _create_partition_writer(output_block, -1, 
&file_name,
+                        writer = _create_partition_writer(nullptr, -1, 
&file_name,
                                                           file_name_index + 1);
                     } catch (doris::Exception& e) {
                         return e.to_status();
@@ -160,21 +160,21 @@ Status VIcebergTableWriter::write(vectorized::Block& 
block) {
     }
 
     {
+        Block transformed_block;
         SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
-        _transformed_block.reserve(_iceberg_partition_columns.size());
+        transformed_block.reserve(_iceberg_partition_columns.size());
         for (auto& iceberg_partition_columns : _iceberg_partition_columns) {
-            
_transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
+            
transformed_block.insert(iceberg_partition_columns.partition_column_transform().apply(
                     output_block, iceberg_partition_columns.source_idx()));
         }
         for (int i = 0; i < output_block.rows(); ++i) {
             std::optional<PartitionData> partition_data;
             try {
-                partition_data = _get_partition_data(_transformed_block, i);
+                partition_data = _get_partition_data(&transformed_block, i);
             } catch (doris::Exception& e) {
                 return e.to_status();
             }
             std::string partition_name;
-            DCHECK(partition_data.has_value());
             try {
                 partition_name = _partition_to_path(partition_data.value());
             } catch (doris::Exception& e) {
@@ -185,7 +185,7 @@ Status VIcebergTableWriter::write(vectorized::Block& block) 
{
                         const std::string* file_name, int file_name_index,
                         std::shared_ptr<VIcebergPartitionWriter>& writer_ptr) 
-> Status {
                 try {
-                    auto writer = _create_partition_writer(output_block, 
position, file_name,
+                    auto writer = _create_partition_writer(&transformed_block, 
position, file_name,
                                                            file_name_index);
                     RETURN_IF_ERROR(writer->open(_state, _profile));
                     IColumn::Filter filter(output_block.rows(), 0);
@@ -341,30 +341,27 @@ std::vector<std::string> 
VIcebergTableWriter::_partition_values(
 }
 
 std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_writer(
-        vectorized::Block& block, int position, const std::string* file_name, 
int file_name_index) {
+        vectorized::Block* transformed_block, int position, const std::string* 
file_name,
+        int file_name_index) {
     auto& iceberg_table_sink = _t_sink.iceberg_table_sink;
-    std::optional<PartitionData> partition_data;
-    partition_data = _get_partition_data(_transformed_block, position);
-    std::string partition_path;
     std::vector<std::string> partition_values;
-    if (partition_data.has_value()) {
-        partition_path = _partition_to_path(partition_data.value());
-        partition_values = _partition_values(partition_data.value());
-    }
     const std::string& output_path = iceberg_table_sink.output_path;
-
     std::string write_path;
     std::string original_write_path;
     std::string target_path;
-    if (partition_path.empty()) {
-        original_write_path = iceberg_table_sink.original_output_path;
-        target_path = output_path;
-        write_path = output_path;
-    } else {
+
+    if (transformed_block != nullptr) {
+        PartitionData partition_data = _get_partition_data(transformed_block, 
position);
+        std::string partition_path = _partition_to_path(partition_data);
+        partition_values = _partition_values(partition_data);
         original_write_path =
                 fmt::format("{}/{}", iceberg_table_sink.original_output_path, 
partition_path);
         target_path = fmt::format("{}/{}", output_path, partition_path);
         write_path = fmt::format("{}/{}", output_path, partition_path);
+    } else {
+        original_write_path = iceberg_table_sink.original_output_path;
+        target_path = output_path;
+        write_path = output_path;
     }
 
     VIcebergPartitionWriter::WriteInfo write_info = {
@@ -387,18 +384,15 @@ std::shared_ptr<VIcebergPartitionWriter> 
VIcebergTableWriter::_create_partition_
             iceberg_table_sink.hadoop_config);
 }
 
-std::optional<PartitionData> VIcebergTableWriter::_get_partition_data(
-        vectorized::Block& transformed_block, int position) {
-    if (_iceberg_partition_columns.empty()) {
-        return std::nullopt;
-    }
-
+PartitionData VIcebergTableWriter::_get_partition_data(vectorized::Block* 
transformed_block,
+                                                       int position) {
+    DCHECK(!_iceberg_partition_columns.empty());
     std::vector<std::any> values;
     values.reserve(_iceberg_partition_columns.size());
     int column_idx = 0;
     for (auto& iceberg_partition_column : _iceberg_partition_columns) {
         const vectorized::ColumnWithTypeAndName& partition_column =
-                transformed_block.get_by_position(column_idx);
+                transformed_block->get_by_position(column_idx);
         const TypeDescriptor& result_type =
                 
iceberg_partition_column.partition_column_transform().get_result_type();
         auto value = _get_iceberg_partition_value(result_type, 
partition_column, position);
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index 35e71d1960f..b9435b50cbb 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -97,10 +97,10 @@ private:
     std::vector<std::string> _partition_values(const 
doris::iceberg::StructLike& data);
 
     std::shared_ptr<VIcebergPartitionWriter> _create_partition_writer(
-            vectorized::Block& block, int position, const std::string* 
file_name = nullptr,
-            int file_name_index = 0);
+            vectorized::Block* transformed_block, int position,
+            const std::string* file_name = nullptr, int file_name_index = 0);
 
-    std::optional<PartitionData> _get_partition_data(vectorized::Block& block, 
int position);
+    PartitionData _get_partition_data(vectorized::Block* block, int position);
 
     std::any _get_iceberg_partition_value(const TypeDescriptor& type_desc,
                                           const ColumnWithTypeAndName& 
partition_column,
@@ -127,8 +127,6 @@ private:
 
     VExprContextSPtrs _write_output_vexpr_ctxs;
 
-    Block _transformed_block;
-
     size_t _row_count = 0;
 
     // profile counters
diff --git 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out
 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out
new file mode 100644
index 00000000000..b17bf8063c7
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !qt01 --
+2450841        2450841
+2450841        2450841
+2450842        2450842
+2450842        2450842
+2450843        2450843
+2450843        2450843
+2450844        2450844
+2450844        2450844
+2450845        2450845
+2450845        2450845
+2450846        2450846
+2450846        2450846
+2450847        2450847
+2450847        2450847
+2450848        2450848
+2450848        2450848
+2450849        2450849
+2450849        2450849
+2450850        2450850
+2450850        2450850
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy
new file mode 100644
index 00000000000..760611ab3b4
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_overwrite_with_wrong_partition.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_overwrite_with_wrong_partition", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String tb1 = "tb_dst";
+    String tb2 = "tb_src";
+
+    try {
+        String rest_port = 
context.config.otherConfigs.get("iceberg_rest_uri_port")
+        String minio_port = 
context.config.otherConfigs.get("iceberg_minio_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        String catalog_name = "test_iceberg_overwrite_with_wrong_partition"
+
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+                'type'='iceberg',
+                'iceberg.catalog.type'='rest',
+                'uri' = 'http://${externalEnvIp}:${rest_port}',
+                "s3.access_key" = "admin",
+                "s3.secret_key" = "password",
+                "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+                "s3.region" = "us-east-1"
+            );"""
+
+        sql """ switch ${catalog_name} """
+        sql """ use multi_catalog """
+
+        sql """ drop table if exists ${tb1} """
+        sql """ drop table if exists ${tb2} """
+
+        sql """ 
+        create table ${tb1} (
+            id bigint,
+            id2 bigint
+        ) PARTITION BY LIST(id2)() ;
+        """
+        sql """ 
+        create table ${tb2} (
+            id bigint,
+            id2 bigint
+        );
+        """
+
+        sql """ insert into ${tb2} values (2450841,2450841), 
(2450842,2450842); """
+        sql """ insert into ${tb2} values (2450843,2450843), 
(2450844,2450844); """
+        sql """ insert into ${tb2} values (2450845,2450845), 
(2450846,2450846); """
+        sql """ insert into ${tb2} values (2450847,2450847), 
(2450848,2450848); """
+        sql """ insert into ${tb2} values (2450849,2450849), 
(2450850,2450850); """
+        sql """ insert into ${tb2} values (2450841,2450841), 
(2450842,2450842); """
+        sql """ insert into ${tb2} values (2450843,2450843), 
(2450844,2450844); """
+        sql """ insert into ${tb2} values (2450845,2450845), 
(2450846,2450846); """
+        sql """ insert into ${tb2} values (2450847,2450847), 
(2450848,2450848); """
+        sql """ insert into ${tb2} values (2450849,2450849), 
(2450850,2450850); """
+
+        sql """ insert overwrite table ${tb1} (id, id2) select id, id2 from 
${tb2} where id2 >= 2450841 AND id2 < 2450851; """
+
+        order_qt_qt01 """ select * from ${tb1} """
+
+    } finally {
+        sql """ drop table if exists ${tb1} """
+        sql """ drop table if exists ${tb2} """
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to