This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7cb00a8e548 [Feature](hive-writer) Implements s3 file committer.
(#34307)
7cb00a8e548 is described below
commit 7cb00a8e548236a1e72ca72643fb8620899ae54a
Author: Qi Chen <[email protected]>
AuthorDate: Mon Apr 29 19:56:49 2024 +0800
[Feature](hive-writer) Implements s3 file committer. (#34307)
Backport #33937.
---
be/src/io/file_factory.cpp | 9 +-
be/src/io/file_factory.h | 4 +-
be/src/io/fs/file_writer.h | 8 +
be/src/io/fs/s3_file_writer.cpp | 130 +++++-----
be/src/io/fs/s3_file_writer.h | 17 ++
be/src/vec/sink/writer/vhive_partition_writer.cpp | 26 +-
be/src/vec/sink/writer/vhive_partition_writer.h | 1 +
be/src/vec/sink/writer/vhive_table_writer.cpp | 31 ++-
.../doris/common/profile/SummaryProfile.java | 11 +
.../org/apache/doris/common/util/LocationPath.java | 13 +-
.../java/org/apache/doris/common/util/S3URI.java | 4 +-
.../doris/datasource/hive/HMSExternalCatalog.java | 14 +-
.../doris/datasource/hive/HMSTransaction.java | 263 +++++++++++++++++----
.../doris/datasource/hive/HiveMetaStoreCache.java | 18 +-
.../doris/datasource/hive/HiveMetadataOps.java | 17 +-
.../doris/datasource/hudi/source/HudiScanNode.java | 2 +-
.../datasource/iceberg/source/IcebergScanNode.java | 4 +-
.../datasource/paimon/source/PaimonScanNode.java | 2 +-
.../main/java/org/apache/doris/fs/FileSystem.java | 8 +
.../java/org/apache/doris/fs/FileSystemCache.java | 17 +-
.../org/apache/doris/fs/FileSystemFactory.java | 5 +-
.../FileSystemProvider.java} | 11 +-
.../apache/doris/fs/FileSystemProviderImpl.java | 43 ++++
.../org/apache/doris/fs/LocalDfsFileSystem.java | 5 +
.../org/apache/doris/fs/remote/ObjFileSystem.java | 15 ++
.../org/apache/doris/fs/remote/S3FileSystem.java | 4 -
.../doris/fs/remote/SwitchingFileSystem.java | 132 +++++++++++
.../org/apache/doris/planner/HiveTableSink.java | 37 ++-
.../doris/transaction/HiveTransactionManager.java | 13 +-
.../transaction/TransactionManagerFactory.java | 8 +-
.../apache/doris/common/util/LocationPathTest.java | 30 +--
.../doris/datasource/hive/HmsCommitTest.java | 32 ++-
gensrc/thrift/DataSinks.thrift | 10 +
.../hive/test_hive_write_insert_s3.out | 61 +++++
.../hive/test_hive_write_insert_s3.groovy | 166 +++++++++++++
35 files changed, 968 insertions(+), 203 deletions(-)
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index b67faac9453..aa38e220199 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -67,7 +67,8 @@ Status FileFactory::create_file_writer(TFileType::type type,
ExecEnv* env,
const std::vector<TNetworkAddress>&
broker_addresses,
const std::map<std::string,
std::string>& properties,
const std::string& path, int64_t
start_offset,
- std::unique_ptr<io::FileWriter>&
file_writer) {
+ std::unique_ptr<io::FileWriter>&
file_writer,
+ const io::FileWriterOptions* opts) {
switch (type) {
case TFileType::FILE_LOCAL: {
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(path,
&file_writer));
@@ -76,7 +77,7 @@ Status FileFactory::create_file_writer(TFileType::type type,
ExecEnv* env,
case TFileType::FILE_BROKER: {
std::shared_ptr<io::BrokerFileSystem> fs;
RETURN_IF_ERROR(io::BrokerFileSystem::create(broker_addresses[0],
properties, &fs));
- RETURN_IF_ERROR(fs->create_file(path, &file_writer));
+ RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
break;
}
case TFileType::FILE_S3: {
@@ -87,7 +88,7 @@ Status FileFactory::create_file_writer(TFileType::type type,
ExecEnv* env,
S3ClientFactory::convert_properties_to_s3_conf(properties,
s3_uri, &s3_conf));
std::shared_ptr<io::S3FileSystem> fs;
RETURN_IF_ERROR(io::S3FileSystem::create(s3_conf, "", &fs));
- RETURN_IF_ERROR(fs->create_file(path, &file_writer));
+ RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
break;
}
case TFileType::FILE_HDFS: {
@@ -95,7 +96,7 @@ Status FileFactory::create_file_writer(TFileType::type type,
ExecEnv* env,
std::shared_ptr<io::HdfsFileSystem> fs;
RETURN_IF_ERROR(
io::HdfsFileSystem::create(hdfs_params, "",
hdfs_params.fs_name, nullptr, &fs));
- RETURN_IF_ERROR(fs->create_file(path, &file_writer));
+ RETURN_IF_ERROR(fs->create_file(path, &file_writer, opts));
break;
}
default:
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index 7c51118fc5b..48e4e2e9ed0 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -28,6 +28,7 @@
#include "common/factory_creator.h"
#include "common/status.h"
#include "io/fs/file_reader.h"
+#include "io/fs/file_reader_writer_fwd.h"
namespace doris {
namespace io {
@@ -73,7 +74,8 @@ public:
const std::vector<TNetworkAddress>&
broker_addresses,
const std::map<std::string, std::string>&
properties,
const std::string& path, int64_t
start_offset,
- std::unique_ptr<io::FileWriter>&
file_writer);
+ std::unique_ptr<io::FileWriter>&
file_writer,
+ const io::FileWriterOptions* opts =
nullptr);
/// Create FileReader
static Status create_file_reader(const io::FileSystemProperties&
system_properties,
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 6996f2e0c98..6ec972285cc 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -31,6 +31,14 @@ class FileSystem;
// Only affects remote file writers
struct FileWriterOptions {
+ // S3 committer will start multipart uploading all files on BE side,
+ // and then complete multipart upload these files on FE side.
+ // If you do not complete multi parts of a file, the file will not be
visible.
+ // So in this way, the atomicity of a single file can be guaranteed. But
it still cannot
+ // guarantee the atomicity of multiple files.
+ // Because hive committers have best-effort semantics,
+ // this shortens the inconsistent time window.
+ bool used_by_s3_committer = false;
bool write_file_cache = false;
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage
system
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 83d33ed163c..ba5fce83e5c 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -89,7 +89,8 @@ S3FileWriter::S3FileWriter(std::string key,
std::shared_ptr<S3FileSystem> fs,
_cache(nullptr),
_expiration_time(opts ? opts->file_cache_expiration : 0),
_is_cold_data(opts ? opts->is_cold_data : true),
- _write_file_cache(opts ? opts->write_file_cache : false) {
+ _write_file_cache(opts ? opts->write_file_cache : false),
+ _used_by_s3_committer(opts ? opts->used_by_s3_committer : false) {
s3_file_writer_total << 1;
s3_file_being_written << 1;
@@ -203,10 +204,7 @@ Status S3FileWriter::close() {
if (_upload_id.empty()) {
if (_pending_buf != nullptr) {
- // it might be one file less than 5MB, we do upload here
- auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
- DCHECK(buf != nullptr);
- buf->set_upload_to_remote([this](UploadFileBuffer& b) {
_put_object(b); });
+ RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
if (_bytes_appended == 0 && _create_empty_file) {
@@ -234,6 +232,13 @@ Status S3FileWriter::close() {
RETURN_IF_ERROR(builder.build(&_pending_buf));
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
+ if (_used_by_s3_committer) {
+ buf->set_upload_to_remote([part_num = _cur_part_num,
this](UploadFileBuffer& buf) {
+ _upload_one_part(part_num, buf);
+ });
+ DCHECK(_cur_part_num == 1);
+ RETURN_IF_ERROR(_create_multi_upload_request());
+ }
}
}
if (_pending_buf != nullptr) {
@@ -404,56 +409,61 @@ Status S3FileWriter::_complete() {
_wait_until_finish("PutObject");
return _st;
}
- CompleteMultipartUploadRequest complete_request;
-
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
-
+ // Wait multipart load and finish.
_wait_until_finish("Complete");
DBUG_EXECUTE_IF("s3_file_writer::_complete:1", { _cur_part_num++; });
- if (_failed || _completed_parts.size() != _cur_part_num) {
- _st = Status::InternalError(
- "error status {}, complete parts {}, cur part num {}, whole
parts {}", _st,
- _completed_parts.size(), _cur_part_num,
_dump_completed_part());
- LOG(WARNING) << _st;
- return _st;
- }
- // make sure _completed_parts are ascending order
- std::sort(_completed_parts.begin(), _completed_parts.end(),
- [](auto& p1, auto& p2) { return p1->GetPartNumber() <
p2->GetPartNumber(); });
- DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
- { _completed_parts.back()->SetPartNumber(10 *
_completed_parts.size()); });
- CompletedMultipartUpload completed_upload;
- for (size_t i = 0; i < _completed_parts.size(); i++) {
- if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
- auto st = Status::InternalError(
- "error status {}, part num not continous, expected num {},
actual num {}, "
- "whole parts {}",
- _st, i + 1, _completed_parts[i]->GetPartNumber(),
_dump_completed_part());
- LOG(WARNING) << st;
- _st = st;
- return st;
+ if (!_used_by_s3_committer) { // S3 committer will complete multipart
upload file on FE side.
+ CompleteMultipartUploadRequest complete_request;
+
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
+
+ if (_failed || _completed_parts.size() != _cur_part_num) {
+ _st = Status::InternalError(
+ "error status {}, complete parts {}, cur part num {},
whole parts {}", _st,
+ _completed_parts.size(), _cur_part_num,
_dump_completed_part());
+ LOG(WARNING) << _st;
+ return _st;
+ }
+ // make sure _completed_parts are ascending order
+ std::sort(_completed_parts.begin(), _completed_parts.end(),
+ [](auto& p1, auto& p2) { return p1->GetPartNumber() <
p2->GetPartNumber(); });
+ DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
+ { _completed_parts.back()->SetPartNumber(10 *
_completed_parts.size()); });
+ CompletedMultipartUpload completed_upload;
+ for (size_t i = 0; i < _completed_parts.size(); i++) {
+ if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
+ auto st = Status::InternalError(
+ "error status {}, part num not continous, expected num
{}, actual num {}, "
+ "whole parts {}",
+ _st, i + 1, _completed_parts[i]->GetPartNumber(),
_dump_completed_part());
+ LOG(WARNING) << st;
+ _st = st;
+ return st;
+ }
+ completed_upload.AddParts(*_completed_parts[i]);
}
- completed_upload.AddParts(*_completed_parts[i]);
- }
- complete_request.WithMultipartUpload(completed_upload);
+ complete_request.WithMultipartUpload(completed_upload);
- DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
- auto s = Status::IOError(
- "failed to create complete multi part upload (bucket={},
key={}): injected error",
- _bucket, _path.native());
- LOG_WARNING(s.to_string());
- return s;
- });
- SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
- auto complete_outcome = _client->CompleteMultipartUpload(complete_request);
+ DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
+ auto s = Status::IOError(
+ "failed to create complete multi part upload (bucket={},
key={}): injected "
+ "error",
+ _bucket, _path.native());
+ LOG_WARNING(s.to_string());
+ return s;
+ });
+ SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
+ auto complete_outcome =
_client->CompleteMultipartUpload(complete_request);
- if (!complete_outcome.IsSuccess()) {
- _st = s3fs_error(
- complete_outcome.GetError(),
- fmt::format("failed to complete multi part upload {},
upload_id={}, whole parts={}",
+ if (!complete_outcome.IsSuccess()) {
+ _st = s3fs_error(
+ complete_outcome.GetError(),
+ fmt::format(
+ "failed to complete multi part upload {},
upload_id={}, whole parts={}",
_path.native(), _upload_id,
_dump_completed_part()));
- LOG(WARNING) << _st;
- return _st;
+ LOG(WARNING) << _st;
+ return _st;
+ }
}
s3_file_created_total << 1;
return Status::OK();
@@ -466,12 +476,8 @@ Status S3FileWriter::finalize() {
// submit pending buf if it's not nullptr
// it's the last buf, we can submit it right now
if (_pending_buf != nullptr) {
- // if we only need to upload one file less than 5MB, we can just
- // call PutObject to reduce the network IO
if (_upload_id.empty()) {
- auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
- DCHECK(buf != nullptr);
- buf->set_upload_to_remote([this](UploadFileBuffer& b) {
_put_object(b); });
+ RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
_countdown_event.add_count();
RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
@@ -481,6 +487,24 @@ Status S3FileWriter::finalize() {
return _st;
}
+Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
+ auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+ DCHECK(buf != nullptr);
+ if (_used_by_s3_committer) {
+ // If used_by_s3_committer, we always use multi-parts uploading.
+ buf->set_upload_to_remote([part_num = _cur_part_num,
this](UploadFileBuffer& buf) {
+ _upload_one_part(part_num, buf);
+ });
+ DCHECK(_cur_part_num == 1);
+ RETURN_IF_ERROR(_create_multi_upload_request());
+ } else {
+ // if we only need to upload one file less than 5MB, we can just
+ // call PutObject to reduce the network IO
+ buf->set_upload_to_remote([this](UploadFileBuffer& b) {
_put_object(b); });
+ }
+ return Status::OK();
+}
+
void S3FileWriter::_put_object(UploadFileBuffer& buf) {
DCHECK(!_closed) << "closed " << _closed;
Aws::S3::Model::PutObjectRequest request;
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index f6acbc4c75b..c14cf0b0a21 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -51,12 +51,21 @@ public:
Status appendv(const Slice* data, size_t data_cnt) override;
Status finalize() override;
+ const std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>&
completed_parts() const {
+ return _completed_parts;
+ }
+
+ const std::string& key() const { return _key; }
+ const std::string& bucket() const { return _bucket; }
+ const std::string& upload_id() const { return _upload_id; }
+
private:
Status _abort();
[[nodiscard]] std::string _dump_completed_part() const;
void _wait_until_finish(std::string_view task_name);
Status _complete();
Status _create_multi_upload_request();
+ Status _set_upload_to_remote_less_than_buffer_size();
void _put_object(UploadFileBuffer& buf);
void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);
@@ -85,6 +94,14 @@ private:
int64_t _expiration_time;
bool _is_cold_data;
bool _write_file_cache;
+ // S3 committer will start multipart uploading all files on BE side,
+ // and then complete multipart upload these files on FE side.
+ // If you do not complete multi parts of a file, the file will not be
visible.
+ // So in this way, the atomicity of a single file can be guaranteed. But
it still cannot
+ // guarantee the atomicity of multiple files.
+ // Because hive committers have best-effort semantics,
+ // this shortens the inconsistent time window.
+ bool _used_by_s3_committer;
};
} // namespace io
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index a67a22e2cf1..e9e816219b6 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -17,8 +17,11 @@
#include "vhive_partition_writer.h"
+#include <aws/s3/model/CompletedPart.h>
+
#include "io/file_factory.h"
#include "io/fs/file_system.h"
+#include "io/fs/s3_file_writer.h"
#include "runtime/runtime_state.h"
#include "vec/columns/column_map.h"
#include "vec/core/materialize_block.h"
@@ -54,10 +57,11 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
_state = state;
std::vector<TNetworkAddress> broker_addresses;
+ io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
RETURN_IF_ERROR(FileFactory::create_file_writer(
_write_info.file_type, state->exec_env(), broker_addresses,
_hadoop_conf,
- fmt::format("{}/{}", _write_info.write_path,
_get_target_file_name()), 0,
- _file_writer));
+ fmt::format("{}/{}", _write_info.write_path,
_get_target_file_name()), 0, _file_writer,
+ &file_writer_options));
std::vector<std::string> column_names;
column_names.reserve(_columns.size());
@@ -191,12 +195,28 @@ THivePartitionUpdate
VHivePartitionWriter::_build_partition_update() {
hive_partition_update.__set_name(_partition_name);
hive_partition_update.__set_update_mode(_update_mode);
THiveLocationParams location;
- location.__set_write_path(_write_info.write_path);
+ location.__set_write_path(_write_info.original_write_path);
location.__set_target_path(_write_info.target_path);
hive_partition_update.__set_location(location);
hive_partition_update.__set_file_names({_get_target_file_name()});
hive_partition_update.__set_row_count(_row_count);
hive_partition_update.__set_file_size(_input_size_in_bytes);
+
+ if (_write_info.file_type == TFileType::FILE_S3) {
+ doris::io::S3FileWriter* s3_mpu_file_writer =
+ dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
+ TS3MPUPendingUpload s3_mpu_pending_upload;
+ s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket());
+ s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key());
+ s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id());
+
+ std::map<int, std::string> etags;
+ for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
+ etags.insert({completed_part->GetPartNumber(),
completed_part->GetETag()});
+ }
+ s3_mpu_pending_upload.__set_etags(etags);
+
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
+ }
return hive_partition_update;
}
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.h
b/be/src/vec/sink/writer/vhive_partition_writer.h
index fc06e157408..e4fc2ebc24b 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.h
+++ b/be/src/vec/sink/writer/vhive_partition_writer.h
@@ -39,6 +39,7 @@ class VHivePartitionWriter {
public:
struct WriteInfo {
std::string write_path;
+ std::string original_write_path;
std::string target_path;
TFileType::type file_type;
};
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index d43fc34b4e5..7c3a864ebb3 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -256,26 +256,30 @@ std::shared_ptr<VHivePartitionWriter>
VHiveTableWriter::_create_partition_writer
if (existing_table == false) { // new table
update_mode = TUpdateMode::NEW;
if (_partition_columns_input_index.empty()) { // new unpartitioned
table
- write_info = {write_location.write_path,
write_location.target_path,
- write_location.file_type};
+ write_info = {write_location.write_path,
write_location.original_write_path,
+ write_location.target_path,
write_location.file_type};
} else { // a new partition in a new partitioned table
auto write_path = fmt::format("{}/{}",
write_location.write_path, partition_name);
+ auto original_write_path =
+ fmt::format("{}/{}",
write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
- write_info = {std::move(write_path), std::move(target_path),
- write_location.file_type};
+ write_info = {std::move(write_path),
std::move(original_write_path),
+ std::move(target_path),
write_location.file_type};
}
} else { // a new partition in an existing partitioned table, or an
existing unpartitioned table
if (_partition_columns_input_index.empty()) { // an existing
unpartitioned table
update_mode =
!hive_table_sink.overwrite ? TUpdateMode::APPEND :
TUpdateMode::OVERWRITE;
- write_info = {write_location.write_path,
write_location.target_path,
- write_location.file_type};
+ write_info = {write_location.write_path,
write_location.original_write_path,
+ write_location.target_path,
write_location.file_type};
} else { // a new partition in an existing partitioned table
update_mode = TUpdateMode::NEW;
auto write_path = fmt::format("{}/{}",
write_location.write_path, partition_name);
+ auto original_write_path =
+ fmt::format("{}/{}",
write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
- write_info = {std::move(write_path), std::move(target_path),
- write_location.file_type};
+ write_info = {std::move(write_path),
std::move(original_write_path),
+ std::move(target_path),
write_location.file_type};
}
// need to get schema from existing table ?
}
@@ -285,16 +289,21 @@ std::shared_ptr<VHivePartitionWriter>
VHiveTableWriter::_create_partition_writer
if (!hive_table_sink.overwrite) {
update_mode = TUpdateMode::APPEND;
auto write_path = fmt::format("{}/{}", write_location.write_path,
partition_name);
+ auto original_write_path =
+ fmt::format("{}/{}", write_location.original_write_path,
partition_name);
auto target_path = fmt::format("{}",
existing_partition->location.target_path);
- write_info = {std::move(write_path), std::move(target_path),
- existing_partition->location.file_type};
+ write_info = {std::move(write_path),
std::move(original_write_path),
+ std::move(target_path),
existing_partition->location.file_type};
file_format_type = existing_partition->file_format;
write_compress_type = hive_table_sink.compression_type;
} else {
update_mode = TUpdateMode::OVERWRITE;
auto write_path = fmt::format("{}/{}", write_location.write_path,
partition_name);
+ auto original_write_path =
+ fmt::format("{}/{}", write_location.original_write_path,
partition_name);
auto target_path = fmt::format("{}/{}",
write_location.target_path, partition_name);
- write_info = {std::move(write_path), std::move(target_path),
write_location.file_type};
+ write_info = {std::move(write_path),
std::move(original_write_path),
+ std::move(target_path), write_location.file_type};
file_format_type = hive_table_sink.file_format;
write_compress_type = hive_table_sink.compression_type;
// need to get schema from existing table ?
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
index 178a0e802d5..e7e9176b9f2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java
@@ -88,6 +88,8 @@ public class SummaryProfile {
public static final String FILESYSTEM_OPT_TIME = "FileSystem Operator
Time";
public static final String FILESYSTEM_OPT_RENAME_FILE_CNT = "Rename File
Count";
public static final String FILESYSTEM_OPT_RENAME_DIR_CNT = "Rename Dir
Count";
+
+ public static final String FILESYSTEM_OPT_DELETE_FILE_CNT = "Delete File
Count";
public static final String FILESYSTEM_OPT_DELETE_DIR_CNT = "Delete Dir
Count";
public static final String HMS_ADD_PARTITION_TIME = "HMS Add Partition
Time";
public static final String HMS_ADD_PARTITION_CNT = "HMS Add Partition
Count";
@@ -164,6 +166,7 @@ public class SummaryProfile {
.put(FILESYSTEM_OPT_TIME, 1)
.put(FILESYSTEM_OPT_RENAME_FILE_CNT, 2)
.put(FILESYSTEM_OPT_RENAME_DIR_CNT, 2)
+ .put(FILESYSTEM_OPT_DELETE_FILE_CNT, 2)
.put(FILESYSTEM_OPT_DELETE_DIR_CNT, 2)
.put(HMS_ADD_PARTITION_TIME, 1)
.put(HMS_ADD_PARTITION_CNT, 2)
@@ -223,6 +226,8 @@ public class SummaryProfile {
private long hmsUpdatePartitionCnt = 0;
private long filesystemRenameFileCnt = 0;
private long filesystemRenameDirCnt = 0;
+
+ private long filesystemDeleteFileCnt = 0;
private long filesystemDeleteDirCnt = 0;
private TransactionType transactionType = TransactionType.UNKNOWN;
@@ -344,6 +349,8 @@ public class SummaryProfile {
getPrettyCount(filesystemRenameFileCnt));
executionSummaryProfile.addInfoString(FILESYSTEM_OPT_RENAME_DIR_CNT,
getPrettyCount(filesystemRenameDirCnt));
+
executionSummaryProfile.addInfoString(FILESYSTEM_OPT_DELETE_FILE_CNT,
+ getPrettyCount(filesystemDeleteFileCnt));
executionSummaryProfile.addInfoString(FILESYSTEM_OPT_DELETE_DIR_CNT,
getPrettyCount(filesystemDeleteDirCnt));
@@ -666,4 +673,8 @@ public class SummaryProfile {
public void incDeleteDirRecursiveCnt() {
this.filesystemDeleteDirCnt += 1;
}
+
+ public void incDeleteFileCnt() {
+ this.filesystemDeleteFileCnt += 1;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
index 005a8f2cb84..38b5250a157 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java
@@ -296,8 +296,11 @@ public class LocationPath {
fsType = FileSystemType.S3;
break;
case COSN:
+ // COSN use s3 client on FE side, because it need to complete
multi-part uploading files on FE side.
+ fsType = FileSystemType.S3;
+ break;
case OFS:
- // ofs:// and cosn:// use the same underlying file system:
Tencent Cloud HDFS, aka CHDFS)) {
+ // ofs:// use the underlying file system: Tencent Cloud HDFS,
aka CHDFS)) {
fsType = FileSystemType.OFS;
break;
case HDFS:
@@ -329,7 +332,11 @@ public class LocationPath {
return null;
}
LocationPath locationPath = new LocationPath(location);
- switch (locationPath.getLocationType()) {
+ return locationPath.getTFileTypeForBE();
+ }
+
+ public TFileType getTFileTypeForBE() {
+ switch (this.getLocationType()) {
case S3:
case S3A:
case S3N:
@@ -362,7 +369,7 @@ public class LocationPath {
*
* @return BE scan range path
*/
- public Path toScanRangeLocation() {
+ public Path toStorageLocation() {
switch (locationType) {
case S3:
case S3A:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
index 29c3f2700c4..9c9a887aa7e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3URI.java
@@ -68,10 +68,10 @@ public class S3URI {
public static final String SCHEME_DELIM = "://";
public static final String PATH_DELIM = "/";
private static final Set<String> VALID_SCHEMES = ImmutableSet.of("http",
"https", "s3", "s3a", "s3n",
- "bos", "oss", "cos", "obs");
+ "bos", "oss", "cos", "cosn", "obs");
private static final Set<String> OS_SCHEMES = ImmutableSet.of("s3", "s3a",
"s3n",
- "bos", "oss", "cos", "obs");
+ "bos", "oss", "cos", "cosn", "obs");
private URI uri;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
index 299ab6dddfb..20dc870cd35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalCatalog.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.security.authentication.AuthenticationConfig;
import org.apache.doris.common.security.authentication.HadoopUGI;
import org.apache.doris.datasource.CatalogProperty;
@@ -34,6 +35,8 @@ import
org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
+import org.apache.doris.fs.FileSystemProvider;
+import org.apache.doris.fs.FileSystemProviderImpl;
import org.apache.doris.transaction.TransactionManagerFactory;
import com.google.common.base.Strings;
@@ -46,6 +49,7 @@ import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ThreadPoolExecutor;
/**
* External catalog for hive metastore compatible data sources.
@@ -63,6 +67,9 @@ public class HMSExternalCatalog extends ExternalCatalog {
// 0 means file cache is disabled; >0 means file cache with ttl;
public static final int FILE_META_CACHE_TTL_DISABLE_CACHE = 0;
+ private static final int FILE_SYSTEM_EXECUTOR_THREAD_NUM = 16;
+ private ThreadPoolExecutor fileSystemExecutor;
+
public HMSExternalCatalog() {
catalogProperty = new CatalogProperty(null, null);
}
@@ -147,7 +154,12 @@ public class HMSExternalCatalog extends ExternalCatalog {
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
}
HiveMetadataOps hiveOps =
ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
- transactionManager =
TransactionManagerFactory.createHiveTransactionManager(hiveOps);
+ FileSystemProvider fileSystemProvider = new
FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
+ this.bindBrokerName(),
this.catalogProperty.getHadoopProperties());
+ this.fileSystemExecutor =
ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
+ Integer.MAX_VALUE,
String.format("hms_committer_%s_file_system_executor_pool", name), true);
+ transactionManager =
TransactionManagerFactory.createHiveTransactionManager(hiveOps,
fileSystemProvider,
+ fileSystemExecutor);
metadataOps = hiveOps;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
index 32dd083c2ad..6fca8b4745f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java
@@ -23,13 +23,18 @@ package org.apache.doris.datasource.hive;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.FileSystemProvider;
import org.apache.doris.fs.FileSystemUtil;
import org.apache.doris.fs.remote.RemoteFile;
+import org.apache.doris.fs.remote.S3FileSystem;
+import org.apache.doris.fs.remote.SwitchingFileSystem;
import
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.THivePartitionUpdate;
+import org.apache.doris.thrift.TS3MPUPendingUpload;
import org.apache.doris.thrift.TUpdateMode;
import org.apache.doris.transaction.Transaction;
@@ -48,6 +53,11 @@ import
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
import java.util.ArrayList;
import java.util.HashMap;
@@ -79,17 +89,34 @@ public class HMSTransaction implements Transaction {
private final Map<DatabaseTableName, Map<List<String>,
Action<PartitionAndMore>>>
partitionActions = new HashMap<>();
+ private final Executor fileSystemExecutor;
private HmsCommitter hmsCommitter;
private List<THivePartitionUpdate> hivePartitionUpdates =
Lists.newArrayList();
private String declaredIntentionsToWrite;
- public HMSTransaction(HiveMetadataOps hiveOps) {
- this.hiveOps = hiveOps;
- this.fs = hiveOps.getFs();
+ private static class UncompletedMpuPendingUpload {
+
+ private final TS3MPUPendingUpload s3MPUPendingUpload;
+ private final String path;
+
+ public UncompletedMpuPendingUpload(TS3MPUPendingUpload
s3MPUPendingUpload, String path) {
+ this.s3MPUPendingUpload = s3MPUPendingUpload;
+ this.path = path;
+ }
+ }
+ private Set<UncompletedMpuPendingUpload> uncompletedMpuPendingUploads =
new HashSet<>();
+
+ public HMSTransaction(HiveMetadataOps hiveOps, FileSystemProvider
fileSystemProvider, Executor fileSystemExecutor) {
+ this.hiveOps = hiveOps;
+ this.fs = fileSystemProvider.get(null);
+ if (!(fs instanceof SwitchingFileSystem)) {
+ throw new RuntimeException("fs should be SwitchingFileSystem");
+ }
if (ConnectContext.get().getExecutor() != null) {
summaryProfile =
Optional.of(ConnectContext.get().getExecutor().getSummaryProfile());
}
+ this.fileSystemExecutor = fileSystemExecutor;
}
@Override
@@ -112,6 +139,9 @@ public class HMSTransaction implements Transaction {
THivePartitionUpdate old = mm.get(pu.getName());
old.setFileSize(old.getFileSize() + pu.getFileSize());
old.setRowCount(old.getRowCount() + pu.getRowCount());
+ if (old.getS3MpuPendingUploads() != null &&
pu.getS3MpuPendingUploads() != null) {
+
old.getS3MpuPendingUploads().addAll(pu.getS3MpuPendingUploads());
+ }
old.getFileNames().addAll(pu.getFileNames());
} else {
mm.put(pu.getName(), pu);
@@ -136,6 +166,14 @@ public class HMSTransaction implements Transaction {
this.dbName = dbName;
this.tbName = tbName;
List<THivePartitionUpdate> mergedPUs =
mergePartitions(hivePartitionUpdates);
+ for (THivePartitionUpdate pu : mergedPUs) {
+ if (pu.getS3MpuPendingUploads() != null) {
+ for (TS3MPUPendingUpload s3MPUPendingUpload :
pu.getS3MpuPendingUploads()) {
+ uncompletedMpuPendingUploads.add(
+ new
UncompletedMpuPendingUpload(s3MPUPendingUpload,
pu.getLocation().getTargetPath()));
+ }
+ }
+ }
Table table = getTable(dbName, tbName);
List<Pair<THivePartitionUpdate, HivePartitionStatistics>>
insertExistsPartitions = new ArrayList<>();
for (THivePartitionUpdate pu : mergedPUs) {
@@ -156,11 +194,12 @@ public class HMSTransaction implements Transaction {
tbName,
writePath,
pu.getFileNames(),
- hivePartitionStatistics);
+ hivePartitionStatistics,
+ pu);
break;
case OVERWRITE:
dropTable(dbName, tbName);
- createTable(table, writePath, pu.getFileNames(),
hivePartitionStatistics);
+ createTable(table, writePath, pu.getFileNames(),
hivePartitionStatistics, pu);
break;
default:
throw new RuntimeException("Not support mode:[" +
updateMode + "] in unPartitioned table");
@@ -191,7 +230,7 @@ public class HMSTransaction implements Transaction {
}
addPartition(
dbName, tbName, hivePartition, writePath,
- pu.getName(), pu.getFileNames(),
hivePartitionStatistics);
+ pu.getName(), pu.getFileNames(),
hivePartitionStatistics, pu);
break;
default:
throw new RuntimeException("Not support mode:[" +
updateMode + "] in partitioned table");
@@ -351,7 +390,8 @@ public class HMSTransaction implements Transaction {
pu.getLocation().getWritePath(),
pu.getName(),
pu.getFileNames(),
- updateStats
+ updateStats,
+ pu
))
);
}
@@ -550,8 +590,8 @@ public class HMSTransaction implements Transaction {
- private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
- DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory,
deleteEmptyDir);
+ private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir,
boolean reverse) {
+ DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory,
deleteEmptyDir, reverse);
if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
LOG.warn("Failed to delete directory {}. Some eligible items can't
be deleted: {}.",
@@ -561,9 +601,9 @@ public class HMSTransaction implements Transaction {
}
}
- private DeleteRecursivelyResult recursiveDeleteFiles(Path directory,
boolean deleteEmptyDir) {
+ private DeleteRecursivelyResult recursiveDeleteFiles(Path directory,
boolean deleteEmptyDir, boolean reverse) {
try {
- if (!fs.exists(directory.toString()).ok()) {
+ if (!fs.directoryExists(directory.toString()).ok()) {
return new DeleteRecursivelyResult(true, ImmutableList.of());
}
} catch (Exception e) {
@@ -572,10 +612,11 @@ public class HMSTransaction implements Transaction {
return new DeleteRecursivelyResult(false,
notDeletedEligibleItems.build());
}
- return doRecursiveDeleteFiles(directory, deleteEmptyDir, queryId);
+ return doRecursiveDeleteFiles(directory, deleteEmptyDir, queryId,
reverse);
}
- private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory,
boolean deleteEmptyDir, String queryId) {
+ private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory,
boolean deleteEmptyDir,
+ String queryId, boolean reverse) {
List<RemoteFile> allFiles = new ArrayList<>();
Set<String> allDirs = new HashSet<>();
Status statusFile = fs.listFiles(directory.toString(), true, allFiles);
@@ -589,7 +630,7 @@ public class HMSTransaction implements Transaction {
boolean allDescendentsDeleted = true;
ImmutableList.Builder<String> notDeletedEligibleItems =
ImmutableList.builder();
for (RemoteFile file : allFiles) {
- if (file.getName().startsWith(queryId)) {
+ if (reverse ^ file.getName().startsWith(queryId)) {
if (!deleteIfExists(file.getPath())) {
allDescendentsDeleted = false;
notDeletedEligibleItems.add(file.getPath().toString());
@@ -600,7 +641,7 @@ public class HMSTransaction implements Transaction {
}
for (String dir : allDirs) {
- DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new
Path(dir), deleteEmptyDir, queryId);
+ DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new
Path(dir), deleteEmptyDir, queryId, reverse);
if (!subResult.dirNotExists()) {
allDescendentsDeleted = false;
}
@@ -611,7 +652,7 @@ public class HMSTransaction implements Transaction {
if (allDescendentsDeleted && deleteEmptyDir) {
Verify.verify(notDeletedEligibleItems.build().isEmpty());
- if (!deleteIfExists(directory)) {
+ if (!deleteDirectoryIfExists(directory)) {
return new DeleteRecursivelyResult(false,
ImmutableList.of(directory + "/"));
}
// all items of the location have been deleted.
@@ -628,6 +669,14 @@ public class HMSTransaction implements Transaction {
return !fs.exists(path.toString()).ok();
}
+ public boolean deleteDirectoryIfExists(Path path) {
+ Status status = wrapperDeleteDirWithProfileSummary(path.toString());
+ if (status.ok()) {
+ return true;
+ }
+ return !fs.directoryExists(path.toString()).ok();
+ }
+
public static class DatabaseTableName {
private final String dbName;
private final String tbName;
@@ -676,15 +725,19 @@ public class HMSTransaction implements Transaction {
private final List<String> fileNames;
private final HivePartitionStatistics statisticsUpdate;
+ private final THivePartitionUpdate hivePartitionUpdate;
+
public TableAndMore(
Table table,
String currentLocation,
List<String> fileNames,
- HivePartitionStatistics statisticsUpdate) {
+ HivePartitionStatistics statisticsUpdate,
+ THivePartitionUpdate hivePartitionUpdate) {
this.table = Objects.requireNonNull(table, "table is null");
this.currentLocation = Objects.requireNonNull(currentLocation);
this.fileNames = Objects.requireNonNull(fileNames);
this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate,
"statisticsUpdate is null");
+ this.hivePartitionUpdate =
Objects.requireNonNull(hivePartitionUpdate, "hivePartitionUpdate is null");
}
public Table getTable() {
@@ -703,6 +756,10 @@ public class HMSTransaction implements Transaction {
return statisticsUpdate;
}
+ public THivePartitionUpdate getHivePartitionUpdate() {
+ return hivePartitionUpdate;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -719,17 +776,22 @@ public class HMSTransaction implements Transaction {
private final List<String> fileNames;
private final HivePartitionStatistics statisticsUpdate;
+ private final THivePartitionUpdate hivePartitionUpdate;
+
+
public PartitionAndMore(
HivePartition partition,
String currentLocation,
String partitionName,
List<String> fileNames,
- HivePartitionStatistics statisticsUpdate) {
+ HivePartitionStatistics statisticsUpdate,
+ THivePartitionUpdate hivePartitionUpdate) {
this.partition = Objects.requireNonNull(partition, "partition is
null");
this.currentLocation = Objects.requireNonNull(currentLocation,
"currentLocation is null");
this.partitionName = Objects.requireNonNull(partitionName,
"partition is null");
this.fileNames = Objects.requireNonNull(fileNames, "fileNames is
null");
this.statisticsUpdate = Objects.requireNonNull(statisticsUpdate,
"statisticsUpdate is null");
+ this.hivePartitionUpdate =
Objects.requireNonNull(hivePartitionUpdate, "hivePartitionUpdate is null");
}
public HivePartition getPartition() {
@@ -752,6 +814,10 @@ public class HMSTransaction implements Transaction {
return statisticsUpdate;
}
+ public THivePartitionUpdate getHivePartitionUpdate() {
+ return hivePartitionUpdate;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
@@ -835,7 +901,8 @@ public class HMSTransaction implements Transaction {
String tableName,
String location,
List<String> fileNames,
- HivePartitionStatistics statisticsUpdate) {
+ HivePartitionStatistics statisticsUpdate,
+ THivePartitionUpdate hivePartitionUpdate) {
DatabaseTableName databaseTableName = new
DatabaseTableName(databaseName, tableName);
Action<TableAndMore> oldTableAction =
tableActions.get(databaseTableName);
if (oldTableAction == null) {
@@ -843,12 +910,13 @@ public class HMSTransaction implements Transaction {
tableActions.put(
databaseTableName,
new Action<>(
- actionType,
+ actionType,
new TableAndMore(
- table,
- location,
- fileNames,
- statisticsUpdate)));
+ table,
+ location,
+ fileNames,
+ statisticsUpdate,
+ hivePartitionUpdate)));
return;
}
@@ -870,12 +938,13 @@ public class HMSTransaction implements Transaction {
}
public synchronized void createTable(
- Table table, String location, List<String> fileNames,
HivePartitionStatistics statistics) {
+ Table table, String location, List<String> fileNames,
HivePartitionStatistics statistics,
+ THivePartitionUpdate hivePartitionUpdate) {
// When creating a table, it should never have partition actions. This
is just a sanity check.
checkNoPartitionAction(dbName, tbName);
DatabaseTableName databaseTableName = new DatabaseTableName(dbName,
tbName);
Action<TableAndMore> oldTableAction =
tableActions.get(databaseTableName);
- TableAndMore tableAndMore = new TableAndMore(table, location,
fileNames, statistics);
+ TableAndMore tableAndMore = new TableAndMore(table, location,
fileNames, statistics, hivePartitionUpdate);
if (oldTableAction == null) {
tableActions.put(databaseTableName, new Action<>(ActionType.ADD,
tableAndMore));
return;
@@ -939,7 +1008,8 @@ public class HMSTransaction implements Transaction {
String currentLocation,
String partitionName,
List<String> files,
- HivePartitionStatistics statistics) {
+ HivePartitionStatistics statistics,
+ THivePartitionUpdate hivePartitionUpdate) {
Map<List<String>, Action<PartitionAndMore>> partitionActionsForTable =
partitionActions.computeIfAbsent(new
DatabaseTableName(databaseName, tableName), k -> new HashMap<>());
Action<PartitionAndMore> oldPartitionAction =
partitionActionsForTable.get(partition.getPartitionValues());
@@ -948,7 +1018,8 @@ public class HMSTransaction implements Transaction {
partition.getPartitionValues(),
new Action<>(
ActionType.ADD,
- new PartitionAndMore(partition, currentLocation,
partitionName, files, statistics))
+ new PartitionAndMore(partition, currentLocation,
partitionName, files, statistics,
+ hivePartitionUpdate))
);
return;
}
@@ -959,7 +1030,8 @@ public class HMSTransaction implements Transaction {
partition.getPartitionValues(),
new Action<>(
ActionType.ALTER,
- new PartitionAndMore(partition,
currentLocation, partitionName, files, statistics))
+ new PartitionAndMore(partition,
currentLocation, partitionName, files, statistics,
+ hivePartitionUpdate))
);
return;
case ADD:
@@ -1029,7 +1101,8 @@ public class HMSTransaction implements Transaction {
private final List<RenameDirectoryTask> renameDirectoryTasksForAbort =
new ArrayList<>();
// when finished, we need clear some directories
private final List<String> clearDirsForFinish = new ArrayList<>();
- Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
+
+ private final List<String> s3cleanWhenSuccess = new ArrayList<>();
public void cancelUnStartedAsyncFileSystemTask() {
fileSystemTaskCancelled.set(true);
@@ -1091,15 +1164,20 @@ public class HMSTransaction implements Transaction {
writePath,
targetPath,
tableAndMore.getFileNames());
+ } else {
+ if
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+ s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ tableAndMore.hivePartitionUpdate, targetPath);
+ }
}
directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, false));
updateStatisticsTasks.add(
- new UpdateStatisticsTask(
- dbName,
- tbName,
- Optional.empty(),
- tableAndMore.getStatisticsUpdate(),
- true
+ new UpdateStatisticsTask(
+ dbName,
+ tbName,
+ Optional.empty(),
+ tableAndMore.getStatisticsUpdate(),
+ true
));
}
@@ -1129,6 +1207,12 @@ public class HMSTransaction implements Transaction {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " +
targetPath + ":" + status.getErrMsg());
}
+ } else {
+ if
(!tableAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+ s3cleanWhenSuccess.add(targetPath);
+ s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ tableAndMore.hivePartitionUpdate, targetPath);
+ }
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
@@ -1154,6 +1238,11 @@ public class HMSTransaction implements Transaction {
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new
DirectoryCleanUpTask(targetPath, true)));
+ } else {
+ if
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+ s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ partitionAndMore.hivePartitionUpdate, targetPath);
+ }
}
StorageDescriptor sd = getTable(dbName, tbName).getSd();
@@ -1194,6 +1283,11 @@ public class HMSTransaction implements Transaction {
writePath,
targetPath,
partitionAndMore.getFileNames());
+ } else {
+ if
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+ s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ partitionAndMore.hivePartitionUpdate, targetPath);
+ }
}
updateStatisticsTasks.add(
@@ -1207,7 +1301,7 @@ public class HMSTransaction implements Transaction {
private void runDirectoryClearUpTasksForAbort() {
for (DirectoryCleanUpTask cleanUpTask :
directoryCleanUpTasksForAbort) {
- recursiveDeleteItems(cleanUpTask.getPath(),
cleanUpTask.isDeleteEmptyDir());
+ recursiveDeleteItems(cleanUpTask.getPath(),
cleanUpTask.isDeleteEmptyDir(), false);
}
}
@@ -1228,13 +1322,19 @@ public class HMSTransaction implements Transaction {
private void runClearPathsForFinish() {
Status status;
for (String path : clearDirsForFinish) {
- status = wrapperDeleteWithProfileSummary(path);
+ status = wrapperDeleteDirWithProfileSummary(path);
if (!status.ok()) {
LOG.warn("Failed to recursively delete path {}:{}", path,
status.getErrCode());
}
}
}
+ private void runS3cleanWhenSuccess() {
+ for (String path : s3cleanWhenSuccess) {
+ recursiveDeleteItems(new Path(path), false, true);
+ }
+ }
+
public void prepareAlterPartition(PartitionAndMore partitionAndMore) {
HivePartition partition = partitionAndMore.getPartition();
String targetPath = partition.getPath();
@@ -1263,6 +1363,12 @@ public class HMSTransaction implements Transaction {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " +
targetPath + ":" + status.getErrMsg());
}
+ } else {
+ if
(!partitionAndMore.hivePartitionUpdate.s3_mpu_pending_uploads.isEmpty()) {
+ s3cleanWhenSuccess.add(targetPath);
+ s3Commit(fileSystemExecutor, asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
+ partitionAndMore.hivePartitionUpdate, targetPath);
+ }
}
updateStatisticsTasks.add(
@@ -1337,8 +1443,32 @@ public class HMSTransaction implements Transaction {
summaryProfile.ifPresent(SummaryProfile::setHmsUpdatePartitionTime);
}
- public void pruneAndDeleteStagingDirectories() {
- recursiveDeleteItems(new Path(declaredIntentionsToWrite), true);
+ private void pruneAndDeleteStagingDirectories() {
+ recursiveDeleteItems(new Path(declaredIntentionsToWrite), true,
false);
+ }
+
+ private void abortMultiUploads() {
+ if (uncompletedMpuPendingUploads.isEmpty()) {
+ return;
+ }
+ for (UncompletedMpuPendingUpload uncompletedMpuPendingUpload :
uncompletedMpuPendingUploads) {
+ S3FileSystem s3FileSystem = (S3FileSystem)
((SwitchingFileSystem) fs)
+ .fileSystem(uncompletedMpuPendingUpload.path);
+
+ S3Client s3Client;
+ try {
+ s3Client = (S3Client)
s3FileSystem.getObjStorage().getClient();
+ } catch (UserException e) {
+ throw new RuntimeException(e);
+ }
+ asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(()
-> {
+
s3Client.abortMultipartUpload(AbortMultipartUploadRequest.builder()
+
.bucket(uncompletedMpuPendingUpload.s3MPUPendingUpload.getBucket())
+
.key(uncompletedMpuPendingUpload.s3MPUPendingUpload.getKey())
+
.uploadId(uncompletedMpuPendingUpload.s3MPUPendingUpload.getUploadId())
+ .build());
+ }, fileSystemExecutor));
+ }
}
public void doNothing() {
@@ -1348,6 +1478,7 @@ public class HMSTransaction implements Transaction {
public void doCommit() {
waitForAsyncFileSystemTasks();
+ runS3cleanWhenSuccess();
doAddPartitionsTask();
doUpdateStatisticsTasks();
doNothing();
@@ -1365,6 +1496,11 @@ public class HMSTransaction implements Transaction {
public void rollback() {
//delete write path
pruneAndDeleteStagingDirectories();
+ // abort the in-progress multipart uploads
+ abortMultiUploads();
+ for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
+ MoreFutures.getFutureValue(future, RuntimeException.class);
+ }
}
}
@@ -1385,7 +1521,7 @@ public class HMSTransaction implements Transaction {
public Status wrapperDeleteWithProfileSummary(String remotePath) {
summaryProfile.ifPresent(profile -> {
profile.setTempStartTime();
- profile.incDeleteDirRecursiveCnt();
+ profile.incDeleteFileCnt();
});
Status status = fs.delete(remotePath);
@@ -1394,6 +1530,18 @@ public class HMSTransaction implements Transaction {
return status;
}
+ public Status wrapperDeleteDirWithProfileSummary(String remotePath) {
+ summaryProfile.ifPresent(profile -> {
+ profile.setTempStartTime();
+ profile.incDeleteDirRecursiveCnt();
+ });
+
+ Status status = fs.deleteDirectory(remotePath);
+
+ summaryProfile.ifPresent(SummaryProfile::freshFilesystemOptTime);
+ return status;
+ }
+
public void wrapperAsyncRenameWithProfileSummary(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
@@ -1415,4 +1563,37 @@ public class HMSTransaction implements Transaction {
fs, executor, renameFileFutures, cancelled, origFilePath,
destFilePath, runWhenPathNotExist);
summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt);
}
+
+ private void s3Commit(Executor fileSystemExecutor,
List<CompletableFuture<?>> asyncFileSystemTaskFutures,
+ AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate
hivePartitionUpdate, String path) {
+ S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem)
fs).fileSystem(path);
+ S3Client s3Client;
+ try {
+ s3Client = (S3Client) s3FileSystem.getObjStorage().getClient();
+ } catch (UserException e) {
+ throw new RuntimeException(e);
+ }
+
+ for (TS3MPUPendingUpload s3MPUPendingUpload :
hivePartitionUpdate.getS3MpuPendingUploads()) {
+ asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> {
+ if (fileSystemTaskCancelled.get()) {
+ return;
+ }
+ List<CompletedPart> completedParts = Lists.newArrayList();
+ for (Map.Entry<Integer, String> entry :
s3MPUPendingUpload.getEtags().entrySet()) {
+
completedParts.add(CompletedPart.builder().eTag(entry.getValue()).partNumber(entry.getKey())
+ .build());
+ }
+
+
s3Client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
+ .bucket(s3MPUPendingUpload.getBucket())
+ .key(s3MPUPendingUpload.getKey())
+ .uploadId(s3MPUPendingUpload.getUploadId())
+
.multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build())
+ .build());
+ uncompletedMpuPendingUploads.remove(new
UncompletedMpuPendingUpload(s3MPUPendingUpload, path));
+ }, fileSystemExecutor));
+ }
+ }
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index fcebd67954e..be5ecb163b1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -349,9 +349,11 @@ public class HiveMetaStoreCache {
List<String> partitionValues,
String bindBrokerName) throws UserException {
FileCacheValue result = new FileCacheValue();
+ Map<String, String> properties = new HashMap<>();
+ jobConf.iterator().forEachRemaining(e -> properties.put(e.getKey(),
e.getValue()));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new
FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
- location, bindBrokerName), jobConf, bindBrokerName));
+ location, bindBrokerName), properties,
bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location,
jobConf));
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the
same time. eg:
@@ -366,7 +368,7 @@ public class HiveMetaStoreCache {
for (RemoteFile remoteFile : remoteFiles) {
String srcPath = remoteFile.getPath().toString();
LocationPath locationPath = new LocationPath(srcPath,
catalog.getProperties());
- Path convertedPath = locationPath.toScanRangeLocation();
+ Path convertedPath = locationPath.toStorageLocation();
if (!convertedPath.toString().equals(srcPath)) {
remoteFile.setPath(convertedPath);
}
@@ -777,10 +779,12 @@ public class HiveMetaStoreCache {
return Collections.emptyList();
}
String acidVersionPath = new Path(baseOrDeltaPath,
"_orc_acid_version").toUri().toString();
+ Map<String, String> properties = new HashMap<>();
+ jobConf.iterator().forEachRemaining(e ->
properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
- bindBrokerName), jobConf,
bindBrokerName));
+ bindBrokerName), properties,
bindBrokerName));
Status status = fs.exists(acidVersionPath);
if (status != Status.OK) {
if (status.getErrCode() == ErrCode.NOT_FOUND) {
@@ -800,10 +804,12 @@ public class HiveMetaStoreCache {
List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
for (AcidUtils.ParsedDelta delta :
directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
+ Map<String, String> properties = new HashMap<>();
+ jobConf.iterator().forEachRemaining(e ->
properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location,
bindBrokerName),
- jobConf, bindBrokerName));
+ properties, bindBrokerName));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
@@ -825,10 +831,12 @@ public class HiveMetaStoreCache {
// base
if (directory.getBaseDirectory() != null) {
String location = directory.getBaseDirectory().toString();
+ Map<String, String> properties = new HashMap<>();
+ jobConf.iterator().forEachRemaining(e ->
properties.put(e.getKey(), e.getValue()));
RemoteFileSystem fs =
Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location,
bindBrokerName),
- jobConf, bindBrokerName));
+ properties, bindBrokerName));
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index a4566cd0b7a..1cf6595bbad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -36,8 +36,6 @@ import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
-import org.apache.doris.fs.FileSystem;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -61,7 +59,6 @@ public class HiveMetadataOps implements ExternalMetadataOps {
private static final Logger LOG =
LogManager.getLogger(HiveMetadataOps.class);
private static final int MIN_CLIENT_POOL_SIZE = 8;
private final HMSCachedClient client;
- private final FileSystem fs;
private final HMSExternalCatalog catalog;
public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig
jdbcClientConfig, HMSExternalCatalog catalog) {
@@ -74,24 +71,14 @@ public class HiveMetadataOps implements ExternalMetadataOps
{
public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client)
{
this.catalog = catalog;
this.client = client;
- // TODO Currently only supports DFSFileSystem, more types will be
supported in the future
- this.fs = new DFSFileSystem(catalog.getProperties());
}
- @VisibleForTesting
- public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client,
FileSystem fs) {
- this.catalog = catalog;
- this.client = client;
- this.fs = fs;
- }
-
-
public HMSCachedClient getClient() {
return client;
}
- public FileSystem getFs() {
- return fs;
+ public HMSExternalCatalog getCatalog() {
+ return catalog;
}
public static HMSCachedClient createCachedClient(HiveConf hiveConf, int
thriftClientPoolSize,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index eb1d77a322d..94748e7e427 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -354,7 +354,7 @@ public class HudiScanNode extends HiveScanNode {
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath,
hmsTable.getCatalogProperties());
- Path splitFilePath = locationPath.toScanRangeLocation();
+ Path splitFilePath = locationPath.toStorageLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize,
fileSize,
new String[0], partition.getPartitionValues()));
});
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 961fb8ae1d6..21826dfd8d5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -150,7 +150,7 @@ public class IcebergScanNode extends FileQueryScanNode {
TIcebergDeleteFileDesc deleteFileDesc = new
TIcebergDeleteFileDesc();
String deleteFilePath = filter.getDeleteFilePath();
LocationPath locationPath = new LocationPath(deleteFilePath,
icebergSplit.getConfig());
- Path splitDeletePath = locationPath.toScanRangeLocation();
+ Path splitDeletePath = locationPath.toStorageLocation();
deleteFileDesc.setPath(splitDeletePath.toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
fileDesc.setContent(FileContent.POSITION_DELETES.id());
@@ -244,7 +244,7 @@ public class IcebergScanNode extends FileQueryScanNode {
partitionPathSet.add(structLike.toString());
}
LocationPath locationPath = new LocationPath(dataFilePath,
source.getCatalog().getProperties());
- Path finalDataFilePath = locationPath.toScanRangeLocation();
+ Path finalDataFilePath = locationPath.toStorageLocation();
IcebergSplit split = new IcebergSplit(
finalDataFilePath,
splitTask.start(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index ddb5a8c4f3d..b9672f70c41 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -161,7 +161,7 @@ public class PaimonScanNode extends FileQueryScanNode {
List<RawFile> rawFiles = optRawFiles.get();
for (RawFile file : rawFiles) {
LocationPath locationPath = new
LocationPath(file.path(), source.getCatalog().getProperties());
- Path finalDataFilePath =
locationPath.toScanRangeLocation();
+ Path finalDataFilePath =
locationPath.toStorageLocation();
try {
splits.addAll(
splitFile(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
index 94f5c420438..b6fd96bbd37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java
@@ -38,6 +38,10 @@ public interface FileSystem {
Status exists(String remotePath);
+ default Status directoryExists(String dir) {
+ return exists(dir);
+ }
+
Status downloadWithFileSize(String remoteFilePath, String localFilePath,
long fileSize);
Status upload(String localPath, String remotePath);
@@ -58,6 +62,10 @@ public interface FileSystem {
Status delete(String remotePath);
+ default Status deleteDirectory(String dir) {
+ return delete(dir);
+ }
+
Status makeDir(String remotePath);
Status listFiles(String remotePath, boolean recursive, List<RemoteFile>
result);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
index 149bbe2d378..dd66c359b9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemCache.java
@@ -23,8 +23,8 @@ import org.apache.doris.common.Pair;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.apache.hadoop.mapred.JobConf;
+import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
@@ -44,7 +44,7 @@ public class FileSystemCache {
}
private RemoteFileSystem loadFileSystem(FileSystemCacheKey key) {
- return FileSystemFactory.getRemoteFileSystem(key.type, key.conf,
key.bindBrokerName);
+ return FileSystemFactory.getRemoteFileSystem(key.type, key.properties,
key.bindBrokerName);
}
public RemoteFileSystem getRemoteFileSystem(FileSystemCacheKey key) {
@@ -55,13 +55,14 @@ public class FileSystemCache {
private final FileSystemType type;
// eg: hdfs://nameservices1
private final String fsIdent;
- private final JobConf conf;
+ private final Map<String, String> properties;
private final String bindBrokerName;
- public FileSystemCacheKey(Pair<FileSystemType, String> fs, JobConf
conf, String bindBrokerName) {
+ public FileSystemCacheKey(Pair<FileSystemType, String> fs,
+ Map<String, String> properties, String bindBrokerName) {
this.type = fs.first;
this.fsIdent = fs.second;
- this.conf = conf;
+ this.properties = properties;
this.bindBrokerName = bindBrokerName;
}
@@ -75,7 +76,7 @@ public class FileSystemCache {
}
boolean equalsWithoutBroker = type.equals(((FileSystemCacheKey)
obj).type)
&& fsIdent.equals(((FileSystemCacheKey) obj).fsIdent)
- && conf == ((FileSystemCacheKey) obj).conf;
+ && properties == ((FileSystemCacheKey) obj).properties;
if (bindBrokerName == null) {
return equalsWithoutBroker;
}
@@ -85,9 +86,9 @@ public class FileSystemCache {
@Override
public int hashCode() {
if (bindBrokerName == null) {
- return Objects.hash(conf, fsIdent, type);
+ return Objects.hash(properties, fsIdent, type);
}
- return Objects.hash(conf, fsIdent, type, bindBrokerName);
+ return Objects.hash(properties, fsIdent, type, bindBrokerName);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
index 63f552a8ab8..cd7212c8e39 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
public class FileSystemFactory {
@@ -51,10 +50,8 @@ public class FileSystemFactory {
}
}
- public static RemoteFileSystem getRemoteFileSystem(FileSystemType type,
Configuration conf,
+ public static RemoteFileSystem getRemoteFileSystem(FileSystemType type,
Map<String, String> properties,
String bindBrokerName) {
- Map<String, String> properties = new HashMap<>();
- conf.iterator().forEachRemaining(e -> properties.put(e.getKey(),
e.getValue()));
switch (type) {
case S3:
return new S3FileSystem(properties);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java
similarity index 74%
copy from
fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
copy to fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java
index 334258a3f12..aab7471fd99 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProvider.java
@@ -15,13 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.transaction;
+package org.apache.doris.fs;
-import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.datasource.SessionContext;
-public class TransactionManagerFactory {
-
- public static TransactionManager
createHiveTransactionManager(HiveMetadataOps ops) {
- return new HiveTransactionManager(ops);
- }
+public interface FileSystemProvider {
+ FileSystem get(SessionContext ctx);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java
new file mode 100644
index 00000000000..680592ab4a8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemProviderImpl.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs;
+
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.fs.remote.SwitchingFileSystem;
+
+import java.util.Map;
+
+public class FileSystemProviderImpl implements FileSystemProvider {
+ private ExternalMetaCacheMgr extMetaCacheMgr;
+ private String bindBrokerName;
+
+ private Map<String, String> properties;
+
+ public FileSystemProviderImpl(ExternalMetaCacheMgr extMetaCacheMgr, String
bindBrokerName,
+ Map<String, String> properties) {
+ this.extMetaCacheMgr = extMetaCacheMgr;
+ this.bindBrokerName = bindBrokerName;
+ this.properties = properties;
+ }
+
+ @Override
+ public FileSystem get(SessionContext ctx) {
+ return new SwitchingFileSystem(extMetaCacheMgr, bindBrokerName,
properties);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
index 57a10ed1109..93e79bf94b1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalDfsFileSystem.java
@@ -48,6 +48,11 @@ public class LocalDfsFileSystem implements FileSystem {
return null;
}
+ @Override
+ public Status directoryExists(String dir) {
+ return exists(dir);
+ }
+
@Override
public Status exists(String remotePath) {
boolean exists = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
index 72b75350140..f821e5bb6ce 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java
@@ -31,6 +31,7 @@ import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
import java.util.Comparator;
public abstract class ObjFileSystem extends RemoteFileSystem {
@@ -43,11 +44,20 @@ public abstract class ObjFileSystem extends
RemoteFileSystem {
this.objStorage = objStorage;
}
+ public ObjStorage<?> getObjStorage() {
+ return objStorage;
+ }
+
@Override
public Status exists(String remotePath) {
return objStorage.headObject(remotePath);
}
+ @Override
+ public Status directoryExists(String dir) {
+ return listFiles(dir, false, new ArrayList<>());
+ }
+
/**
* download data from remote file and check data size with expected file
size.
* @param remoteFilePath remote file path
@@ -139,4 +149,9 @@ public abstract class ObjFileSystem extends
RemoteFileSystem {
public Status delete(String remotePath) {
return objStorage.deleteObject(remotePath);
}
+
+ @Override
+ public Status deleteDirectory(String absolutePath) {
+ return objStorage.deleteObjects(absolutePath);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
index 5771c65224b..3869824de55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java
@@ -107,8 +107,4 @@ public class S3FileSystem extends ObjFileSystem {
}
return Status.OK;
}
-
- public Status deleteDirectory(String absolutePath) {
- return ((S3ObjStorage) objStorage).deleteObjects(absolutePath);
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
new file mode 100644
index 00000000000..00802922ef3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/SwitchingFileSystem.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.fs.remote;
+
+import org.apache.doris.backup.Status;
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.FileSystemCache;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SwitchingFileSystem implements FileSystem {
+
+ private final ExternalMetaCacheMgr extMetaCacheMgr;
+
+ private final String bindBrokerName;
+
+ private final Map<String, String> properties;
+
+ public SwitchingFileSystem(ExternalMetaCacheMgr extMetaCacheMgr, String
bindBrokerName,
+ Map<String, String> properties) {
+ this.extMetaCacheMgr = extMetaCacheMgr;
+ this.bindBrokerName = bindBrokerName;
+ this.properties = properties;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public Status exists(String remotePath) {
+ return fileSystem(remotePath).exists(remotePath);
+ }
+
+ @Override
+ public Status directoryExists(String dir) {
+ return fileSystem(dir).directoryExists(dir);
+ }
+
+ @Override
+ public Status downloadWithFileSize(String remoteFilePath, String
localFilePath, long fileSize) {
+ return fileSystem(remoteFilePath).downloadWithFileSize(remoteFilePath,
localFilePath, fileSize);
+ }
+
+ @Override
+ public Status upload(String localPath, String remotePath) {
+ return fileSystem(localPath).upload(localPath, remotePath);
+ }
+
+ @Override
+ public Status directUpload(String content, String remoteFile) {
+ return fileSystem(remoteFile).directUpload(content, remoteFile);
+ }
+
+ @Override
+ public Status rename(String origFilePath, String destFilePath) {
+ return fileSystem(origFilePath).rename(origFilePath, destFilePath);
+ }
+
+ @Override
+ public Status renameDir(String origFilePath, String destFilePath) {
+ return fileSystem(origFilePath).renameDir(origFilePath, destFilePath);
+ }
+
+ @Override
+ public Status renameDir(String origFilePath, String destFilePath, Runnable
runWhenPathNotExist) {
+ return fileSystem(origFilePath).renameDir(origFilePath, destFilePath,
runWhenPathNotExist);
+ }
+
+ @Override
+ public Status delete(String remotePath) {
+ return fileSystem(remotePath).delete(remotePath);
+ }
+
+ @Override
+ public Status deleteDirectory(String absolutePath) {
+ return fileSystem(absolutePath).deleteDirectory(absolutePath);
+ }
+
+ @Override
+ public Status makeDir(String remotePath) {
+ return fileSystem(remotePath).makeDir(remotePath);
+ }
+
+ @Override
+ public Status listFiles(String remotePath, boolean recursive,
List<RemoteFile> result) {
+ return fileSystem(remotePath).listFiles(remotePath, recursive, result);
+ }
+
+ @Override
+ public Status globList(String remotePath, List<RemoteFile> result) {
+ return fileSystem(remotePath).globList(remotePath, result);
+ }
+
+ @Override
+ public Status globList(String remotePath, List<RemoteFile> result, boolean
fileNameOnly) {
+ return fileSystem(remotePath).globList(remotePath, result,
fileNameOnly);
+ }
+
+ @Override
+ public Status listDirectories(String remotePath, Set<String> result) {
+ return fileSystem(remotePath).listDirectories(remotePath, result);
+ }
+
+ public FileSystem fileSystem(String location) {
+ return extMetaCacheMgr.getFsCache().getRemoteFileSystem(
+ new FileSystemCache.FileSystemCacheKey(
+ LocationPath.getFSIdentity(location,
+ bindBrokerName), properties, bindBrokerName));
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
index c45bdc1ebc9..6374dfddc66 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java
@@ -33,6 +33,7 @@ import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THiveBucket;
import org.apache.doris.thrift.THiveColumn;
import org.apache.doris.thrift.THiveColumnType;
@@ -128,21 +129,35 @@ public class HiveTableSink extends DataSink {
setCompressType(tSink, formatType);
THiveLocationParams locationParams = new THiveLocationParams();
- String location = sd.getLocation();
-
- String writeTempPath = createTempPath(location);
- locationParams.setWritePath(writeTempPath);
- locationParams.setTargetPath(location);
- locationParams.setFileType(LocationPath.getTFileTypeForBE(location));
+ LocationPath locationPath = new LocationPath(sd.getLocation(),
targetTable.getHadoopProperties());
+ String location = locationPath.toString();
+ String storageLocation = locationPath.toStorageLocation().toString();
+ TFileType fileType = locationPath.getTFileTypeForBE();
+ if (fileType == TFileType.FILE_S3) {
+ locationParams.setWritePath(storageLocation);
+ locationParams.setOriginalWritePath(location);
+ locationParams.setTargetPath(location);
+ if (insertCtx.isPresent()) {
+ HiveInsertCommandContext context = (HiveInsertCommandContext)
insertCtx.get();
+ tSink.setOverwrite(context.isOverwrite());
+ context.setWritePath(storageLocation);
+ }
+ } else {
+ String writeTempPath = createTempPath(location);
+ locationParams.setWritePath(writeTempPath);
+ locationParams.setOriginalWritePath(writeTempPath);
+ locationParams.setTargetPath(location);
+ if (insertCtx.isPresent()) {
+ HiveInsertCommandContext context = (HiveInsertCommandContext)
insertCtx.get();
+ tSink.setOverwrite(context.isOverwrite());
+ context.setWritePath(writeTempPath);
+ }
+ }
+ locationParams.setFileType(fileType);
tSink.setLocation(locationParams);
tSink.setHadoopConfig(targetTable.getHadoopProperties());
- if (insertCtx.isPresent()) {
- HiveInsertCommandContext context = (HiveInsertCommandContext)
insertCtx.get();
- tSink.setOverwrite(context.isOverwrite());
- context.setWritePath(writeTempPath);
- }
tDataSink = new TDataSink(getDataSinkType());
tDataSink.setHiveTableSink(tSink);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
index 2499cc6eba4..838d135fa45 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/HiveTransactionManager.java
@@ -21,23 +21,32 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.fs.FileSystemProvider;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
public class HiveTransactionManager implements TransactionManager {
private final Map<Long, HMSTransaction> transactions = new
ConcurrentHashMap<>();
private final HiveMetadataOps ops;
- public HiveTransactionManager(HiveMetadataOps ops) {
+ private final FileSystemProvider fileSystemProvider;
+
+ private final Executor fileSystemExecutor;
+
+ public HiveTransactionManager(HiveMetadataOps ops, FileSystemProvider
fileSystemProvider,
+ Executor fileSystemExecutor) {
this.ops = ops;
+ this.fileSystemProvider = fileSystemProvider;
+ this.fileSystemExecutor = fileSystemExecutor;
}
@Override
public long begin() {
long id = Env.getCurrentEnv().getNextId();
- HMSTransaction hiveTransaction = new HMSTransaction(ops);
+ HMSTransaction hiveTransaction = new HMSTransaction(ops,
fileSystemProvider, fileSystemExecutor);
transactions.put(id, hiveTransaction);
return id;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
index 334258a3f12..394494a129d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionManagerFactory.java
@@ -18,10 +18,14 @@
package org.apache.doris.transaction;
import org.apache.doris.datasource.hive.HiveMetadataOps;
+import org.apache.doris.fs.FileSystemProvider;
+
+import java.util.concurrent.Executor;
public class TransactionManagerFactory {
- public static TransactionManager
createHiveTransactionManager(HiveMetadataOps ops) {
- return new HiveTransactionManager(ops);
+ public static TransactionManager
createHiveTransactionManager(HiveMetadataOps ops,
+ FileSystemProvider fileSystemProvider, Executor
fileSystemExecutor) {
+ return new HiveTransactionManager(ops, fileSystemProvider,
fileSystemExecutor);
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
index 571826aa9c8..69130f57fff 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java
@@ -34,7 +34,7 @@ public class LocationPathTest {
LocationPath locationPath = new LocationPath("hdfs://dir/file.path",
rangeProps);
Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
- String beLocation = locationPath.toScanRangeLocation().toString();
+ String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("hdfs://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
@@ -45,21 +45,21 @@ public class LocationPathTest {
Assertions.assertTrue(locationPath.get().startsWith("hdfs://")
&& !locationPath.get().startsWith("hdfs:///"));
- beLocation = locationPath.toScanRangeLocation().toString();
+ beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("hdfs://") &&
!beLocation.startsWith("hdfs:///"));
// nonstandard '/' for hdfs path
locationPath = new LocationPath("hdfs:/dir/file.path", props);
Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
- beLocation = locationPath.toScanRangeLocation().toString();
+ beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("hdfs://"));
// empty ha nameservices
props.put("dfs.nameservices", "");
locationPath = new LocationPath("hdfs:/dir/file.path", props);
- beLocation = locationPath.toScanRangeLocation().toString();
+ beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(locationPath.get().startsWith("/dir")
&& !locationPath.get().startsWith("hdfs://"));
Assertions.assertTrue(beLocation.startsWith("/dir") &&
!beLocation.startsWith("hdfs://"));
@@ -75,7 +75,7 @@ public class LocationPathTest {
// FE
Assertions.assertTrue(locationPath.get().startsWith("jfs://"));
// BE
- loc = locationPath.toScanRangeLocation().toString();
+ loc = locationPath.toStorageLocation().toString();
Assertions.assertTrue(loc.startsWith("jfs://"));
Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first,
FileSystemType.JFS);
}
@@ -89,7 +89,7 @@ public class LocationPathTest {
// FE
Assertions.assertTrue(locationPath.get().startsWith("s3://"));
// BE
- String beLoc = locationPath.toScanRangeLocation().toString();
+ String beLoc = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLoc.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first,
FileSystemType.S3);
}
@@ -101,7 +101,7 @@ public class LocationPathTest {
// FE
Assertions.assertTrue(locationPath.get().startsWith("oss://"));
// BE
- String beLocation = locationPath.toScanRangeLocation().toString();
+ String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
@@ -109,7 +109,7 @@ public class LocationPathTest {
// FE
Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs"));
// BE
- beLocation = locationPath.toScanRangeLocation().toString();
+ beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
@@ -121,7 +121,7 @@ public class LocationPathTest {
LocationPath locationPath = new LocationPath("cos://test.com",
rangeProps);
// FE
Assertions.assertTrue(locationPath.get().startsWith("cos://"));
- String beLocation = locationPath.toScanRangeLocation().toString();
+ String beLocation = locationPath.toStorageLocation().toString();
// BE
Assertions.assertTrue(beLocation.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
@@ -130,7 +130,7 @@ public class LocationPathTest {
// FE
Assertions.assertTrue(locationPath.get().startsWith("cosn://"));
// BE
- beLocation = locationPath.toScanRangeLocation().toString();
+ beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
@@ -138,7 +138,7 @@ public class LocationPathTest {
// FE
Assertions.assertTrue(locationPath.get().startsWith("ofs://"));
// BE
- beLocation = locationPath.toScanRangeLocation().toString();
+ beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("ofs://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.OFS);
@@ -147,7 +147,7 @@ public class LocationPathTest {
// FE
Assertions.assertTrue(locationPath.get().startsWith("gfs://"));
// BE
- beLocation = locationPath.toScanRangeLocation().toString();
+ beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("gfs://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.DFS);
}
@@ -159,7 +159,7 @@ public class LocationPathTest {
// FE
Assertions.assertTrue(locationPath.get().startsWith("obs://"));
// BE
- String beLocation = locationPath.toScanRangeLocation().toString();
+ String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("s3://"));
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation,
null).first, FileSystemType.S3);
}
@@ -173,7 +173,7 @@ public class LocationPathTest {
Assertions.assertTrue(locationPath.get().startsWith("unknown://"));
Assertions.assertTrue(locationPath.getLocationType() ==
LocationPath.LocationType.UNKNOWN);
// BE
- String beLocation = locationPath.toScanRangeLocation().toString();
+ String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.startsWith("unknown://"));
}
@@ -186,7 +186,7 @@ public class LocationPathTest {
Assertions.assertTrue(locationPath.get().equalsIgnoreCase("/path/to/local"));
Assertions.assertTrue(locationPath.getLocationType() ==
LocationPath.LocationType.NOSCHEME);
// BE
- String beLocation = locationPath.toScanRangeLocation().toString();
+ String beLocation = locationPath.toStorageLocation().toString();
Assertions.assertTrue(beLocation.equalsIgnoreCase("/path/to/local"));
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
index ba87dd8f48e..e441262f12e 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java
@@ -21,7 +21,10 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.TestHMSCachedClient;
+import org.apache.doris.fs.FileSystem;
+import org.apache.doris.fs.FileSystemProvider;
import org.apache.doris.fs.LocalDfsFileSystem;
+import org.apache.doris.fs.remote.SwitchingFileSystem;
import
org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.THiveLocationParams;
@@ -54,16 +57,21 @@ import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public class HmsCommitTest {
private static HiveMetadataOps hmsOps;
private static HMSCachedClient hmsClient;
+
+ private static FileSystemProvider fileSystemProvider;
private static final String dbName = "test_db";
private static final String tbWithPartition = "test_tb_with_partition";
private static final String tbWithoutPartition =
"test_tb_without_partition";
- private static LocalDfsFileSystem fs;
+ private static FileSystem fs;
+ private static LocalDfsFileSystem localDFSFileSystem;
+ private static Executor fileSystemExecutor;
static String dbLocation;
static String writeLocation;
static String uri = "thrift://127.0.0.1:9083";
@@ -86,7 +94,14 @@ public class HmsCommitTest {
}
public static void createTestHiveCatalog() throws IOException {
- fs = new LocalDfsFileSystem();
+ localDFSFileSystem = new LocalDfsFileSystem();
+ new MockUp<SwitchingFileSystem>(SwitchingFileSystem.class) {
+ @Mock
+ public FileSystem fileSystem(String location) {
+ return localDFSFileSystem;
+ }
+ };
+ fs = new SwitchingFileSystem(null, null, null);
if (hasRealHmsService) {
// If you have a real HMS service, then you can use this client to
create real connections for testing
@@ -96,7 +111,9 @@ public class HmsCommitTest {
} else {
hmsClient = new TestHMSCachedClient();
}
- hmsOps = new HiveMetadataOps(null, hmsClient, fs);
+ hmsOps = new HiveMetadataOps(null, hmsClient);
+ fileSystemProvider = ctx -> fs;
+ fileSystemExecutor = Executors.newFixedThreadPool(16);
}
public static void createTestHiveDatabase() {
@@ -339,9 +356,9 @@ public class HmsCommitTest {
fs.makeDir(targetPath);
}
- fs.createFile(writePath + "/" + f1);
- fs.createFile(writePath + "/" + f2);
- fs.createFile(writePath + "/" + f3);
+ localDFSFileSystem.createFile(writePath + "/" + f1);
+ localDFSFileSystem.createFile(writePath + "/" + f2);
+ localDFSFileSystem.createFile(writePath + "/" + f3);
return pu;
}
@@ -363,7 +380,7 @@ public class HmsCommitTest {
public void commit(String dbName,
String tableName,
List<THivePartitionUpdate> hivePUs) {
- HMSTransaction hmsTransaction = new HMSTransaction(hmsOps);
+ HMSTransaction hmsTransaction = new HMSTransaction(hmsOps,
fileSystemProvider, fileSystemExecutor);
hmsTransaction.setHivePartitionUpdates(hivePUs);
HiveInsertCommandContext ctx = new HiveInsertCommandContext();
String queryId = DebugUtil.printId(ConnectContext.get().queryId());
@@ -634,3 +651,4 @@ public class HmsCommitTest {
assertNumRows(3, pa);
}
}
+
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index 93de397b27b..834c9025cfd 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -282,6 +282,8 @@ struct THiveLocationParams {
1: optional string write_path
2: optional string target_path
3: optional Types.TFileType file_type
+ // Other object store will convert write_path to s3 scheme path for BE, this
field keeps the original write path.
+ 4: optional string original_write_path
}
struct TSortedColumn {
@@ -336,6 +338,13 @@ enum TUpdateMode {
OVERWRITE = 2 // insert overwrite
}
+struct TS3MPUPendingUpload {
+ 1: optional string bucket
+ 2: optional string key
+ 3: optional string upload_id
+ 4: optional map<i32, string> etags
+}
+
struct THivePartitionUpdate {
1: optional string name
2: optional TUpdateMode update_mode
@@ -343,6 +352,7 @@ struct THivePartitionUpdate {
4: optional list<string> file_names
5: optional i64 row_count
6: optional i64 file_size
+ 7: optional list<TS3MPUPendingUpload> s3_mpu_pending_uploads
}
struct TDataSink {
diff --git
a/regression-test/data/external_table_p2/hive/test_hive_write_insert_s3.out
b/regression-test/data/external_table_p2/hive/test_hive_write_insert_s3.out
new file mode 100644
index 00000000000..7d34ada0550
--- /dev/null
+++ b/regression-test/data/external_table_p2/hive/test_hive_write_insert_s3.out
@@ -0,0 +1,61 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q01 --
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q02 --
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q03 --
+\N \N \N \N \N -123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {2:20} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [3.4567, 4.5678]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N \N
+\N \N \N \N \N -123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {3:20} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [8.4567, 4.5678]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N \N
+\N \N \N \N \N 123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {1:10} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [1.2345, 2.3456]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N \N
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q04 --
+\N \N \N \N \N -123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {2:20} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [3.4567, 4.5678]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N \N
+\N \N \N \N \N -123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {3:20} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [8.4567, 4.5678]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N \N
+\N \N \N \N \N 123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {1:10} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [1.2345, 2.3456]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N \N
+
+-- !q05 --
+
+-- !q01 --
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q02 --
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q03 --
+\N \N \N \N \N -123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {2:20} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [3.4567, 4.5678]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N 20240321
+\N \N \N \N \N -123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {3:20} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [8.4567, 4.5678]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N 20240322
+\N \N \N \N \N 123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {1:10} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [1.2345, 2.3456]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N 20240320
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-21
2024-03-21T12:00 2024-03-21T12:00:00.123456
2024-03-21T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {2:20} {2:200000000000}
{2.2:20.2} {2.2:20.2} {0:1} {2.2:2.2} {2.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+false -128 -32768 -2147483648 -9223372036854775808 -123.45
-123456.789 -123456789 -1234.5678 -123456.789012
-123456789.012345678901 string_value binary_value 2024-03-22
2024-03-22T12:00 2024-03-22T12:00:00.123456
2024-03-22T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"x":"y"} {3:20} {3:200000000000}
{3.2:20.2} {3.2:20.2} {0:1} {3.2:2.2} {3.34:2.34}
{2.3456:2.3456} {2.34567890:2.34567890} {2.34567890 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+true 127 32767 2147483647 9223372036854775807 123.45
123456.789 123456789 1234.5678 123456.789012
123456789.012345678901 string_value binary_value 2024-03-20
2024-03-20T12:00 2024-03-20T12:00:00.123456
2024-03-20T12:00:00.123456 char_value1 char_value2 char_value3
varchar_value1 varchar_value2 varchar_value3 {"key1":"value1"}
{"key1":"value1"} {"a":"b"} {1:10} {1:100000000000}
{1.1:10.1} {1.1:10.1} {1:0} {1.1:1.1} {1.23:1.23}
{1.2345:1.2345} {1.23456789:1.23456789} {1.23456789:1.23456789 [...]
+
+-- !q04 --
+\N \N \N \N \N -123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {2:20} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [3.4567, 4.5678]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N 20240321
+\N \N \N \N \N -123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {3:20} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [8.4567, 4.5678]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N 20240322
+\N \N \N \N \N 123.45 \N \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N {1:10} \N \N \N
\N \N \N \N \N \N \N \N \N \N
\N \N \N \N \N \N \N [1.2345, 2.3456]
\N \N \N \N \N \N \N \N \N \N
[null, "value1", "value2"] \N \N \N 20240320
+
diff --git
a/regression-test/suites/external_table_p2/hive/test_hive_write_insert_s3.groovy
b/regression-test/suites/external_table_p2/hive/test_hive_write_insert_s3.groovy
new file mode 100644
index 00000000000..87633ba1b09
--- /dev/null
+++
b/regression-test/suites/external_table_p2/hive/test_hive_write_insert_s3.groovy
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_hive_write_insert_s3",
"p2,external,hive,external_remote,external_remote_hive") {
+ def format_compressions = ["parquet_snappy"]
+
+ def q01 = { String format_compression, String catalog_name ->
+ logger.info("hive sql: " + """ truncate table
all_types_${format_compression}_s3; """)
+ hive_remote """ truncate table all_types_${format_compression}_s3; """
+ sql """refresh catalog ${catalog_name};"""
+
+ sql """
+ INSERT INTO all_types_${format_compression}_s3
+ SELECT * FROM all_types_parquet_snappy_src;
+ """
+ order_qt_q01 """ select * from all_types_${format_compression}_s3;
+ """
+
+ sql """
+ INSERT INTO all_types_${format_compression}_s3
+ SELECT boolean_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, decimal_col1, decimal_col2,
+ decimal_col3, decimal_col4, string_col, binary_col, date_col,
timestamp_col1, timestamp_col2, timestamp_col3, char_col1,
+ char_col2, char_col3, varchar_col1, varchar_col2, varchar_col3,
t_map_string, t_map_varchar, t_map_char, t_map_int,
+ t_map_bigint, t_map_float, t_map_double, t_map_boolean,
t_map_decimal_precision_2, t_map_decimal_precision_4,
+ t_map_decimal_precision_8, t_map_decimal_precision_17,
t_map_decimal_precision_18, t_map_decimal_precision_38,
+ t_array_string, t_array_int, t_array_bigint, t_array_float,
t_array_double, t_array_boolean, t_array_varchar,
+ t_array_char, t_array_decimal_precision_2,
t_array_decimal_precision_4, t_array_decimal_precision_8,
+ t_array_decimal_precision_17, t_array_decimal_precision_18,
t_array_decimal_precision_38, t_struct_bigint, t_complex,
+ t_struct_nested, t_struct_null,
t_struct_non_nulls_after_nulls, t_nested_struct_non_nulls_after_nulls,
+ t_map_null_value, t_array_string_starting_with_nulls,
t_array_string_with_nulls_in_between,
+ t_array_string_ending_with_nulls, t_array_string_all_nulls,
dt FROM all_types_parquet_snappy_src;
+ """
+ order_qt_q02 """ select * from all_types_${format_compression}_s3;
+ """
+
+ sql """
+ INSERT INTO all_types_${format_compression}_s3(float_col, t_map_int,
t_array_decimal_precision_8, t_array_string_starting_with_nulls)
+ SELECT float_col, t_map_int, t_array_decimal_precision_8,
t_array_string_starting_with_nulls FROM all_types_parquet_snappy_src;
+ """
+ order_qt_q03 """
+ select * from all_types_${format_compression}_s3;
+ """
+
+ sql """
+ INSERT OVERWRITE TABLE all_types_${format_compression}_s3(float_col,
t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls)
+ SELECT float_col, t_map_int, t_array_decimal_precision_8,
t_array_string_starting_with_nulls FROM all_types_parquet_snappy_src;
+ """
+ order_qt_q04 """
+ select * from all_types_${format_compression}_s3;
+ """
+
+ logger.info("hive sql: " + """ truncate table
all_types_${format_compression}_s3; """)
+ hive_remote """ truncate table all_types_${format_compression}_s3; """
+ sql """refresh catalog ${catalog_name};"""
+ order_qt_q05 """
+ select * from all_types_${format_compression}_s3;
+ """
+ }
+
+ def q02 = { String format_compression, String catalog_name ->
+ logger.info("hive sql: " + """ DROP TABLE IF EXISTS
all_types_par_${format_compression}_s3_${catalog_name}_q02; """)
+ hive_remote """ DROP TABLE IF EXISTS
all_types_par_${format_compression}_s3_${catalog_name}_q02; """
+ logger.info("hive sql: " + """ CREATE TABLE IF NOT EXISTS
all_types_par_${format_compression}_s3_${catalog_name}_q02 like
all_types_par_${format_compression}_s3; """)
+ hive_remote """ CREATE TABLE IF NOT EXISTS
all_types_par_${format_compression}_s3_${catalog_name}_q02 like
all_types_par_${format_compression}_s3; """
+ logger.info("hive sql: " + """ ALTER TABLE
all_types_par_${format_compression}_s3_${catalog_name}_q02 SET LOCATION
'cosn://doris-build-1308700295/regression/write/data/all_types_par_${format_compression}_s3_${catalog_name}_q02';
""")
+ hive_remote """ ALTER TABLE
all_types_par_${format_compression}_s3_${catalog_name}_q02 SET LOCATION
'cosn://doris-build-1308700295/regression/write/data/all_types_par_${format_compression}_s3_${catalog_name}_q02';
"""
+ sql """refresh catalog ${catalog_name};"""
+
+ sql """
+ INSERT INTO all_types_par_${format_compression}_s3_${catalog_name}_q02
+ SELECT * FROM all_types_par_parquet_snappy_src;
+ """
+ order_qt_q01 """ select * from
all_types_par_${format_compression}_s3_${catalog_name}_q02;
+ """
+
+ sql """
+ INSERT INTO all_types_par_${format_compression}_s3_${catalog_name}_q02
+ SELECT boolean_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, decimal_col1, decimal_col2,
+ decimal_col3, decimal_col4, string_col, binary_col, date_col,
timestamp_col1, timestamp_col2, timestamp_col3, char_col1,
+ char_col2, char_col3, varchar_col1, varchar_col2, varchar_col3,
t_map_string, t_map_varchar, t_map_char, t_map_int,
+ t_map_bigint, t_map_float, t_map_double, t_map_boolean,
t_map_decimal_precision_2, t_map_decimal_precision_4,
+ t_map_decimal_precision_8, t_map_decimal_precision_17,
t_map_decimal_precision_18, t_map_decimal_precision_38,
+ t_array_string, t_array_int, t_array_bigint, t_array_float,
t_array_double, t_array_boolean, t_array_varchar,
+ t_array_char, t_array_decimal_precision_2,
t_array_decimal_precision_4, t_array_decimal_precision_8,
+ t_array_decimal_precision_17, t_array_decimal_precision_18,
t_array_decimal_precision_38, t_struct_bigint, t_complex,
+ t_struct_nested, t_struct_null,
t_struct_non_nulls_after_nulls, t_nested_struct_non_nulls_after_nulls,
+ t_map_null_value, t_array_string_starting_with_nulls,
t_array_string_with_nulls_in_between,
+ t_array_string_ending_with_nulls, t_array_string_all_nulls,
dt FROM all_types_parquet_snappy_src;
+ """
+ order_qt_q02 """ select * from
all_types_par_${format_compression}_s3_${catalog_name}_q02;
+ """
+
+ sql """
+ INSERT INTO
all_types_par_${format_compression}_s3_${catalog_name}_q02(float_col,
t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls, dt)
+ SELECT float_col, t_map_int, t_array_decimal_precision_8,
t_array_string_starting_with_nulls, dt FROM all_types_parquet_snappy_src;
+ """
+ order_qt_q03 """ select * from
all_types_par_${format_compression}_s3_${catalog_name}_q02;
+ """
+
+ sql """
+ INSERT OVERWRITE TABLE
all_types_par_${format_compression}_s3_${catalog_name}_q02(float_col,
t_map_int, t_array_decimal_precision_8, t_array_string_starting_with_nulls, dt)
+ SELECT float_col, t_map_int, t_array_decimal_precision_8,
t_array_string_starting_with_nulls, dt FROM all_types_parquet_snappy_src;
+ """
+ order_qt_q04 """
+ select * from
all_types_par_${format_compression}_s3_${catalog_name}_q02;
+ """
+
+ logger.info("hive sql: " + """ DROP TABLE IF EXISTS
all_types_par_${format_compression}_s3_${catalog_name}_q02; """)
+ hive_remote """ DROP TABLE IF EXISTS
all_types_par_${format_compression}_s3_${catalog_name}_q02; """
+ }
+
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ String catalog_name = "test_hive_write_insert_s3"
+
+ String hms_host = context.config.otherConfigs.get("extHiveHmsHost")
+ String Hms_port = context.config.otherConfigs.get("extHiveHmsPort")
+ String hdfs_host =
context.config.otherConfigs.get("extHiveHmsHost")
+ String hdfs_port = context.config.otherConfigs.get("extHdfsPort")
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ String ak = context.config.otherConfigs.get("extAk")
+ String sk = context.config.otherConfigs.get("extSk")
+ String endpoint = context.config.otherConfigs.get("extS3Endpoint")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ 'type'='hms',
+ 'hive.metastore.uris' = 'thrift://${hms_host}:${Hms_port}',
+ 'fs.defaultFS' = 'hdfs://${hdfs_host}:${hdfs_port}',
+ 'hadoop.username' = 'hadoop',
+ 's3.endpoint' = '${endpoint}',
+ 's3.access_key' = '${ak}',
+ 's3.secret_key' = '${sk}'
+ );"""
+ sql """use `${catalog_name}`.`write_test`"""
+ logger.info("hive sql: " + """ use `write_test` """)
+ hive_remote """use `write_test`"""
+
+ sql """set enable_fallback_to_original_planner=false;"""
+
+ for (String format_compression in format_compressions) {
+ logger.info("Process format_compression" + format_compression)
+ q01(format_compression, catalog_name)
+ q02(format_compression, catalog_name)
+ }
+
+ sql """drop catalog if exists ${catalog_name}"""
+ } finally {
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]