This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5f952cf6ed6 branch-2.1: [fix](iceberg)Bring field_id with parquet
files And fix map type's key optional #44470 (#44828)
5f952cf6ed6 is described below
commit 5f952cf6ed698b208f910413a8a12663f226c63f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Dec 2 10:24:07 2024 +0800
branch-2.1: [fix](iceberg)Bring field_id with parquet files And fix map
type's key optional #44470 (#44828)
Cherry-picked from #44470
Co-authored-by: wuwenchi <[email protected]>
---
.../format/table/iceberg/arrow_schema_util.cpp | 134 +++++++++
.../exec/format/table/iceberg/arrow_schema_util.h | 45 +++
be/src/vec/exec/format/table/iceberg/types.cpp | 3 +-
be/src/vec/exec/format/table/iceberg/types.h | 4 +
be/src/vec/runtime/vparquet_transformer.cpp | 51 ++--
be/src/vec/runtime/vparquet_transformer.h | 5 +-
.../writer/iceberg/viceberg_partition_writer.cpp | 2 +-
.../table/iceberg/arrow_schema_util_test.cpp | 304 +++++++++++++++++++++
.../format/table/iceberg/schema_parser_test.cpp | 24 ++
9 files changed, 547 insertions(+), 25 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
new file mode 100644
index 00000000000..35a4d51b7f1
--- /dev/null
+++ b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.cpp
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
+
+#include <arrow/type.h>
+#include <arrow/util/key_value_metadata.h>
+
+namespace doris {
+namespace iceberg {
+
+const char* ArrowSchemaUtil::PARQUET_FIELD_ID = "PARQUET:field_id";
+const char* ArrowSchemaUtil::ORIGINAL_TYPE = "originalType";
+const char* ArrowSchemaUtil::MAP_TYPE_VALUE = "mapType";
+
+Status ArrowSchemaUtil::convert(const Schema* schema, const std::string&
timezone,
+ std::vector<std::shared_ptr<arrow::Field>>&
fields) {
+ for (const auto& column : schema->columns()) {
+ std::shared_ptr<arrow::Field> arrow_field;
+ RETURN_IF_ERROR(convert_to(column, &arrow_field, timezone));
+ fields.push_back(arrow_field);
+ }
+ return Status::OK();
+}
+
+Status ArrowSchemaUtil::convert_to(const iceberg::NestedField& field,
+ std::shared_ptr<arrow::Field>* arrow_field,
+ const std::string& timezone) {
+ std::shared_ptr<arrow::DataType> arrow_type;
+ std::unordered_map<std::string, std::string> metadata;
+ metadata[PARQUET_FIELD_ID] = std::to_string(field.field_id());
+
+ switch (field.field_type()->type_id()) {
+ case iceberg::TypeID::BOOLEAN:
+ arrow_type = arrow::boolean();
+ break;
+
+ case iceberg::TypeID::INTEGER:
+ arrow_type = arrow::int32();
+ break;
+
+ case iceberg::TypeID::LONG:
+ arrow_type = arrow::int64();
+ break;
+
+ case iceberg::TypeID::FLOAT:
+ arrow_type = arrow::float32();
+ break;
+
+ case iceberg::TypeID::DOUBLE:
+ arrow_type = arrow::float64();
+ break;
+
+ case iceberg::TypeID::DATE:
+ arrow_type = arrow::date32();
+ break;
+
+ case iceberg::TypeID::TIMESTAMP: {
+ arrow_type =
std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO, timezone);
+ break;
+ }
+
+ case iceberg::TypeID::BINARY:
+ case iceberg::TypeID::STRING:
+ case iceberg::TypeID::UUID:
+ case iceberg::TypeID::FIXED:
+ arrow_type = arrow::utf8();
+ break;
+
+ case iceberg::TypeID::DECIMAL: {
+ auto dt = dynamic_cast<DecimalType*>(field.field_type());
+ arrow_type = arrow::decimal(dt->get_precision(), dt->get_scale());
+ break;
+ }
+
+ case iceberg::TypeID::STRUCT: {
+ std::vector<std::shared_ptr<arrow::Field>> element_fields;
+ StructType* st = field.field_type()->as_struct_type();
+ for (const auto& column : st->fields()) {
+ std::shared_ptr<arrow::Field> element_field;
+ RETURN_IF_ERROR(convert_to(column, &element_field, timezone));
+ element_fields.push_back(element_field);
+ }
+ arrow_type = arrow::struct_(element_fields);
+ break;
+ }
+
+ case iceberg::TypeID::LIST: {
+ std::shared_ptr<arrow::Field> item_field;
+ ListType* list_type = field.field_type()->as_list_type();
+ RETURN_IF_ERROR(convert_to(list_type->element_field(), &item_field,
timezone));
+ arrow_type = arrow::list(item_field);
+ break;
+ }
+
+ case iceberg::TypeID::MAP: {
+ std::shared_ptr<arrow::Field> key_field;
+ std::shared_ptr<arrow::Field> value_field;
+ MapType* map_type = field.field_type()->as_map_type();
+ RETURN_IF_ERROR(convert_to(map_type->key_field(), &key_field,
timezone));
+ RETURN_IF_ERROR(convert_to(map_type->value_field(), &value_field,
timezone));
+ metadata[ORIGINAL_TYPE] = MAP_TYPE_VALUE;
+ arrow_type = std::make_shared<arrow::MapType>(key_field, value_field);
+ break;
+ }
+
+ case iceberg::TypeID::TIME:
+ default:
+ return Status::InternalError("Unsupported field type:" +
field.field_type()->to_string());
+ }
+
+ std::shared_ptr<arrow::KeyValueMetadata> schema_metadata =
+ std::make_shared<arrow::KeyValueMetadata>(metadata);
+ *arrow_field =
+ arrow::field(field.field_name(), arrow_type, field.is_optional(),
schema_metadata);
+ return Status::OK();
+}
+
+} // namespace iceberg
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h
b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h
new file mode 100644
index 00000000000..20b7dbc627c
--- /dev/null
+++ b/be/src/vec/exec/format/table/iceberg/arrow_schema_util.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/type.h>
+
+#include <shared_mutex>
+
+#include "vec/exec/format/table/iceberg/schema.h"
+
+namespace doris {
+namespace iceberg {
+
+class ArrowSchemaUtil {
+public:
+ static Status convert(const Schema* schema, const std::string& timezone,
+ std::vector<std::shared_ptr<arrow::Field>>& fields);
+
+private:
+ static const char* PARQUET_FIELD_ID;
+ static const char* ORIGINAL_TYPE;
+ static const char* MAP_TYPE_VALUE;
+
+ static Status convert_to(const iceberg::NestedField& field,
+ std::shared_ptr<arrow::Field>* arrow_field,
+ const std::string& timezone);
+};
+
+} // namespace iceberg
+} // namespace doris
diff --git a/be/src/vec/exec/format/table/iceberg/types.cpp
b/be/src/vec/exec/format/table/iceberg/types.cpp
index b56a231979a..bf643655ab8 100644
--- a/be/src/vec/exec/format/table/iceberg/types.cpp
+++ b/be/src/vec/exec/format/table/iceberg/types.cpp
@@ -25,8 +25,9 @@ namespace iceberg {
std::unique_ptr<MapType> MapType::of_optional(int key_id, int value_id,
std::unique_ptr<Type> key_type,
std::unique_ptr<Type>
value_type) {
+ // key is always required
auto key_field =
- std::make_unique<NestedField>(true, key_id, "key",
std::move(key_type), std::nullopt);
+ std::make_unique<NestedField>(false, key_id, "key",
std::move(key_type), std::nullopt);
auto value_field = std::make_unique<NestedField>(true, value_id, "value",
std::move(value_type),
std::nullopt);
return std::unique_ptr<MapType>(new MapType(std::move(key_field),
std::move(value_field)));
diff --git a/be/src/vec/exec/format/table/iceberg/types.h
b/be/src/vec/exec/format/table/iceberg/types.h
index f5262b36f55..91a2f705df0 100644
--- a/be/src/vec/exec/format/table/iceberg/types.h
+++ b/be/src/vec/exec/format/table/iceberg/types.h
@@ -265,6 +265,10 @@ public:
ss << "decimal(" << precision << ", " << scale << ")";
return ss.str();
}
+
+ int get_precision() const { return precision; }
+
+ int get_scale() const { return scale; }
};
class BinaryType : public PrimitiveType {
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp
b/be/src/vec/runtime/vparquet_transformer.cpp
index f0810d6c7ce..86ca54909f7 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -65,6 +65,7 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
+#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/function_helpers.h"
@@ -201,21 +202,20 @@ void
ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
}
}
-VParquetTransformer::VParquetTransformer(RuntimeState* state,
doris::io::FileWriter* file_writer,
- const VExprContextSPtrs&
output_vexpr_ctxs,
- std::vector<std::string> column_names,
- TParquetCompressionType::type
compression_type,
- bool parquet_disable_dictionary,
- TParquetVersion::type parquet_version,
- bool output_object_data,
- const std::string*
iceberg_schema_json)
+VParquetTransformer::VParquetTransformer(
+ RuntimeState* state, doris::io::FileWriter* file_writer,
+ const VExprContextSPtrs& output_vexpr_ctxs, std::vector<std::string>
column_names,
+ TParquetCompressionType::type compression_type, bool
parquet_disable_dictionary,
+ TParquetVersion::type parquet_version, bool output_object_data,
+ const std::string* iceberg_schema_json, const iceberg::Schema*
iceberg_schema)
: VFileFormatTransformer(state, output_vexpr_ctxs, output_object_data),
_column_names(std::move(column_names)),
_parquet_schemas(nullptr),
_compression_type(compression_type),
_parquet_disable_dictionary(parquet_disable_dictionary),
_parquet_version(parquet_version),
- _iceberg_schema_json(iceberg_schema_json) {
+ _iceberg_schema_json(iceberg_schema_json),
+ _iceberg_schema(iceberg_schema) {
_outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
}
@@ -233,6 +233,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState*
state, doris::io::FileWri
_parquet_disable_dictionary(parquet_disable_dictionary),
_parquet_version(parquet_version),
_iceberg_schema_json(iceberg_schema_json) {
+ _iceberg_schema = nullptr;
_outstream = std::shared_ptr<ParquetOutputStream>(new
ParquetOutputStream(file_writer));
}
@@ -264,21 +265,27 @@ Status VParquetTransformer::_parse_properties() {
Status VParquetTransformer::_parse_schema() {
std::vector<std::shared_ptr<arrow::Field>> fields;
- for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
- std::shared_ptr<arrow::DataType> type;
-
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(),
&type,
- _state->timezone()));
- if (_parquet_schemas != nullptr) {
- std::shared_ptr<arrow::Field> field =
-
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
- _output_vexpr_ctxs[i]->root()->is_nullable());
- fields.emplace_back(field);
- } else {
- std::shared_ptr<arrow::Field> field = arrow::field(
- _column_names[i], type,
_output_vexpr_ctxs[i]->root()->is_nullable());
- fields.emplace_back(field);
+ if (_iceberg_schema != nullptr) {
+ RETURN_IF_ERROR(
+ iceberg::ArrowSchemaUtil::convert(_iceberg_schema,
_state->timezone(), fields));
+ } else {
+ for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+ std::shared_ptr<arrow::DataType> type;
+
RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(),
&type,
+ _state->timezone()));
+ if (_parquet_schemas != nullptr) {
+ std::shared_ptr<arrow::Field> field =
+
arrow::field(_parquet_schemas->operator[](i).schema_column_name, type,
+
_output_vexpr_ctxs[i]->root()->is_nullable());
+ fields.emplace_back(field);
+ } else {
+ std::shared_ptr<arrow::Field> field = arrow::field(
+ _column_names[i], type,
_output_vexpr_ctxs[i]->root()->is_nullable());
+ fields.emplace_back(field);
+ }
}
}
+
if (_iceberg_schema_json != nullptr) {
std::shared_ptr<arrow::KeyValueMetadata> schema_metadata =
arrow::KeyValueMetadata::Make({"iceberg.schema"},
{*_iceberg_schema_json});
diff --git a/be/src/vec/runtime/vparquet_transformer.h
b/be/src/vec/runtime/vparquet_transformer.h
index 9eae25d8ac4..03c9aeb0816 100644
--- a/be/src/vec/runtime/vparquet_transformer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -27,6 +27,7 @@
#include <parquet/types.h>
#include <stdint.h>
+#include "vec/exec/format/table/iceberg/schema.h"
#include "vfile_format_transformer.h"
namespace doris {
@@ -94,7 +95,8 @@ public:
std::vector<std::string> column_names,
TParquetCompressionType::type compression_type,
bool parquet_disable_dictionary, TParquetVersion::type
parquet_version,
- bool output_object_data, const std::string*
iceberg_schema_json = nullptr);
+ bool output_object_data, const std::string*
iceberg_schema_json = nullptr,
+ const iceberg::Schema* iceberg_schema = nullptr);
VParquetTransformer(RuntimeState* state, doris::io::FileWriter*
file_writer,
const VExprContextSPtrs& output_vexpr_ctxs,
@@ -131,6 +133,7 @@ private:
const TParquetVersion::type _parquet_version;
const std::string* _iceberg_schema_json;
uint64_t _write_size = 0;
+ const iceberg::Schema* _iceberg_schema;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index 30cd7f20316..f3691f8c621 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -83,7 +83,7 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
_file_format_transformer.reset(new VParquetTransformer(
state, _file_writer.get(), _write_output_expr_ctxs,
_write_column_names,
parquet_compression_type, parquet_disable_dictionary,
TParquetVersion::PARQUET_1_0,
- false, _iceberg_schema_json));
+ false, _iceberg_schema_json, &_schema));
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
diff --git a/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp
b/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp
new file mode 100644
index 00000000000..b5f61c9d2e3
--- /dev/null
+++ b/be/test/vec/exec/format/table/iceberg/arrow_schema_util_test.cpp
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/iceberg/arrow_schema_util.h"
+
+#include <arrow/api.h>
+#include <arrow/io/api.h>
+#include <arrow/status.h>
+#include <arrow/type.h>
+#include <arrow/util/key_value_metadata.h>
+#include <gtest/gtest.h>
+#include <parquet/api/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/schema.h>
+
+#include "io/fs/local_file_system.h"
+#include "vec/exec/format/table/iceberg/schema.h"
+#include "vec/exec/format/table/iceberg/schema_parser.h"
+
+namespace doris {
+namespace iceberg {
+
+class ArrowSchemaUtilTest : public testing::Test {
+public:
+ ArrowSchemaUtilTest() = default;
+ virtual ~ArrowSchemaUtilTest() = default;
+};
+
+const std::string_view pfid = "PARQUET:field_id";
+
+TEST(ArrowSchemaUtilTest, test_simple_field) {
+ std::vector<NestedField> nested_fields;
+ nested_fields.reserve(2);
+ NestedField field1(false, 1, "field1", std::make_unique<IntegerType>(),
std::nullopt);
+ NestedField field2(false, 2, "field2", std::make_unique<StringType>(),
std::nullopt);
+ nested_fields.emplace_back(std::move(field1));
+ nested_fields.emplace_back(std::move(field2));
+
+ Schema schema(1, std::move(nested_fields));
+
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ Status st;
+ st = ArrowSchemaUtil::convert(&schema, "utc", fields);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(2, fields.size());
+ EXPECT_EQ("field1", fields[0]->name());
+ EXPECT_EQ("field2", fields[1]->name());
+ EXPECT_TRUE(fields[0]->HasMetadata());
+ EXPECT_TRUE(fields[1]->HasMetadata());
+ EXPECT_EQ("1", fields[0]->metadata()->Get(pfid).ValueUnsafe());
+ EXPECT_EQ("2", fields[1]->metadata()->Get(pfid).ValueUnsafe());
+}
+
+TEST(ArrowSchemaUtilTest, test_stuct_field) {
+ // struct_json comes from :
+ // Schema schema = new Schema(
+ // Types.NestedField.optional(
+ // 21, "st_col", Types.StructType.of(
+ // Types.NestedField.optional(32, "st_col_c1",
Types.IntegerType.get()),
+ // Types.NestedField.optional(43, "st_col_c2",
Types.StringType.get())
+ // )
+ // )
+ // );
+ // StringWriter writer = new StringWriter();
+ // JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+ // SchemaParser.toJson(schema.asStruct(), generator);
+ // generator.flush();
+ // System.out.println(writer.toString());
+
+ const std::string struct_json = R"({
+ "type": "struct",
+ "fields": [
+ {
+ "id": 21,
+ "name": "st_col",
+ "required": false,
+ "type": {
+ "type": "struct",
+ "fields": [
+ {
+ "id": 32,
+ "name": "st_col_c1",
+ "required": false,
+ "type": "int"
+ },
+ {
+ "id": 43,
+ "name": "st_col_c2",
+ "required": false,
+ "type": "string"
+ }
+ ]
+ }
+ }
+ ]
+ })";
+ std::unique_ptr<Schema> schema = SchemaParser::from_json(struct_json);
+
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ Status st;
+ st = ArrowSchemaUtil::convert(schema.get(), "utc", fields);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(1, fields.size());
+ EXPECT_EQ("st_col", fields[0]->name());
+ EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe());
+
+ arrow::StructType* arrow_struct =
dynamic_cast<arrow::StructType*>(fields[0]->type().get());
+ auto map_fields = arrow_struct->fields();
+ EXPECT_EQ(2, arrow_struct->fields().size());
+ EXPECT_EQ("st_col_c1", map_fields.at(0).get()->name());
+ EXPECT_EQ("st_col_c2", map_fields.at(1).get()->name());
+ EXPECT_EQ("32",
map_fields.at(0).get()->metadata()->Get(pfid).ValueUnsafe());
+ EXPECT_EQ("43",
map_fields.at(1).get()->metadata()->Get(pfid).ValueUnsafe());
+}
+
+TEST(ArrowSchemaUtilTest, test_map_field) {
+ // map_json comes from :
+ // Schema schema = new Schema(
+ // Types.NestedField.optional(
+ // 21, "map_col", Types.MapType.ofOptional(
+ // 32, 43, Types.IntegerType.get(), Types.StringType.get()
+ // )
+ // )
+ // );
+ // StringWriter writer = new StringWriter();
+ // JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+ // SchemaParser.toJson(schema.asStruct(), generator);
+ // generator.flush();
+ // System.out.println(writer.toString());
+
+ const std::string map_json = R"({
+ "type": "struct",
+ "fields": [
+ {
+ "id": 21,
+ "name": "map_col",
+ "required": false,
+ "type": {
+ "type": "map",
+ "key-id": 32,
+ "key": "int",
+ "value-id": 43,
+ "value": "string",
+ "value-required": false
+ }
+ }
+ ]
+ })";
+ std::unique_ptr<Schema> schema = SchemaParser::from_json(map_json);
+
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ Status st;
+ st = ArrowSchemaUtil::convert(schema.get(), "utc", fields);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(1, fields.size());
+ EXPECT_EQ("map_col", fields[0]->name());
+ EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe());
+
+ arrow::MapType* arrow_map =
dynamic_cast<arrow::MapType*>(fields[0]->type().get());
+ auto map_fields = arrow_map->fields();
+ EXPECT_EQ(1, arrow_map->fields().size());
+ EXPECT_EQ("key", arrow_map->key_field()->name());
+ EXPECT_EQ("value", arrow_map->item_field()->name());
+ EXPECT_EQ("32",
arrow_map->key_field()->metadata()->Get(pfid).ValueUnsafe());
+ EXPECT_EQ("43",
arrow_map->item_field()->metadata()->Get(pfid).ValueUnsafe());
+}
+
+TEST(ArrowSchemaUtilTest, test_list_field) {
+ // list_json comes from :
+ // Schema schema = new Schema(
+ // Types.NestedField.optional(
+ // 21, "list_col", Types.ListType.ofOptional(
+ // 32, Types.IntegerType.get())));
+ // StringWriter writer = new StringWriter();
+ // JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
+ // SchemaParser.toJson(schema.asStruct(), generator);
+ // generator.flush();
+ // System.out.println(writer.toString());
+
+ const std::string list_json = R"({
+ "type": "struct",
+ "fields": [
+ {
+ "id": 21,
+ "name": "list_col",
+ "required": false,
+ "type": {
+ "type": "list",
+ "element-id": 32,
+ "element": "int",
+ "element-required": false
+ }
+ }
+ ]
+ })";
+ std::unique_ptr<Schema> schema = SchemaParser::from_json(list_json);
+
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ Status st;
+ st = ArrowSchemaUtil::convert(schema.get(), "utc", fields);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(1, fields.size());
+ EXPECT_EQ("list_col", fields[0]->name());
+ EXPECT_EQ("21", fields[0]->metadata()->Get(pfid).ValueUnsafe());
+
+ arrow::ListType* arrow_list =
dynamic_cast<arrow::ListType*>(fields[0]->type().get());
+ auto map_fields = arrow_list->fields();
+ EXPECT_EQ(1, arrow_list->fields().size());
+ EXPECT_EQ("element", arrow_list->value_field()->name());
+ EXPECT_EQ("32",
arrow_list->value_field()->metadata()->Get(pfid).ValueUnsafe());
+}
+
+TEST(ArrowSchemaUtilTest, test_parquet_filed_id) {
+ std::string test_dir = "ut_dir/test_parquet_filed_id";
+ Status st;
+ st = io::global_local_filesystem()->delete_directory(test_dir);
+ ASSERT_TRUE(st.ok()) << st;
+ st = io::global_local_filesystem()->create_directory(test_dir);
+ ASSERT_TRUE(st.ok()) << st;
+
+ std::shared_ptr<arrow::Array> id_array;
+ std::shared_ptr<arrow::Array> name_array;
+
+ arrow::Int32Builder id_builder;
+ ASSERT_TRUE(id_builder.Append(1).ok());
+ ASSERT_TRUE(id_builder.Append(2).ok());
+ ASSERT_TRUE(id_builder.Append(3).ok());
+ auto&& result_id = id_builder.Finish();
+ ASSERT_TRUE(result_id.ok());
+ id_array = std::move(result_id).ValueUnsafe();
+
+ arrow::StringBuilder name_builder;
+ ASSERT_TRUE(name_builder.Append("Alice").ok());
+ ASSERT_TRUE(name_builder.Append("Bob").ok());
+ ASSERT_TRUE(name_builder.Append("Charlie").ok());
+ auto&& result_name = name_builder.Finish();
+ ASSERT_TRUE(result_name.ok());
+ name_array = std::move(result_name).ValueUnsafe();
+
+ // 定义表的 Schema
+ std::vector<NestedField> nested_fields;
+ nested_fields.reserve(2);
+ NestedField field1(false, 17, "field_1", std::make_unique<IntegerType>(),
std::nullopt);
+ NestedField field2(false, 36, "field_2", std::make_unique<StringType>(),
std::nullopt);
+ nested_fields.emplace_back(std::move(field1));
+ nested_fields.emplace_back(std::move(field2));
+
+ Schema schema(1, std::move(nested_fields));
+
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ st = ArrowSchemaUtil::convert(&schema, "utc", fields);
+ auto arrow_schema = arrow::schema(fields);
+
+ // create arrow table
+ auto table = arrow::Table::Make(arrow_schema, {id_array, name_array});
+
+ std::string file_path = test_dir + "/f1.parquet";
+ std::shared_ptr<arrow::io::FileOutputStream> outfile;
+ auto&& result_file = arrow::io::FileOutputStream::Open(file_path);
+ ASSERT_TRUE(result_file.ok());
+ outfile = std::move(result_file).ValueUnsafe();
+
+ // arrow table to parquet file
+ PARQUET_THROW_NOT_OK(
+ parquet::arrow::WriteTable(*table, arrow::default_memory_pool(),
outfile, 1024));
+
+ // open parquet with parquet's API
+ std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+ parquet::ParquetFileReader::OpenFile(file_path, false);
+
+ // get MessageType
+ std::shared_ptr<parquet::FileMetaData> file_metadata =
parquet_reader->metadata();
+ auto schema_descriptor = file_metadata->schema();
+ const parquet::schema::Node& root = *schema_descriptor->group_node();
+ const auto& group_node = static_cast<const
parquet::schema::GroupNode&>(root);
+
+ EXPECT_EQ(2, group_node.field_count());
+ auto filed1 = group_node.field(0);
+ auto filed2 = group_node.field(1);
+ EXPECT_EQ("field_1", filed1->name());
+ EXPECT_EQ(17, filed1->field_id());
+ EXPECT_EQ("field_2", filed2->name());
+ EXPECT_EQ(36, filed2->field_id());
+
+ st = io::global_local_filesystem()->delete_directory(test_dir);
+ EXPECT_TRUE(st.ok()) << st;
+}
+
+} // namespace iceberg
+} // namespace doris
diff --git a/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp
b/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp
index 4c3f58cdd10..f464525a7f9 100644
--- a/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp
+++ b/be/test/vec/exec/format/table/iceberg/schema_parser_test.cpp
@@ -78,6 +78,15 @@ const std::string valid_map_json = R"({
"value-required": true
})";
+const std::string valid_map_json2 = R"({
+ "type": "map",
+ "key-id": 4,
+ "key": "string",
+ "value-id": 5,
+ "value": "int",
+ "value-required": false
+})";
+
const std::string nested_list_json = R"({
"type": "list",
"element-id": 6,
@@ -209,6 +218,21 @@ TEST(SchemaParserTest, parse_valid_map) {
SchemaParser::_type_from_json(rapidjson::Document().Parse(valid_map_json.c_str()));
ASSERT_NE(type, nullptr);
EXPECT_EQ(type->to_string(), "map<string, int>");
+ EXPECT_TRUE(type->is_map_type());
+ MapType* mt = type->as_map_type();
+ EXPECT_TRUE(mt->field(4)->is_required());
+ EXPECT_TRUE(mt->field(5)->is_required());
+}
+
+TEST(SchemaParserTest, parse_valid_map2) {
+ std::unique_ptr<Type> type =
+
SchemaParser::_type_from_json(rapidjson::Document().Parse(valid_map_json2.c_str()));
+ ASSERT_NE(type, nullptr);
+ EXPECT_EQ(type->to_string(), "map<string, int>");
+ EXPECT_TRUE(type->is_map_type());
+ MapType* mt = type->as_map_type();
+ EXPECT_TRUE(mt->field(4)->is_required());
+ EXPECT_TRUE(mt->field(5)->is_optional());
}
TEST(SchemaParserTest, parse_nested_list) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]