This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4d575fd7fff branch-3.0: [fix](hive/iceberg) rectify the fs name if 
path already contains fs #49998 (#50197)
4d575fd7fff is described below

commit 4d575fd7fff4808f42aff4d9ad5bce8f46aa2340
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 22 11:32:24 2025 +0800

    branch-3.0: [fix](hive/iceberg) rectify the fs name if path already 
contains fs #49998 (#50197)
    
    Cherry-picked from #49998
    
    Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
 be/src/io/file_factory.cpp                         |  32 +++++++++++++++------
 be/src/io/file_factory.h                           |   3 ++
 be/src/vec/sink/writer/vhive_partition_writer.cpp  |  13 ---------
 .../iceberg/write/test_iceberg_write_insert.out    | Bin 142167 -> 162889 bytes
 .../iceberg/write/test_iceberg_write_insert.groovy |  23 ++++++++++++++-
 5 files changed, 49 insertions(+), 22 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 86907886f17..db0d1c2109b 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -88,20 +88,36 @@ Result<io::FileSystemSPtr> FileFactory::create_fs(const 
io::FSPropertiesRef& fs_
                 *fs_properties.properties, s3_uri, &s3_conf));
         return io::S3FileSystem::create(std::move(s3_conf), 
io::FileSystem::TMP_FS_ID);
     }
-    case TFileType::FILE_HDFS:
-        return fs_properties.hdfs_params
-                       ? io::HdfsFileSystem::create(*fs_properties.hdfs_params,
-                                                    file_description.fs_name,
-                                                    io::FileSystem::TMP_FS_ID, 
nullptr)
-                       : io::HdfsFileSystem::create(*fs_properties.properties,
-                                                    file_description.fs_name,
-                                                    io::FileSystem::TMP_FS_ID, 
nullptr);
+    case TFileType::FILE_HDFS: {
+        std::string fs_name = _get_fs_name(file_description);
+        return io::HdfsFileSystem::create(*fs_properties.properties, fs_name,
+                                          io::FileSystem::TMP_FS_ID, nullptr);
+    }
     default:
         return ResultError(Status::InternalError("unsupported fs type: {}",
                                                  
std::to_string(fs_properties.type)));
     }
 }
 
+std::string FileFactory::_get_fs_name(const io::FileDescription& 
file_description) {
+    // If the destination path contains a schema, use the schema directly.
+    // If not, use origin file_description.fs_name
+    // Because the default fsname in file_description.fs_name maybe different 
from
+    // file's.
+    // example:
+    //    hdfs://host:port/path1/path2  --> hdfs://host:port
+    //    hdfs://nameservice/path1/path2 --> hdfs://nameservice
+    std::string fs_name = file_description.fs_name;
+    string::size_type idx = file_description.path.find("://");
+    if (idx != string::npos) {
+        idx = file_description.path.find("/", idx + 3);
+        if (idx != string::npos) {
+            fs_name = file_description.path.substr(0, idx);
+        }
+    }
+    return fs_name;
+}
+
 Result<io::FileWriterPtr> FileFactory::create_file_writer(
         TFileType::type type, ExecEnv* env, const 
std::vector<TNetworkAddress>& broker_addresses,
         const std::map<std::string, std::string>& properties, const 
std::string& path,
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 9d9d714812f..603afdee17d 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -123,6 +123,9 @@ public:
         LOG(FATAL) << "__builtin_unreachable";
         __builtin_unreachable();
     }
+
+private:
+    static std::string _get_fs_name(const io::FileDescription& 
file_description);
 };
 
 } // namespace doris
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 9f8f1041a01..8a5e2a9777e 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -61,19 +61,6 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
     io::FileDescription file_description = {
             .path = fmt::format("{}/{}", _write_info.write_path, 
_get_target_file_name()),
             .fs_name {}};
-    // If the destination path contains a schema, use the schema directly.
-    // If not, use defaultFS.
-    // Otherwise a write error will occur.
-    // example:
-    //    hdfs://host:port/path1/path2  --> hdfs://host:port
-    //    hdfs://nameservice/path1/path2 --> hdfs://nameservice
-    string::size_type idx = file_description.path.find("://");
-    if (idx != string::npos) {
-        idx = file_description.path.find("/", idx + 3);
-        if (idx != string::npos) {
-            file_description.fs_name = file_description.path.substr(0, idx);
-        }
-    }
     _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
     io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
     RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, 
&file_writer_options));
diff --git 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
index d90434aa9c1..006596a760f 100644
Binary files 
a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
 and 
b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_write_insert.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
index 900006bc216..617d830e16d 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_write_insert.groovy
@@ -818,6 +818,7 @@ suite("test_iceberg_write_insert", 
"p0,external,iceberg,external_docker,external
                 'iceberg.catalog.type'='hms',
                 'hive.metastore.uris' = 
'thrift://${externalEnvIp}:${hms_port}',
                 'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}',
+                'warehouse' = 'hdfs://${externalEnvIp}:${hdfs_port}',
                 'use_meta_cache' = 'true'
             );"""
             sql """drop catalog if exists ${hive_catalog_name}"""
@@ -828,7 +829,9 @@ suite("test_iceberg_write_insert", 
"p0,external,iceberg,external_docker,external
                 'use_meta_cache' = 'true'
             );"""
 
-            sql """use `${iceberg_catalog_name}`.`write_test`"""
+            sql """drop database if exists 
`${iceberg_catalog_name}`.`iceberg_write_test` force"""
+            sql """create database 
`${iceberg_catalog_name}`.`iceberg_write_test`"""
+            sql """use `${iceberg_catalog_name}`.`iceberg_write_test`"""
 
             sql """set enable_fallback_to_original_planner=false;"""
 
@@ -842,6 +845,24 @@ suite("test_iceberg_write_insert", 
"p0,external,iceberg,external_docker,external
 
             sql """drop catalog if exists ${iceberg_catalog_name}"""
             sql """drop catalog if exists ${hive_catalog_name}"""
+
+            // test with wrong fs.defaultFS
+            sql """create catalog if not exists ${iceberg_catalog_name} 
properties (
+                'type'='iceberg',
+                'iceberg.catalog.type'='hms',
+                'hive.metastore.uris' = 
'thrift://${externalEnvIp}:${hms_port}',
+                'fs.defaultFS' = 'hdfs://${externalEnvIp}:${hdfs_port}/tmp',
+                'warehouse' = 'hdfs://${externalEnvIp}:${hdfs_port}/tmp',
+                'use_meta_cache' = 'true'
+            );"""
+
+            sql """drop database if exists 
`${iceberg_catalog_name}`.`wrong_fs_name` force"""
+            sql """create database `${iceberg_catalog_name}`.`wrong_fs_name`"""
+            sql """use `${iceberg_catalog_name}`.`wrong_fs_name`"""
+
+            q01("parquet_zstd", iceberg_catalog_name)
+
+            sql """drop catalog if exists ${iceberg_catalog_name}"""
         } finally {
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to