This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7cb00a8e548 [Feature](hive-writer) Implements s3 file committer. 
(#34307)
7cb00a8e548 is described below

commit 7cb00a8e548236a1e72ca72643fb8620899ae54a
Author: Qi Chen <[email protected]>
AuthorDate: Mon Apr 29 19:56:49 2024 +0800

    [Feature](hive-writer) Implements s3 file committer. (#34307)
    
    Backport #33937.
---
 be/src/io/file_factory.cpp                         |   9 +-
 be/src/io/file_factory.h                           |   4 +-
 be/src/io/fs/file_writer.h                         |   8 +
 be/src/io/fs/s3_file_writer.cpp                    | 130 +++++-----
 be/src/io/fs/s3_file_writer.h                      |  17 ++
 be/src/vec/sink/writer/vhive_partition_writer.cpp  |  26 +-
 be/src/vec/sink/writer/vhive_partition_writer.h    |   1 +
 be/src/vec/sink/writer/vhive_table_writer.cpp      |  31 ++-
 .../doris/common/profile/SummaryProfile.java       |  11 +
 .../org/apache/doris/common/util/LocationPath.java |  13 +-
 .../java/org/apache/doris/common/util/S3URI.java   |   4 +-
 .../doris/datasource/hive/HMSExternalCatalog.java  |  14 +-
 .../doris/datasource/hive/HMSTransaction.java      | 263 +++++++++++++++++----
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  18 +-
 .../doris/datasource/hive/HiveMetadataOps.java     |  17 +-
 .../doris/datasource/hudi/source/HudiScanNode.java |   2 +-
 .../datasource/iceberg/source/IcebergScanNode.java |   4 +-
 .../datasource/paimon/source/PaimonScanNode.java   |   2 +-
 .../main/java/org/apache/doris/fs/FileSystem.java  |   8 +
 .../java/org/apache/doris/fs/FileSystemCache.java  |  17 +-
 .../org/apache/doris/fs/FileSystemFactory.java     |   5 +-
 .../FileSystemProvider.java}                       |  11 +-
 .../apache/doris/fs/FileSystemProviderImpl.java    |  43 ++++
 .../org/apache/doris/fs/LocalDfsFileSystem.java    |   5 +
 .../org/apache/doris/fs/remote/ObjFileSystem.java  |  15 ++
 .../org/apache/doris/fs/remote/S3FileSystem.java   |   4 -
 .../doris/fs/remote/SwitchingFileSystem.java       | 132 +++++++++++
 .../org/apache/doris/planner/HiveTableSink.java    |  37 ++-
 .../doris/transaction/HiveTransactionManager.java  |  13 +-
 .../transaction/TransactionManagerFactory.java     |   8 +-
 .../apache/doris/common/util/LocationPathTest.java |  30 +--
 .../doris/datasource/hive/HmsCommitTest.java       |  32 ++-
 gensrc/thrift/DataSinks.thrift                     |  10 +
 .../hive/test_hive_write_insert_s3.out             |  61 +++++
 .../hive/test_hive_write_insert_s3.groovy          | 166 +++++++++++++
 35 files changed, 968 insertions(+), 203 deletions(-)

diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index b67faac9453..aa38e220199 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -67,7 +67,8 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
                                        const std::vector<TNetworkAddress>& 
broker_addresses,
                                        const std::map<std::string, 
std::string>& properties,
                                        const std::string& path, int64_t 
start_offset,
-                                       std::unique_ptr<io::FileWriter>& 
file_writer) {
+                                       std::unique_ptr<io::FileWriter>& 
file_writer,
+                                       const io::FileWriterOptions* opts) {
     switch (type) {
     case TFileType::FILE_LOCAL: {
         RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path, 
&file_writer));
@@ -76,7 +77,7 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
     case TFileType::FILE_BROKER: {
         std::shared_ptr<io::BrokerFileSystem> fs;
         RETURN_IF_ERROR(io::BrokerFileSystem::create(broker_addresses[0], 
properties, &fs));
-        RETURN_IF_ERROR(fs->create_file(path, &file_writer));
+        RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
         break;
     }
     case TFileType::FILE_S3: {
@@ -87,7 +88,7 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
                 S3ClientFactory::convert_properties_to_s3_conf(properties, 
s3_uri, &s3_conf));
         std::shared_ptr<io::S3FileSystem> fs;
         RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", &fs));
-        RETURN_IF_ERROR(fs->create_file(path, &file_writer));
+        RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
         break;
     }
     case TFileType::FILE_HDFS: {
@@ -95,7 +96,7 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
         std::shared_ptr<io::HdfsFileSystem> fs;
         RETURN_IF_ERROR(
                 io::HdfsFileSystem::create(hdfs_params, "", 
hdfs_params.fs_name, nullptr, &fs));
-        RETURN_IF_ERROR(fs->create_file(path, &file_writer));
+        RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
         break;
     }
     default:
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 7c51118fc5b..48e4e2e9ed0 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -28,6 +28,7 @@
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "io/fs/file_reader.h"
+#include "io/fs/file_reader_writer_fwd.h"
 
 namespace doris {
 namespace io {
@@ -73,7 +74,8 @@ public:
                                      const std::vector<TNetworkAddress>& 
broker_addresses,
                                      const std::map<std::string, std::string>& 
properties,
                                      const std::string& path, int64_t 
start_offset,
-                                     std::unique_ptr<io::FileWriter>& 
file_writer);
+                                     std::unique_ptr<io::FileWriter>& 
file_writer,
+                                     const io::FileWriterOptions* opts = 
nullptr);
 
     /// Create FileReader
     static Status create_file_reader(const io::FileSystemProperties& 
system_properties,
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 6996f2e0c98..6ec972285cc 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -31,6 +31,14 @@ class FileSystem;
 
 // Only affects remote file writers
 struct FileWriterOptions {
+    // S3 committer will start multipart uploading all files on BE side,
+    // and then complete multipart upload these files on FE side.
+    // If you do not complete multi parts of a file, the file will not be 
visible.
+    // So in this way, the atomicity of a single file can be guaranteed. But 
it still cannot
+    // guarantee the atomicity of multiple files.
+    // Because hive committers have best-effort semantics,
+    // this shortens the inconsistent time window.
+    bool used_by_s3_committer = false;
     bool write_file_cache = false;
     bool is_cold_data = false;
     bool sync_file_data = true;        // Whether flush data into storage 
system
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 83d33ed163c..ba5fce83e5c 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -89,7 +89,8 @@ S3FileWriter::S3FileWriter(std::string key, 
std::shared_ptr<S3FileSystem> fs,
           _cache(nullptr),
           _expiration_time(opts ? opts->file_cache_expiration : 0),
           _is_cold_data(opts ? opts->is_cold_data : true),
-          _write_file_cache(opts ? opts->write_file_cache : false) {
+          _write_file_cache(opts ? opts->write_file_cache : false),
+          _used_by_s3_committer(opts ? opts->used_by_s3_committer : false) {
     s3_file_writer_total << 1;
     s3_file_being_written << 1;
 
@@ -203,10 +204,7 @@ Status S3FileWriter::close() {
 
     if (_upload_id.empty()) {
         if (_pending_buf != nullptr) {
-            // it might be one file less than 5MB, we do upload here
-            auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
-            DCHECK(buf != nullptr);
-            buf->set_upload_to_remote([this](UploadFileBuffer& b) { 
_put_object(b); });
+            RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
         }
 
         if (_bytes_appended == 0 && _create_empty_file) {
@@ -234,6 +232,13 @@ Status S3FileWriter::close() {
             RETURN_IF_ERROR(builder.build(&_pending_buf));
             auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
             DCHECK(buf != nullptr);
+            if (_used_by_s3_committer) {
+                buf->set_upload_to_remote([part_num = _cur_part_num, 
this](UploadFileBuffer& buf) {
+                    _upload_one_part(part_num, buf);
+                });
+                DCHECK(_cur_part_num == 1);
+                RETURN_IF_ERROR(_create_multi_upload_request());
+            }
         }
     }
     if (_pending_buf != nullptr) {
@@ -404,56 +409,61 @@ Status S3FileWriter::_complete() {
         _wait_until_finish("PutObject");
         return _st;
     }
-    CompleteMultipartUploadRequest complete_request;
-    
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
-
+    // Wait multipart load and finish.
     _wait_until_finish("Complete");
     DBUG_EXECUTE_IF("s3_file_writer::_complete:1", { _cur_part_num++; });
-    if (_failed || _completed_parts.size() != _cur_part_num) {
-        _st = Status::InternalError(
-                "error status {}, complete parts {}, cur part num {}, whole 
parts {}", _st,
-                _completed_parts.size(), _cur_part_num, 
_dump_completed_part());
-        LOG(WARNING) << _st;
-        return _st;
-    }
-    // make sure _completed_parts are ascending order
-    std::sort(_completed_parts.begin(), _completed_parts.end(),
-              [](auto& p1, auto& p2) { return p1->GetPartNumber() < 
p2->GetPartNumber(); });
-    DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
-                    { _completed_parts.back()->SetPartNumber(10 * 
_completed_parts.size()); });
-    CompletedMultipartUpload completed_upload;
-    for (size_t i = 0; i < _completed_parts.size(); i++) {
-        if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
-            auto st = Status::InternalError(
-                    "error status {}, part num not continous, expected num {}, 
actual num {}, "
-                    "whole parts {}",
-                    _st, i + 1, _completed_parts[i]->GetPartNumber(), 
_dump_completed_part());
-            LOG(WARNING) << st;
-            _st = st;
-            return st;
+    if (!_used_by_s3_committer) { // S3 committer will complete multipart 
upload file on FE side.
+        CompleteMultipartUploadRequest complete_request;
+        
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
+
+        if (_failed || _completed_parts.size() != _cur_part_num) {
+            _st = Status::InternalError(
+                    "error status {}, complete parts {}, cur part num {}, 
whole parts {}", _st,
+                    _completed_parts.size(), _cur_part_num, 
_dump_completed_part());
+            LOG(WARNING) << _st;
+            return _st;
+        }
+        // make sure _completed_parts are ascending order
+        std::sort(_completed_parts.begin(), _completed_parts.end(),
+                  [](auto& p1, auto& p2) { return p1->GetPartNumber() < 
p2->GetPartNumber(); });
+        DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
+                        { _completed_parts.back()->SetPartNumber(10 * 
_completed_parts.size()); });
+        CompletedMultipartUpload completed_upload;
+        for (size_t i = 0; i < _completed_parts.size(); i++) {
+            if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
+                auto st = Status::InternalError(
+                        "error status {}, part num not continous, expected num 
{}, actual num {}, "
+                        "whole parts {}",
+                        _st, i + 1, _completed_parts[i]->GetPartNumber(), 
_dump_completed_part());
+                LOG(WARNING) << st;
+                _st = st;
+                return st;
+            }
+            completed_upload.AddParts(*_completed_parts[i]);
         }
-        completed_upload.AddParts(*_completed_parts[i]);
-    }
 
-    complete_request.WithMultipartUpload(completed_upload);
+        complete_request.WithMultipartUpload(completed_upload);
 
-    DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
-        auto s = Status::IOError(
-                "failed to create complete multi part upload (bucket={}, 
key={}): injected error",
-                _bucket, _path.native());
-        LOG_WARNING(s.to_string());
-        return s;
-    });
-    SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
-    auto complete_outcome = _client->CompleteMultipartUpload(complete_request);
+        DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
+            auto s = Status::IOError(
+                    "failed to create complete multi part upload (bucket={}, 
key={}): injected "
+                    "error",
+                    _bucket, _path.native());
+            LOG_WARNING(s.to_string());
+            return s;
+        });
+        SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
+        auto complete_outcome = 
_client->CompleteMultipartUpload(complete_request);
 
