This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 860f7d057ac [feat](iceberg)Supports using `rest` type catalog to read
tables in unity catalog for 3.0 (#43525) (#45161)
860f7d057ac is described below
commit 860f7d057ac11bae3d77ddf5b544defac16b8b12
Author: wuwenchi <[email protected]>
AuthorDate: Mon Dec 9 08:48:57 2024 +0800
[feat](iceberg)Supports using `rest` type catalog to read tables in unity
catalog for 3.0 (#43525) (#45161)
pb: #43525
---
be/src/io/fs/file_system.cpp | 34 +++++++----
be/src/io/fs/file_system.h | 2 +-
be/src/io/fs/local_file_system.cpp | 50 ++++++++++++++++
be/src/io/fs/local_file_system.h | 6 +-
be/src/io/fs/remote_file_system.cpp | 10 +++-
be/src/io/fs/remote_file_system.h | 8 ++-
be/src/io/fs/s3_file_system.h | 7 ++-
be/src/vec/exec/format/parquet/schema_desc.cpp | 17 ++++++
be/src/vec/exec/format/parquet/schema_desc.h | 8 +++
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 6 +-
be/src/vec/exec/format/parquet/vparquet_reader.h | 2 +-
be/src/vec/exec/format/table/iceberg_reader.cpp | 48 +++++----------
be/src/vec/exec/format/table/iceberg_reader.h | 2 +-
be/test/io/fs/local_file_system_test.cpp | 50 ++++++++++++++++
.../apache/doris/datasource/ExternalCatalog.java | 6 ++
.../doris/datasource/hive/HMSExternalCatalog.java | 20 ++++---
.../datasource/iceberg/IcebergExternalCatalog.java | 3 +-
.../datasource/iceberg/IcebergMetadataCache.java | 14 ++---
.../datasource/iceberg/IcebergMetadataOps.java | 69 ++++++++++++++++------
.../iceberg/IcebergRestExternalCatalog.java | 2 -
.../datasource/iceberg/IcebergTransaction.java | 3 +-
.../operations/ExternalMetadataOperations.java | 4 +-
.../datasource/operations/ExternalMetadataOps.java | 4 ++
.../iceberg/iceberg_read_unitycatalog_table.out | 40 +++++++++++++
.../iceberg/iceberg_read_unitycatalog_table.groovy | 62 +++++++++++++++++++
25 files changed, 376 insertions(+), 101 deletions(-)
diff --git a/be/src/io/fs/file_system.cpp b/be/src/io/fs/file_system.cpp
index 3579a5323d9..e6b5ef7df1a 100644
--- a/be/src/io/fs/file_system.cpp
+++ b/be/src/io/fs/file_system.cpp
@@ -25,58 +25,70 @@ namespace io {
Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
const FileWriterOptions* opts) {
- auto path = absolute_path(file);
+ Path path;
+ RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(create_file_impl(path, writer, opts));
}
Status FileSystem::open_file(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions* opts) {
- auto path = absolute_path(file);
+ Path path;
+ RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(open_file_impl(path, reader, opts));
}
Status FileSystem::create_directory(const Path& dir, bool failed_if_exists) {
- auto path = absolute_path(dir);
+ Path path;
+ RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(create_directory_impl(path, failed_if_exists));
}
Status FileSystem::delete_file(const Path& file) {
- auto path = absolute_path(file);
+ Path path;
+ RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(delete_file_impl(path));
}
Status FileSystem::delete_directory(const Path& dir) {
- auto path = absolute_path(dir);
+ Path path;
+ RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(delete_directory_impl(path));
}
Status FileSystem::batch_delete(const std::vector<Path>& files) {
std::vector<Path> abs_files;
for (auto& file : files) {
- abs_files.push_back(absolute_path(file));
+ Path abs_file;
+ RETURN_IF_ERROR(absolute_path(file, abs_file));
+ abs_files.push_back(abs_file);
}
FILESYSTEM_M(batch_delete_impl(abs_files));
}
Status FileSystem::exists(const Path& path, bool* res) const {
- auto fs_path = absolute_path(path);
+ Path fs_path;
+ RETURN_IF_ERROR(absolute_path(path, fs_path));
FILESYSTEM_M(exists_impl(fs_path, res));
}
Status FileSystem::file_size(const Path& file, int64_t* file_size) const {
- auto path = absolute_path(file);
+ Path path;
+ RETURN_IF_ERROR(absolute_path(file, path));
FILESYSTEM_M(file_size_impl(path, file_size));
}
Status FileSystem::list(const Path& dir, bool only_file,
std::vector<FileInfo>* files,
bool* exists) {
- auto path = absolute_path(dir);
+ Path path;
+ RETURN_IF_ERROR(absolute_path(dir, path));
FILESYSTEM_M(list_impl(path, only_file, files, exists));
}
Status FileSystem::rename(const Path& orig_name, const Path& new_name) {
- auto orig_path = absolute_path(orig_name);
- auto new_path = absolute_path(new_name);
+ Path orig_path;
+ RETURN_IF_ERROR(absolute_path(orig_name, orig_path));
+ Path new_path;
+ RETURN_IF_ERROR(absolute_path(new_name, new_path));
FILESYSTEM_M(rename_impl(orig_path, new_path));
}
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index a8ccc8756bb..6baf07917d3 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -163,7 +163,7 @@ protected:
// FIMXE(plat1ko): The implementation and semantics of this function are
not completely
// consistent, which is confused.
- virtual Path absolute_path(const Path& path) const = 0;
+ virtual Status absolute_path(const Path& path, Path& abs_path) const = 0;
FileSystem(std::string id, FileSystemType type) : _id(std::move(id)),
_type(type) {}
diff --git a/be/src/io/fs/local_file_system.cpp
b/be/src/io/fs/local_file_system.cpp
index 0107ed57dc8..9270d919a37 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -471,4 +471,54 @@ Status LocalFileSystem::permission_impl(const Path& file,
std::filesystem::perms
return Status::OK();
}
+Status LocalFileSystem::convert_to_abs_path(const Path& input_path_str, Path&
abs_path) {
+ // valid path include:
+ // 1. abc/def will return abc/def
+ // 2. /abc/def will return /abc/def
+ // 3. file:/abc/def will return /abc/def
+ // 4. file://<authority>/abc/def will return /abc/def
+ std::string path_str = input_path_str;
+ size_t slash = path_str.find('/');
+ if (slash == 0) {
+ abs_path = input_path_str;
+ return Status::OK();
+ }
+
+ // Initialize scheme and authority
+ std::string scheme;
+ size_t start = 0;
+
+ // Parse URI scheme
+ size_t colon = path_str.find(':');
+ if (colon != std::string::npos && (slash == std::string::npos || colon <
slash)) {
+ // Has a scheme
+ scheme = path_str.substr(0, colon);
+ if (scheme != "file") {
+ return Status::InternalError(
+ "Only supports `file` type scheme, like 'file:///path',
'file:/path'.");
+ }
+ start = colon + 1;
+ }
+
+ // Parse URI authority, if any
+ if (path_str.compare(start, 2, "//") == 0 && path_str.length() - start >
2) {
+ // Has authority
+ // such as : path_str = "file://authority/abc/def"
+ // and now : start = 5
+ size_t next_slash = path_str.find('/', start + 2);
+ // now : next_slash = 16
+ if (next_slash == std::string::npos) {
+ return Status::InternalError(
+ "This input string only has authority, but has no path
information");
+ }
+ // We will skit authority
+ // now : start = 16
+ start = next_slash;
+ }
+
+ // URI path is the rest of the string
+ abs_path = path_str.substr(start);
+ return Status::OK();
+}
+
} // namespace doris::io
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index c6295b0bae1..4540df47c16 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -34,6 +34,8 @@ class LocalFileSystem final : public FileSystem {
public:
~LocalFileSystem() override;
+ static Status convert_to_abs_path(const Path& path, Path& abs_path);
+
/// hard link dest file to src file
Status link_file(const Path& src, const Path& dest);
@@ -104,7 +106,9 @@ private:
// `LocalFileSystem` always use absolute path as arguments
// FIXME(plat1ko): Eliminate this method
- Path absolute_path(const Path& path) const override { return path; }
+ Status absolute_path(const Path& path, Path& abs_path) const override {
+ return convert_to_abs_path(path, abs_path);
+ }
friend const std::shared_ptr<LocalFileSystem>& global_local_filesystem();
};
diff --git a/be/src/io/fs/remote_file_system.cpp
b/be/src/io/fs/remote_file_system.cpp
index 2b6af2af046..fd793f60cdc 100644
--- a/be/src/io/fs/remote_file_system.cpp
+++ b/be/src/io/fs/remote_file_system.cpp
@@ -29,7 +29,8 @@
namespace doris::io {
Status RemoteFileSystem::upload(const Path& local_file, const Path& dest_file)
{
- auto dest_path = absolute_path(dest_file);
+ Path dest_path;
+ RETURN_IF_ERROR(absolute_path(dest_file, dest_path));
FILESYSTEM_M(upload_impl(local_file, dest_path));
}
@@ -37,13 +38,16 @@ Status RemoteFileSystem::batch_upload(const
std::vector<Path>& local_files,
const std::vector<Path>& remote_files) {
std::vector<Path> remote_paths;
for (auto& path : remote_files) {
- remote_paths.push_back(absolute_path(path));
+ Path abs_path;
+ RETURN_IF_ERROR(absolute_path(path, abs_path));
+ remote_paths.push_back(abs_path);
}
FILESYSTEM_M(batch_upload_impl(local_files, remote_paths));
}
Status RemoteFileSystem::download(const Path& remote_file, const Path& local) {
- auto remote_path = absolute_path(remote_file);
+ Path remote_path;
+ RETURN_IF_ERROR(absolute_path(remote_file, remote_path));
FILESYSTEM_M(download_impl(remote_path, local));
}
diff --git a/be/src/io/fs/remote_file_system.h
b/be/src/io/fs/remote_file_system.h
index e9472140ab7..de0a1b71519 100644
--- a/be/src/io/fs/remote_file_system.h
+++ b/be/src/io/fs/remote_file_system.h
@@ -64,11 +64,13 @@ protected:
virtual Status open_file_internal(const Path& file, FileReaderSPtr* reader,
const FileReaderOptions& opts) = 0;
- Path absolute_path(const Path& path) const override {
+ Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.is_absolute()) {
- return path;
+ abs_path = path;
+ } else {
+ abs_path = _root_path / path;
}
- return _root_path / path;
+ return Status::OK();
}
Path _root_path;
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index d1e8b5b6e31..61967a63e44 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -113,16 +113,17 @@ protected:
const std::vector<Path>& remote_files) override;
Status download_impl(const Path& remote_file, const Path& local_file)
override;
- Path absolute_path(const Path& path) const override {
+ Status absolute_path(const Path& path, Path& abs_path) const override {
if (path.string().find("://") != std::string::npos) {
// the path is with schema, which means this is a full path like:
// s3://bucket/path/to/file.txt
// so no need to concat with prefix
- return path;
+ abs_path = path;
} else {
// path with no schema
- return _root_path / path;
+ abs_path = _root_path / path;
}
+ return Status::OK();
}
private:
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp
b/be/src/vec/exec/format/parquet/schema_desc.cpp
index de879706264..1eae65b1a4d 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -137,6 +137,9 @@ Status FieldDescriptor::parse_from_thrift(const
std::vector<tparquet::SchemaElem
return Status::InvalidArgument("Duplicated field name: {}",
_fields[i].name);
}
_name_to_field.emplace(_fields[i].name, &_fields[i]);
+ if (_fields[i].field_id != -1) {
+ _field_id_name_mapping.emplace(_fields[i].field_id,
_fields[i].name);
+ }
}
if (_next_schema_pos != t_schemas.size()) {
@@ -147,6 +150,14 @@ Status FieldDescriptor::parse_from_thrift(const
std::vector<tparquet::SchemaElem
return Status::OK();
}
+const doris::Slice FieldDescriptor::get_column_name_from_field_id(int32_t id)
const {
+ auto const it = _field_id_name_mapping.find(id);
+ if (it == _field_id_name_mapping.end()) {
+ return {};
+ }
+ return {it->second.data()};
+}
+
Status FieldDescriptor::parse_node_field(const
std::vector<tparquet::SchemaElement>& t_schemas,
size_t curr_pos, FieldSchema*
node_field) {
if (curr_pos >= t_schemas.size()) {
@@ -172,6 +183,7 @@ Status FieldDescriptor::parse_node_field(const
std::vector<tparquet::SchemaEleme
node_field->type.add_sub_type(child->type);
node_field->is_nullable = false;
_next_schema_pos = curr_pos + 1;
+ node_field->field_id = t_schema.__isset.field_id ? t_schema.field_id :
-1;
} else {
bool is_optional = is_optional_node(t_schema);
if (is_optional) {
@@ -194,6 +206,7 @@ void FieldDescriptor::parse_physical_field(const
tparquet::SchemaElement& physic
auto type = get_doris_type(physical_schema);
physical_field->type = type.first;
physical_field->is_type_compatibility = type.second;
+ physical_field->field_id = physical_schema.__isset.field_id ?
physical_schema.field_id : -1;
}
std::pair<TypeDescriptor, bool> FieldDescriptor::get_doris_type(
@@ -465,6 +478,7 @@ Status FieldDescriptor::parse_group_field(const
std::vector<tparquet::SchemaElem
group_field->type.type = TYPE_ARRAY;
group_field->type.add_sub_type(struct_field->type);
group_field->is_nullable = false;
+ group_field->field_id = group_schema.__isset.field_id ?
group_schema.field_id : -1;
} else {
RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field));
}
@@ -533,6 +547,7 @@ Status FieldDescriptor::parse_list_field(const
std::vector<tparquet::SchemaEleme
list_field->type.type = TYPE_ARRAY;
list_field->type.add_sub_type(list_field->children[0].type);
list_field->is_nullable = is_optional;
+ list_field->field_id = first_level.__isset.field_id ? first_level.field_id
: -1;
return Status::OK();
}
@@ -597,6 +612,7 @@ Status FieldDescriptor::parse_map_field(const
std::vector<tparquet::SchemaElemen
map_field->type.add_sub_type(map_kv_field->type.children[0]);
map_field->type.add_sub_type(map_kv_field->type.children[1]);
map_field->is_nullable = is_optional;
+ map_field->field_id = map_schema.__isset.field_id ? map_schema.field_id :
-1;
return Status::OK();
}
@@ -619,6 +635,7 @@ Status FieldDescriptor::parse_struct_field(const
std::vector<tparquet::SchemaEle
struct_field->name = to_lower(struct_schema.name);
struct_field->is_nullable = is_optional;
struct_field->type.type = TYPE_STRUCT;
+ struct_field->field_id = struct_schema.__isset.field_id ?
struct_schema.field_id : -1;
for (int i = 0; i < num_children; ++i) {
struct_field->type.add_sub_type(struct_field->children[i].type,
struct_field->children[i].name);
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h
b/be/src/vec/exec/format/parquet/schema_desc.h
index ca726ef1b57..2593da837c3 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -28,6 +28,7 @@
#include "common/status.h"
#include "runtime/types.h"
+#include "util/slice.h"
namespace doris::vectorized {
@@ -56,6 +57,8 @@ struct FieldSchema {
~FieldSchema() = default;
FieldSchema(const FieldSchema& fieldSchema) = default;
std::string debug_string() const;
+
+ int32_t field_id;
};
class FieldDescriptor {
@@ -68,6 +71,7 @@ private:
std::unordered_map<std::string, const FieldSchema*> _name_to_field;
// Used in from_thrift, marking the next schema position that should be
parsed
size_t _next_schema_pos;
+ std::unordered_map<int, std::string> _field_id_name_mapping;
void parse_physical_field(const tparquet::SchemaElement& physical_schema,
bool is_nullable,
FieldSchema* physical_field);
@@ -128,6 +132,10 @@ public:
std::string debug_string() const;
int32_t size() const { return _fields.size(); }
+
+ bool has_parquet_field_id() const { return _field_id_name_mapping.size() >
0; }
+
+ const doris::Slice get_column_name_from_field_id(int32_t id) const;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6c4e4983c70..44522454846 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -253,10 +253,8 @@ Status ParquetReader::_open_file() {
return Status::OK();
}
-// Get iceberg col id to col name map stored in parquet metadata key values.
-// This is for iceberg schema evolution.
-std::vector<tparquet::KeyValue> ParquetReader::get_metadata_key_values() {
- return _t_metadata->key_value_metadata;
+const FieldDescriptor ParquetReader::get_file_metadata_schema() {
+ return _file_metadata->schema();
}
Status ParquetReader::open() {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index aceb621b825..00db2652382 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -148,7 +148,7 @@ public:
partition_columns,
const std::unordered_map<std::string, VExprContextSPtr>&
missing_columns) override;
- std::vector<tparquet::KeyValue> get_metadata_key_values();
+ const FieldDescriptor get_file_metadata_schema();
void set_table_to_file_col_map(std::unordered_map<std::string,
std::string>& map) {
_table_col_to_file_col = map;
}
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 295a3a40544..8f130ca6002 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -53,6 +53,7 @@
#include "vec/exec/format/format_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/orc/vorc_reader.h"
+#include "vec/exec/format/parquet/schema_desc.h"
#include "vec/exec/format/table/table_format_reader.h"
namespace cctz {
@@ -546,8 +547,8 @@ Status IcebergParquetReader::init_reader(
_col_id_name_map = col_id_name_map;
_file_col_names = file_col_names;
_colname_to_value_range = colname_to_value_range;
- auto parquet_meta_kv = parquet_reader->get_metadata_key_values();
- RETURN_IF_ERROR(_gen_col_name_maps(parquet_meta_kv));
+ FieldDescriptor field_desc = parquet_reader->get_file_metadata_schema();
+ RETURN_IF_ERROR(_gen_col_name_maps(field_desc));
_gen_file_col_names();
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
@@ -672,39 +673,20 @@ Status IcebergOrcReader::_read_position_delete_file(const
TFileRangeDesc* delete
* 1. col1_new -> col1
* 2. col1 -> col1_new
*/
-Status
IcebergParquetReader::_gen_col_name_maps(std::vector<tparquet::KeyValue>
parquet_meta_kv) {
- for (int i = 0; i < parquet_meta_kv.size(); ++i) {
- tparquet::KeyValue kv = parquet_meta_kv[i];
- if (kv.key == "iceberg.schema") {
- _has_iceberg_schema = true;
- std::string schema = kv.value;
- rapidjson::Document json;
- json.Parse(schema.c_str());
-
- if (json.HasMember("fields")) {
- rapidjson::Value& fields = json["fields"];
- if (fields.IsArray()) {
- for (int j = 0; j < fields.Size(); j++) {
- rapidjson::Value& e = fields[j];
- rapidjson::Value& id = e["id"];
- rapidjson::Value& name = e["name"];
- std::string name_string = name.GetString();
- transform(name_string.begin(), name_string.end(),
name_string.begin(),
- ::tolower);
- auto iter = _col_id_name_map.find(id.GetInt());
- if (iter != _col_id_name_map.end()) {
- _table_col_to_file_col.emplace(iter->second,
name_string);
- _file_col_to_table_col.emplace(name_string,
iter->second);
- if (name_string != iter->second) {
- _has_schema_change = true;
- }
- } else {
- _has_schema_change = true;
- }
- }
+Status IcebergParquetReader::_gen_col_name_maps(const FieldDescriptor&
field_desc) {
+ if (field_desc.has_parquet_field_id()) {
+ for (const auto& pair : _col_id_name_map) {
+ auto name_slice =
field_desc.get_column_name_from_field_id(pair.first);
+ if (name_slice.get_size() == 0) {
+ _has_schema_change = true;
+ } else {
+ auto name_string = name_slice.to_string();
+ _table_col_to_file_col.emplace(pair.second, name_string);
+ _file_col_to_table_col.emplace(name_string, pair.second);
+ if (name_string != pair.second) {
+ _has_schema_change = true;
}
}
- break;
}
}
return Status::OK();
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 04f64aad518..2e240f465b6 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -218,7 +218,7 @@ public:
parquet_reader->set_delete_rows(&_iceberg_delete_rows);
}
- Status _gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv);
+ Status _gen_col_name_maps(const FieldDescriptor& field_desc);
protected:
std::unique_ptr<GenericReader> _create_equality_reader(
diff --git a/be/test/io/fs/local_file_system_test.cpp
b/be/test/io/fs/local_file_system_test.cpp
index 0fd18445bea..c930ba72eab 100644
--- a/be/test/io/fs/local_file_system_test.cpp
+++ b/be/test/io/fs/local_file_system_test.cpp
@@ -417,4 +417,54 @@ TEST_F(LocalFileSystemTest, TestGlob) {
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(path).ok());
}
+TEST_F(LocalFileSystemTest, TestConvertToAbsPath) {
+ io::Path abs_path;
+ Status st;
+
+ // suppurt path:
+ st = doris::io::LocalFileSystem::convert_to_abs_path("/abc/def", abs_path);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ("/abc/def", abs_path);
+
+ st = doris::io::LocalFileSystem::convert_to_abs_path("file:/def/hij",
abs_path);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ("/def/hij", abs_path);
+
+ st =
doris::io::LocalFileSystem::convert_to_abs_path("file://host:80/hij/abc",
abs_path);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ("/hij/abc", abs_path);
+
+ st =
doris::io::LocalFileSystem::convert_to_abs_path("file://host/abc/def",
abs_path);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ("/abc/def", abs_path);
+
+ st = doris::io::LocalFileSystem::convert_to_abs_path("file:///def",
abs_path);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ("/def", abs_path);
+
+ st = doris::io::LocalFileSystem::convert_to_abs_path("file:///", abs_path);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ("/", abs_path);
+
+ st = doris::io::LocalFileSystem::convert_to_abs_path("file://auth/",
abs_path);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ("/", abs_path);
+
+ st = doris::io::LocalFileSystem::convert_to_abs_path("abc", abs_path);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ("abc", abs_path);
+
+ // not support path:
+ st = doris::io::LocalFileSystem::convert_to_abs_path("file://auth",
abs_path);
+ ASSERT_TRUE(!st.ok());
+
+ st = doris::io::LocalFileSystem::convert_to_abs_path("fileee:/abc",
abs_path);
+ ASSERT_TRUE(!st.ok());
+
+ st = doris::io::LocalFileSystem::convert_to_abs_path("hdfs:///abc",
abs_path);
+ ASSERT_TRUE(!st.ok());
+
+ st = doris::io::LocalFileSystem::convert_to_abs_path("hdfs:/abc",
abs_path);
+ ASSERT_TRUE(!st.ok());
+}
} // namespace doris
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 23e6d68954e..cde08113373 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -39,6 +39,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.es.EsExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
@@ -149,6 +150,7 @@ public abstract class ExternalCatalog
protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
+ protected PreExecutionAuthenticator preExecutionAuthenticator;
public ExternalCatalog() {
}
@@ -935,4 +937,8 @@ public abstract class ExternalCatalog
tableAutoAnalyzePolicy.put(key, policy);
}
}
+
+ public PreExecutionAuthenticator getPreExecutionAuthenticator() {
+ return preExecutionAuthenticator;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 20b9482041d..85b999f1111 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopAuthenticator;
+import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
@@ -34,6 +35,7 @@ import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
@@ -88,7 +90,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
private boolean enableHmsEventsIncrementalSync = false;
//for "type" = "hms" , but is iceberg table.
- private HiveCatalog icebergHiveCatalog;
+ private IcebergMetadataOps icebergMetadataOps;
@VisibleForTesting
public HMSExternalCatalog() {
@@ -168,6 +170,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
@Override
protected void initLocalObjectsImpl() {
+ preExecutionAuthenticator = new PreExecutionAuthenticator();
if (authenticator == null) {
AuthenticationConfig config =
AuthenticationConfig.getKerberosConfig(getConfiguration());
authenticator = HadoopAuthenticator.getHadoopAuthenticator(config);
@@ -199,8 +202,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
transactionManager =
TransactionManagerFactory.createHiveTransactionManager(hiveOps,
fileSystemProvider,
fileSystemExecutor);
metadataOps = hiveOps;
-
- icebergHiveCatalog = IcebergUtils.createIcebergHiveCatalog(this,
getName());
}
@Override
@@ -337,10 +338,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
return enableHmsEventsIncrementalSync;
}
- public HiveCatalog getIcebergHiveCatalog() {
- return icebergHiveCatalog;
- }
-
/**
* Enum for meta tables in hive catalog.
* eg: tbl$partitions
@@ -393,5 +390,14 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
}
}
+
+ public IcebergMetadataOps getIcebergMetadataOps() {
+ makeSureInitialized();
+ if (icebergMetadataOps == null) {
+ HiveCatalog icebergHiveCatalog =
IcebergUtils.createIcebergHiveCatalog(this, getName());
+ icebergMetadataOps =
ExternalMetadataOperations.newIcebergMetadataOps(this, icebergHiveCatalog);
+ }
+ return icebergMetadataOps;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index d8dfd1c128f..0fa69825a01 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -40,11 +40,10 @@ public abstract class IcebergExternalCatalog extends
ExternalCatalog {
public static final String ICEBERG_HADOOP = "hadoop";
public static final String ICEBERG_GLUE = "glue";
public static final String ICEBERG_DLF = "dlf";
+ public static final String EXTERNAL_CATALOG_NAME = "external_catalog.name";
protected String icebergCatalogType;
protected Catalog catalog;
- protected PreExecutionAuthenticator preExecutionAuthenticator;
-
public IcebergExternalCatalog(long catalogId, String name, String comment)
{
super(catalogId, name, InitCatalogLog.Type.ICEBERG, comment);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index c1ac2a79754..ad347ca78f2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -36,8 +36,6 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.TableIdentifier;
import org.jetbrains.annotations.NotNull;
import java.util.HashMap;
@@ -104,18 +102,16 @@ public class IcebergMetadataCache {
@NotNull
private Table loadTable(IcebergMetadataCacheKey key) {
- Catalog icebergCatalog;
+ IcebergMetadataOps ops;
if (key.catalog instanceof HMSExternalCatalog) {
- icebergCatalog = ((HMSExternalCatalog)
key.catalog).getIcebergHiveCatalog();
+ ops = ((HMSExternalCatalog) key.catalog).getIcebergMetadataOps();
} else if (key.catalog instanceof IcebergExternalCatalog) {
- icebergCatalog = ((IcebergExternalCatalog)
key.catalog).getCatalog();
+ ops = (IcebergMetadataOps) (((IcebergExternalCatalog)
key.catalog).getMetadataOps());
} else {
throw new RuntimeException("Only support 'hms' and 'iceberg' type
for iceberg table");
}
- Table icebergTable =
HiveMetaStoreClientHelper.ugiDoAs(((ExternalCatalog)
key.catalog).getConfiguration(),
- () -> icebergCatalog.loadTable(TableIdentifier.of(key.dbName,
key.tableName)));
- initIcebergTableFileIO(icebergTable, key.catalog.getProperties());
- return icebergTable;
+ return HiveMetaStoreClientHelper.ugiDoAs(((ExternalCatalog)
key.catalog).getConfiguration(),
+ () -> ops.loadTable(key.dbName, key.tableName));
}
public void invalidateCatalogCache(long catalogId) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 59729ddb47a..970814b7acd 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -36,6 +36,7 @@ import
org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -46,29 +47,40 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class IcebergMetadataOps implements ExternalMetadataOps {
private static final Logger LOG =
LogManager.getLogger(IcebergMetadataOps.class);
protected Catalog catalog;
- protected IcebergExternalCatalog dorisCatalog;
+ protected ExternalCatalog dorisCatalog;
protected SupportsNamespaces nsCatalog;
private PreExecutionAuthenticator preExecutionAuthenticator;
+ // Generally, there should be only two levels under the catalog, namely
<database>.<table>,
+ // but the REST type catalog is obtained from an external server,
+ // and the level provided by the external server may be three levels,
<catalog>.<database>.<table>.
+ // Therefore, if the external server provides a catalog,
+ // the catalog needs to be recorded here to ensure semantic consistency.
+ private Optional<String> externalCatalogName = Optional.empty();
- public IcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog
catalog) {
+ public IcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) {
this.dorisCatalog = dorisCatalog;
this.catalog = catalog;
nsCatalog = (SupportsNamespaces) catalog;
- this.preExecutionAuthenticator =
dorisCatalog.preExecutionAuthenticator;
+ this.preExecutionAuthenticator =
dorisCatalog.getPreExecutionAuthenticator();
+ if
(dorisCatalog.getProperties().containsKey(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME))
{
+ externalCatalogName =
+
Optional.of(dorisCatalog.getProperties().get(IcebergExternalCatalog.EXTERNAL_CATALOG_NAME));
+ }
}
public Catalog getCatalog() {
return catalog;
}
- public IcebergExternalCatalog getExternalCatalog() {
+ public ExternalCatalog getExternalCatalog() {
return dorisCatalog;
}
@@ -78,17 +90,18 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
@Override
public boolean tableExist(String dbName, String tblName) {
- return catalog.tableExists(TableIdentifier.of(dbName, tblName));
+ return catalog.tableExists(getTableIdentifier(dbName, tblName));
}
public boolean databaseExist(String dbName) {
- return nsCatalog.namespaceExists(Namespace.of(dbName));
+ return nsCatalog.namespaceExists(getNamespace(dbName));
}
public List<String> listDatabaseNames() {
try {
- return preExecutionAuthenticator.execute(() ->
nsCatalog.listNamespaces().stream()
- .map(Namespace::toString)
+ return preExecutionAuthenticator.execute(() ->
nsCatalog.listNamespaces(getNamespace())
+ .stream()
+ .map(n -> n.level(n.length() - 1))
.collect(Collectors.toList()));
} catch (Exception e) {
throw new RuntimeException("Failed to list database names, error
message is: " + e.getMessage());
@@ -98,7 +111,7 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
@Override
public List<String> listTableNames(String dbName) {
- List<TableIdentifier> tableIdentifiers =
catalog.listTables(Namespace.of(dbName));
+ List<TableIdentifier> tableIdentifiers =
catalog.listTables(getNamespace(dbName));
return
tableIdentifiers.stream().map(TableIdentifier::name).collect(Collectors.toList());
}
@@ -128,12 +141,14 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
ErrorReport.reportDdlException(ErrorCode.ERR_DB_CREATE_EXISTS,
dbName);
}
}
- String icebergCatalogType = dorisCatalog.getIcebergCatalogType();
- if (!properties.isEmpty() &&
!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
- throw new DdlException(
+ if (!properties.isEmpty() && dorisCatalog instanceof
IcebergExternalCatalog) {
+ String icebergCatalogType = ((IcebergExternalCatalog)
dorisCatalog).getIcebergCatalogType();
+ if
(!IcebergExternalCatalog.ICEBERG_HMS.equals(icebergCatalogType)) {
+ throw new DdlException(
"Not supported: create database with properties for
iceberg catalog type: " + icebergCatalogType);
+ }
}
- nsCatalog.createNamespace(Namespace.of(dbName), properties);
+ nsCatalog.createNamespace(getNamespace(dbName), properties);
dorisCatalog.onRefreshCache(true);
}
@@ -159,8 +174,7 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
ErrorReport.reportDdlException(ErrorCode.ERR_DB_DROP_EXISTS,
dbName);
}
}
- SupportsNamespaces nsCatalog = (SupportsNamespaces) catalog;
- nsCatalog.dropNamespace(Namespace.of(dbName));
+ nsCatalog.dropNamespace(getNamespace(dbName));
dorisCatalog.onRefreshCache(true);
}
@@ -200,7 +214,7 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
Map<String, String> properties = stmt.getProperties();
properties.put(ExternalCatalog.DORIS_VERSION,
ExternalCatalog.DORIS_VERSION_VALUE);
PartitionSpec partitionSpec =
IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
- catalog.createTable(TableIdentifier.of(dbName, tableName), schema,
partitionSpec, properties);
+ catalog.createTable(getTableIdentifier(dbName, tableName), schema,
partitionSpec, properties);
db.setUnInitialized(true);
return false;
}
@@ -238,7 +252,7 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_TABLE,
tableName, dbName);
}
}
- catalog.dropTable(TableIdentifier.of(dbName, tableName), true);
+ catalog.dropTable(getTableIdentifier(dbName, tableName), true);
db.setUnInitialized(true);
}
@@ -250,4 +264,25 @@ public class IcebergMetadataOps implements
ExternalMetadataOps {
public PreExecutionAuthenticator getPreExecutionAuthenticator() {
return preExecutionAuthenticator;
}
+
+ @Override
+ public Table loadTable(String dbName, String tblName) {
+ return catalog.loadTable(getTableIdentifier(dbName, tblName));
+ }
+
+ private TableIdentifier getTableIdentifier(String dbName, String tblName) {
+ return externalCatalogName
+ .map(s -> TableIdentifier.of(s, dbName, tblName))
+ .orElseGet(() -> TableIdentifier.of(dbName, tblName));
+ }
+
+ private Namespace getNamespace(String dbName) {
+ return externalCatalogName
+ .map(s -> Namespace.of(s, dbName))
+ .orElseGet(() -> Namespace.of(dbName));
+ }
+
+ private Namespace getNamespace() {
+ return externalCatalogName.map(Namespace::of).orElseGet(() ->
Namespace.empty());
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
index 908a4fa9e3f..b92d2c91f96 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.s3a.Constants;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.aws.AwsClientProperties;
-import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import java.util.HashMap;
@@ -71,7 +70,6 @@ public class IcebergRestExternalCatalog extends
IcebergExternalCatalog {
Map<String, String> props = catalogProperty.getProperties();
Map<String, String> restProperties = new HashMap<>(props);
- restProperties.put(CatalogProperties.FILE_IO_IMPL,
S3FileIO.class.getName());
restProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
String restUri = props.getOrDefault(CatalogProperties.URI, "");
restProperties.put(CatalogProperties.URI, restUri);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 685915025d6..d0cca11b0af 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -22,6 +22,7 @@ package org.apache.doris.datasource.iceberg;
import org.apache.doris.common.UserException;
import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
import
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
@@ -140,7 +141,7 @@ public class IcebergTransaction implements Transaction {
private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
Objects.requireNonNull(tableInfo);
- IcebergExternalCatalog externalCatalog = ops.getExternalCatalog();
+ ExternalCatalog externalCatalog = ops.getExternalCatalog();
return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
index 4a2757f918f..50166fe8305 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java
@@ -17,9 +17,9 @@
package org.apache.doris.datasource.operations;
+import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetadataOps;
-import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
@@ -34,7 +34,7 @@ public class ExternalMetadataOperations {
return new HiveMetadataOps(hiveConf, jdbcClientConfig, catalog);
}
- public static IcebergMetadataOps
newIcebergMetadataOps(IcebergExternalCatalog dorisCatalog, Catalog catalog) {
+ public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog
dorisCatalog, Catalog catalog) {
return new IcebergMetadataOps(dorisCatalog, catalog);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
index 0333124b352..e5ed129c679 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
@@ -91,6 +91,10 @@ public interface ExternalMetadataOps {
boolean databaseExist(String dbName);
+ default Object loadTable(String dbName, String tblName) {
+ throw new UnsupportedOperationException("Load table is not
supported.");
+ }
+
/**
* close the connection, eg, to hms
*/
diff --git
a/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out
b/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out
new file mode 100644
index 00000000000..42414c36549
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/iceberg_read_unitycatalog_table.out
@@ -0,0 +1,40 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q1 --
+1 nWYHawtqUw 930
+2 uvOzzthsLV 166
+3 WIAehuXWkv 170
+4 wYCSvnJKTo 709
+5 VsslXsUIDZ 993
+6 ZLsACYYTFy 813
+7 BtDDvLeBpK 52
+8 YISVtrPfGr 8
+9 PBPJHDFjjC 45
+10 qbDuUJzJMO 756
+11 EjqqWoaLJn 712
+12 jpZLMdKXpn 847
+13 acpjQXpJCp 649
+14 nOKqHhRwao 133
+15 kxUUZEUoKv 398
+
+-- !q2 --
+7
+8
+9
+10
+11
+12
+13
+14
+15
+
+-- !q3 --
+nWYHawtqUw 930
+wYCSvnJKTo 709
+VsslXsUIDZ 993
+ZLsACYYTFy 813
+qbDuUJzJMO 756
+EjqqWoaLJn 712
+jpZLMdKXpn 847
+acpjQXpJCp 649
+kxUUZEUoKv 398
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy
b/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy
new file mode 100644
index 00000000000..48b8b6559ca
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/iceberg_read_unitycatalog_table.groovy
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_read_unitycatalog_table",
"p0,external,doris,external_docker,external_docker_doris") {
+
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String catalog_name = "iceberg_read_unitycatalog_table"
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ logger.info("catalog " + catalog_name + " created")
+ sql """ use ${catalog_name}.test_db """
+ String tb = "unitycatalog_marksheet_uniform"
+
+ qt_q1 """ select * from ${tb} order by c1 """
+ qt_q2 """ select c1 from ${tb} where c1 > 6 order by c1 """
+ qt_q3 """ select c2, c3 from ${tb} where c3 > 200 order by c1 """
+
+}
+
+/*
+
+spark-sql:
+ 1. create table marksheet_uniform (c1 int, c2 string, c3 int);
+ 2. get parquet file from marksheet_uniform; (ref:
https://docs.unitycatalog.io/usage/tables/uniform/)
+ 3. put parquet file to hdfs: hdfs dfs -put <parquet_file> hdfs://xxxxx
+ 4. CALL <catalog_name>.system.add_files(
+ table => '<catalog_name>.unitycatalog_db.marksheet_uniform',
+ source_table =>
'`parquet`.`hdfs://172.20.32.136:8020/user/doris/preinstalled_data/iceberg_hadoop_warehouse/unitycatalog_db/marksheet_uniform_data/part-00000-5af50cc4-3218-465b-a3a4-eb4fc709421d-c000.snappy.parquet`'
+ );
+*/
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]