This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new ead1da60239 branch-4.0: [fix](be) Poll packed file async close without
blocking #62938 (#63531)
ead1da60239 is described below
commit ead1da60239c63611f5dfc836530f8815e3db36b
Author: Yixuan Wang <[email protected]>
AuthorDate: Thu May 28 14:50:24 2026 +0800
branch-4.0: [fix](be) Poll packed file async close without blocking #62938
(#63531)
pick: https://github.com/apache/doris/pull/62938
---
be/src/io/fs/file_writer.h | 10 +++
be/src/io/fs/packed_file_manager.cpp | 14 +---
be/src/io/fs/s3_file_writer.cpp | 91 +++++++++++-----------
be/src/io/fs/s3_file_writer.h | 2 +
be/src/olap/rowset/beta_rowset_writer.cpp | 12 +++
be/src/olap/rowset/segment_creator.cpp | 7 ++
be/test/io/fs/packed_file_manager_test.cpp | 72 ++++++++++++++---
.../test_packed_file_async_close_error.groovy | 78 +++++++++++++++++++
8 files changed, 222 insertions(+), 64 deletions(-)
diff --git a/be/src/io/fs/file_writer.h b/be/src/io/fs/file_writer.h
index 0cda2b519c4..de4fc6f577a 100644
--- a/be/src/io/fs/file_writer.h
+++ b/be/src/io/fs/file_writer.h
@@ -72,6 +72,16 @@ public:
// If there is no data appended, an empty file will be persisted.
virtual Status close(bool non_block = false) = 0;
+ // Non-blocking probe for a previous close(true).
+ // OK means close finished successfully. NeedSendAgain means close is
still running.
+ // Other errors mean close finished with error or the writer does not
support this API.
+ // NOTE: This method consumes the async close result when it is ready. The
caller must
+ // use it as the only completion path for that async close; mixing it with
close(false)
+ // or another try_finish_close consumer is not supported.
+ virtual Status try_finish_close() {
+ return Status::NotSupported("try_finish_close is not supported");
+ }
+
Status append(const Slice& data) { return appendv(&data, 1); }
virtual Status appendv(const Slice* data, size_t data_cnt) = 0;
diff --git a/be/src/io/fs/packed_file_manager.cpp
b/be/src/io/fs/packed_file_manager.cpp
index 6bff9eff6f3..654f2d933fd 100644
--- a/be/src/io/fs/packed_file_manager.cpp
+++ b/be/src/io/fs/packed_file_manager.cpp
@@ -851,11 +851,6 @@ void PackedFileManager::process_uploading_packed_files() {
Status upload_status =
finalize_packed_file_upload(packed_file->packed_file_path,
packed_file->writer.get());
- if (upload_status.is<ErrorCode::ALREADY_CLOSED>()) {
- record_ready_to_upload(packed_file);
- handle_success(packed_file);
- continue;
- }
if (!upload_status.ok()) {
handle_failure(packed_file, upload_status);
continue;
@@ -873,16 +868,13 @@ void PackedFileManager::process_uploading_packed_files() {
continue;
}
- if (packed_file->writer->state() != FileWriter::State::CLOSED) {
+ Status status = packed_file->writer->try_finish_close();
+ if (status.is<ErrorCode::NEED_SEND_AGAIN>()) {
continue;
}
- Status status = packed_file->writer->close(true);
- if (status.is<ErrorCode::ALREADY_CLOSED>()) {
- handle_success(packed_file);
- continue;
- }
if (status.ok()) {
+ handle_success(packed_file);
continue;
}
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index a1b72b8dc98..eec1e4c3a60 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -41,6 +41,7 @@
#include "io/fs/s3_file_system.h"
#include "io/fs/s3_obj_storage_client.h"
#include "runtime/exec_env.h"
+#include "util/debug_points.h"
#include "util/s3_util.h"
#include "util/stopwatch.hpp"
@@ -126,48 +127,10 @@ void S3FileWriter::_wait_until_finish(std::string_view
task_name) {
}
Status S3FileWriter::close(bool non_block) {
- auto record_close_latency = [this]() {
- if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
- return;
- }
- auto now = std::chrono::steady_clock::now();
- auto latency_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(
- now - *_first_append_timestamp)
- .count();
- s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
- if (auto* sampler =
s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
- sampler->take_sample();
- }
- _close_latency_recorded = true;
- };
-
if (state() == State::CLOSED) {
- if (_async_close_pack != nullptr) {
- _st = _async_close_pack->future.get();
- _async_close_pack = nullptr;
- // Return the final close status so that a blocking close issued
after
- // an async close observes the real result just like the legacy
behavior.
- if (!non_block && _st.ok()) {
- record_close_latency();
- }
- return _st;
- }
- if (non_block) {
- if (_st.ok()) {
- record_close_latency();
- return Status::Error<ErrorCode::ALREADY_CLOSED>(
- "S3FileWriter already closed, file path {}, file key
{}",
- _obj_storage_path_opts.path.native(),
_obj_storage_path_opts.key);
- }
- return _st;
- }
- if (_st.ok()) {
- record_close_latency();
- return Status::Error<ErrorCode::ALREADY_CLOSED>(
- "S3FileWriter already closed, file path {}, file key {}",
- _obj_storage_path_opts.path.native(),
_obj_storage_path_opts.key);
- }
- return _st;
+ return Status::InternalError("S3FileWriter already closed, file path
{}, file key {}",
+ _obj_storage_path_opts.path.native(),
+ _obj_storage_path_opts.key);
}
if (state() == State::ASYNC_CLOSING) {
if (non_block) {
@@ -181,7 +144,7 @@ Status S3FileWriter::close(bool non_block) {
// The next time we call close() with no matter non_block true or
false, it would always return the
// '_st' value because this writer is already closed.
if (!non_block && _st.ok()) {
- record_close_latency();
+ _record_close_latency();
}
return _st;
}
@@ -194,7 +157,6 @@ Status S3FileWriter::close(bool non_block) {
s3_file_writer_async_close_queuing << -1;
s3_file_writer_async_close_processing << 1;
_st = _close_impl();
- _state = State::CLOSED;
_async_close_pack->promise.set_value(_st);
s3_file_writer_async_close_processing << -1;
});
@@ -202,7 +164,42 @@ Status S3FileWriter::close(bool non_block) {
_st = _close_impl();
_state = State::CLOSED;
if (!non_block && _st.ok()) {
- record_close_latency();
+ _record_close_latency();
+ }
+ return _st;
+}
+
+void S3FileWriter::_record_close_latency() {
+ if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
+ return;
+ }
+ auto now = std::chrono::steady_clock::now();
+ auto latency_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(now -
*_first_append_timestamp)
+ .count();
+ s3_file_writer_first_append_to_close_ms_recorder << latency_ms;
+ if (auto* sampler =
s3_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
+ sampler->take_sample();
+ }
+ _close_latency_recorded = true;
+}
+
+Status S3FileWriter::try_finish_close() {
+ if (state() == State::CLOSED) {
+ return _st;
+ }
+ if (state() != State::ASYNC_CLOSING) {
+ return Status::NotSupported("S3FileWriter is not async closing");
+ }
+ CHECK(_async_close_pack != nullptr);
+ if (_async_close_pack->future.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready) {
+ return Status::NeedSendAgain("async close is not finished");
+ }
+ _st = _async_close_pack->future.get();
+ _async_close_pack = nullptr;
+ _state = State::CLOSED;
+ if (_st.ok()) {
+ _record_close_latency();
}
return _st;
}
@@ -254,6 +251,12 @@ Status S3FileWriter::_build_upload_buffer() {
Status S3FileWriter::_close_impl() {
VLOG_DEBUG << "S3FileWriter::close, path: " <<
_obj_storage_path_opts.path.native();
+ DBUG_EXECUTE_IF("S3FileWriter._close_impl.inject_error", {
+ if (_obj_storage_path_opts.key.ends_with(".dat")) {
+ return Status::IOError("S3FileWriter._close_impl.inject_error");
+ }
+ });
+
if (_cur_part_num == 1 && _pending_buf) { // data size is less than
config::s3_write_buffer_size
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 38e68b14c4d..f31a9edef2c 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -71,6 +71,7 @@ public:
}
Status close(bool non_block = false) override;
+ Status try_finish_close() override;
private:
Status _close_impl();
@@ -84,6 +85,7 @@ private:
void _upload_one_part(int part_num, UploadFileBuffer& buf);
bool _complete_part_task_callback(Status s);
Status _build_upload_buffer();
+ void _record_close_latency();
ObjectStoragePathOptions _obj_storage_path_opts;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 8e0794ce5d3..16f1419dba6 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -23,11 +23,13 @@
#include <fmt/format.h>
#include <stdio.h>
+#include <chrono>
#include <ctime> // time
#include <filesystem>
#include <memory>
#include <mutex>
#include <sstream>
+#include <thread>
#include <utility>
#include <vector>
@@ -139,6 +141,16 @@ Status SegmentFileCollection::close() {
}
for (auto&& [_, writer] : _file_writers) {
+ DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", {
+ auto before_state = writer->state();
+ for (int i = 0; i < 3000 && writer->state() !=
io::FileWriter::State::CLOSED; ++i) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ }
+ LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path="
+ << writer->path().native()
+ << " before_state=" << static_cast<int>(before_state)
+ << " after_state=" << static_cast<int>(writer->state());
+ });
if (writer->state() != io::FileWriter::State::CLOSED) {
RETURN_IF_ERROR(writer->close());
}
diff --git a/be/src/olap/rowset/segment_creator.cpp
b/be/src/olap/rowset/segment_creator.cpp
index b35d4764d2f..b2982eabcf9 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -20,6 +20,7 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
+#include <chrono>
#include <filesystem>
#include <memory>
#include <sstream>
@@ -236,6 +237,9 @@ Status SegmentFlusher::_flush_segment_writer(
return Status::Error(s.code(), "failed to finalize segment: {}",
s.to_string());
}
+
DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
+ {
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
+
MonotonicStopWatch inverted_index_timer;
inverted_index_timer.start();
int64_t inverted_index_file_size = 0;
@@ -311,6 +315,9 @@ Status
SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
return Status::Error(s.code(), "failed to finalize segment: {}",
s.to_string());
}
+
DBUG_EXECUTE_IF("SegmentFlusher._flush_segment_writer.after_finalize.sleep",
+ {
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); });
+
MonotonicStopWatch inverted_index_timer;
inverted_index_timer.start();
int64_t inverted_index_file_size = 0;
diff --git a/be/test/io/fs/packed_file_manager_test.cpp
b/be/test/io/fs/packed_file_manager_test.cpp
index 645f0896f1a..37800bf1620 100644
--- a/be/test/io/fs/packed_file_manager_test.cpp
+++ b/be/test/io/fs/packed_file_manager_test.cpp
@@ -55,24 +55,20 @@ public:
void set_start_close_status(Status st) { _start_close_status =
std::move(st); }
void complete_async_close() {
if (_state == State::ASYNC_CLOSING) {
- _state = State::CLOSED;
+ _async_close_ready = true;
}
}
+ size_t close_calls() const { return _close_calls; }
+ size_t try_finish_close_calls() const { return _try_finish_close_calls; }
size_t append_calls() const { return _append_calls; }
bool closed() const { return _state == State::CLOSED; }
size_t bytes_appended() const override { return _bytes_appended; }
const std::string& written_data() const { return _written; }
Status close(bool non_block = false) override {
+ ++_close_calls;
if (_state == State::CLOSED) {
- if (non_block) {
- if (_close_status.ok()) {
- return Status::Error<ErrorCode::ALREADY_CLOSED>(
- "MockFileWriter already closed: {}",
_path.native());
- }
- return _close_status;
- }
return Status::Error<ErrorCode::ALREADY_CLOSED>("MockFileWriter
already closed: {}",
_path.native());
}
@@ -82,11 +78,18 @@ public:
}
if (_state == State::ASYNC_CLOSING) {
- return Status::InternalError("Don't submit async close multi
times");
+ if (non_block) {
+ return Status::InternalError("Don't submit async close multi
times");
+ }
+ _async_close_ready = true;
+ _async_close_consumed = true;
+ _state = State::CLOSED;
+ return _close_status;
}
if (non_block) {
_state = State::ASYNC_CLOSING;
+ _async_close_ready = false;
return Status::OK();
}
@@ -94,6 +97,19 @@ public:
return _close_status;
}
+ Status try_finish_close() override {
+ ++_try_finish_close_calls;
+ if (_state == State::CLOSED) {
+ return _async_close_consumed ? _close_status : Status::OK();
+ }
+ if (_state != State::ASYNC_CLOSING || !_async_close_ready) {
+ return Status::NeedSendAgain("async close is not finished");
+ }
+ _state = State::CLOSED;
+ _async_close_consumed = true;
+ return _close_status;
+ }
+
Status appendv(const Slice* data, size_t data_cnt) override {
if (!_append_status.ok()) {
return _append_status;
@@ -114,10 +130,14 @@ private:
Path _path;
size_t _bytes_appended = 0;
size_t _append_calls = 0;
+ size_t _close_calls = 0;
+ size_t _try_finish_close_calls = 0;
std::string _written;
Status _start_close_status = Status::OK();
Status _append_status = Status::OK();
Status _close_status = Status::OK();
+ bool _async_close_ready = false;
+ bool _async_close_consumed = false;
State _state = State::OPENED;
};
@@ -581,6 +601,40 @@ TEST_F(PackedFileManagerTest,
ProcessUploadingFilesSetsFailedWhenAsyncCloseFails
EXPECT_NE(failed->last_error.find("async close fail"), std::string::npos);
}
+TEST_F(PackedFileManagerTest,
ProcessUploadingFilesPollsAsyncCloseWithoutBlocking) {
+ std::string payload = "abc";
+ Slice slice(payload);
+ auto info = default_append_info();
+ ASSERT_TRUE(manager->append_small_file("async_poll_fail", slice,
info).ok());
+
ASSERT_TRUE(manager->mark_current_packed_file_for_upload(_resource_id).ok());
+ ASSERT_EQ(manager->uploading_packed_files_for_test().size(), 1);
+
+ auto uploading =
manager->uploading_packed_files_for_test().begin()->second;
+ auto* writer = dynamic_cast<MockFileWriter*>(uploading->writer.get());
+ ASSERT_NE(writer, nullptr);
+ uploading->state = PackedFileManager::PackedFileState::UPLOADING;
+ writer->set_close_status(Status::IOError("async close poll fail"));
+ ASSERT_TRUE(writer->close(true).ok());
+ ASSERT_EQ(writer->close_calls(), 1);
+
+ manager->process_uploading_packed_files();
+ EXPECT_EQ(uploading->state.load(),
PackedFileManager::PackedFileState::UPLOADING);
+ EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 1);
+ EXPECT_EQ(manager->uploaded_packed_files_for_test().size(), 0);
+ EXPECT_EQ(writer->close_calls(), 1);
+ EXPECT_EQ(writer->try_finish_close_calls(), 1);
+
+ writer->complete_async_close();
+ manager->process_uploading_packed_files();
+ EXPECT_EQ(writer->close_calls(), 1);
+ EXPECT_EQ(writer->try_finish_close_calls(), 2);
+ EXPECT_EQ(manager->uploading_packed_files_for_test().size(), 0);
+ ASSERT_EQ(manager->uploaded_packed_files_for_test().size(), 1);
+ auto failed = manager->uploaded_packed_files_for_test().begin()->second;
+ EXPECT_EQ(failed->state.load(),
PackedFileManager::PackedFileState::FAILED);
+ EXPECT_NE(failed->last_error.find("async close poll fail"),
std::string::npos);
+}
+
TEST_F(PackedFileManagerTest, AppendPackedFileInfoToFileTail) {
std::string payload = "abc";
Slice slice(payload);
diff --git
a/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy
b/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy
new file mode 100644
index 00000000000..7407e5eb27d
--- /dev/null
+++
b/regression-test/suites/cloud_p0/packed_file/test_packed_file_async_close_error.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_packed_file_async_close_error", "p0, nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ def closeErrorPoint = "S3FileWriter._close_impl.inject_error"
+ def afterFinalizeSleepPoint =
"SegmentFlusher._flush_segment_writer.after_finalize.sleep"
+ def waitDatClosedPoint = "SegmentFileCollection.close.wait_dat_closed"
+
+ sql """ DROP TABLE IF EXISTS test_packed_file_async_close_error """
+ sql """
+ CREATE TABLE IF NOT EXISTS test_packed_file_async_close_error (
+ `k1` int NULL,
+ `v1` varchar(32) NULL,
+ INDEX idx_v1 (`v1`) USING INVERTED PROPERTIES("parser" = "english")
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ setBeConfigTemporary([
+ "enable_packed_file": "false",
+ "enable_vertical_segment_writer": "true",
+ "small_file_threshold_bytes": "1048576",
+ "packed_file_size_threshold_bytes": "1048576",
+ "packed_file_time_threshold_ms": "1"
+ ]) {
+ try {
+ sql """ SET enable_file_cache = false """
+ GetDebugPoint().disableDebugPointForAllBEs(closeErrorPoint)
+ GetDebugPoint().disableDebugPointForAllBEs(afterFinalizeSleepPoint)
+ GetDebugPoint().disableDebugPointForAllBEs(waitDatClosedPoint)
+ GetDebugPoint().enableDebugPointForAllBEs(closeErrorPoint)
+ GetDebugPoint().enableDebugPointForAllBEs(afterFinalizeSleepPoint)
+ GetDebugPoint().enableDebugPointForAllBEs(waitDatClosedPoint)
+
+ streamLoad {
+ table "test_packed_file_async_close_error"
+ set "column_separator", ","
+ inputText "1,a\n2,b\n"
+ time 120000
+
+ check { result, exception, startTime, endTime ->
+ def msg = exception == null ? result :
exception.getMessage()
+ logger.info("stream load result with injected S3 close
error: ${msg}")
+ assertTrue(exception == null, "stream load should succeed
before async packed file upload fails: ${msg}")
+ def json = parseJson(result)
+ assertEquals("fail", json.Status.toLowerCase())
+ }
+ }
+
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(closeErrorPoint)
+ GetDebugPoint().disableDebugPointForAllBEs(afterFinalizeSleepPoint)
+ GetDebugPoint().disableDebugPointForAllBEs(waitDatClosedPoint)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]