This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4d575fd7fff branch-3.0: [fix](hive/iceberg) rectify the fs name if
path already contains fs #49998 (#50197)
4d575fd7fff is described below
commit 4d575fd7fff4808f42aff4d9ad5bce8f46aa2340
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 22 11:32:24 2025 +0800
branch-3.0: [fix](hive/iceberg) rectify the fs name if path already
contains fs #49998 (#50197)
Cherry-picked from #49998
Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
be/src/io/file_factory.cpp | 32 +++++++++++++++------
be/src/io/file_factory.h | 3 ++
be/src/vec/sink/writer/vhive_partition_writer.cpp | 13 ---------
.../iceberg/write/test_iceberg_write_insert.out | Bin 142167 -> 162889 bytes
.../iceberg/write/test_iceberg_write_insert.groovy | 23 ++++++++++++++-
5 files changed, 49 insertions(+), 22 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 86907886f17..db0d1c2109b 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -88,20 +88,36 @@ Result<io::FileSystemSPtr> FileFactory::create_fs(const
io::FSPropertiesRef& fs_
*fs_properties.properties, s3_uri, &s3_conf));
return io::S3FileSystem::create(std::move(s3_conf),
io::FileSystem::TMP_FS_ID);
}
- case TFileType::FILE_HDFS:
- return fs_properties.hdfs_params
- ? io::HdfsFileSystem::create(*fs_properties.hdfs_params,
- file_description.fs_name,
- io::FileSystem::TMP_FS_ID,
nullptr)
- : io::HdfsFileSystem::create(*fs_properties.properties,
- file_description.fs_name,
- io::FileSystem::TMP_FS_ID,
nullptr);
+ case TFileType::FILE_HDFS: {
+ std::string fs_name = _get_fs_name(file_description);
+ return io::HdfsFileSystem::create(*fs_properties.properties, fs_name,
+ io::FileSystem::TMP_FS_ID, nullptr);
+ }
default:
return ResultError(Status::InternalError("unsupported fs type: {}",
std::to_string(fs_properties.type)));
}
}
+std::string FileFactory::_get_fs_name(const io::FileDescription&
file_description) {
+ // If the destination path contains a schema, use the schema directly.
+ // If not, use origin file_description.fs_name
+ // Because the default fsname in file_description.fs_name maybe different
from
+ // file's.
+ // example:
+ // hdfs://host:port/path1/path2 --> hdfs://host:port
+ // hdfs://nameservice/path1/path2 --> hdfs://nameservice
+ std::string fs_name = file_description.fs_name;
+ string::size_type idx = file_description.path.find("://");
+ if (idx != string::npos) {
+ idx = file_description.path.find("/", idx + 3);
+ if (idx != string::npos) {
+ fs_name = file_description.path.substr(0, idx);
+ }
+ }
+ return fs_name;
+}
+
Result<io::FileWriterPtr> FileFactory::create_file_writer(
TFileType::type type, ExecEnv* env, const
std::vector<TNetworkAddress>& broker_addresses,
const std::map<std::string, std::string>& properties, const
std::string& path,
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 9d9d714812f..603afdee17d 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -123,6 +123,9 @@ public:
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
+
+private:
+ static std::string _get_fs_name(const io::FileDescription&
file_description);
};
} // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 9f8f1041a01..8a5e2a9777e 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -61,19 +61,6 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path,
_get_target_file_name()),
.fs_name {}};
- // If the destination path contains a schema, use the schema directly.
- // If not, use defaultFS.
- // Otherwise a write error will occur.
- // example:
- // hdfs://host:port/path1/path2 --> hdfs://host:port
- // hdfs://nameservice/path1/path2 --> hdfs://nameservice
- string::size_type idx = file_description.path.find("://");
- if (idx != string::npos) {
- idx = file_description.path.find("/", idx + 3);
- if (idx != string::npos) {
- file_description.fs_name = file_description.path.substr(0, idx);
- }
- }
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer,
&file_writer_options));
diff --git
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
index d90434aa9c1..006596a760f 100644
Binary files
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
and
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
differ
diff --git
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
index 900006bc216..617d830e16d 100644
---
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
@@ -818,6 +818,7 @@ suite("test_iceberg_write_insert",
"p0,external,iceberg,external_docker,external
'iceberg.catalog.type'='hms',
'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hms_port}',
'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}',
+ 'warehouse' = 'hdfs://${externalEnvIp}:${hdfs_port}',
'use_meta_cache' = 'true'
);"""
sql """drop catalog if exists ${hive_catalog_name}"""
@@ -828,7 +829,9 @@ suite("test_iceberg_write_insert",
"p0,external,iceberg,external_docker,external
'use_meta_cache' = 'true'
);"""
- sql """use `${iceberg_catalog_name}`.`write_test`"""
+ sql """drop database if exists
`${iceberg_catalog_name}`.`iceberg_write_test` force"""
+ sql """create database
`${iceberg_catalog_name}`.`iceberg_write_test`"""
+ sql """use `${iceberg_catalog_name}`.`iceberg_write_test`"""
sql """set enable_fallback_to_original_planner=false;"""
@@ -842,6 +845,24 @@ suite("test_iceberg_write_insert",
"p0,external,iceberg,external_docker,external
sql """drop catalog if exists ${iceberg_catalog_name}"""
sql """drop catalog if exists ${hive_catalog_name}"""
+
+ // test with wrong fs.defaultFS
+ sql """create catalog if not exists ${iceberg_catalog_name}
properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hms',
+ 'hive.metastore.uris' =
'thrift://${externalEnvIp}:${hms_port}',
+ 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}/tmp',
+ 'warehouse' = 'hdfs://${externalEnvIp}:${hdfs_port}/tmp',
+ 'use_meta_cache' = 'true'
+ );"""
+
+ sql """drop database if exists
`${iceberg_catalog_name}`.`wrong_fs_name` force"""
+ sql """create database `${iceberg_catalog_name}`.`wrong_fs_name`"""
+ sql """use `${iceberg_catalog_name}`.`wrong_fs_name`"""
+
+ q01("parquet_zstd", iceberg_catalog_name)
+
+ sql """drop catalog if exists ${iceberg_catalog_name}"""
} finally {
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]