yyy1000 commented on code in PR #9648:
URL: https://github.com/apache/arrow-datafusion/pull/9648#discussion_r1527712503
##########
datafusion/core/src/datasource/file_format/write/mod.rs:
##########
@@ -150,19 +74,13 @@ pub trait BatchSerializer: Sync + Send {
fn serialize(&self, batch: RecordBatch, initial: bool) -> Result<Bytes>;
}
-/// Returns an [`AbortableWrite`] which writes to the given object store
location
+/// Returns an [`AsyncWrite`] which writes to the given object store location
/// with the specified compression
pub(crate) async fn create_writer(
file_compression_type: FileCompressionType,
location: &Path,
object_store: Arc<dyn ObjectStore>,
-) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
- let (multipart_id, writer) = object_store
- .put_multipart(location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- Ok(AbortableWrite::new(
- file_compression_type.convert_async_writer(writer)?,
- MultiPart::new(object_store, multipart_id, location.clone()),
- ))
+) -> Result<Box<dyn AsyncWrite + Send + Unpin>> {
+ let buf_writer = BufWriter::new(object_store, location.clone());
+ Ok(file_compression_type.convert_async_writer(buf_writer)?)
Review Comment:
I think `create_writer` could still be saved cause it create the `writer`
with compression?
##########
datafusion/core/src/datasource/file_format/write/mod.rs:
##########
@@ -69,79 +66,6 @@ impl Write for SharedBuffer {
}
}
-/// Stores data needed during abortion of MultiPart writers
-#[derive(Clone)]
-pub(crate) struct MultiPart {
- /// A shared reference to the object store
- store: Arc<dyn ObjectStore>,
- multipart_id: MultipartId,
- location: Path,
-}
-
-impl MultiPart {
- /// Create a new `MultiPart`
- pub fn new(
- store: Arc<dyn ObjectStore>,
- multipart_id: MultipartId,
- location: Path,
- ) -> Self {
- Self {
- store,
- multipart_id,
- location,
- }
- }
-}
-
-/// A wrapper struct with abort method and writer
-pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
- writer: W,
- multipart: MultiPart,
-}
-
-impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
- /// Create a new `AbortableWrite` instance with the given writer, and
write mode.
- pub(crate) fn new(writer: W, multipart: MultiPart) -> Self {
- Self { writer, multipart }
- }
-
- /// handling of abort for different write modes
- pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static,
Result<()>>> {
- let multi = self.multipart.clone();
- Ok(Box::pin(async move {
- multi
- .store
- .abort_multipart(&multi.location, &multi.multipart_id)
- .await
- .map_err(DataFusionError::ObjectStore)
- }))
- }
-}
-
-impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AbortableWrite<W> {
- fn poll_write(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<std::result::Result<usize, Error>> {
- Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- Pin::new(&mut self.get_mut().writer).poll_flush(cx)
- }
-
- fn poll_shutdown(
- self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
- }
-}
-
Review Comment:
I also removed `struct MultiPart` since they're also not used anymore.
##########
datafusion/core/src/datasource/file_format/write/orchestration.rs:
##########
@@ -173,19 +173,9 @@ pub(crate) async fn stateless_serialize_and_write_files(
// Finalize or abort writers as appropriate
for mut writer in finished_writers.into_iter() {
- match any_errors {
- true => {
- let abort_result = writer.abort_writer();
- if abort_result.is_err() {
- any_abort_errors = true;
- }
- }
- false => {
- writer.shutdown()
+ writer.shutdown()
.await
.map_err(|_| internal_datafusion_err!("Error encountered
while finalizing writes! Partial results may have been written to
ObjectStore!"))?;
Review Comment:
Don't know whether it's proper to let all just `shutdown` here. But I think
according to
https://github.com/apache/arrow-datafusion/pull/9648#issuecomment-2002483969,
it's OK? 👀
##########
datafusion/core/src/datasource/file_format/file_compression_type.rs:
##########
@@ -152,7 +153,7 @@ impl FileCompressionType {
/// according to this `FileCompressionType`.
pub fn convert_async_writer(
&self,
- w: Box<dyn AsyncWrite + Send + Unpin>,
+ w: BufWriter,
Review Comment:
I'm thinking whether it's OK to change the param type here cause it's
public, but keeping `Box<dyn AsyncWrite + Send + Unpin>` makes the type
incompatible. 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]