This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d232065c3b refactor: rewrite mega type to an enum containing both
cases (#11539)
d232065c3b is described below
commit d232065c3b710d0c8e035de49730238a30073eb2
Author: Lorrens Pantelis <[email protected]>
AuthorDate: Sun Jul 21 14:31:41 2024 +0200
refactor: rewrite mega type to an enum containing both cases (#11539)
---
.../datasource/file_format/write/orchestration.rs | 62 +++++++++++++++++-----
1 file changed, 48 insertions(+), 14 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index f788865b07..1d32063ee9 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -42,6 +42,37 @@ use tokio::task::JoinSet;
type WriterType = Box<dyn AsyncWrite + Send + Unpin>;
type SerializerType = Arc<dyn BatchSerializer>;
+/// Result of calling [`serialize_rb_stream_to_object_store`]
+pub(crate) enum SerializedRecordBatchResult {
+ Success {
+ /// the writer
+ writer: WriterType,
+
+ /// the number of rows successfully written
+ row_count: usize,
+ },
+ Failure {
+ /// As explained in [`serialize_rb_stream_to_object_store`]:
+ /// - If an IO error occured that involved the ObjectStore writer,
then the writer will not be returned to the caller
+ /// - Otherwise, the writer is returned to the caller
+ writer: Option<WriterType>,
+
+ /// the actual error that occured
+ err: DataFusionError,
+ },
+}
+
+impl SerializedRecordBatchResult {
+ /// Create the success variant
+ pub fn success(writer: WriterType, row_count: usize) -> Self {
+ Self::Success { writer, row_count }
+ }
+
+ pub fn failure(writer: Option<WriterType>, err: DataFusionError) -> Self {
+ Self::Failure { writer, err }
+ }
+}
+
/// Serializes a single data stream in parallel and writes to an ObjectStore
concurrently.
/// Data order is preserved.
///
@@ -55,7 +86,7 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
mut data_rx: Receiver<RecordBatch>,
serializer: Arc<dyn BatchSerializer>,
mut writer: WriterType,
-) -> std::result::Result<(WriterType, u64), (Option<WriterType>,
DataFusionError)> {
+) -> SerializedRecordBatchResult {
let (tx, mut rx) =
mpsc::channel::<SpawnedTask<Result<(usize, Bytes),
DataFusionError>>>(100);
let serialize_task = SpawnedTask::spawn(async move {
@@ -86,43 +117,43 @@ pub(crate) async fn serialize_rb_stream_to_object_store(
match writer.write_all(&bytes).await {
Ok(_) => (),
Err(e) => {
- return Err((
+ return SerializedRecordBatchResult::failure(
None,
DataFusionError::Execution(format!(
"Error writing to object store: {e}"
)),
- ))
+ )
}
};
row_count += cnt;
}
Ok(Err(e)) => {
// Return the writer along with the error
- return Err((Some(writer), e));
+ return SerializedRecordBatchResult::failure(Some(writer), e);
}
Err(e) => {
// Handle task panic or cancellation
- return Err((
+ return SerializedRecordBatchResult::failure(
Some(writer),
DataFusionError::Execution(format!(
"Serialization task panicked or was cancelled: {e}"
)),
- ));
+ );
}
}
}
match serialize_task.join().await {
Ok(Ok(_)) => (),
- Ok(Err(e)) => return Err((Some(writer), e)),
+ Ok(Err(e)) => return
SerializedRecordBatchResult::failure(Some(writer), e),
Err(_) => {
- return Err((
+ return SerializedRecordBatchResult::failure(
Some(writer),
internal_datafusion_err!("Unknown error writing to object
store"),
- ))
+ )
}
}
- Ok((writer, row_count as u64))
+ SerializedRecordBatchResult::success(writer, row_count)
}
type FileWriteBundle = (Receiver<RecordBatch>, SerializerType, WriterType);
@@ -153,14 +184,17 @@ pub(crate) async fn stateless_serialize_and_write_files(
while let Some(result) = join_set.join_next().await {
match result {
Ok(res) => match res {
- Ok((writer, cnt)) => {
+ SerializedRecordBatchResult::Success {
+ writer,
+ row_count: cnt,
+ } => {
finished_writers.push(writer);
row_count += cnt;
}
- Err((writer, e)) => {
+ SerializedRecordBatchResult::Failure { writer, err } => {
finished_writers.extend(writer);
any_errors = true;
- triggering_error = Some(e);
+ triggering_error = Some(err);
}
},
Err(e) => {
@@ -193,7 +227,7 @@ pub(crate) async fn stateless_serialize_and_write_files(
}
}
- tx.send(row_count).map_err(|_| {
+ tx.send(row_count as u64).map_err(|_| {
internal_datafusion_err!(
"Error encountered while sending row count back to file sink!"
)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]