-    if (!complete_outcome.IsSuccess()) {
-        _st = s3fs_error(
-                complete_outcome.GetError(),
-                fmt::format("failed to complete multi part upload {}, 
upload_id={}, whole parts={}",
+        if (!complete_outcome.IsSuccess()) {
+            _st = s3fs_error(
+                    complete_outcome.GetError(),
+                    fmt::format(
+                            "failed to complete multi part upload {}, 
upload_id={}, whole parts={}",
                             _path.native(), _upload_id, 
_dump_completed_part()));
-        LOG(WARNING) << _st;
-        return _st;
+            LOG(WARNING) << _st;
+            return _st;
+        }
     }
     s3_file_created_total << 1;
     return Status::OK();
@@ -466,12 +476,8 @@ Status S3FileWriter::finalize() {
     // submit pending buf if it's not nullptr
     // it's the last buf, we can submit it right now
     if (_pending_buf != nullptr) {
-        // if we only need to upload one file less than 5MB, we can just
-        // call PutObject to reduce the network IO
         if (_upload_id.empty()) {
-            auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
-            DCHECK(buf != nullptr);
-            buf->set_upload_to_remote([this](UploadFileBuffer& b) { 
_put_object(b); });
+            RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
         }
         _countdown_event.add_count();
         RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
@@ -481,6 +487,24 @@ Status S3FileWriter::finalize() {
     return _st;
 }
 
+Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
+    auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+    DCHECK(buf != nullptr);
+    if (_used_by_s3_committer) {
+        // If used_by_s3_committer, we always use multi-parts uploading.
+        buf->set_upload_to_remote([part_num = _cur_part_num, 
this](UploadFileBuffer& buf) {
+            _upload_one_part(part_num, buf);
+        });
+        DCHECK(_cur_part_num == 1);
+        RETURN_IF_ERROR(_create_multi_upload_request());
+    } else {
+        // if we only need to upload one file less than 5MB, we can just
+        // call PutObject to reduce the network IO
+        buf->set_upload_to_remote([this](UploadFileBuffer& b) { 
_put_object(b); });
+    }
+    return Status::OK();
+}
+
 void S3FileWriter::_put_object(UploadFileBuffer& buf) {
     DCHECK(!_closed) << "closed " << _closed;
     Aws::S3::Model::PutObjectRequest request;
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index f6acbc4c75b..c14cf0b0a21 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -51,12 +51,21 @@ public:
     Status appendv(const Slice* data, size_t data_cnt) override;
     Status finalize() override;
 
+    const std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& 
completed_parts() const {
+        return _completed_parts;
+    }
+
+    const std::string& key() const { return _key; }
+    const std::string& bucket() const { return _bucket; }
+    const std::string& upload_id() const { return _upload_id; }
+
 private:
     Status _abort();
     [[nodiscard]] std::string _dump_completed_part() const;
     void _wait_until_finish(std::string_view task_name);
     Status _complete();
     Status _create_multi_upload_request();
+    Status _set_upload_to_remote_less_than_buffer_size();
     void _put_object(UploadFileBuffer& buf);
     void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);
 
@@ -85,6 +94,14 @@ private:
     int64_t _expiration_time;
     bool _is_cold_data;
     bool _write_file_cache;
+    // S3 committer will start multipart uploading all files on BE side,
+    // and then complete multipart upload these files on FE side.
+    // If you do not complete multi parts of a file, the file will not be 
visible.
+    // So in this way, the atomicity of a single file can be guaranteed. But 
it still cannot
+    // guarantee the atomicity of multiple files.
+    // Because hive committers have best-effort semantics,
+    // this shortens the inconsistent time window.
+    bool _used_by_s3_committer;
 };
 
 } // namespace io
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp 
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index a67a22e2cf1..e9e816219b6 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -17,8 +17,11 @@
 
 #include "vhive_partition_writer.h"
 
+#include <aws/s3/model/CompletedPart.h>
+
 #include "io/file_factory.h"
 #include "io/fs/file_system.h"
+#include "io/fs/s3_file_writer.h"
 #include "runtime/runtime_state.h"
 #include "vec/columns/column_map.h"
 #include "vec/core/materialize_block.h"
@@ -54,10 +57,11 @@ Status VHivePartitionWriter::open(RuntimeState* state, 
RuntimeProfile* profile)
     _state = state;
 
     std::vector<TNetworkAddress> broker_addresses;
+    io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
     RETURN_IF_ERROR(FileFactory::create_file_writer(
             _write_info.file_type, state->exec_env(), broker_addresses, 
_hadoop_conf,
-            fmt::format("{}/{}", _write_info.write_path, 
_get_target_file_name()), 0,
-            _file_writer));
+            fmt::format("{}/{}", _write_info.write_path, 
_get_target_file_name()), 0, _file_writer,
+            &file_writer_options));
 
     std::vector<std::string> column_names;
     column_names.reserve(_columns.size());
