This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new f67ed9f0a4a [fix](iceberg) iceberg use customer method to encode
special characters of field name (#27108) (#27205)
f67ed9f0a4a is described below
commit f67ed9f0a4aa55a4c9a131648132dd52f9ee98af
Author: Ashin Gau <[email protected]>
AuthorDate: Fri Nov 17 21:21:00 2023 +0800
[fix](iceberg) iceberg use customer method to encode special characters of
field name (#27108) (#27205)
Fix two bugs:
1. Missing column is case sensitive, change the column name to lower case
in FE for hive/iceberg/hudi
2. Iceberg use custom method to encode special characters in column name.
Decode the column name to match the right column in parquet reader.
---
be/src/vec/exec/format/parquet/schema_desc.cpp | 67 ++++++++++++++++++++++
be/src/vec/exec/format/parquet/schema_desc.h | 4 ++
.../exec/format/parquet/vparquet_file_metadata.h | 3 +
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 6 ++
be/src/vec/exec/format/parquet/vparquet_reader.h | 3 +
be/src/vec/exec/format/table/iceberg_reader.cpp | 1 +
be/src/vec/exec/scan/vfile_scanner.cpp | 22 +++----
be/src/vec/exec/scan/vfile_scanner.h | 6 +-
.../doris/catalog/external/HMSExternalTable.java | 6 +-
.../catalog/external/IcebergExternalTable.java | 3 +-
.../test_external_catalog_iceberg_common.out | 10 +++-
.../iceberg/test_external_catalog_iceberg_common | 3 +
12 files changed, 112 insertions(+), 22 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index 1b830689c0e..c9283c62889 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -26,6 +26,7 @@
#include "common/logging.h"
#include "runtime/define_primitive_type.h"
#include "util/slice.h"
+#include "util/string_util.h"
namespace doris::vectorized {
@@ -239,6 +240,72 @@ TypeDescriptor FieldDescriptor::get_doris_type(const
tparquet::SchemaElement& ph
return type;
}
+// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
+static bool is_valid_avro_name(const std::string& name) {
+ int length = name.length();
+ char first = name[0];
+ if (!isalpha(first) && first != '_') {
+ return false;
+ }
+
+ for (int i = 1; i < length; i++) {
+ char character = name[i];
+ if (!isalpha(character) && !isdigit(character) && character != '_') {
+ return false;
+ }
+ }
+ return true;
+}
+
+// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
+static void sanitize_avro_name(std::ostringstream& buf, char character) {
+ if (isdigit(character)) {
+ buf << '_' << character;
+ } else {
+ std::stringstream ss;
+ ss << std::hex << (int)character;
+ std::string hex_str = ss.str();
+ buf << "_x" << doris::to_lower(hex_str);
+ }
+}
+
+// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
+static std::string sanitize_avro_name(const std::string& name) {
+ std::ostringstream buf;
+ int length = name.length();
+ char first = name[0];
+ if (!isalpha(first) && first != '_') {
+ sanitize_avro_name(buf, first);
+ } else {
+ buf << first;
+ }
+
+ for (int i = 1; i < length; i++) {
+ char character = name[i];
+ if (!isalpha(character) && !isdigit(character) && character != '_') {
+ sanitize_avro_name(buf, character);
+ } else {
+ buf << character;
+ }
+ }
+ return buf.str();
+}
+
+void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>&
read_columns) {
+ for (const std::string& col : read_columns) {
+ if (!is_valid_avro_name(col)) {
+ std::string sanitize_name = sanitize_avro_name(col);
+ auto it = _name_to_field.find(sanitize_name);
+ if (it != _name_to_field.end()) {
+ FieldSchema* schema = const_cast<FieldSchema*>(it->second);
+ schema->name = col;
+ _name_to_field.emplace(col, schema);
+ _name_to_field.erase(sanitize_name);
+ }
+ }
+ }
+}
+
TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType
logicalType) {
TypeDescriptor type;
if (logicalType.__isset.STRING) {
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h
b/be/src/vec/exec/format/parquet/schema_desc.h
index fb61ad918a7..d763e40e2ed 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -91,6 +91,10 @@ private:
TypeDescriptor get_doris_type(const tparquet::SchemaElement&
physical_schema);
public:
+ // org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special
characters,
+ // we have to decode these characters
+ void iceberg_sanitize(const std::vector<std::string>& read_columns);
+
FieldDescriptor() = default;
~FieldDescriptor() = default;
diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
index 6f52ef5b4af..5d745a0db62 100644
--- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
+++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h
@@ -32,6 +32,9 @@ public:
Status init_schema();
const FieldDescriptor& schema() const { return _schema; }
const tparquet::FileMetaData& to_thrift();
+ void iceberg_sanitize(const std::vector<std::string>& read_columns) {
+ _schema.iceberg_sanitize(read_columns);
+ }
std::string debug_string() const;
private:
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 49798ed4f13..124f623f2e9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -294,6 +294,12 @@ void ParquetReader::_init_file_description() {
}
}
+void ParquetReader::iceberg_sanitize(const std::vector<std::string>&
read_columns) {
+ if (_file_metadata != nullptr) {
+ _file_metadata->iceberg_sanitize(read_columns);
+ }
+}
+
Status ParquetReader::init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 5ceca55d7ec..6efd0bd7237 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -139,6 +139,9 @@ public:
const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }
+ // Only for iceberg reader to sanitize invalid column names
+ void iceberg_sanitize(const std::vector<std::string>& read_columns);
+
Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string,
const SlotDescriptor*>>&
partition_columns,
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 9f98a0ae3f4..bcaa99143f0 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -129,6 +129,7 @@ Status IcebergTableReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
+ parquet_reader->iceberg_sanitize(_all_required_col_names);
Status status = parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names,
&_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 7f35422ec03..fd77b084bb9 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -418,7 +418,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
}
Status VFileScanner::_fill_columns_from_path(size_t rows) {
- for (auto& kv : *_partition_columns) {
+ for (auto& kv : _partition_col_descs) {
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
auto& [value, slot_desc] = kv.second;
@@ -437,7 +437,7 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
}
SCOPED_TIMER(_fill_missing_columns_timer);
- for (auto& kv : *_missing_columns) {
+ for (auto& kv : _missing_col_descs) {
if (kv.second == nullptr) {
// no default column, fill with null
auto nullable_column =
reinterpret_cast<vectorized::ColumnNullable*>(
@@ -862,9 +862,8 @@ Status VFileScanner::_get_next_reader() {
}
Status VFileScanner::_generate_fill_columns() {
- _partition_columns.reset(
- new std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>());
- _missing_columns.reset(new std::unordered_map<std::string,
VExprContextSPtr>());
+ _partition_col_descs.clear();
+ _missing_col_descs.clear();
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
@@ -881,8 +880,8 @@ Status VFileScanner::_generate_fill_columns() {
if (size == 4 && memcmp(data, "null", 4) == 0) {
data = TextConverter::NULL_STR;
}
- _partition_columns->emplace(slot_desc->col_name(),
- std::make_tuple(data, slot_desc));
+ _partition_col_descs.emplace(slot_desc->col_name(),
+ std::make_tuple(data, slot_desc));
}
}
}
@@ -901,16 +900,11 @@ Status VFileScanner::_generate_fill_columns() {
return Status::InternalError("failed to find default value
expr for slot: {}",
slot_desc->col_name());
}
- _missing_columns->emplace(slot_desc->col_name(), it->second);
+ _missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}
- RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns,
*_missing_columns));
- if (_cur_reader->fill_all_columns()) {
- _partition_columns.reset(nullptr);
- _missing_columns.reset(nullptr);
- }
- return Status::OK();
+ return _cur_reader->set_fill_columns(_partition_col_descs,
_missing_col_descs);
}
Status VFileScanner::_init_expr_ctxes() {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 3611785625d..e3533ce05c2 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -162,9 +162,9 @@ protected:
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;
- std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string,
const SlotDescriptor*>>>
- _partition_columns;
- std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>>
_missing_columns;
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ _partition_col_descs;
+ std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
private:
RuntimeProfile::Counter* _get_block_timer = nullptr;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 2729fdf7a95..0243ad12f75 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -435,7 +435,7 @@ public class HMSExternalTable extends ExternalTable {
} else {
List<Column> tmpSchema =
Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
- tmpSchema.add(new Column(field.getName(),
+ tmpSchema.add(new
Column(field.getName().toLowerCase(Locale.ROOT),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, field.getComment(), true, -1));
}
@@ -484,7 +484,7 @@ public class HMSExternalTable extends ExternalTable {
Schema schema = icebergTable.schema();
List<Column> tmpSchema =
Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
- tmpSchema.add(new Column(field.getName(),
+ tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
true, null, true, false, null, field.getComment(), true,
null,
@@ -500,7 +500,7 @@ public class HMSExternalTable extends ExternalTable {
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
for (Column column : schema) {
- if (partitionKey.equals(column.getName())) {
+ if (partitionKey.equalsIgnoreCase(column.getName())) {
// For partition column, if it is string type, change it
to varchar(65535)
// to be same as doris managed table.
// This is to avoid some unexpected behavior such as
different partition pruning result
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
index bede9b99e43..7398ff19c9e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.types.Types;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Optional;
public class IcebergExternalTable extends ExternalTable {
@@ -66,7 +67,7 @@ public class IcebergExternalTable extends ExternalTable {
List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema =
Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
- tmpSchema.add(new Column(field.name(),
+ tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
icebergTypeToDorisType(field.type()), true, null,
true, field.doc(), true,
schema.caseInsensitiveFindField(field.name()).fieldId()));
}
diff --git
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out
index 9554f1d21f0..a51bac0e1b5 100644
---
a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out
+++
b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out
@@ -1,3 +1,11 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
-- !q01 --
-599715
\ No newline at end of file
+599715
+
+-- !sanitize_mara --
+MATNR1 3.140 /DSD/SV_CNT_GRP1
+MATNR2 3.240 /DSD/SV_CNT_GRP2
+MATNR4 3.440 /DSD/SV_CNT_GRP4
+MATNR5 3.540 /DSD/SV_CNT_GRP5
+MATNR6 3.640 /DSD/SV_CNT_GRP6
+
diff --git
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common
index a035ea6d1b3..577a4e6702a 100644
---
a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common
+++
b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common
@@ -46,5 +46,8 @@ suite("test_external_catalog_iceberg_common",
"p2,external,iceberg,external_remo
}
sql """ use `iceberg_catalog`; """
q01_parquet()
+
+ // test the special characters in table fields
+ qt_sanitize_mara """select MaTnR, NtgEW, `/dsd/Sv_cnt_grP` from
sanitize_mara order by mAtNr"""
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]