This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 384fb2b7f6f branch-3.1: [Enhancement](paimon)support native read
paimon top level schema change table. #48723 (#52174)
384fb2b7f6f is described below
commit 384fb2b7f6f0cec50bc84b564eb3b54dc2220642
Author: daidai <[email protected]>
AuthorDate: Sat Jun 28 22:47:26 2025 +0800
branch-3.1: [Enhancement](paimon)support native read paimon top level
schema change table. #48723 (#52174)
bp #48723
---
be/src/vec/exec/format/table/iceberg_reader.cpp | 4 +-
be/src/vec/exec/format/table/iceberg_reader.h | 6 +-
be/src/vec/exec/format/table/paimon_reader.cpp | 87 ++++++++++-
be/src/vec/exec/format/table/paimon_reader.h | 57 ++++++++
be/src/vec/exec/scan/vfile_scanner.cpp | 24 ++--
be/src/vec/exec/scan/vfile_scanner.h | 2 +-
.../format/paimon/paimon_schema_change_test.cpp | 136 ++++++++++++++++++
.../create_preinstalled_scripts/paimon/run02.sql | 159 +++++++++++++++++++++
.../datasource/paimon/PaimonExternalTable.java | 2 +-
.../datasource/paimon/PaimonSchemaCacheValue.java | 12 +-
.../datasource/paimon/source/PaimonScanNode.java | 19 ++-
.../datasource/paimon/source/PaimonSplit.java | 9 ++
gensrc/thrift/PlanNodes.thrift | 2 +
.../paimon/test_paimon_schema_change.out | Bin 0 -> 4601 bytes
.../paimon/test_paimon_schema_change.groovy | 88 ++++++++++++
15 files changed, 585 insertions(+), 22 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index b7666bda688..d209eb7d271 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -502,7 +502,7 @@ void
IcebergTableReader::_gen_position_delete_file_range(Block& block, DeleteFil
Status IcebergParquetReader::init_reader(
const std::vector<std::string>& file_col_names,
- const std::unordered_map<int, std::string>& col_id_name_map,
+ const std::unordered_map<uint64_t, std::string>& col_id_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
@@ -575,7 +575,7 @@ Status IcebergParquetReader
::_read_position_delete_file(const TFileRangeDesc* d
Status IcebergOrcReader::init_reader(
const std::vector<std::string>& file_col_names,
- const std::unordered_map<int, std::string>& col_id_name_map,
+ const std::unordered_map<uint64_t, std::string>& col_id_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 48236021ce9..8a2b253d16f 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -148,7 +148,7 @@ protected:
// copy from _colname_to_value_range with new column name that is in
parquet/orc file, to support schema evolution.
std::unordered_map<std::string, ColumnValueRangeType>
_new_colname_to_value_range;
// column id to name map. Collect from FE slot descriptor.
- std::unordered_map<int, std::string> _col_id_name_map;
+ std::unordered_map<uint64_t, std::string> _col_id_name_map;
// col names in the parquet,orc file
std::vector<std::string> _all_required_col_names;
// col names in table but not in parquet,orc file
@@ -194,7 +194,7 @@ public:
kv_cache, io_ctx) {}
Status init_reader(
const std::vector<std::string>& file_col_names,
- const std::unordered_map<int, std::string>& col_id_name_map,
+ const std::unordered_map<uint64_t, std::string>& col_id_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
@@ -240,7 +240,7 @@ public:
Status init_reader(
const std::vector<std::string>& file_col_names,
- const std::unordered_map<int, std::string>& col_id_name_map,
+ const std::unordered_map<uint64_t, std::string>& col_id_name_map,
const std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range,
const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
const RowDescriptor* row_descriptor,
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp
b/be/src/vec/exec/format/table/paimon_reader.cpp
index cd8b7c0060f..e3fba810bba 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -37,6 +37,68 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader>
file_format_reader,
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
}
+Status PaimonReader::gen_file_col_name(
+ const std::vector<std::string>& read_table_col_names,
+ const std::unordered_map<uint64_t, std::string>&
table_col_id_table_name_map,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
+ table_col_name_to_value_range) {
+ // It is a bit similar to iceberg. I will consider integrating it when I
write hudi schema change later.
+ _table_col_to_file_col.clear();
+ _file_col_to_table_col.clear();
+
+ if (!_params.__isset.paimon_schema_info) [[unlikely]] {
+ return Status::RuntimeError("miss paimon schema info.");
+ }
+
+ if
(!_params.paimon_schema_info.contains(_range.table_format_params.paimon_params.schema_id))
+ [[unlikely]] {
+ return Status::InternalError("miss paimon schema info.");
+ }
+
+ const auto& table_id_to_file_name =
+
_params.paimon_schema_info.at(_range.table_format_params.paimon_params.schema_id);
+ for (auto [table_col_id, file_col_name] : table_id_to_file_name) {
+ if (table_col_id_table_name_map.find(table_col_id) ==
table_col_id_table_name_map.end()) {
+ continue;
+ }
+ auto& table_col_name = table_col_id_table_name_map.at(table_col_id);
+
+ _table_col_to_file_col.emplace(table_col_name, file_col_name);
+ _file_col_to_table_col.emplace(file_col_name, table_col_name);
+ if (table_col_name != file_col_name) {
+ _has_schema_change = true;
+ }
+ }
+
+ _all_required_col_names.clear();
+ _not_in_file_col_names.clear();
+ for (auto name : read_table_col_names) {
+ auto iter = _table_col_to_file_col.find(name);
+ if (iter == _table_col_to_file_col.end()) {
+ auto name_low = to_lower(name);
+ _all_required_col_names.emplace_back(name_low);
+
+ _table_col_to_file_col.emplace(name, name_low);
+ _file_col_to_table_col.emplace(name_low, name);
+ if (name != name_low) {
+ _has_schema_change = true;
+ }
+ } else {
+ _all_required_col_names.emplace_back(iter->second);
+ }
+ }
+
+ for (auto& it : *table_col_name_to_value_range) {
+ auto iter = _table_col_to_file_col.find(it.first);
+ if (iter == _table_col_to_file_col.end()) {
+ _new_colname_to_value_range.emplace(it.first, it.second);
+ } else {
+ _new_colname_to_value_range.emplace(iter->second, it.second);
+ }
+ }
+ return Status::OK();
+}
+
Status PaimonReader::init_row_filters() {
const auto& table_desc = _range.table_format_params.paimon_params;
if (!table_desc.__isset.deletion_file) {
@@ -106,6 +168,29 @@ Status PaimonReader::init_row_filters() {
}
Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows,
bool* eof) {
- return _file_format_reader->get_next_block(block, read_rows, eof);
+ if (_has_schema_change) {
+ for (int i = 0; i < block->columns(); i++) {
+ ColumnWithTypeAndName& col = block->get_by_position(i);
+ auto iter = _table_col_to_file_col.find(col.name);
+ if (iter != _table_col_to_file_col.end()) {
+ col.name = iter->second;
+ }
+ }
+ block->initialize_index_by_name();
+ }
+
+ RETURN_IF_ERROR(_file_format_reader->get_next_block(block, read_rows,
eof));
+
+ if (_has_schema_change) {
+ for (int i = 0; i < block->columns(); i++) {
+ ColumnWithTypeAndName& col = block->get_by_position(i);
+ auto iter = _file_col_to_table_col.find(col.name);
+ if (iter != _file_col_to_table_col.end()) {
+ col.name = iter->second;
+ }
+ }
+ block->initialize_index_by_name();
+ }
+ return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/paimon_reader.h
b/be/src/vec/exec/format/table/paimon_reader.h
index dcc7bf9d700..11fffca9437 100644
--- a/be/src/vec/exec/format/table/paimon_reader.h
+++ b/be/src/vec/exec/format/table/paimon_reader.h
@@ -30,12 +30,19 @@ public:
PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx);
+
~PaimonReader() override = default;
Status init_row_filters() final;
Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof)
final;
+ Status gen_file_col_name(
+ const std::vector<std::string>& read_table_col_names,
+ const std::unordered_map<uint64_t, std::string>&
table_col_id_table_name_map,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
+ table_col_name_to_value_range);
+
protected:
struct PaimonProfile {
RuntimeProfile::Counter* num_delete_rows;
@@ -44,6 +51,16 @@ protected:
std::vector<int64_t> _delete_rows;
PaimonProfile _paimon_profile;
+ std::unordered_map<std::string, ColumnValueRangeType>
_new_colname_to_value_range;
+
+ std::unordered_map<std::string, std::string> _file_col_to_table_col;
+ std::unordered_map<std::string, std::string> _table_col_to_file_col;
+
+ std::vector<std::string> _all_required_col_names;
+ std::vector<std::string> _not_in_file_col_names;
+
+ bool _has_schema_change = false;
+
virtual void set_delete_rows() = 0;
};
@@ -60,6 +77,25 @@ public:
(reinterpret_cast<OrcReader*>(_file_format_reader.get()))
->set_position_delete_rowids(&_delete_rows);
}
+
+ Status init_reader(
+ const std::vector<std::string>& read_table_col_names,
+ const std::unordered_map<uint64_t, std::string>&
table_col_id_table_name_map,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
+ table_col_name_to_value_range,
+ const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const VExprContextSPtrs* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, VExprContextSPtrs>*
slot_id_to_filter_conjuncts) {
+ RETURN_IF_ERROR(gen_file_col_name(read_table_col_names,
table_col_id_table_name_map,
+ table_col_name_to_value_range));
+ auto* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
+ orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
+ return orc_reader->init_reader(&_all_required_col_names,
&_new_colname_to_value_range,
+ conjuncts, false, tuple_descriptor,
row_descriptor,
+ not_single_slot_filter_conjuncts,
+ slot_id_to_filter_conjuncts);
+ }
};
class PaimonParquetReader final : public PaimonReader {
@@ -75,5 +111,26 @@ public:
(reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
->set_delete_rows(&_delete_rows);
}
+
+ Status init_reader(
+ const std::vector<std::string>& read_table_col_names,
+ const std::unordered_map<uint64_t, std::string>&
table_col_id_table_name_map,
+ const std::unordered_map<std::string, ColumnValueRangeType>*
+ table_col_name_to_value_range,
+ const VExprContextSPtrs& conjuncts, const TupleDescriptor*
tuple_descriptor,
+ const RowDescriptor* row_descriptor,
+ const std::unordered_map<std::string, int>* colname_to_slot_id,
+ const VExprContextSPtrs* not_single_slot_filter_conjuncts,
+ const std::unordered_map<int, VExprContextSPtrs>*
slot_id_to_filter_conjuncts) {
+ RETURN_IF_ERROR(gen_file_col_name(read_table_col_names,
table_col_id_table_name_map,
+ table_col_name_to_value_range));
+ auto* parquet_reader =
static_cast<ParquetReader*>(_file_format_reader.get());
+ parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
+
+ return parquet_reader->init_reader(
+ _all_required_col_names, _not_in_file_col_names,
&_new_colname_to_value_range,
+ conjuncts, tuple_descriptor, row_descriptor,
colname_to_slot_id,
+ not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
+ }
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 985111cd040..1c17e788b00 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -961,15 +961,14 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type ==
"paimon") {
- std::vector<std::string> place_holder;
- init_status = parquet_reader->init_reader(
- _file_col_names, place_holder, _colname_to_value_range,
- _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
- _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts,
- &_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonParquetReader> paimon_reader =
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params,
range, _io_ctx.get());
+ init_status = paimon_reader->init_reader(
+ _file_col_names, _col_id_name_map,
_colname_to_value_range,
+ _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
+ _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts,
+ &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else {
@@ -1027,12 +1026,13 @@ Status VFileScanner::_get_next_reader() {
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type ==
"paimon") {
- init_status = orc_reader->init_reader(
- &_file_col_names, _colname_to_value_range,
_push_down_conjuncts, false,
- _real_tuple_desc, _default_val_row_desc.get(),
- &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonOrcReader> paimon_reader =
PaimonOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params,
range, _io_ctx.get());
+
+ init_status = paimon_reader->init_reader(
+ _file_col_names, _col_id_name_map,
_colname_to_value_range,
+ _push_down_conjuncts, _real_tuple_desc,
_default_val_row_desc.get(),
+ &_not_single_slot_filter_conjuncts,
&_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(paimon_reader->init_row_filters());
_cur_reader = std::move(paimon_reader);
} else {
@@ -1237,7 +1237,9 @@ Status VFileScanner::_init_expr_ctxes() {
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
_file_col_names.push_back(it->second->col_name());
- if (it->second->col_unique_id() > 0) {
+ if (it->second->col_unique_id() >= 0) {
+ // Iceberg field unique ID starts from 1, Paimon/Hudi field
unique ID starts from 0.
+ // For other data sources, all columns are set to -1.
_col_id_name_map.emplace(it->second->col_unique_id(),
it->second->col_name());
}
} else {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index ca7a03c68c0..9bd1f3e2aa1 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -107,7 +107,7 @@ protected:
// col names from _file_slot_descs
std::vector<std::string> _file_col_names;
// column id to name map. Collect from FE slot descriptor.
- std::unordered_map<int, std::string> _col_id_name_map;
+ std::unordered_map<uint64_t, std::string> _col_id_name_map;
// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;
diff --git a/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp
b/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp
new file mode 100644
index 00000000000..8dafed48a97
--- /dev/null
+++ b/be/test/vec/exec/format/paimon/paimon_schema_change_test.cpp
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <map>
+#include <string>
+
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/io_common.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/exec/format/table/paimon_reader.h"
+
+namespace doris::vectorized {
+
+class PaimonMockReader final : public PaimonReader {
+public:
+ PaimonMockReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
+ RuntimeState* state, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, io::IOContext* io_ctx)
+ : PaimonReader(std::move(file_format_reader), profile, state,
params, range, io_ctx) {};
+ ~PaimonMockReader() final = default;
+
+ void set_delete_rows() final {
+ (reinterpret_cast<OrcReader*>(_file_format_reader.get()))
+ ->set_position_delete_rowids(&_delete_rows);
+ }
+
+ void check() {
+ ASSERT_TRUE(_has_schema_change == true);
+ ASSERT_TRUE(_new_colname_to_value_range.empty());
+ std::unordered_map<std::string, std::string> table_col_to_file_col_ans;
+ table_col_to_file_col_ans["b"] = "map_col";
+ table_col_to_file_col_ans["e"] = "array_col";
+ table_col_to_file_col_ans["d"] = "struct_col";
+ table_col_to_file_col_ans["a"] = "vvv";
+ table_col_to_file_col_ans["c"] = "k";
+ table_col_to_file_col_ans["nonono"] = "nonono";
+ for (auto [table_col, file_col] : table_col_to_file_col_ans) {
+ ASSERT_TRUE(_table_col_to_file_col[table_col] == file_col);
+ ASSERT_TRUE(_file_col_to_table_col[file_col] == table_col);
+ }
+ }
+};
+
+class PaimonReaderTest : public ::testing::Test {
+protected:
+ void SetUp() override {
+ _profile = new RuntimeProfile("test_profile");
+ _state = new RuntimeState(TQueryGlobals());
+ _io_ctx = new io::IOContext();
+ _schema_file_path = "./be/test/exec/test_data/paimon_scanner/schema-0";
+ }
+
+ void TearDown() override {
+ delete _profile;
+ delete _state;
+ delete _io_ctx;
+ }
+
+ RuntimeProfile* _profile;
+ RuntimeState* _state;
+ io::IOContext* _io_ctx;
+ std::string _schema_file_path;
+};
+
+TEST_F(PaimonReaderTest, ReadSchemaFile) {
+ std::map<int64_t, std::string> file_id_to_name;
+ file_id_to_name[0] = "k";
+ file_id_to_name[1] = "vvv";
+ file_id_to_name[2] = "array_col";
+ file_id_to_name[3] = "struct_col";
+ file_id_to_name[6] = "map_col";
+
+ TFileScanRangeParams params;
+ params.file_type = TFileType::FILE_LOCAL;
+ params.properties = {};
+ params.hdfs_params = {};
+ params.__isset.paimon_schema_info = true;
+ params.paimon_schema_info[0] = file_id_to_name;
+ TFileRangeDesc range;
+ range.table_format_params.paimon_params.schema_id = 0;
+
+ PaimonMockReader reader(nullptr, _profile, _state, params, range, _io_ctx);
+
+ // create table tmp5 (
+ // k int,
+ // vVV string,
+ // array_col array<int>,
+ // struct_COL struct<a:int,b:string>,
+ // map_COL map<string,int>
+ // ) tblproperties (
+ // 'primary-key' = 'k',
+ // "file.format" = "parquet"
+ // );
+
+ std::vector<std::string> read_table_col_names;
+ read_table_col_names.emplace_back("a");
+ read_table_col_names.emplace_back("b");
+ read_table_col_names.emplace_back("c");
+ read_table_col_names.emplace_back("d");
+ read_table_col_names.emplace_back("e");
+ read_table_col_names.emplace_back("nonono");
+
+ std::unordered_map<uint64_t, std::string> table_col_id_table_name_map;
+ table_col_id_table_name_map[1] = "a";
+ table_col_id_table_name_map[6] = "b";
+ table_col_id_table_name_map[0] = "c";
+ table_col_id_table_name_map[3] = "d";
+ table_col_id_table_name_map[2] = "e";
+ table_col_id_table_name_map[10] = "nonono";
+
+ std::unordered_map<std::string, ColumnValueRangeType>
table_col_name_to_value_range;
+ Status status = reader.gen_file_col_name(read_table_col_names,
table_col_id_table_name_map,
+ &table_col_name_to_value_range);
+ ASSERT_TRUE(status.ok());
+ reader.check();
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run02.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run02.sql
new file mode 100644
index 00000000000..1d199a2bec8
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run02.sql
@@ -0,0 +1,159 @@
+use paimon;
+
+create database if not exists test_paimon_schema_change;
+
+use test_paimon_schema_change;
+
+CREATE TABLE sc_orc_pk (
+ id INT,
+ name STRING,
+ age INT
+) USING paimon
+TBLPROPERTIES ('primary-key' = 'id', "file.format" =
"orc",'deletion-vectors.enabled' = 'true');
+
+INSERT INTO sc_orc_pk (id, name, age) VALUES (1, 'Alice', 30), (2, 'Bob', 25);
+INSERT INTO sc_orc_pk (id, name, age) VALUES (3, 'Charlie', 28);
+ALTER TABLE sc_orc_pk ADD COLUMNS (city STRING);
+INSERT INTO sc_orc_pk (id, name, age, city) VALUES (4, 'Charlie', 28, 'New
York');
+INSERT INTO sc_orc_pk (id, name, age, city) VALUES (5, 'David', 32, 'Los
Angeles');
+ALTER TABLE sc_orc_pk RENAME COLUMN name TO full_name;
+INSERT INTO sc_orc_pk (id, full_name, age, city) VALUES (6, 'David', 35, 'Los
Angeles');
+INSERT INTO sc_orc_pk (id, full_name, age, city) VALUES (7, 'Eve', 27, 'San
Francisco');
+ALTER TABLE sc_orc_pk DROP COLUMN age;
+INSERT INTO sc_orc_pk (id, full_name, city) VALUES (8, 'Eve', 'San Francisco');
+INSERT INTO sc_orc_pk (id, full_name, city) VALUES (9, 'Frank', 'Chicago');
+ALTER TABLE sc_orc_pk CHANGE COLUMN id id BIGINT;
+INSERT INTO sc_orc_pk (id, full_name, city) VALUES (10000000000, 'Frank',
'Chicago');
+INSERT INTO sc_orc_pk (id, full_name, city) VALUES (10, 'Grace', 'Seattle');
+
+ALTER TABLE sc_orc_pk ADD COLUMN salary DECIMAL(10,2) FIRST;
+INSERT INTO sc_orc_pk (id, full_name, city, salary) VALUES (11, 'Grace',
'Seattle', 5000.00);
+INSERT INTO sc_orc_pk (id, full_name, city, salary) VALUES (12, 'Heidi',
'Boston', 6000.00);
+
+ALTER TABLE sc_orc_pk RENAME COLUMN city TO location;
+INSERT INTO sc_orc_pk (id, full_name, location, salary) VALUES (13, 'Heidi',
'Boston', 6000.00);
+INSERT INTO sc_orc_pk (id, full_name, location, salary) VALUES (14, 'Ivan',
'Miami', 7000.00);
+
+ALTER TABLE sc_orc_pk CHANGE COLUMN salary salary DECIMAL(12,2);
+INSERT INTO sc_orc_pk (id, full_name, location, salary) VALUES (15, 'Ivan',
'Miami', 7000.00);
+INSERT INTO sc_orc_pk (id, full_name, location, salary) VALUES (16, 'Judy',
'Denver', 8000.00);
+
+ALTER TABLE sc_orc_pk ALTER COLUMN salary AFTER location;
+INSERT INTO sc_orc_pk (id, full_name, location, salary) VALUES (17, 'Stm',
'ttttt', 8000.00);
+INSERT INTO sc_orc_pk (id, full_name, location, salary) VALUES (18, 'Ken',
'Austin', 9000.00);
+
+ALTER TABLE sc_orc_pk ALTER COLUMN full_name FIRST;
+INSERT INTO sc_orc_pk (id, full_name, location, salary) VALUES (19, 'AAAA',
'BBBB', 9000.00);
+INSERT INTO sc_orc_pk (id, full_name, location, salary) VALUES (20, 'Laura',
'Portland', 10000.00);
+
+
+
+
+
+CREATE TABLE sc_parquet_pk (
+ id INT,
+ name STRING,
+ age INT
+) USING paimon
+TBLPROPERTIES ('primary-key' = 'id',"file.format" =
"parquet",'deletion-vectors.enabled' = 'true');
+
+INSERT INTO sc_parquet_pk (id, name, age) VALUES (1, 'Alice', 30), (2, 'Bob',
25);
+INSERT INTO sc_parquet_pk (id, name, age) VALUES (3, 'Charlie', 28);
+
+ALTER TABLE sc_parquet_pk ADD COLUMNS (city STRING);
+INSERT INTO sc_parquet_pk (id, name, age, city) VALUES (3, 'Charlie', 28, 'New
York');
+INSERT INTO sc_parquet_pk (id, name, age, city) VALUES (4, 'David', 32, 'Los
Angeles');
+
+ALTER TABLE sc_parquet_pk RENAME COLUMN name TO full_name;
+INSERT INTO sc_parquet_pk (id, full_name, age, city) VALUES (4, 'David', 35,
'Los Angeles');
+INSERT INTO sc_parquet_pk (id, full_name, age, city) VALUES (5, 'Eve', 27,
'San Francisco');
+
+ALTER TABLE sc_parquet_pk DROP COLUMN age;
+INSERT INTO sc_parquet_pk (id, full_name, city) VALUES (5, 'Eve', 'San
Francisco');
+INSERT INTO sc_parquet_pk (id, full_name, city) VALUES (6, 'Frank', 'Chicago');
+
+ALTER TABLE sc_parquet_pk CHANGE COLUMN id id BIGINT;
+INSERT INTO sc_parquet_pk (id, full_name, city) VALUES (10000000000, 'Frank',
'Chicago');
+INSERT INTO sc_parquet_pk (id, full_name, city) VALUES (7, 'Grace', 'Seattle');
+
+ALTER TABLE sc_parquet_pk ADD COLUMN salary DECIMAL(10,2) FIRST;
+INSERT INTO sc_parquet_pk (id, full_name, city, salary) VALUES (6, 'Grace',
'Seattle', 5000.00);
+INSERT INTO sc_parquet_pk (id, full_name, city, salary) VALUES (8, 'Heidi',
'Boston', 6000.00);
+
+ALTER TABLE sc_parquet_pk RENAME COLUMN city TO location;
+INSERT INTO sc_parquet_pk (id, full_name, location, salary) VALUES (7,
'Heidi', 'Boston', 6000.00);
+INSERT INTO sc_parquet_pk (id, full_name, location, salary) VALUES (9, 'Ivan',
'Miami', 7000.00);
+
+ALTER TABLE sc_parquet_pk CHANGE COLUMN salary salary DECIMAL(12,2);
+INSERT INTO sc_parquet_pk (id, full_name, location, salary) VALUES (8, 'Ivan',
'Miami', 7000.00);
+INSERT INTO sc_parquet_pk (id, full_name, location, salary) VALUES (10,
'Judy', 'Denver', 8000.00);
+
+ALTER TABLE sc_parquet_pk ALTER COLUMN salary AFTER location;
+INSERT INTO sc_parquet_pk (id, full_name, location, salary) VALUES (9, 'Stm',
'ttttt', 8000.00);
+INSERT INTO sc_parquet_pk (id, full_name, location, salary) VALUES (11, 'Ken',
'Austin', 9000.00);
+
+ALTER TABLE sc_parquet_pk ALTER COLUMN full_name FIRST;
+INSERT INTO sc_parquet_pk (id, full_name, location, salary) VALUES (10,
'AAAA', 'BBBB', 9000.00);
+INSERT INTO sc_parquet_pk (id, full_name, location, salary) VALUES (12,
'Laura', 'Portland', 10000.00);
+
+
+
+
+
+
+
+
+create table sc_parquet (
+ k int,
+ vVV string,
+ col1 array<int>,
+ col2 struct<a:int,b:string>,
+ col3 map<string,int>
+) tblproperties (
+ "file.format" = "parquet"
+);
+INSERT INTO sc_parquet (k,vVV,col1,col2,col3) VALUES
+ (1, 'hello', array(1,2,3), named_struct('a', 10, 'b', 'world'),
map('key1', 100, 'key2', 200));
+
+ALTER TABLE sc_parquet RENAME COLUMN col1 TO new_col1;
+ALTER TABLE sc_parquet RENAME COLUMN col2 TO new_col2;
+ALTER TABLE sc_parquet RENAME COLUMN col3 TO new_col3;
+ALTER TABLE sc_parquet RENAME COLUMN vVV to vv;
+alter table sc_parquet ALTER COLUMN new_col2 AFTER new_col3;
+alter table sc_parquet ALTER COLUMN new_col1 AFTER new_col2;
+
+INSERT INTO sc_parquet (k,vv,new_col1,new_col2,new_col3) VALUES
+ (2, 'test', array(4,5,6), named_struct('a', 20, 'b', 'spark'), map('key3',
300)),
+ (3, 'example', array(7,8,9), named_struct('a', 30, 'b', 'hive'),
map('key4', 400, 'key5', 500));
+
+
+
+
+
+
+
+create table sc_orc (
+ k int,
+ vVV string,
+ col1 array<int>,
+ col2 struct<a:int,b:string>,
+ col3 map<string,int>
+) tblproperties (
+ "file.format" = "orc"
+);
+
+INSERT INTO sc_orc (k,vVV,col1,col2,col3) VALUES
+ (1, 'hello', array(1,2,3), named_struct('a', 10, 'b', 'world'),
map('key1', 100, 'key2', 200));
+
+ALTER TABLE sc_orc RENAME COLUMN col1 TO new_col1;
+ALTER TABLE sc_orc RENAME COLUMN col2 TO new_col2;
+ALTER TABLE sc_orc RENAME COLUMN col3 TO new_col3;
+ALTER TABLE sc_orc RENAME COLUMN vVV to vv;
+alter table sc_orc ALTER COLUMN new_col2 AFTER new_col3;
+alter table sc_orc ALTER COLUMN new_col1 AFTER new_col2;
+
+INSERT INTO sc_orc (k,vv,new_col1,new_col2,new_col3) VALUES
+ (2, 'test', array(4,5,6), named_struct('a', 20, 'b', 'spark'), map('key3',
300)),
+ (3, 'example', array(7,8,9), named_struct('a', 30, 'b', 'hive'),
map('key4', 400, 'key5', 500));
+
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 4878d2bdfb2..80710a5fb2a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -254,7 +254,7 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
partitionColumns.add(column);
}
}
- return Optional.of(new PaimonSchemaCacheValue(dorisColumns,
partitionColumns));
+ return Optional.of(new PaimonSchemaCacheValue(dorisColumns,
partitionColumns, tableSchema));
} catch (Exception e) {
throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
null, getCatalog().getName(), key.getDbName(),
key.getTblName(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
index ccb530a3cbc..e931b52336b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
@@ -20,18 +20,28 @@ package org.apache.doris.datasource.paimon;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.paimon.schema.TableSchema;
+
import java.util.List;
public class PaimonSchemaCacheValue extends SchemaCacheValue {
private List<Column> partitionColumns;
- public PaimonSchemaCacheValue(List<Column> schema, List<Column>
partitionColumns) {
+ private TableSchema tableSchema;
+ // Caching TableSchema can reduce the reading of schema files and json
parsing.
+
+ public PaimonSchemaCacheValue(List<Column> schema, List<Column>
partitionColumns, TableSchema tableSchema) {
super(schema);
this.partitionColumns = partitionColumns;
+ this.tableSchema = tableSchema;
}
public List<Column> getPartitionColumns() {
return partitionColumns;
}
+
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 1b4edfa5ed9..25539c82477 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -48,10 +48,12 @@ import org.apache.logging.log4j.Logger;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.InstantiationUtil;
import java.io.IOException;
@@ -130,6 +132,7 @@ public class PaimonScanNode extends FileQueryScanNode {
source = new PaimonSource(desc);
serializedTable = encodeObjectToString(source.getPaimonTable());
Preconditions.checkNotNull(source);
+ params.setPaimonSchemaInfo(new HashMap<>());
}
@VisibleForTesting
@@ -167,6 +170,17 @@ public class PaimonScanNode extends FileQueryScanNode {
return Optional.of(serializedTable);
}
+ private Map<Long, String> getSchemaInfo(Long schemaId) {
+ PaimonExternalTable table = (PaimonExternalTable)
source.getTargetTable();
+ TableSchema tableSchema =
table.getPaimonSchemaCacheValue(schemaId).getTableSchema();
+ Map<Long, String> columnIdToName = new
HashMap<>(tableSchema.fields().size());
+ for (DataField dataField : tableSchema.fields()) {
+ columnIdToName.put((long) dataField.id(),
dataField.name().toLowerCase());
+ }
+
+ return columnIdToName;
+ }
+
private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit
paimonSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
@@ -188,8 +202,9 @@ public class PaimonScanNode extends FileQueryScanNode {
} else {
throw new RuntimeException("Unsupported file format: " +
fileFormat);
}
+ fileDesc.setSchemaId(paimonSplit.getSchemaId());
+
params.paimon_schema_info.computeIfAbsent(paimonSplit.getSchemaId(),
this::getSchemaInfo);
}
-
fileDesc.setFileFormat(fileFormat);
fileDesc.setPaimonPredicate(encodeObjectToString(predicates));
fileDesc.setPaimonColumnNames(source.getDesc().getSlots().stream().map(slot ->
slot.getColumn().getName())
@@ -250,7 +265,6 @@ public class PaimonScanNode extends FileQueryScanNode {
for (DataSplit dataSplit : dataSplits) {
SplitStat splitStat = new SplitStat();
splitStat.setRowCount(dataSplit.rowCount());
-
BinaryRow partitionValue = dataSplit.partition();
selectedPartitionValues.add(partitionValue);
Optional<List<RawFile>> optRawFiles =
dataSplit.convertToRawFiles();
@@ -283,6 +297,7 @@ public class PaimonScanNode extends FileQueryScanNode {
null,
PaimonSplit.PaimonSplitCreator.DEFAULT);
for (Split dorisSplit : dorisSplits) {
+ ((PaimonSplit)
dorisSplit).setSchemaId(file.schemaId());
// try to set deletion file
if (optDeletionFiles.isPresent() &&
optDeletionFiles.get().get(i) != null) {
((PaimonSplit)
dorisSplit).setDeletionFile(optDeletionFiles.get().get(i));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
index 224960e5c96..822d4b28dcc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSplit.java
@@ -37,6 +37,7 @@ public class PaimonSplit extends FileSplit {
private TableFormatType tableFormatType;
private Optional<DeletionFile> optDeletionFile = Optional.empty();
private Optional<Long> optRowCount = Optional.empty();
+ private Optional<Long> schemaId = Optional.empty();
public PaimonSplit(DataSplit split) {
super(DUMMY_PATH, 0, 0, 0, 0, null, null);
@@ -93,6 +94,14 @@ public class PaimonSplit extends FileSplit {
this.optRowCount = Optional.of(rowCount);
}
+ public void setSchemaId(long schemaId) {
+ this.schemaId = Optional.of(schemaId);
+ }
+
+ public Long getSchemaId() {
+ return schemaId.orElse(null);
+ }
+
public static class PaimonSplitCreator implements SplitCreator {
static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index a8f10a3f053..c4a01bf2ec4 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -340,6 +340,7 @@ struct TPaimonFileDesc {
13: optional map<string, string> hadoop_conf // deprecated
14: optional string paimon_table // deprecated
15: optional i64 row_count // deprecated
+ 16: optional i64 schema_id; // for schema change.
}
struct TTrinoConnectorFileDesc {
@@ -466,6 +467,7 @@ struct TFileScanRangeParams {
// 1. Reduce the access to HMS and HDFS on the JNI side.
// 2. There will be no inconsistency between the fe and be tables.
24: optional string serialized_table
+ 25: optional map<i64, map<i64, string>> paimon_schema_info //paimon
map<schema id, map<column unique id , column name>> : for schema change.
}
struct TFileRangeDesc {
diff --git
a/regression-test/data/external_table_p0/paimon/test_paimon_schema_change.out
b/regression-test/data/external_table_p0/paimon/test_paimon_schema_change.out
new file mode 100644
index 00000000000..5d33ed9f7e6
Binary files /dev/null and
b/regression-test/data/external_table_p0/paimon/test_paimon_schema_change.out
differ
diff --git
a/regression-test/suites/external_table_p0/paimon/test_paimon_schema_change.groovy
b/regression-test/suites/external_table_p0/paimon/test_paimon_schema_change.groovy
new file mode 100644
index 00000000000..2e9f9790a28
--- /dev/null
+++
b/regression-test/suites/external_table_p0/paimon/test_paimon_schema_change.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_schema_change",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enablePaimonTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
+ String catalog_name = "test_paimon_schema_change"
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ String table_name = "ts_scale_orc"
+
+ sql """drop catalog if exists ${catalog_name}"""
+
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type' = 'paimon',
+ 'warehouse' = 's3://warehouse/wh',
+ 's3.endpoint' = 'http://${externalEnvIp}:${minio_port}',
+ 's3.access_key' = 'admin',
+ 's3.secret_key' = 'password',
+ 's3.path.style.access' = 'true'
+ );
+ """
+ sql """switch `${catalog_name}`"""
+ sql """show databases; """
+ sql """use test_paimon_schema_change """
+
+
+ qt_desc_1 """ desc sc_parquet_pk """
+
+ qt_parquet_pk_1 """SELECT * FROM sc_parquet_pk order by id;"""
+ qt_parquet_pk_2 """SELECT full_name, location FROM sc_parquet_pk
order by id;"""
+ qt_parquet_pk_3 """SELECT * FROM sc_parquet_pk WHERE salary IS NULL
order by id;"""
+ qt_parquet_pk_4 """SELECT * FROM sc_parquet_pk WHERE salary IS NOT
NULL order by id;"""
+ qt_parquet_pk_5 """SELECT * FROM sc_parquet_pk WHERE location = 'New
York' OR location = 'Los Angeles' order by id;"""
+ qt_parquet_pk_6 """SELECT * FROM sc_parquet_pk WHERE id > 5 order by
id;"""
+ qt_parquet_pk_7 """SELECT * FROM sc_parquet_pk WHERE salary > 6000
order by id;"""
+
+
+ qt_desc_2 """ desc sc_orc_pk """
+ qt_orc_pk_1 """SELECT * FROM sc_orc_pk order by id;"""
+ qt_orc_pk_2 """SELECT full_name, location FROM sc_orc_pk order by
id;"""
+ qt_orc_pk_3 """SELECT * FROM sc_orc_pk WHERE salary IS NULL order by
id;"""
+ qt_orc_pk_4 """SELECT * FROM sc_orc_pk WHERE salary IS NOT NULL order
by id;"""
+ qt_orc_pk_5 """SELECT * FROM sc_orc_pk WHERE location = 'New York' OR
location = 'Los Angeles' order by id;"""
+ qt_orc_pk_6 """SELECT * FROM sc_orc_pk WHERE id > 5 order by id;"""
+ qt_orc_pk_7 """SELECT * FROM sc_orc_pk WHERE salary > 6000 order by
id;"""
+
+
+
+ qt_desc_3 """ desc sc_parquet """
+
+ qt_parquet_1 """select * from sc_parquet order by k;"""
+ qt_parquet_2 """select * from sc_parquet where k >= 3;"""
+ qt_parquet_3 """select * from sc_parquet where k <= 1;"""
+
+
+ qt_desc_4 """ desc sc_orc """
+ qt_orc_1 """select * from sc_orc order by k;"""
+ qt_orc_2 """select * from sc_orc where k >= 3;"""
+ qt_orc_3 """select * from sc_orc where k <= 1;"""
+
+
+ qt_count_1 """ select count(*) from sc_parquet_pk;"""
+ qt_count_2 """ select count(*) from sc_orc_pk;"""
+ qt_count_3 """ select count(*) from sc_parquet;"""
+ qt_count_4 """ select count(*) from sc_orc;"""
+
+
+ }
+}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]