@@ -191,12 +195,28 @@ THivePartitionUpdate 
VHivePartitionWriter::_build_partition_update() {
     hive_partition_update.__set_name(_partition_name);
     hive_partition_update.__set_update_mode(_update_mode);
     THiveLocationParams location;
-    location.__set_write_path(_write_info.write_path);
+    location.__set_write_path(_write_info.original_write_path);
     location.__set_target_path(_write_info.target_path);
     hive_partition_update.__set_location(location);
     hive_partition_update.__set_file_names({_get_target_file_name()});
     hive_partition_update.__set_row_count(_row_count);
     hive_partition_update.__set_file_size(_input_size_in_bytes);
+
+    if (_write_info.file_type == TFileType::FILE_S3) {
+        doris::io::S3FileWriter* s3_mpu_file_writer =
+                dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
+        TS3MPUPendingUpload s3_mpu_pending_upload;
+        s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket());
+        s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key());
+        s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id());
+
+        std::map<int, std::string> etags;
+        for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
+            etags.insert({completed_part->GetPartNumber(), 
completed_part->GetETag()});
+        }
+        s3_mpu_pending_upload.__set_etags(etags);
+        
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
+    }
     return hive_partition_update;
 }
 
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h 
b/be/src/vec/sink/writer/vhive_partition_writer.h
index fc06e157408..e4fc2ebc24b 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.h
+++ b/be/src/vec/sink/writer/vhive_partition_writer.h
@@ -39,6 +39,7 @@ class VHivePartitionWriter {
 public:
     struct WriteInfo {
         std::string write_path;
+        std::string original_write_path;
         std::string target_path;
         TFileType::type file_type;
     };
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp 
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index d43fc34b4e5..7c3a864ebb3 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -256,26 +256,30 @@ std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer
         if (existing_table == false) {   // new table
             update_mode = TUpdateMode::NEW;
             if (_partition_columns_input_index.empty()) { // new unpartitioned 
table
-                write_info = {write_location.write_path, 
write_location.target_path,
-                              write_location.file_type};
+                write_info = {write_location.write_path, 
write_location.original_write_path,
+                              write_location.target_path, 
write_location.file_type};
             } else { // a new partition in a new partitioned table
                 auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
+                auto original_write_path =
+                        fmt::format("{}/{}", 
write_location.original_write_path, partition_name);
                 auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
-                write_info = {std::move(write_path), std::move(target_path),
-                              write_location.file_type};
+                write_info = {std::move(write_path), 
std::move(original_write_path),
+                              std::move(target_path), 
write_location.file_type};
             }
         } else { // a new partition in an existing partitioned table, or an 
existing unpartitioned table
             if (_partition_columns_input_index.empty()) { // an existing 
unpartitioned table
                 update_mode =
                         !hive_table_sink.overwrite ? TUpdateMode::APPEND : 
TUpdateMode::OVERWRITE;
-                write_info = {write_location.write_path, 
write_location.target_path,
-                              write_location.file_type};
+                write_info = {write_location.write_path, 
write_location.original_write_path,
+                              write_location.target_path, 
write_location.file_type};
             } else { // a new partition in an existing partitioned table
                 update_mode = TUpdateMode::NEW;
                 auto write_path = fmt::format("{}/{}", 
write_location.write_path, partition_name);
+                auto original_write_path =
+                        fmt::format("{}/{}", 
write_location.original_write_path, partition_name);
                 auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
-                write_info = {std::move(write_path), std::move(target_path),
-                              write_location.file_type};
+                write_info = {std::move(write_path), 
std::move(original_write_path),
+                              std::move(target_path), 
write_location.file_type};
             }
             // need to get schema from existing table ?
         }
@@ -285,16 +289,21 @@ std::shared_ptr<VHivePartitionWriter> 
VHiveTableWriter::_create_partition_writer
         if (!hive_table_sink.overwrite) {
             update_mode = TUpdateMode::APPEND;
             auto write_path = fmt::format("{}/{}", write_location.write_path, 
partition_name);
+            auto original_write_path =
+                    fmt::format("{}/{}", write_location.original_write_path, 
partition_name);
             auto target_path = fmt::format("{}", 
existing_partition->location.target_path);
-            write_info = {std::move(write_path), std::move(target_path),
-                          existing_partition->location.file_type};
+            write_info = {std::move(write_path), 
std::move(original_write_path),
+                          std::move(target_path), 
existing_partition->location.file_type};
             file_format_type = existing_partition->file_format;
             write_compress_type = hive_table_sink.compression_type;
         } else {
             update_mode = TUpdateMode::OVERWRITE;
             auto write_path = fmt::format("{}/{}", write_location.write_path, 
partition_name);
+            auto original_write_path =
+                    fmt::format("{}/{}", write_location.original_write_path, 
partition_name);
             auto target_path = fmt::format("{}/{}", 
write_location.target_path, partition_name);
-            write_info = {std::move(write_path), std::move(target_path), 
write_location.file_type};
+            write_info = {std::move(write_path), 
std::move(original_write_path),
+                          std::move(target_path), write_location.file_type};
             file_format_type = hive_table_sink.file_format;
             write_compress_type = hive_table_sink.compression_type;
             // need to get schema from existing table ?
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 178a0e802d5..e7e9176b9f2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -88,6 +88,8 @@ public class SummaryProfile {
     public static final String FILESYSTEM_OPT_TIME = "FileSystem Operator 
Time";
     public static final String FILESYSTEM_OPT_RENAME_FILE_CNT = "Rename File 
Count";
     public static final String FILESYSTEM_OPT_RENAME_DIR_CNT = "Rename Dir 
Count";
+
+    public static final String FILESYSTEM_OPT_DELETE_FILE_CNT = "Delete File 
Count";
     public static final String FILESYSTEM_OPT_DELETE_DIR_CNT = "Delete Dir 
Count";
     public static final String HMS_ADD_PARTITION_TIME = "HMS Add Partition 
Time";
     public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition 
Count";
@@ -164,6 +166,7 @@ public class SummaryProfile {
             .put(FILESYSTEM_OPT_TIME, 1)
             .put(FILESYSTEM_OPT_RENAME_FILE_CNT, 2)
             .put(FILESYSTEM_OPT_RENAME_DIR_CNT, 2)
+            .put(FILESYSTEM_OPT_DELETE_FILE_CNT, 2)
             .put(FILESYSTEM_OPT_DELETE_DIR_CNT, 2)
             .put(HMS_ADD_PARTITION_TIME, 1)
             .put(HMS_ADD_PARTITION_CNT, 2)
@@ -223,6 +226,8 @@ public class SummaryProfile {
     private long hmsUpdatePartitionCnt = 0;
     private long filesystemRenameFileCnt = 0;
     private long filesystemRenameDirCnt = 0;
+
+    private long filesystemDeleteFileCnt = 0;
     private long filesystemDeleteDirCnt = 0;
     private TransactionType transactionType = TransactionType.UNKNOWN;
 
@@ -344,6 +349,8 @@ public class SummaryProfile {
                     getPrettyCount(filesystemRenameFileCnt));
             
executionSummaryProfile.addInfoString(FILESYSTEM_OPT_RENAME_DIR_CNT,
                     getPrettyCount(filesystemRenameDirCnt));
+            
executionSummaryProfile.addInfoString(FILESYSTEM_OPT_DELETE_FILE_CNT,
+                    getPrettyCount(filesystemDeleteFileCnt));
             
executionSummaryProfile.addInfoString(FILESYSTEM_OPT_DELETE_DIR_CNT,
                     getPrettyCount(filesystemDeleteDirCnt));
 
@@ -666,4 +673,8 @@ public class SummaryProfile {
     public void incDeleteDirRecursiveCnt() {
         this.filesystemDeleteDirCnt += 1;
     }
+
+    public void incDeleteFileCnt() {
+        this.filesystemDeleteFileCnt += 1;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 005a8f2cb84..38b5250a157 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -296,8 +296,11 @@ public class LocationPath {
                 fsType = FileSystemType.S3;
                 break;
             case COSN:
+                // COSN use s3 client on FE side, because it need to complete 
multi-part uploading files on FE side.
+                fsType = FileSystemType.S3;
+                break;
             case OFS:
-                // ofs:// and cosn:// use the same underlying file system: 
Tencent Cloud HDFS, aka CHDFS)) {
+                // ofs:// use the underlying file system: Tencent Cloud HDFS, 
aka CHDFS)) {
                 fsType = FileSystemType.OFS;
                 break;
             case HDFS:
@@ -329,7 +332,11 @@ public class LocationPath {
             return null;
         }
         LocationPath locationPath = new LocationPath(location);
-        switch (locationPath.getLocationType()) {
+        return locationPath.getTFileTypeForBE();
+    }
+
+    public TFileType getTFileTypeForBE() {
+        switch (this.getLocationType()) {
             case S3:
             case S3A:
             case S3N:
@@ -362,7 +369,7 @@ public class LocationPath {
      *
      * @return BE scan range path
      */
-    public Path toScanRangeLocation() {
+    public Path toStorageLocation() {
         switch (locationType) {
             case S3:
             case S3A:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
index 29c3f2700c4..9c9a887aa7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
@@ -68,10 +68,10 @@ public class S3URI {
     public static final String SCHEME_DELIM = "://";
     public static final String PATH_DELIM = "/";
     private static final Set<String> VALID_SCHEMES = ImmutableSet.of("http", 
"https", "s3", "s3a", "s3n",
-            "bos", "oss", "cos", "obs");
+            "bos", "oss", "cos", "cosn", "obs");
 
     private static final Set<String> OS_SCHEMES = ImmutableSet.of("s3", "s3a", 
"s3n",
-            "bos", "oss", "cos", "obs");
+            "bos", "oss", "cos", "cosn", "obs");
 
     private URI uri;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 299ab6dddfb..20dc870cd35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.cluster.ClusterNamespace;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ThreadPoolManager;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.security.authentication.HadoopUGI;
 import org.apache.doris.datasource.CatalogProperty;
@@ -34,6 +35,8 @@ import 
org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.fs.FileSystemProviderImpl;
 import org.apache.doris.transaction.TransactionManagerFactory;
 
 import com.google.common.base.Strings;
@@ -46,6 +49,7 @@ import org.apache.logging.log4j.Logger;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * External catalog for hive metastore compatible data sources.
@@ -63,6 +67,9 @@ public class HMSExternalCatalog extends ExternalCatalog {
     // 0 means file cache is disabled; >0 means file cache with ttl;
     public static final int FILE_META_CACHE_TTL_DISABLE_CACHE = 0;
 
+    private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
+    private ThreadPoolExecutor fileSystemExecutor;
+
     public HMSExternalCatalog() {
         catalogProperty = new CatalogProperty(null, null);
     }
@@ -147,7 +154,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
                     AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
         }
         HiveMetadataOps hiveOps = 
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
-        transactionManager = 
TransactionManagerFactory.createHiveTransactionManager(hiveOps);
+        FileSystemProvider fileSystemProvider = new 
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
+                this.bindBrokerName(), 
this.catalogProperty.getHadoopProperties());
+        this.fileSystemExecutor = 
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
+                Integer.MAX_VALUE, 
String.format("hms_committer_%s_file_system_executor_pool", name), true);
+        transactionManager = 
TransactionManagerFactory.createHiveTransactionManager(hiveOps, 
fileSystemProvider,
+                fileSystemExecutor);
         metadataOps = hiveOps;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 32dd083c2ad..6fca8b4745f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -23,13 +23,18 @@ package org.apache.doris.datasource.hive;
 
 import org.apache.doris.backup.Status;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.SummaryProfile;
 import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.FileSystemProvider;
 import org.apache.doris.fs.FileSystemUtil;
 import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.S3FileSystem;
+import org.apache.doris.fs.remote.SwitchingFileSystem;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.thrift.TS3MPUPendingUpload;
 import org.apache.doris.thrift.TUpdateMode;
 import org.apache.doris.transaction.Transaction;
 
@@ -48,6 +53,11 @@ import 
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -79,17 +89,34 @@ public class HMSTransaction implements Transaction {
     private final Map<DatabaseTableName, Map<List<String>, 
Action<PartitionAndMore>>>
             partitionActions = new HashMap<>();
 
+    private final Executor fileSystemExecutor;
     private HmsCommitter hmsCommitter;
     private List<THivePartitionUpdate> hivePartitionUpdates = 
Lists.newArrayList();
     private String declaredIntentionsToWrite;
 
-    public HMSTransaction(HiveMetadataOps hiveOps) {
-        this.hiveOps = hiveOps;
-        this.fs = hiveOps.getFs();
+    private static class UncompletedMpuPendingUpload {
+
+        private final TS3MPUPendingUpload s3MPUPendingUpload;
+        private final String path;
+
+        public UncompletedMpuPendingUpload(TS3MPUPendingUpload 
s3MPUPendingUpload, String path) {
+            this.s3MPUPendingUpload = s3MPUPendingUpload;
+            this.path = path;
+        }
+    }
 
+    private Set<UncompletedMpuPendingUpload> uncompletedMpuPendingUploads = 
new HashSet<>();
+
+    public HMSTransaction(HiveMetadataOps hiveOps, FileSystemProvider 
fileSystemProvider, Executor fileSystemExecutor) {
+        this.hiveOps = hiveOps;
+        this.fs = fileSystemProvider.get(null);
+        if (!(fs instanceof SwitchingFileSystem)) {
+            throw new RuntimeException("fs should be SwitchingFileSystem");
+        }
         if (ConnectContext.get().getExecutor() != null) {
             summaryProfile = 
Optional.of(ConnectContext.get().getExecutor().getSummaryProfile());
         }
+        this.fileSystemExecutor = fileSystemExecutor;
     }
 
     @Override
@@ -112,6 +139,9 @@ public class HMSTransaction implements Transaction {
                 THivePartitionUpdate old = mm.get(pu.getName());
                 old.setFileSize(old.getFileSize() + pu.getFileSize());
                 old.setRowCount(old.getRowCount() + pu.getRowCount());
+                if (old.getS3MpuPendingUploads() != null && 
pu.getS3MpuPendingUploads() != null) {
+                    
old.getS3MpuPendingUploads().addAll(pu.getS3MpuPendingUploads());
+                }
                 old.getFileNames().addAll(pu.getFileNames());
             } else {
                 mm.put(pu.getName(), pu);
@@ -136,6 +166,14 @@ public class HMSTransaction implements Transaction {
         this.dbName = dbName;
         this.tbName = tbName;
         List<THivePartitionUpdate> mergedPUs = 
mergePartitions(hivePartitionUpdates);
+        for (THivePartitionUpdate pu : mergedPUs) {
+            if (pu.getS3MpuPendingUploads() != null) {
+                for (TS3MPUPendingUpload s3MPUPendingUpload : 
pu.getS3MpuPendingUploads()) {
+                    uncompletedMpuPendingUploads.add(
+                            new 
UncompletedMpuPendingUpload(s3MPUPendingUpload, 
pu.getLocation().getTargetPath()));
+                }
+            }
+        }
         Table table = getTable(dbName, tbName);
         List<Pair<THivePartitionUpdate, HivePartitionStatistics>> 
insertExistsPartitions = new ArrayList<>();
         for (THivePartitionUpdate pu : mergedPUs) {
@@ -156,11 +194,12 @@ public class HMSTransaction implements Transaction {
                                 tbName,
                                 writePath,
                                 pu.getFileNames(),
-                                hivePartitionStatistics);
+                                hivePartitionStatistics,
+                                pu);
                         break;
                     case OVERWRITE:
                         dropTable(dbName, tbName);
-                        createTable(table, writePath, pu.getFileNames(), 
hivePartitionStatistics);
+                        createTable(table, writePath, pu.getFileNames(), 
hivePartitionStatistics, pu);
                         break;
                     default:
                         throw new RuntimeException("Not support mode:[" + 
updateMode + "] in unPartitioned table");
@@ -191,7 +230,7 @@ public class HMSTransaction implements Transaction {
                         }
                         addPartition(
                                 dbName, tbName, hivePartition, writePath,
-                                pu.getName(), pu.getFileNames(), 
hivePartitionStatistics);
+                                pu.getName(), pu.getFileNames(), 
hivePartitionStatistics, pu);
                         break;
                     default:
                         throw new RuntimeException("Not support mode:[" + 
updateMode + "] in partitioned table");
@@ -351,7 +390,8 @@ public class HMSTransaction implements Transaction {
                                     pu.getLocation().getWritePath(),
                                     pu.getName(),
                                     pu.getFileNames(),
-                                    updateStats
+                                    updateStats,
+                                    pu
                                 ))
                 );
             }
@@ -550,8 +590,8 @@ public class HMSTransaction implements Transaction {
 
 
 
-    private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
-        DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, 
deleteEmptyDir);
+    private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir, 
boolean reverse) {
+        DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, 
deleteEmptyDir, reverse);
 
         if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
             LOG.warn("Failed to delete directory {}. Some eligible items can't 
be deleted: {}.",
@@ -561,9 +601,9 @@ public class HMSTransaction implements Transaction {
         }
     }
 
-    private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir) {
+    private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir, boolean reverse) {
         try {
-            if (!fs.exists(directory.toString()).ok()) {
+            if (!fs.directoryExists(directory.toString()).ok()) {
                 return new DeleteRecursivelyResult(true, ImmutableList.of());
             }
         } catch (Exception e) {
@@ -572,10 +612,11 @@ public class HMSTransaction implements Transaction {
             return new DeleteRecursivelyResult(false, 
notDeletedEligibleItems.build());
         }
 
-        return doRecursiveDeleteFiles(directory, deleteEmptyDir, queryId);
+        return doRecursiveDeleteFiles(directory, deleteEmptyDir, queryId, 
reverse);
     }
 
-    private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir, String queryId) {
+    private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, 
boolean deleteEmptyDir,
+            String queryId, boolean reverse) {
         List<RemoteFile> allFiles = new ArrayList<>();
         Set<String> allDirs = new HashSet<>();
         Status statusFile = fs.listFiles(directory.toString(), true, allFiles);
@@ -589,7 +630,7 @@ public class HMSTransaction implements Transaction {
         boolean allDescendentsDeleted = true;
         ImmutableList.Builder<String> notDeletedEligibleItems = 
ImmutableList.builder();
         for (RemoteFile file : allFiles) {
-            if (file.getName().startsWith(queryId)) {
+            if (reverse ^ file.getName().startsWith(queryId)) {
                 if (!deleteIfExists(file.getPath())) {
                     allDescendentsDeleted = false;
                     notDeletedEligibleItems.add(file.getPath().toString());
@@ -600,7 +641,7 @@ public class HMSTransaction implements Transaction {
         }
 
         for (String dir : allDirs) {
-            DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new 
Path(dir), deleteEmptyDir, queryId);
+            DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new 
Path(dir), deleteEmptyDir, queryId, reverse);
             if (!subResult.dirNotExists()) {
                 allDescendentsDeleted = false;
             }
@@ -611,7 +652,7 @@ public class HMSTransaction implements Transaction {
 
         if (allDescendentsDeleted && deleteEmptyDir) {
             Verify.verify(notDeletedEligibleItems.build().isEmpty());
-            if (!deleteIfExists(directory)) {
+            if (!deleteDirectoryIfExists(directory)) {
                 return new DeleteRecursivelyResult(false, 
ImmutableList.of(directory + "/"));
             }
             // all items of the location have been deleted.
@@ -628,6 +669,14 @@ public class HMSTransaction implements Transaction {
         return !fs.exists(path.toString()).ok();
     }
 
+    public boolean deleteDirectoryIfExists(Path path) {
+        Status status = wrapperDeleteDirWithProfileSummary(path.toString());
+        if (status.ok()) {
+            return true;
+        }
+        return !fs.directoryExists(path.toString()).ok();
+    }
+
     public static class DatabaseTableName {
         private final String dbName;
         private final String tbName;
@@ -676,15 +725,19 @@ public class HMSTransaction implements Transaction {
         private final List<String> fileNames;
         private final HivePartitionStatistics statisticsUpdate;
 
+        private final THivePartitionUpdate hivePartitionUpdate;
+
         public TableAndMore(
                 Table table,
                 String currentLocation,
                 List<String> fileNames,
-                HivePartitionStatistics statisticsUpdate) {
+                HivePartitionStatistics statisticsUpdate,
+                THivePartitionUpdate hivePartitionUpdate) {
             this.table = Objects.requireNonNull(table, "table is null");
             this.currentLocation = Objects.requireNonNull(currentLocation);
             this.fileNames = Objects.requireNonNull(fileNames);
             this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, 
"statisticsUpdate is null");
+            this.hivePartitionUpdate = 
Objects.requireNonNull(hivePartitionUpdate, "hivePartitionUpdate is null");
         }
 
         public Table getTable() {
@@ -703,6 +756,10 @@ public class HMSTransaction implements Transaction {
             return statisticsUpdate;
         }
 
+        public THivePartitionUpdate getHivePartitionUpdate() {
+            return hivePartitionUpdate;
+        }
+
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(this)
@@ -719,17 +776,22 @@ public class HMSTransaction implements Transaction {
         private final List<String> fileNames;
         private final HivePartitionStatistics statisticsUpdate;
 
+        private final THivePartitionUpdate hivePartitionUpdate;
+
+
         public PartitionAndMore(
                 HivePartition partition,
                 String currentLocation,
                 String partitionName,
                 List<String> fileNames,
-                HivePartitionStatistics statisticsUpdate) {
+                HivePartitionStatistics statisticsUpdate,
+                THivePartitionUpdate hivePartitionUpdate) {
             this.partition = Objects.requireNonNull(partition, "partition is 
null");
             this.currentLocation = Objects.requireNonNull(currentLocation, 
"currentLocation is null");
             this.partitionName = Objects.requireNonNull(partitionName, 
"partition is null");
             this.fileNames = Objects.requireNonNull(fileNames, "fileNames is 
null");
             this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate, 
"statisticsUpdate is null");
+            this.hivePartitionUpdate = 
Objects.requireNonNull(hivePartitionUpdate, "hivePartitionUpdate is null");
         }
 
         public HivePartition getPartition() {
@@ -752,6 +814,10 @@ public class HMSTransaction implements Transaction {
             return statisticsUpdate;
         }
 
+        public THivePartitionUpdate getHivePartitionUpdate() {
+            return hivePartitionUpdate;
+        }
+
         @Override
         public String toString() {
             return MoreObjects.toStringHelper(this)
@@ -835,7 +901,8 @@ public class HMSTransaction implements Transaction {
             String tableName,
             String location,
             List<String> fileNames,
-            HivePartitionStatistics statisticsUpdate) {
+            HivePartitionStatistics statisticsUpdate,
+            THivePartitionUpdate hivePartitionUpdate) {
         DatabaseTableName databaseTableName = new 
DatabaseTableName(databaseName, tableName);
         Action<TableAndMore> oldTableAction = 
tableActions.get(databaseTableName);
         if (oldTableAction == null) {
@@ -843,12 +910,13 @@ public class HMSTransaction implements Transaction {
             tableActions.put(
                     databaseTableName,
                     new Action<>(
-                        actionType,
+                            actionType,
                             new TableAndMore(
-                                table,
-                                location,
-                                fileNames,
-                                statisticsUpdate)));
+                                    table,
+                                    location,
+                                    fileNames,
+                                    statisticsUpdate,
+                                    hivePartitionUpdate)));
             return;
         }
 
@@ -870,12 +938,13 @@ public class HMSTransaction implements Transaction {
     }
 
     public synchronized void createTable(
-            Table table, String location, List<String> fileNames,  
HivePartitionStatistics statistics) {
+            Table table, String location, List<String> fileNames, 
HivePartitionStatistics statistics,
+            THivePartitionUpdate hivePartitionUpdate) {
         // When creating a table, it should never have partition actions. This 
is just a sanity check.
         checkNoPartitionAction(dbName, tbName);
         DatabaseTableName databaseTableName = new DatabaseTableName(dbName, 
tbName);
         Action<TableAndMore> oldTableAction = 
tableActions.get(databaseTableName);
-        TableAndMore tableAndMore = new TableAndMore(table, location, 
fileNames, statistics);
+        TableAndMore tableAndMore = new TableAndMore(table, location, 
fileNames, statistics, hivePartitionUpdate);
         if (oldTableAction == null) {
             tableActions.put(databaseTableName, new Action<>(ActionType.ADD, 
tableAndMore));
             return;
@@ -939,7 +1008,8 @@ public class HMSTransaction implements Transaction {
             String currentLocation,
             String partitionName,
             List<String> files,
-            HivePartitionStatistics statistics) {
+            HivePartitionStatistics statistics,
+            THivePartitionUpdate hivePartitionUpdate) {
         Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
                 partitionActions.computeIfAbsent(new 
DatabaseTableName(databaseName, tableName), k -> new HashMap<>());
         Action<PartitionAndMore> oldPartitionAction = 
partitionActionsForTable.get(partition.getPartitionValues());
@@ -948,7 +1018,8 @@ public class HMSTransaction implements Transaction {
                     partition.getPartitionValues(),
                     new Action<>(
                             ActionType.ADD,
-                            new PartitionAndMore(partition, currentLocation, 
partitionName, files, statistics))
+                            new PartitionAndMore(partition, currentLocation, 
partitionName, files, statistics,
+                                    hivePartitionUpdate))
             );
             return;
         }
@@ -959,7 +1030,8 @@ public class HMSTransaction implements Transaction {
                         partition.getPartitionValues(),
                         new Action<>(
                                 ActionType.ALTER,
-                                new PartitionAndMore(partition, 
currentLocation, partitionName, files, statistics))
+                                new PartitionAndMore(partition, 
currentLocation, partitionName, files, statistics,
+                                        hivePartitionUpdate))
                 );
                 return;
             case ADD:
@@ -1029,7 +1101,8 @@ public class HMSTransaction implements Transaction {
         private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = 
new ArrayList<>();
         // when finished, we need clear some directories
         private final List<String> clearDirsForFinish = new ArrayList<>();
-        Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
+
+        private final List<String> s3cleanWhenSuccess = new ArrayList<>();
 
         public void cancelUnStartedAsyncFileSystemTask() {
             fileSystemTaskCancelled.set(true);
@@ -1091,15 +1164,20 @@ public class HMSTransaction implements Transaction {
                         writePath,
                         targetPath,
                         tableAndMore.getFileNames());
+            } else {
+                if 
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                            tableAndMore.hivePartitionUpdate, targetPath);
+                }
             }
             directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, false));
             updateStatisticsTasks.add(
-                new UpdateStatisticsTask(
-                        dbName,
-                        tbName,
-                        Optional.empty(),
-                        tableAndMore.getStatisticsUpdate(),
-                        true
+                    new UpdateStatisticsTask(
+                            dbName,
+                            tbName,
+                            Optional.empty(),
+                            tableAndMore.getStatisticsUpdate(),
+                            true
                     ));
         }
 
@@ -1129,6 +1207,12 @@ public class HMSTransaction implements Transaction {
                     throw new RuntimeException(
                         "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
                 }
+            } else {
+                if 
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+                    s3cleanWhenSuccess.add(targetPath);
+                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                            tableAndMore.hivePartitionUpdate, targetPath);
+                }
             }
             updateStatisticsTasks.add(
                 new UpdateStatisticsTask(
@@ -1154,6 +1238,11 @@ public class HMSTransaction implements Transaction {
                         writePath,
                         targetPath,
                         () -> directoryCleanUpTasksForAbort.add(new 
DirectoryCleanUpTask(targetPath, true)));
+            } else {
+                if 
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                            partitionAndMore.hivePartitionUpdate, targetPath);
+                }
             }
 
             StorageDescriptor sd = getTable(dbName, tbName).getSd();
@@ -1194,6 +1283,11 @@ public class HMSTransaction implements Transaction {
                         writePath,
                         targetPath,
                         partitionAndMore.getFileNames());
+            } else {
+                if 
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                            partitionAndMore.hivePartitionUpdate, targetPath);
+                }
             }
 
             updateStatisticsTasks.add(
@@ -1207,7 +1301,7 @@ public class HMSTransaction implements Transaction {
 
         private void runDirectoryClearUpTasksForAbort() {
             for (DirectoryCleanUpTask cleanUpTask : 
directoryCleanUpTasksForAbort) {
-                recursiveDeleteItems(cleanUpTask.getPath(), 
cleanUpTask.isDeleteEmptyDir());
+                recursiveDeleteItems(cleanUpTask.getPath(), 
cleanUpTask.isDeleteEmptyDir(), false);
             }
         }
 
@@ -1228,13 +1322,19 @@ public class HMSTransaction implements Transaction {
         private void runClearPathsForFinish() {
             Status status;
             for (String path : clearDirsForFinish) {
-                status = wrapperDeleteWithProfileSummary(path);
+                status = wrapperDeleteDirWithProfileSummary(path);
                 if (!status.ok()) {
                     LOG.warn("Failed to recursively delete path {}:{}", path, 
status.getErrCode());
                 }
             }
         }
 
+        private void runS3cleanWhenSuccess() {
+            for (String path : s3cleanWhenSuccess) {
+                recursiveDeleteItems(new Path(path), false, true);
+            }
+        }
+
         public void prepareAlterPartition(PartitionAndMore partitionAndMore) {
             HivePartition partition = partitionAndMore.getPartition();
             String targetPath = partition.getPath();
@@ -1263,6 +1363,12 @@ public class HMSTransaction implements Transaction {
                     throw new RuntimeException(
                         "Error to rename dir from " + writePath + " to " + 
targetPath + ":" + status.getErrMsg());
                 }
+            } else {
+                if 
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+                    s3cleanWhenSuccess.add(targetPath);
+                    s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures, 
fileSystemTaskCancelled,
+                            partitionAndMore.hivePartitionUpdate, targetPath);
+                }
             }
 
             updateStatisticsTasks.add(
@@ -1337,8 +1443,32 @@ public class HMSTransaction implements Transaction {
             
summaryProfile.ifPresent(SummaryProfile::setHmsUpdatePartitionTime);
         }
 
-        public void pruneAndDeleteStagingDirectories() {
-            recursiveDeleteItems(new Path(declaredIntentionsToWrite), true);
+        private void pruneAndDeleteStagingDirectories() {
+            recursiveDeleteItems(new Path(declaredIntentionsToWrite), true, 
false);
+        }
+
+        private void abortMultiUploads() {
+            if (uncompletedMpuPendingUploads.isEmpty()) {
+                return;
+            }
+            for (UncompletedMpuPendingUpload uncompletedMpuPendingUpload : 
uncompletedMpuPendingUploads) {
+                S3FileSystem s3FileSystem = (S3FileSystem) 
((SwitchingFileSystem) fs)
+                        .fileSystem(uncompletedMpuPendingUpload.path);
+
+                S3Client s3Client;
+                try {
+                    s3Client = (S3Client) 
s3FileSystem.getObjStorage().getClient();
+                } catch (UserException e) {
+                    throw new RuntimeException(e);
+                }
+                asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() 
-> {
+                    
s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder()
+                            
.bucket(uncompletedMpuPendingUpload.s3MPUPendingUpload.getBucket())
+                            
.key(uncompletedMpuPendingUpload.s3MPUPendingUpload.getKey())
+                            
.uploadId(uncompletedMpuPendingUpload.s3MPUPendingUpload.getUploadId())
+                            .build());
+                }, fileSystemExecutor));
+            }
         }
 
         public void doNothing() {
@@ -1348,6 +1478,7 @@ public class HMSTransaction implements Transaction {
 
         public void doCommit() {
             waitForAsyncFileSystemTasks();
+            runS3cleanWhenSuccess();
             doAddPartitionsTask();
             doUpdateStatisticsTasks();
             doNothing();
@@ -1365,6 +1496,11 @@ public class HMSTransaction implements Transaction {
         public void rollback() {
             //delete write path
             pruneAndDeleteStagingDirectories();
+            // abort the in-progress multipart uploads
+            abortMultiUploads();
+            for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
+                MoreFutures.getFutureValue(future, RuntimeException.class);
+            }
         }
     }
 
@@ -1385,7 +1521,7 @@ public class HMSTransaction implements Transaction {
     public Status wrapperDeleteWithProfileSummary(String remotePath) {
         summaryProfile.ifPresent(profile -> {
             profile.setTempStartTime();
-            profile.incDeleteDirRecursiveCnt();
+            profile.incDeleteFileCnt();
         });
 
         Status status = fs.delete(remotePath);
@@ -1394,6 +1530,18 @@ public class HMSTransaction implements Transaction {
         return status;
     }
 
+    public Status wrapperDeleteDirWithProfileSummary(String remotePath) {
+        summaryProfile.ifPresent(profile -> {
+            profile.setTempStartTime();
+            profile.incDeleteDirRecursiveCnt();
+        });
+
+        Status status = fs.deleteDirectory(remotePath);
+
+        summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime);
+        return status;
+    }
+
     public void wrapperAsyncRenameWithProfileSummary(Executor executor,
                                                      
List<CompletableFuture<?>> renameFileFutures,
                                                      AtomicBoolean cancelled,
@@ -1415,4 +1563,37 @@ public class HMSTransaction implements Transaction {
                 fs, executor, renameFileFutures, cancelled, origFilePath, 
destFilePath, runWhenPathNotExist);
         summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt);
     }
+
+    private void s3Commit(Executor fileSystemExecutor, 
List<CompletableFuture<?>> asyncFileSystemTaskFutures,
+            AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate 
hivePartitionUpdate, String path) {
+        S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) 
fs).fileSystem(path);
+        S3Client s3Client;
+        try {
+            s3Client = (S3Client) s3FileSystem.getObjStorage().getClient();
+        } catch (UserException e) {
+            throw new RuntimeException(e);
+        }
+
+        for (TS3MPUPendingUpload s3MPUPendingUpload : 
hivePartitionUpdate.getS3MpuPendingUploads()) {
+            asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> {
+                if (fileSystemTaskCancelled.get()) {
+                    return;
+                }
+                List<CompletedPart> completedParts = Lists.newArrayList();
+                for (Map.Entry<Integer, String> entry : 
s3MPUPendingUpload.getEtags().entrySet()) {
+                    
completedParts.add(CompletedPart.builder().eTag(entry.getValue()).partNumber(entry.getKey())
+                            .build());
+                }
+
+                
s3Client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
+                        .bucket(s3MPUPendingUpload.getBucket())
+                        .key(s3MPUPendingUpload.getKey())
+                        .uploadId(s3MPUPendingUpload.getUploadId())
+                        
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
+                        .build());
+                uncompletedMpuPendingUploads.remove(new 
UncompletedMpuPendingUpload(s3MPUPendingUpload, path));
+            }, fileSystemExecutor));
+        }
+    }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index fcebd67954e..be5ecb163b1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -349,9 +349,11 @@ public class HiveMetaStoreCache {
             List<String> partitionValues,
             String bindBrokerName) throws UserException {
         FileCacheValue result = new FileCacheValue();
+        Map<String, String> properties = new HashMap<>();
+        jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(), 
e.getValue()));
         RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                 new 
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
-                        location, bindBrokerName), jobConf, bindBrokerName));
+                        location, bindBrokerName), properties, 
bindBrokerName));
         result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, 
jobConf));
         // For Tez engine, it may generate subdirectoies for "union" query.
         // So there may be files and directories in the table directory at the 
