This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d5367f3ff5 Avoid calling shutdown after failed write of AsyncWrite
(#249) (#250) (#11415)
d5367f3ff5 is described below
commit d5367f3ff5ed506e824a04c68120194deb68a908
Author: Georgi Krastev <[email protected]>
AuthorDate: Fri Jul 12 22:34:35 2024 +0300
Avoid calling shutdown after failed write of AsyncWrite (#249) (#250)
(#11415)
in `serialize_rb_stream_to_object_store`
---
.../datasource/file_format/write/orchestration.rs | 27 +++++++++++++---------
1 file changed, 16 insertions(+), 11 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index a62b5715ae..8bd0dae9f5 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -42,15 +42,20 @@ use tokio::task::JoinSet;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;
-/// Serializes a single data stream in parallel and writes to an ObjectStore
-/// concurrently. Data order is preserved. In the event of an error,
-/// the ObjectStore writer is returned to the caller in addition to an error,
-/// so that the caller may handle aborting failed writes.
+/// Serializes a single data stream in parallel and writes to an ObjectStore
concurrently.
+/// Data order is preserved.
+///
+/// In the event of a non-IO error which does not involve the ObjectStore
writer,
+/// the writer returned to the caller in addition to the error,
+/// so that failed writes may be aborted.
+///
+/// In the event of an IO error involving the ObjectStore writer,
+/// the writer is dropped to avoid calling further methods on it which might
panic.
pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
-) -> std::result::Result<(WriterType, u64), (WriterType, DataFusionError)> {
+) -> std::result::Result<(WriterType, u64), (Option<WriterType>,
DataFusionError)> {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes),
DataFusionError>>>(100);
let serialize_task = SpawnedTask::spawn(async move {
@@ -82,7 +87,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
Ok(_) => (),
Err(e) => {
return Err((
- writer,
+ None,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
@@ -93,12 +98,12 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
}
Ok(Err(e)) => {
// Return the writer along with the error
- return Err((writer, e));
+ return Err((Some(writer), e));
}
Err(e) => {
// Handle task panic or cancellation
return Err((
- writer,
+ Some(writer),
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
@@ -109,10 +114,10 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
match serialize_task.join().await {
Ok(Ok(_)) => (),
- Ok(Err(e)) => return Err((writer, e)),
+ Ok(Err(e)) => return Err((Some(writer), e)),
Err(_) => {
return Err((
- writer,
+ Some(writer),
internal_datafusion_err!("Unknown error writing to object
store"),
))
}
@@ -153,7 +158,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
row_count += cnt;
}
Err((writer, e)) => {
- finished_writers.push(writer);
+ finished_writers.extend(writer);
any_errors = true;
triggering_error = Some(e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]