This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 70a518e099c [Fix](multi-catalog) Fix not throw error when call close()
in hive/iceberg writer. (#38902)
70a518e099c is described below
commit 70a518e099c9dcf2a9952ba9779bca5e275f0e9f
Author: Qi Chen <[email protected]>
AuthorDate: Tue Aug 6 08:51:12 2024 +0800
[Fix](multi-catalog) Fix not throw error when call close() in hive/iceberg
writer. (#38902)
## Proposed changes
[Fix] (multi-catalog) Fix not throw error when call close() in
hive/iceberg writer.
When the file writer closes(), it will sync buffer to commit. Therefore,
sometimes data is written only when close() is called, which can expose
some errors. For example, hdfs_file_writer. Therefore, this error needs
to be captured in the entire close process.
---
.../vec/sink/writer/iceberg/viceberg_partition_writer.cpp | 14 ++++++++------
be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp | 10 +++++++---
be/src/vec/sink/writer/vhive_partition_writer.cpp | 14 ++++++++------
be/src/vec/sink/writer/vhive_table_writer.cpp | 10 +++++++---
4 files changed, 30 insertions(+), 18 deletions(-)
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
index e2dccd0345b..30cd7f20316 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
@@ -100,24 +100,26 @@ Status VIcebergPartitionWriter::open(RuntimeState* state,
RuntimeProfile* profil
}
Status VIcebergPartitionWriter::close(const Status& status) {
+ Status result_status;
if (_file_format_transformer != nullptr) {
- Status st = _file_format_transformer->close();
- if (!st.ok()) {
+ result_status = _file_format_transformer->close();
+ if (!result_status.ok()) {
LOG(WARNING) << fmt::format("_file_format_transformer close
failed, reason: {}",
- st.to_string());
+ result_status.to_string());
}
}
- if (!status.ok() && _file_writer != nullptr) {
+ bool status_ok = result_status.ok() && status.ok();
+ if (!status_ok && _file_writer != nullptr) {
auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
Status st = _file_writer->fs()->delete_file(path);
if (!st.ok()) {
LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}",
path, st.to_string());
}
}
- if (status.ok()) {
+ if (status_ok) {
_state->iceberg_commit_datas().emplace_back(_build_iceberg_commit_data());
}
- return Status::OK();
+ return result_status;
}
Status VIcebergPartitionWriter::write(vectorized::Block& block) {
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index e59b0593f7b..4b705b0e51b 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -271,15 +271,19 @@ Status
VIcebergTableWriter::_filter_block(doris::vectorized::Block& block,
}
Status VIcebergTableWriter::close(Status status) {
+ Status result_status;
int64_t partitions_to_writers_size = _partitions_to_writers.size();
{
SCOPED_RAW_TIMER(&_close_ns);
for (const auto& pair : _partitions_to_writers) {
Status st = pair.second->close(status);
- if (st != Status::OK()) {
+ if (!st.ok()) {
LOG(WARNING) << fmt::format("partition writer close failed for
partition {}",
st.to_string());
- continue;
+ if (result_status.ok()) {
+ result_status = st;
+ continue;
+ }
}
}
_partitions_to_writers.clear();
@@ -295,7 +299,7 @@ Status VIcebergTableWriter::close(Status status) {
COUNTER_SET(_close_timer, _close_ns);
COUNTER_SET(_write_file_counter, _write_file_count);
}
- return Status::OK();
+ return result_status;
}
std::string VIcebergTableWriter::_partition_to_path(const
doris::iceberg::StructLike& data) {
diff --git a/be/src/vec/sink/writer/vhive_partition_writer.cpp
b/be/src/vec/sink/writer/vhive_partition_writer.cpp
index 64d1d9b4266..3a94303aa75 100644
--- a/be/src/vec/sink/writer/vhive_partition_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_partition_writer.cpp
@@ -105,24 +105,26 @@ Status VHivePartitionWriter::open(RuntimeState* state,
RuntimeProfile* profile)
}
Status VHivePartitionWriter::close(const Status& status) {
+ Status result_status;
if (_file_format_transformer != nullptr) {
- Status st = _file_format_transformer->close();
- if (!st.ok()) {
+ result_status = _file_format_transformer->close();
+ if (!result_status.ok()) {
LOG(WARNING) << fmt::format("_file_format_transformer close
failed, reason: {}",
- st.to_string());
+ result_status.to_string());
}
}
- if (!status.ok() && _file_writer != nullptr) {
+ bool status_ok = result_status.ok() && status.ok();
+ if (!status_ok && _file_writer != nullptr) {
auto path = fmt::format("{}/{}", _write_info.write_path, _file_name);
Status st = _file_writer->fs()->delete_file(path);
if (!st.ok()) {
LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}",
path, st.to_string());
}
}
- if (status.ok()) {
+ if (status_ok) {
_state->hive_partition_updates().emplace_back(_build_partition_update());
}
- return Status::OK();
+ return result_status;
}
Status VHivePartitionWriter::write(vectorized::Block& block) {
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index 0e64060eb0b..0f8ff953eab 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -240,15 +240,19 @@ Status
VHiveTableWriter::_filter_block(doris::vectorized::Block& block,
}
Status VHiveTableWriter::close(Status status) {
+ Status result_status;
int64_t partitions_to_writers_size = _partitions_to_writers.size();
{
SCOPED_RAW_TIMER(&_close_ns);
for (const auto& pair : _partitions_to_writers) {
Status st = pair.second->close(status);
- if (st != Status::OK()) {
+ if (!st.ok()) {
LOG(WARNING) << fmt::format("partition writer close failed for
partition {}",
st.to_string());
- continue;
+ if (result_status.ok()) {
+ result_status = st;
+ continue;
+ }
}
}
_partitions_to_writers.clear();
@@ -264,7 +268,7 @@ Status VHiveTableWriter::close(Status status) {
COUNTER_SET(_close_timer, _close_ns);
COUNTER_SET(_write_file_counter, _write_file_count);
}
- return Status::OK();
+ return result_status;
}
std::shared_ptr<VHivePartitionWriter>
VHiveTableWriter::_create_partition_writer(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]