same time. eg:
@@ -366,7 +368,7 @@ public class HiveMetaStoreCache {
             for (RemoteFile remoteFile : remoteFiles) {
                 String srcPath = remoteFile.getPath().toString();
                 LocationPath locationPath = new LocationPath(srcPath, 
catalog.getProperties());
-                Path convertedPath = locationPath.toScanRangeLocation();
+                Path convertedPath = locationPath.toStorageLocation();
                 if (!convertedPath.toString().equals(srcPath)) {
                     remoteFile.setPath(convertedPath);
                 }
@@ -777,10 +779,12 @@ public class HiveMetaStoreCache {
                         return Collections.emptyList();
                     }
                     String acidVersionPath = new Path(baseOrDeltaPath, 
"_orc_acid_version").toUri().toString();
+                    Map<String, String> properties = new HashMap<>();
+                    jobConf.iterator().forEachRemaining(e -> 
properties.put(e.getKey(), e.getValue()));
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
                                     
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
-                                            bindBrokerName), jobConf, 
bindBrokerName));
+                                            bindBrokerName), properties, 
bindBrokerName));
                     Status status = fs.exists(acidVersionPath);
                     if (status != Status.OK) {
                         if (status.getErrCode() == ErrCode.NOT_FOUND) {
@@ -800,10 +804,12 @@ public class HiveMetaStoreCache {
                 List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
                 for (AcidUtils.ParsedDelta delta : 
directory.getCurrentDirectories()) {
                     String location = delta.getPath().toString();
+                    Map<String, String> properties = new HashMap<>();
+                    jobConf.iterator().forEachRemaining(e -> 
properties.put(e.getKey(), e.getValue()));
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
                                     LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                            jobConf, bindBrokerName));
+                                            properties, bindBrokerName));
                     List<RemoteFile> remoteFiles = new ArrayList<>();
                     Status status = fs.listFiles(location, false, remoteFiles);
                     if (status.ok()) {
@@ -825,10 +831,12 @@ public class HiveMetaStoreCache {
                 // base
                 if (directory.getBaseDirectory() != null) {
                     String location = directory.getBaseDirectory().toString();
+                    Map<String, String> properties = new HashMap<>();
+                    jobConf.iterator().forEachRemaining(e -> 
properties.put(e.getKey(), e.getValue()));
                     RemoteFileSystem fs = 
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
                             new FileSystemCache.FileSystemCacheKey(
                                     LocationPath.getFSIdentity(location, 
bindBrokerName),
-                                            jobConf, bindBrokerName));
+                                            properties, bindBrokerName));
                     List<RemoteFile> remoteFiles = new ArrayList<>();
                     Status status = fs.listFiles(location, false, remoteFiles);
                     if (status.ok()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index a4566cd0b7a..1cf6595bbad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -36,8 +36,6 @@ import org.apache.doris.datasource.ExternalDatabase;
 import org.apache.doris.datasource.jdbc.client.JdbcClient;
 import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
 import org.apache.doris.datasource.operations.ExternalMetadataOps;
-import org.apache.doris.fs.FileSystem;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -61,7 +59,6 @@ public class HiveMetadataOps implements ExternalMetadataOps {
     private static final Logger LOG = 
LogManager.getLogger(HiveMetadataOps.class);
     private static final int MIN_CLIENT_POOL_SIZE = 8;
     private final HMSCachedClient client;
-    private final FileSystem fs;
     private final HMSExternalCatalog catalog;
 
     public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig 
jdbcClientConfig, HMSExternalCatalog catalog) {
@@ -74,24 +71,14 @@ public class HiveMetadataOps implements ExternalMetadataOps 
{
     public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) 
{
         this.catalog = catalog;
         this.client = client;
-        // TODO Currently only supports DFSFileSystem, more types will be 
supported in the future
-        this.fs = new DFSFileSystem(catalog.getProperties());
     }
 
-    @VisibleForTesting
-    public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client, 
FileSystem fs) {
-        this.catalog = catalog;
-        this.client = client;
-        this.fs = fs;
-    }
-
-
     public HMSCachedClient getClient() {
         return client;
     }
 
-    public FileSystem getFs() {
-        return fs;
+    public HMSExternalCatalog getCatalog() {
+        return catalog;
     }
 
     public static HMSCachedClient createCachedClient(HiveConf hiveConf, int 
thriftClientPoolSize,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index eb1d77a322d..94748e7e427 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -354,7 +354,7 @@ public class HudiScanNode extends HiveScanNode {
                     long fileSize = baseFile.getFileSize();
                     // Need add hdfs host to location
                     LocationPath locationPath = new LocationPath(filePath, 
hmsTable.getCatalogProperties());
-                    Path splitFilePath = locationPath.toScanRangeLocation();
+                    Path splitFilePath = locationPath.toStorageLocation();
                     splits.add(new FileSplit(splitFilePath, 0, fileSize, 
fileSize,
                             new String[0], partition.getPartitionValues()));
                 });
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 961fb8ae1d6..21826dfd8d5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -150,7 +150,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                 TIcebergDeleteFileDesc deleteFileDesc = new 
TIcebergDeleteFileDesc();
                 String deleteFilePath = filter.getDeleteFilePath();
                 LocationPath locationPath = new LocationPath(deleteFilePath, 
icebergSplit.getConfig());
-                Path splitDeletePath = locationPath.toScanRangeLocation();
+                Path splitDeletePath = locationPath.toStorageLocation();
                 deleteFileDesc.setPath(splitDeletePath.toString());
                 if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
                     fileDesc.setContent(FileContent.POSITION_DELETES.id());
@@ -244,7 +244,7 @@ public class IcebergScanNode extends FileQueryScanNode {
                     partitionPathSet.add(structLike.toString());
                 }
                 LocationPath locationPath = new LocationPath(dataFilePath, 
source.getCatalog().getProperties());
-                Path finalDataFilePath = locationPath.toScanRangeLocation();
+                Path finalDataFilePath = locationPath.toStorageLocation();
                 IcebergSplit split = new IcebergSplit(
                         finalDataFilePath,
                         splitTask.start(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index ddb5a8c4f3d..b9672f70c41 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -161,7 +161,7 @@ public class PaimonScanNode extends FileQueryScanNode {
                     List<RawFile> rawFiles = optRawFiles.get();
                     for (RawFile file : rawFiles) {
                         LocationPath locationPath = new 
LocationPath(file.path(), source.getCatalog().getProperties());
-                        Path finalDataFilePath = 
locationPath.toScanRangeLocation();
+                        Path finalDataFilePath = 
locationPath.toStorageLocation();
                         try {
                             splits.addAll(
                                     splitFile(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index 94f5c420438..b6fd96bbd37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -38,6 +38,10 @@ public interface FileSystem {
 
     Status exists(String remotePath);
 
+    default Status directoryExists(String dir) {
+        return exists(dir);
+    }
+
     Status downloadWithFileSize(String remoteFilePath, String localFilePath, 
long fileSize);
 
     Status upload(String localPath, String remotePath);
@@ -58,6 +62,10 @@ public interface FileSystem {
 
     Status delete(String remotePath);
 
+    default Status deleteDirectory(String dir) {
+        return delete(dir);
+    }
+
     Status makeDir(String remotePath);
 
     Status listFiles(String remotePath, boolean recursive, List<RemoteFile> 
result);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
index 149bbe2d378..dd66c359b9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
@@ -23,8 +23,8 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.apache.hadoop.mapred.JobConf;
 
+import java.util.Map;
 import java.util.Objects;
 import java.util.OptionalLong;
 
@@ -44,7 +44,7 @@ public class FileSystemCache {
     }
 
     private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
-        return FileSystemFactory.getRemoteFileSystem(key.type, key.conf, 
key.bindBrokerName);
+        return FileSystemFactory.getRemoteFileSystem(key.type, key.properties, 
key.bindBrokerName);
     }
 
     public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
@@ -55,13 +55,14 @@ public class FileSystemCache {
         private final FileSystemType type;
         // eg: hdfs://nameservices1
         private final String fsIdent;
-        private final JobConf conf;
+        private final Map<String, String> properties;
         private final String bindBrokerName;
 
-        public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf 
conf, String bindBrokerName) {
+        public FileSystemCacheKey(Pair<FileSystemType, String> fs,
+                Map<String, String> properties, String bindBrokerName) {
             this.type = fs.first;
             this.fsIdent = fs.second;
-            this.conf = conf;
+            this.properties = properties;
             this.bindBrokerName = bindBrokerName;
         }
 
@@ -75,7 +76,7 @@ public class FileSystemCache {
             }
             boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey) 
obj).type)
                     && fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
-                    && conf == ((FileSystemCacheKey) obj).conf;
+                    && properties == ((FileSystemCacheKey) obj).properties;
             if (bindBrokerName == null) {
                 return equalsWithoutBroker;
             }
@@ -85,9 +86,9 @@ public class FileSystemCache {
         @Override
         public int hashCode() {
             if (bindBrokerName == null) {
-                return Objects.hash(conf, fsIdent, type);
+                return Objects.hash(properties, fsIdent, type);
             }
-            return Objects.hash(conf, fsIdent, type, bindBrokerName);
+            return Objects.hash(properties, fsIdent, type, bindBrokerName);
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index 63f552a8ab8..cd7212c8e39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 
 public class FileSystemFactory {
@@ -51,10 +50,8 @@ public class FileSystemFactory {
         }
     }
 
-    public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, 
Configuration conf,
+    public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, 
Map<String, String> properties,
                                                        String bindBrokerName) {
-        Map<String, String> properties = new HashMap<>();
-        conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), 
e.getValue()));
         switch (type) {
             case S3:
                 return new S3FileSystem(properties);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java
similarity index 74%
copy from 
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
copy to fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java
index 334258a3f12..aab7471fd99 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java
@@ -15,13 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.transaction;
+package org.apache.doris.fs;
 
-import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.datasource.SessionContext;
 
-public class TransactionManagerFactory {
-
-    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops) {
-        return new HiveTransactionManager(ops);
-    }
+public interface FileSystemProvider {
+    FileSystem get(SessionContext ctx);
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java
new file mode 100644
index 00000000000..680592ab4a8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.fs.remote.SwitchingFileSystem;
+
+import java.util.Map;
+
+public class FileSystemProviderImpl implements FileSystemProvider {
+    private ExternalMetaCacheMgr extMetaCacheMgr;
+    private String bindBrokerName;
+
+    private Map<String, String> properties;
+
+    public FileSystemProviderImpl(ExternalMetaCacheMgr extMetaCacheMgr, String 
bindBrokerName,
+            Map<String, String> properties) {
+        this.extMetaCacheMgr = extMetaCacheMgr;
+        this.bindBrokerName = bindBrokerName;
+        this.properties = properties;
+    }
+
+    @Override
+    public FileSystem get(SessionContext ctx) {
+        return new SwitchingFileSystem(extMetaCacheMgr, bindBrokerName, 
properties);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
index 57a10ed1109..93e79bf94b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
@@ -48,6 +48,11 @@ public class LocalDfsFileSystem implements FileSystem {
         return null;
     }
 
+    @Override
+    public Status directoryExists(String dir) {
+        return exists(dir);
+    }
+
     @Override
     public Status exists(String remotePath) {
         boolean exists = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
index 72b75350140..f821e5bb6ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
@@ -31,6 +31,7 @@ import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Comparator;
 
 public abstract class ObjFileSystem extends RemoteFileSystem {
@@ -43,11 +44,20 @@ public abstract class ObjFileSystem extends 
RemoteFileSystem {
         this.objStorage = objStorage;
     }
 
+    public ObjStorage<?> getObjStorage() {
+        return objStorage;
+    }
+
     @Override
     public Status exists(String remotePath) {
         return objStorage.headObject(remotePath);
     }
 
+    @Override
+    public Status directoryExists(String dir) {
+        return listFiles(dir, false, new ArrayList<>());
+    }
+
     /**
      * download data from remote file and check data size with expected file 
size.
      * @param remoteFilePath remote file path
@@ -139,4 +149,9 @@ public abstract class ObjFileSystem extends 
RemoteFileSystem {
     public Status delete(String remotePath) {
         return objStorage.deleteObject(remotePath);
     }
+
+    @Override
+    public Status deleteDirectory(String absolutePath) {
+        return objStorage.deleteObjects(absolutePath);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 5771c65224b..3869824de55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -107,8 +107,4 @@ public class S3FileSystem extends ObjFileSystem {
         }
         return Status.OK;
     }
-
-    public Status deleteDirectory(String absolutePath) {
-        return ((S3ObjStorage) objStorage).deleteObjects(absolutePath);
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
new file mode 100644
index 00000000000..00802922ef3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs.remote;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.FileSystemCache;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SwitchingFileSystem implements FileSystem {
+
+    private final ExternalMetaCacheMgr extMetaCacheMgr;
+
+    private final String bindBrokerName;
+
+    private final Map<String, String> properties;
+
+    public SwitchingFileSystem(ExternalMetaCacheMgr extMetaCacheMgr, String 
bindBrokerName,
+            Map<String, String> properties) {
+        this.extMetaCacheMgr = extMetaCacheMgr;
+        this.bindBrokerName = bindBrokerName;
+        this.properties = properties;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public Status exists(String remotePath) {
+        return fileSystem(remotePath).exists(remotePath);
+    }
+
+    @Override
+    public Status directoryExists(String dir) {
+        return fileSystem(dir).directoryExists(dir);
+    }
+
+    @Override
+    public Status downloadWithFileSize(String remoteFilePath, String 
localFilePath, long fileSize) {
+        return fileSystem(remoteFilePath).downloadWithFileSize(remoteFilePath, 
localFilePath, fileSize);
+    }
+
+    @Override
+    public Status upload(String localPath, String remotePath) {
+        return fileSystem(localPath).upload(localPath, remotePath);
+    }
+
+    @Override
+    public Status directUpload(String content, String remoteFile) {
+        return fileSystem(remoteFile).directUpload(content, remoteFile);
+    }
+
+    @Override
+    public Status rename(String origFilePath, String destFilePath) {
+        return fileSystem(origFilePath).rename(origFilePath, destFilePath);
+    }
+
+    @Override
+    public Status renameDir(String origFilePath, String destFilePath) {
+        return fileSystem(origFilePath).renameDir(origFilePath, destFilePath);
+    }
+
+    @Override
+    public Status renameDir(String origFilePath, String destFilePath, Runnable 
runWhenPathNotExist) {
+        return fileSystem(origFilePath).renameDir(origFilePath, destFilePath, 
runWhenPathNotExist);
+    }
+
+    @Override
+    public Status delete(String remotePath) {
+        return fileSystem(remotePath).delete(remotePath);
+    }
+
+    @Override
+    public Status deleteDirectory(String absolutePath) {
+        return fileSystem(absolutePath).deleteDirectory(absolutePath);
+    }
+
+    @Override
+    public Status makeDir(String remotePath) {
+        return fileSystem(remotePath).makeDir(remotePath);
+    }
+
+    @Override
+    public Status listFiles(String remotePath, boolean recursive, 
List<RemoteFile> result) {
+        return fileSystem(remotePath).listFiles(remotePath, recursive, result);
+    }
+
+    @Override
+    public Status globList(String remotePath, List<RemoteFile> result) {
+        return fileSystem(remotePath).globList(remotePath, result);
+    }
+
+    @Override
+    public Status globList(String remotePath, List<RemoteFile> result, boolean 
fileNameOnly) {
+        return fileSystem(remotePath).globList(remotePath, result, 
fileNameOnly);
+    }
+
+    @Override
+    public Status listDirectories(String remotePath, Set<String> result) {
+        return fileSystem(remotePath).listDirectories(remotePath, result);
+    }
+
+    public FileSystem fileSystem(String location) {
+        return extMetaCacheMgr.getFsCache().getRemoteFileSystem(
+                new FileSystemCache.FileSystemCacheKey(
+                        LocationPath.getFSIdentity(location,
+                                bindBrokerName), properties, bindBrokerName));
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index c45bdc1ebc9..6374dfddc66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -33,6 +33,7 @@ import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TFileCompressType;
 import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.THiveBucket;
 import org.apache.doris.thrift.THiveColumn;
 import org.apache.doris.thrift.THiveColumnType;
@@ -128,21 +129,35 @@ public class HiveTableSink extends DataSink {
         setCompressType(tSink, formatType);
 
         THiveLocationParams locationParams = new THiveLocationParams();
-        String location = sd.getLocation();
-
-        String writeTempPath = createTempPath(location);
-        locationParams.setWritePath(writeTempPath);
-        locationParams.setTargetPath(location);
-        locationParams.setFileType(LocationPath.getTFileTypeForBE(location));
+        LocationPath locationPath = new LocationPath(sd.getLocation(), 
targetTable.getHadoopProperties());
+        String location = locationPath.toString();
+        String storageLocation = locationPath.toStorageLocation().toString();
+        TFileType fileType = locationPath.getTFileTypeForBE();
+        if (fileType == TFileType.FILE_S3) {
+            locationParams.setWritePath(storageLocation);
+            locationParams.setOriginalWritePath(location);
+            locationParams.setTargetPath(location);
+            if (insertCtx.isPresent()) {
+                HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
+                tSink.setOverwrite(context.isOverwrite());
+                context.setWritePath(storageLocation);
+            }
+        } else {
+            String writeTempPath = createTempPath(location);
+            locationParams.setWritePath(writeTempPath);
+            locationParams.setOriginalWritePath(writeTempPath);
+            locationParams.setTargetPath(location);
+            if (insertCtx.isPresent()) {
+                HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
+                tSink.setOverwrite(context.isOverwrite());
+                context.setWritePath(writeTempPath);
+            }
+        }
+        locationParams.setFileType(fileType);
         tSink.setLocation(locationParams);
 
         tSink.setHadoopConfig(targetTable.getHadoopProperties());
 
-        if (insertCtx.isPresent()) {
-            HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
-            tSink.setOverwrite(context.isOverwrite());
-            context.setWritePath(writeTempPath);
-        }
         tDataSink = new TDataSink(getDataSinkType());
         tDataSink.setHiveTableSink(tSink);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
index 2499cc6eba4..838d135fa45 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
@@ -21,23 +21,32 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.hive.HMSTransaction;
 import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.fs.FileSystemProvider;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 public class HiveTransactionManager implements TransactionManager {
 
     private final Map<Long, HMSTransaction> transactions = new 
ConcurrentHashMap<>();
     private final HiveMetadataOps ops;
 
-    public HiveTransactionManager(HiveMetadataOps ops) {
+    private final FileSystemProvider fileSystemProvider;
+
+    private final Executor fileSystemExecutor;
+
+    public HiveTransactionManager(HiveMetadataOps ops, FileSystemProvider 
fileSystemProvider,
+            Executor fileSystemExecutor) {
         this.ops = ops;
+        this.fileSystemProvider = fileSystemProvider;
+        this.fileSystemExecutor = fileSystemExecutor;
     }
 
     @Override
     public long begin() {
         long id = Env.getCurrentEnv().getNextId();
-        HMSTransaction hiveTransaction = new HMSTransaction(ops);
+        HMSTransaction hiveTransaction = new HMSTransaction(ops, 
fileSystemProvider, fileSystemExecutor);
         transactions.put(id, hiveTransaction);
         return id;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
index 334258a3f12..394494a129d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
@@ -18,10 +18,14 @@
 package org.apache.doris.transaction;
 
 import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.fs.FileSystemProvider;
+
+import java.util.concurrent.Executor;
 
 public class TransactionManagerFactory {
 
-    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops) {
-        return new HiveTransactionManager(ops);
+    public static TransactionManager 
createHiveTransactionManager(HiveMetadataOps ops,
+            FileSystemProvider fileSystemProvider, Executor 
fileSystemExecutor) {
+        return new HiveTransactionManager(ops, fileSystemProvider, 
fileSystemExecutor);
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
index 571826aa9c8..69130f57fff 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
@@ -34,7 +34,7 @@ public class LocationPathTest {
         LocationPath locationPath = new LocationPath("hdfs://dir/file.path", 
rangeProps);
         Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
 
-        String beLocation = locationPath.toScanRangeLocation().toString();
+        String beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("hdfs://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
null).first, FileSystemType.DFS);
 
@@ -45,21 +45,21 @@ public class LocationPathTest {
         Assertions.assertTrue(locationPath.get().startsWith("hdfs://")
                 && !locationPath.get().startsWith("hdfs:///"));
 
-        beLocation = locationPath.toScanRangeLocation().toString();
+        beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("hdfs://") && 
!beLocation.startsWith("hdfs:///"));
 
         // nonstandard '/' for hdfs path
         locationPath = new LocationPath("hdfs:/dir/file.path", props);
         Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
 
-        beLocation = locationPath.toScanRangeLocation().toString();
+        beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("hdfs://"));
 
         // empty ha nameservices
         props.put("dfs.nameservices", "");
         locationPath = new LocationPath("hdfs:/dir/file.path", props);
 
-        beLocation = locationPath.toScanRangeLocation().toString();
+        beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(locationPath.get().startsWith("/dir")
                 && !locationPath.get().startsWith("hdfs://"));
         Assertions.assertTrue(beLocation.startsWith("/dir") && 
!beLocation.startsWith("hdfs://"));
@@ -75,7 +75,7 @@ public class LocationPathTest {
         // FE
         Assertions.assertTrue(locationPath.get().startsWith("jfs://"));
         // BE
-        loc = locationPath.toScanRangeLocation().toString();
+        loc = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(loc.startsWith("jfs://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first, 
FileSystemType.JFS);
     }
@@ -89,7 +89,7 @@ public class LocationPathTest {
         // FE
         Assertions.assertTrue(locationPath.get().startsWith("s3://"));
         // BE
-        String beLoc = locationPath.toScanRangeLocation().toString();
+        String beLoc = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLoc.startsWith("s3://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first, 
FileSystemType.S3);
     }
@@ -101,7 +101,7 @@ public class LocationPathTest {
         // FE
         Assertions.assertTrue(locationPath.get().startsWith("oss://"));
         // BE
-        String beLocation = locationPath.toScanRangeLocation().toString();
+        String beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("s3://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
null).first, FileSystemType.S3);
 
@@ -109,7 +109,7 @@ public class LocationPathTest {
         // FE
         
Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs"));
         // BE
-        beLocation = locationPath.toScanRangeLocation().toString();
+        beLocation = locationPath.toStorageLocation().toString();
         
Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
null).first, FileSystemType.DFS);
 
@@ -121,7 +121,7 @@ public class LocationPathTest {
         LocationPath locationPath = new LocationPath("cos://test.com", 
rangeProps);
         // FE
         Assertions.assertTrue(locationPath.get().startsWith("cos://"));
-        String beLocation = locationPath.toScanRangeLocation().toString();
+        String beLocation = locationPath.toStorageLocation().toString();
         // BE
         Assertions.assertTrue(beLocation.startsWith("s3://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
null).first, FileSystemType.S3);
@@ -130,7 +130,7 @@ public class LocationPathTest {
         // FE
         Assertions.assertTrue(locationPath.get().startsWith("cosn://"));
         // BE
-        beLocation = locationPath.toScanRangeLocation().toString();
+        beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("s3://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
null).first, FileSystemType.S3);
 
@@ -138,7 +138,7 @@ public class LocationPathTest {
         // FE
         Assertions.assertTrue(locationPath.get().startsWith("ofs://"));
         // BE
-        beLocation = locationPath.toScanRangeLocation().toString();
+        beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("ofs://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
null).first, FileSystemType.OFS);
 
@@ -147,7 +147,7 @@ public class LocationPathTest {
         // FE
         Assertions.assertTrue(locationPath.get().startsWith("gfs://"));
         // BE
-        beLocation = locationPath.toScanRangeLocation().toString();
+        beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("gfs://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
null).first, FileSystemType.DFS);
     }
@@ -159,7 +159,7 @@ public class LocationPathTest {
         // FE
         Assertions.assertTrue(locationPath.get().startsWith("obs://"));
         // BE
-        String beLocation = locationPath.toScanRangeLocation().toString();
+        String beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("s3://"));
         Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, 
null).first, FileSystemType.S3);
     }
@@ -173,7 +173,7 @@ public class LocationPathTest {
         Assertions.assertTrue(locationPath.get().startsWith("unknown://"));
         Assertions.assertTrue(locationPath.getLocationType() == 
LocationPath.LocationType.UNKNOWN);
         // BE
-        String beLocation = locationPath.toScanRangeLocation().toString();
+        String beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.startsWith("unknown://"));
     }
 
@@ -186,7 +186,7 @@ public class LocationPathTest {
         
Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local"));
         Assertions.assertTrue(locationPath.getLocationType() == 
LocationPath.LocationType.NOSCHEME);
         // BE
-        String beLocation = locationPath.toScanRangeLocation().toString();
+        String beLocation = locationPath.toStorageLocation().toString();
         Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local"));
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index ba87dd8f48e..e441262f12e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -21,7 +21,10 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.TestHMSCachedClient;
+import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.FileSystemProvider;
 import org.apache.doris.fs.LocalDfsFileSystem;
+import org.apache.doris.fs.remote.SwitchingFileSystem;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.THiveLocationParams;
@@ -54,16 +57,21 @@ import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class HmsCommitTest {
 
     private static HiveMetadataOps hmsOps;
     private static HMSCachedClient hmsClient;
+
+    private static FileSystemProvider fileSystemProvider;
     private static final String dbName = "test_db";
     private static final String tbWithPartition = "test_tb_with_partition";
     private static final String tbWithoutPartition = 
"test_tb_without_partition";
-    private static LocalDfsFileSystem fs;
+    private static FileSystem fs;
+    private static LocalDfsFileSystem localDFSFileSystem;
+    private static Executor fileSystemExecutor;
     static String dbLocation;
     static String writeLocation;
     static String uri = "thrift://127.0.0.1:9083";
@@ -86,7 +94,14 @@ public class HmsCommitTest {
     }
 
     public static void createTestHiveCatalog() throws IOException {
-        fs = new LocalDfsFileSystem();
+        localDFSFileSystem = new LocalDfsFileSystem();
+        new MockUp<SwitchingFileSystem>(SwitchingFileSystem.class) {
+            @Mock
+            public FileSystem fileSystem(String location) {
+                return localDFSFileSystem;
+            }
+        };
+        fs = new SwitchingFileSystem(null, null, null);
 
         if (hasRealHmsService) {
             // If you have a real HMS service, then you can use this client to 
create real connections for testing
@@ -96,7 +111,9 @@ public class HmsCommitTest {
         } else {
             hmsClient = new TestHMSCachedClient();
         }
-        hmsOps = new HiveMetadataOps(null, hmsClient, fs);
+        hmsOps = new HiveMetadataOps(null, hmsClient);
+        fileSystemProvider = ctx -> fs;
+        fileSystemExecutor = Executors.newFixedThreadPool(16);
     }
 
     public static void createTestHiveDatabase() {
@@ -339,9 +356,9 @@ public class HmsCommitTest {
             fs.makeDir(targetPath);
         }
 
-        fs.createFile(writePath + "/" + f1);
-        fs.createFile(writePath + "/" + f2);
-        fs.createFile(writePath + "/" + f3);
+        localDFSFileSystem.createFile(writePath + "/" + f1);
+        localDFSFileSystem.createFile(writePath + "/" + f2);
+        localDFSFileSystem.createFile(writePath + "/" + f3);
         return pu;
     }
 
@@ -363,7 +380,7 @@ public class HmsCommitTest {
     public void commit(String dbName,
                        String tableName,
                        List<THivePartitionUpdate> hivePUs) {
-        HMSTransaction hmsTransaction = new HMSTransaction(hmsOps);
+        HMSTransaction hmsTransaction = new HMSTransaction(hmsOps, 
fileSystemProvider, fileSystemExecutor);
         hmsTransaction.setHivePartitionUpdates(hivePUs);
         HiveInsertCommandContext ctx = new HiveInsertCommandContext();
         String queryId = DebugUtil.printId(ConnectContext.get().queryId());
@@ -634,3 +651,4 @@ public class HmsCommitTest {
         assertNumRows(3, pa);
     }
 }
+
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 93de397b27b..834c9025cfd 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -282,6 +282,8 @@ struct THiveLocationParams {
   1: optional string write_path
   2: optional string target_path
   3: optional Types.TFileType file_type
+  // Other object store will convert write_path to s3 scheme path for BE, this 
field keeps the original write path.
+  4: optional string original_write_path
 }
 
 struct TSortedColumn {
@@ -336,6 +338,13 @@ enum TUpdateMode {
     OVERWRITE = 2 // insert overwrite
 }
 
+struct TS3MPUPendingUpload {
+    1: optional string bucket
+    2: optional string key
+    3: optional string upload_id
+    4: optional map<i32, string> etags
+}
+
 struct THivePartitionUpdate {
     1: optional string name
     2: optional TUpdateMode update_mode
@@ -343,6 +352,7 @@ struct THivePartitionUpdate {
     4: optional list<string> file_names
     5: optional i64 row_count
     6: optional i64 file_size
+    7: optional list<TS3MPUPendingUpload> s3_mpu_pending_uploads
 }
 
 struct TDataSink {
diff --git 
a/regression-test/data/external_table_p2/hive/test_hive_write_insert_s3.out 
b/regression-test/data/external_table_p2/hive/test_hive_write_insert_s3.out
new file mode 100644
index 00000000000..7d34ada0550
--- /dev/null
+++ b/regression-test/data/external_table_p2/hive/test_hive_write_insert_s3.out
@@ -0,0 +1,61 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !q01 --
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q02 --
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q03 --
+\N     \N      \N      \N      \N      -123.45 \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {2:20}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [3.4567, 4.5678]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      \N
+\N     \N      \N      \N      \N      -123.45 \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {3:20}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [8.4567, 4.5678]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      \N
+\N     \N      \N      \N      \N      123.45  \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {1:10}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [1.2345, 2.3456]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      \N
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q04 --
+\N     \N      \N      \N      \N      -123.45 \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {2:20}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [3.4567, 4.5678]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      \N
+\N     \N      \N      \N      \N      -123.45 \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {3:20}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [8.4567, 4.5678]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      \N
+\N     \N      \N      \N      \N      123.45  \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {1:10}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [1.2345, 2.3456]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      \N
+
+-- !q05 --
+
+-- !q01 --
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q02 --
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q03 --
+\N     \N      \N      \N      \N      -123.45 \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {2:20}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [3.4567, 4.5678]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      20240321
+\N     \N      \N      \N      \N      -123.45 \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {3:20}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [8.4567, 4.5678]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      20240322
+\N     \N      \N      \N      \N      123.45  \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {1:10}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [1.2345, 2.3456]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      20240320
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-21      
2024-03-21T12:00        2024-03-21T12:00:00.123456      
2024-03-21T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {2:20}  {2:200000000000}        
{2.2:20.2}      {2.2:20.2}      {0:1}   {2.2:2.2}       {2.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false  -128    -32768  -2147483648     -9223372036854775808    -123.45 
-123456.789     -123456789      -1234.5678      -123456.789012  
-123456789.012345678901 string_value    binary_value    2024-03-22      
2024-03-22T12:00        2024-03-22T12:00:00.123456      
2024-03-22T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"x":"y"}       {3:20}  {3:200000000000}        
{3.2:20.2}      {3.2:20.2}      {0:1}   {3.2:2.2}       {3.34:2.34}     
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+true   127     32767   2147483647      9223372036854775807     123.45  
123456.789      123456789       1234.5678       123456.789012   
123456789.012345678901  string_value    binary_value    2024-03-20      
2024-03-20T12:00        2024-03-20T12:00:00.123456      
2024-03-20T12:00:00.123456      char_value1     char_value2     char_value3     
varchar_value1  varchar_value2  varchar_value3  {"key1":"value1"}       
{"key1":"value1"}       {"a":"b"}       {1:10}  {1:100000000000}        
{1.1:10.1}      {1.1:10.1}      {1:0}   {1.1:1.1}       {1.23:1.23}     
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q04 --
+\N     \N      \N      \N      \N      -123.45 \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {2:20}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [3.4567, 4.5678]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      20240321
+\N     \N      \N      \N      \N      -123.45 \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {3:20}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [8.4567, 4.5678]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      20240322
+\N     \N      \N      \N      \N      123.45  \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      {1:10}  \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N      [1.2345, 2.3456]        
\N      \N      \N      \N      \N      \N      \N      \N      \N      \N      
[null, "value1", "value2"]      \N      \N      \N      20240320
+
diff --git 
a/regression-test/suites/external_table_p2/hive/test_hive_write_insert_s3.groovy
 
b/regression-test/suites/external_table_p2/hive/test_hive_write_insert_s3.groovy
new file mode 100644
index 00000000000..87633ba1b09
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/hive/test_hive_write_insert_s3.groovy
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_write_insert_s3", 
"p2,external,hive,external_remote,external_remote_hive") {
+    def format_compressions = ["parquet_snappy"]
+
+    def q01 = { String format_compression, String catalog_name ->
+        logger.info("hive sql: " + """ truncate table 
all_types_${format_compression}_s3; """)
+        hive_remote """ truncate table all_types_${format_compression}_s3; """
+        sql """refresh catalog ${catalog_name};"""
+
+        sql """
+        INSERT INTO all_types_${format_compression}_s3
+        SELECT * FROM all_types_parquet_snappy_src;
+        """
+        order_qt_q01 """ select * from all_types_${format_compression}_s3;
+        """
+
+        sql """
+        INSERT INTO all_types_${format_compression}_s3
+        SELECT boolean_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, decimal_col1, decimal_col2,
+         decimal_col3, decimal_col4, string_col, binary_col, date_col, 
timestamp_col1, timestamp_col2, timestamp_col3, char_col1,
+          char_col2, char_col3, varchar_col1, varchar_col2, varchar_col3, 
t_map_string, t_map_varchar, t_map_char, t_map_int,
+           t_map_bigint, t_map_float, t_map_double, t_map_boolean, 
t_map_decimal_precision_2, t_map_decimal_precision_4,
+            t_map_decimal_precision_8, t_map_decimal_precision_17, 
t_map_decimal_precision_18, t_map_decimal_precision_38,
+             t_array_string, t_array_int, t_array_bigint, t_array_float, 
t_array_double, t_array_boolean, t_array_varchar,
+              t_array_char, t_array_decimal_precision_2, 
t_array_decimal_precision_4, t_array_decimal_precision_8,
+               t_array_decimal_precision_17, t_array_decimal_precision_18, 
t_array_decimal_precision_38, t_struct_bigint, t_complex,
+                t_struct_nested, t_struct_null, 
t_struct_non_nulls_after_nulls, t_nested_struct_non_nulls_after_nulls,
+                 t_map_null_value, t_array_string_starting_with_nulls, 
t_array_string_with_nulls_in_between,
+                  t_array_string_ending_with_nulls, t_array_string_all_nulls, 
dt FROM all_types_parquet_snappy_src;
+        """
+        order_qt_q02 """ select * from all_types_${format_compression}_s3;
+        """
+
+        sql """
+        INSERT INTO all_types_${format_compression}_s3(float_col, t_map_int, 
t_array_decimal_precision_8, t_array_string_starting_with_nulls)
+        SELECT float_col, t_map_int, t_array_decimal_precision_8, 
t_array_string_starting_with_nulls FROM all_types_parquet_snappy_src;
+        """
+        order_qt_q03 """
+        select * from all_types_${format_compression}_s3;
+        """
+
+        sql """
+        INSERT OVERWRITE TABLE all_types_${format_compression}_s3(float_col, 
t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls)
+        SELECT float_col, t_map_int, t_array_decimal_precision_8, 
t_array_string_starting_with_nulls FROM all_types_parquet_snappy_src;
+        """
+        order_qt_q04 """
+        select * from all_types_${format_compression}_s3;
+        """
+
+        logger.info("hive sql: " + """ truncate table 
all_types_${format_compression}_s3; """)
+        hive_remote """ truncate table all_types_${format_compression}_s3; """
+        sql """refresh catalog ${catalog_name};"""
+        order_qt_q05 """
+        select * from all_types_${format_compression}_s3;
+        """
+    }
+
+    def q02 = { String format_compression, String catalog_name ->
+        logger.info("hive sql: " + """ DROP TABLE IF EXISTS 
all_types_par_${format_compression}_s3_${catalog_name}_q02; """)
+        hive_remote """ DROP TABLE IF EXISTS 
all_types_par_${format_compression}_s3_${catalog_name}_q02; """
+        logger.info("hive sql: " + """ CREATE TABLE IF NOT EXISTS 
all_types_par_${format_compression}_s3_${catalog_name}_q02 like 
all_types_par_${format_compression}_s3; """)
+        hive_remote """ CREATE TABLE IF NOT EXISTS 
all_types_par_${format_compression}_s3_${catalog_name}_q02 like 
all_types_par_${format_compression}_s3; """
+        logger.info("hive sql: " + """ ALTER TABLE 
all_types_par_${format_compression}_s3_${catalog_name}_q02 SET LOCATION 
'cosn://doris-build-1308700295/regression/write/data/all_types_par_${format_compression}_s3_${catalog_name}_q02';
 """)
+        hive_remote """ ALTER TABLE 
all_types_par_${format_compression}_s3_${catalog_name}_q02 SET LOCATION 
'cosn://doris-build-1308700295/regression/write/data/all_types_par_${format_compression}_s3_${catalog_name}_q02';
 """
+        sql """refresh catalog ${catalog_name};"""
+
+        sql """
+        INSERT INTO all_types_par_${format_compression}_s3_${catalog_name}_q02
+        SELECT * FROM all_types_par_parquet_snappy_src;
+        """
+        order_qt_q01 """ select * from 
all_types_par_${format_compression}_s3_${catalog_name}_q02;
+        """
+
+        sql """
+        INSERT INTO all_types_par_${format_compression}_s3_${catalog_name}_q02
+        SELECT boolean_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, decimal_col1, decimal_col2,
+         decimal_col3, decimal_col4, string_col, binary_col, date_col, 
timestamp_col1, timestamp_col2, timestamp_col3, char_col1,
+          char_col2, char_col3, varchar_col1, varchar_col2, varchar_col3, 
t_map_string, t_map_varchar, t_map_char, t_map_int,
+           t_map_bigint, t_map_float, t_map_double, t_map_boolean, 
t_map_decimal_precision_2, t_map_decimal_precision_4,
+            t_map_decimal_precision_8, t_map_decimal_precision_17, 
t_map_decimal_precision_18, t_map_decimal_precision_38,
+             t_array_string, t_array_int, t_array_bigint, t_array_float, 
t_array_double, t_array_boolean, t_array_varchar,
+              t_array_char, t_array_decimal_precision_2, 
t_array_decimal_precision_4, t_array_decimal_precision_8,
+               t_array_decimal_precision_17, t_array_decimal_precision_18, 
t_array_decimal_precision_38, t_struct_bigint, t_complex,
+                t_struct_nested, t_struct_null, 
t_struct_non_nulls_after_nulls, t_nested_struct_non_nulls_after_nulls,
+                 t_map_null_value, t_array_string_starting_with_nulls, 
t_array_string_with_nulls_in_between,
+                  t_array_string_ending_with_nulls, t_array_string_all_nulls, 
dt FROM all_types_parquet_snappy_src;
+        """
+        order_qt_q02 """ select * from 
all_types_par_${format_compression}_s3_${catalog_name}_q02;
+        """
+
+        sql """
+        INSERT INTO 
all_types_par_${format_compression}_s3_${catalog_name}_q02(float_col, 
t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls, dt)
+        SELECT float_col, t_map_int, t_array_decimal_precision_8, 
t_array_string_starting_with_nulls, dt FROM all_types_parquet_snappy_src;
+        """
+        order_qt_q03 """ select * from 
all_types_par_${format_compression}_s3_${catalog_name}_q02;
+        """
+
+        sql """
+        INSERT OVERWRITE TABLE 
all_types_par_${format_compression}_s3_${catalog_name}_q02(float_col, 
t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls, dt)
+        SELECT float_col, t_map_int, t_array_decimal_precision_8, 
t_array_string_starting_with_nulls, dt FROM all_types_parquet_snappy_src;
+        """
+        order_qt_q04 """
+        select * from 
all_types_par_${format_compression}_s3_${catalog_name}_q02;
+        """
+
+        logger.info("hive sql: " + """ DROP TABLE IF EXISTS 
all_types_par_${format_compression}_s3_${catalog_name}_q02; """)
+        hive_remote """ DROP TABLE IF EXISTS 
all_types_par_${format_compression}_s3_${catalog_name}_q02; """
+    }
+
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            String catalog_name = "test_hive_write_insert_s3"
+
+            String hms_host = context.config.otherConfigs.get("extHiveHmsHost")
+            String Hms_port = context.config.otherConfigs.get("extHiveHmsPort")
+            String hdfs_host = 
context.config.otherConfigs.get("extHiveHmsHost")
+            String hdfs_port = context.config.otherConfigs.get("extHdfsPort")
+            String externalEnvIp = 
context.config.otherConfigs.get("externalEnvIp")
+            String ak = context.config.otherConfigs.get("extAk")
+            String sk = context.config.otherConfigs.get("extSk")
+            String endpoint = context.config.otherConfigs.get("extS3Endpoint")
+
+            sql """drop catalog if exists ${catalog_name}"""
+            sql """create catalog if not exists ${catalog_name} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 'thrift://${hms_host}:${Hms_port}',
+                'fs.defaultFS' = 'hdfs://${hdfs_host}:${hdfs_port}',
+                'hadoop.username' = 'hadoop',
+                's3.endpoint' = '${endpoint}',
+                's3.access_key' = '${ak}',
+                's3.secret_key' = '${sk}'
+            );"""
+            sql """use `${catalog_name}`.`write_test`"""
+            logger.info("hive sql: " + """ use `write_test` """)
+            hive_remote """use `write_test`"""
+
+            sql """set enable_fallback_to_original_planner=false;"""
+
+            for (String format_compression in format_compressions) {
+                logger.info("Process format_compression" + format_compression)
+                q01(format_compression, catalog_name)
+                q02(format_compression, catalog_name)
+            }
+
+            sql """drop catalog if exists ${catalog_name}"""
+        } finally {
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to