github-actions[bot] commented on code in PR #64779: URL: https://github.com/apache/doris/pull/64779#discussion_r3467568824
########## regression-test/suites/cloud_p0/s3/test_s3_file_writer_submit_error.groovy: ########## @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_s3_file_writer_submit_error", "p0, nonConcurrent") { + if (!isCloudMode()) { + return + } + + def submitUploadBufferErrorPoint = "S3FileWriter.submit_upload_buffer.inject_error" + def asyncCloseSubmitErrorPoint = "S3FileWriter.close.submit_async_close.inject_error" + def debugPoints = [submitUploadBufferErrorPoint, asyncCloseSubmitErrorPoint] + + def disableDebugPoints = { + debugPoints.each { point -> + GetDebugPoint().disableDebugPointForAllBEs(point) + } + } + + def streamLoadAndCheck = { String label, String expectedStatus -> + streamLoad { + table "test_s3_file_writer_submit_error" + set "column_separator", "," + inputText "1,${label}\n2,${label}\n" + time 120000 + + check { result, exception, startTime, endTime -> + def msg = exception == null ? result : exception.getMessage() + logger.info("stream load result for ${label}: ${msg}") + assertTrue(exception == null, + "stream load should return json result for ${label}: ${msg}") + def json = parseJson(result) + assertEquals(expectedStatus, json.Status.toLowerCase()) + } + } + } + + def runWithDebugPoint = { String label, String expectedStatus, String point -> + GetDebugPoint().enableDebugPointForAllBEs(point) + try { + streamLoadAndCheck(label, expectedStatus) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(point) + } + } + + sql """ DROP TABLE IF EXISTS test_s3_file_writer_submit_error """ + sql """ + CREATE TABLE IF NOT EXISTS test_s3_file_writer_submit_error ( + `k1` int NULL, + `v1` varchar(32) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ) + """ + + setBeConfigTemporary([ + "enable_packed_file": "false" + ]) { + try { + sql """ SET disable_file_cache = true """ + disableDebugPoints() Review Comment: This still does not disable the file-cache write path for the stream load. The `sql` call changes the JDBC session used by the regression driver, but `streamLoad {}` sends a separate HTTP request; only values added with the helper's `set` method become stream-load headers. On the BE side `_process_put()` builds `TStreamLoadPutRequest` from the supported HTTP headers and does not carry `disable_file_cache`, while FE `streamLoadPut` creates its own `ConnectContext` before `OlapTableSink.init()` reads `isDisableFileCache()`. That context keeps the default `disableFileCache=false`, so `write_file_cache` can still be true. Please disable the cache path on the actual stream-load request/planning path, or use a temporary BE/global setting that `S3FileWriter` observes for this load. As written, the regression can still bypass the direct S3 writer path it is trying to fault-inject. ########## be/src/io/fs/s3_file_writer.cpp: ########## @@ -152,13 +152,35 @@ Status S3FileWriter::close(bool non_block) { _async_close_pack = std::make_unique<AsyncCloseStatusPack>(); _async_close_pack->future = _async_close_pack->promise.get_future(); s3_file_writer_async_close_queuing << 1; - return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() { + Status submit_status = Status::OK(); + DBUG_EXECUTE_IF("S3FileWriter.close.submit_async_close.inject_error", { + submit_status = Status::IOError("S3FileWriter.close.submit_async_close.inject_error"); + }); + if (submit_status.ok()) { + submit_status = + ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() { + s3_file_writer_async_close_queuing << -1; + s3_file_writer_async_close_processing << 1; + _st = _close_impl(); + _async_close_pack->promise.set_value(_st); + s3_file_writer_async_close_processing << -1; + }); + } + if (!submit_status.ok()) { s3_file_writer_async_close_queuing << -1; - s3_file_writer_async_close_processing << 1; + _async_close_pack = nullptr; + _state = State::OPENED; + LOG(WARNING) << "failed to submit async close for " + << _obj_storage_path_opts.path.native() + << ", fallback to sync close, status=" << submit_status; _st = _close_impl(); - _async_close_pack->promise.set_value(_st); - s3_file_writer_async_close_processing << -1; - }); + _state = State::CLOSED; + if (_st.ok()) { Review Comment: This fallback can still break the packed-file direct-write path. In cloud mode `enable_packed_file` defaults to true, and `PackedFileWriter::appendv()` switches large files to direct-write, so `_close_async()` calls `_inner_writer->close(true)` and then unconditionally marks the wrapper `ASYNC_CLOSING`. With this new fallback, the inner `S3FileWriter` may have already completed synchronously and set itself to `CLOSED` before returning OK. The later normal wait path (`SegmentFileCollection::close()` or `IndexFileWriter::finish_close()` -> `PackedFileWriter::close(false)`) then calls `_inner_writer->close(false)`, which returns `S3FileWriter already closed`, turning a successfully closed file into a load failure. Please either have the wrapper recognize that the inner writer finished synchronously before marking itself async-closing, or otherwise preserve the `close(true)` contract expected by these callers. The new regression currently sets `enable_packed_file=false`, so it misses this production path. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
