This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 026c1d0598 Minor: remove duplication in `create_writer` (#7229)
026c1d0598 is described below

commit 026c1d059829a60de30e315da4556db2ebf2d3c0
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 9 12:09:47 2023 -0500

    Minor: remove duplication in `create_writer` (#7229)
    
    * Minor: remove duplication in `create_writer`
    
    * clippy
    
    * more clippy
    
    * Update datafusion/core/src/datasource/file_format/mod.rs
    
    Co-authored-by: Metehan Yıldırım 
<[email protected]>
    
    * fmt
    
    ---------
    
    Co-authored-by: Metehan Yıldırım 
<[email protected]>
---
 datafusion/core/src/datasource/file_format/csv.rs  | 88 +++++-----------------
 .../core/src/datasource/file_format/file_type.rs   |  2 +-
 datafusion/core/src/datasource/file_format/json.rs | 82 ++++----------------
 datafusion/core/src/datasource/file_format/mod.rs  | 59 ++++++++++++++-
 datafusion/core/src/datasource/listing/table.rs    |  2 +-
 5 files changed, 95 insertions(+), 138 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 0d8641a464..8f56bf139e 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -36,17 +36,13 @@ use bytes::{Buf, Bytes};
 use futures::stream::BoxStream;
 use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
 use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
-use tokio::io::AsyncWrite;
 
-use super::{stateless_serialize_and_write_files, FileFormat};
+use super::{create_writer, stateless_serialize_and_write_files, FileFormat};
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::datasource::file_format::FileWriterMode;
-use crate::datasource::file_format::{
-    AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
-    DEFAULT_SCHEMA_INFER_MAX_RECORD,
-};
+use crate::datasource::file_format::{BatchSerializer, 
DEFAULT_SCHEMA_INFER_MAX_RECORD};
 use crate::datasource::physical_plan::{
-    CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
+    CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
 };
 use crate::error::Result;
 use crate::execution::context::SessionState;
@@ -285,7 +281,7 @@ impl FileFormat for CsvFormat {
             conf,
             self.has_header,
             self.delimiter,
-            self.file_compression_type.clone(),
+            self.file_compression_type,
         ));
 
         Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
@@ -494,56 +490,6 @@ impl CsvSink {
             file_compression_type,
         }
     }
-
-    // Create a write for Csv files
-    async fn create_writer(
-        &self,
-        file_meta: FileMeta,
-        object_store: Arc<dyn ObjectStore>,
-    ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
-        let object = &file_meta.object_meta;
-        match self.config.writer_mode {
-            // If the mode is append, call the store's append method and 
return wrapped in
-            // a boxed trait object.
-            FileWriterMode::Append => {
-                let writer = object_store
-                    .append(&object.location)
-                    .await
-                    .map_err(DataFusionError::ObjectStore)?;
-                let writer = AbortableWrite::new(
-                    self.file_compression_type.convert_async_writer(writer)?,
-                    AbortMode::Append,
-                );
-                Ok(writer)
-            }
-            // If the mode is put, create a new AsyncPut writer and return it 
wrapped in
-            // a boxed trait object
-            FileWriterMode::Put => {
-                let writer = Box::new(AsyncPutWriter::new(object.clone(), 
object_store));
-                let writer = AbortableWrite::new(
-                    self.file_compression_type.convert_async_writer(writer)?,
-                    AbortMode::Put,
-                );
-                Ok(writer)
-            }
-            // If the mode is put multipart, call the store's put_multipart 
method and
-            // return the writer wrapped in a boxed trait object.
-            FileWriterMode::PutMultipart => {
-                let (multipart_id, writer) = object_store
-                    .put_multipart(&object.location)
-                    .await
-                    .map_err(DataFusionError::ObjectStore)?;
-                Ok(AbortableWrite::new(
-                    self.file_compression_type.convert_async_writer(writer)?,
-                    AbortMode::MultiPart(MultiPart::new(
-                        object_store,
-                        multipart_id,
-                        object.location.clone(),
-                    )),
-                ))
-            }
-        }
-    }
 }
 
 #[async_trait]
@@ -577,12 +523,13 @@ impl DataSink for CsvSink {
                     serializers.push(Box::new(serializer));
 
                     let file = file_group.clone();
-                    let writer = self
-                        .create_writer(
-                            file.object_meta.clone().into(),
-                            object_store.clone(),
-                        )
-                        .await?;
+                    let writer = create_writer(
+                        self.config.writer_mode,
+                        self.file_compression_type,
+                        file.object_meta.clone().into(),
+                        object_store.clone(),
+                    )
+                    .await?;
                     writers.push(writer);
                 }
             }
@@ -612,9 +559,13 @@ impl DataSink for CsvSink {
                         size: 0,
                         e_tag: None,
                     };
-                    let writer = self
-                        .create_writer(object_meta.into(), 
object_store.clone())
-                        .await?;
+                    let writer = create_writer(
+                        self.config.writer_mode,
+                        self.file_compression_type,
+                        object_meta.into(),
+                        object_store.clone(),
+                    )
+                    .await?;
                     writers.push(writer);
                 }
             }
@@ -854,8 +805,7 @@ mod tests {
             Field::new("c13", DataType::Utf8, true),
         ]);
 
-        let compressed_csv =
-            csv.with_file_compression_type(file_compression_type.clone());
+        let compressed_csv = 
csv.with_file_compression_type(file_compression_type);
 
         //convert compressed_stream to decoded_stream
         let decoded_stream = compressed_csv
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs 
b/datafusion/core/src/datasource/file_format/file_type.rs
index 567fffb323..68967221ee 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -62,7 +62,7 @@ pub trait GetExt {
 }
 
 /// Readable file compression type
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
 pub struct FileCompressionType {
     variant: CompressionTypeVariant,
 }
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index dae3a18f96..6856ad89ea 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -28,7 +28,6 @@ use std::fmt;
 use std::fmt::Debug;
 use std::io::BufReader;
 use std::sync::Arc;
-use tokio::io::AsyncWrite;
 
 use arrow::datatypes::Schema;
 use arrow::datatypes::SchemaRef;
@@ -43,21 +42,17 @@ use datafusion_physical_expr::PhysicalExpr;
 use object_store::{GetResult, ObjectMeta, ObjectStore};
 
 use crate::datasource::physical_plan::FileGroupDisplay;
-use crate::datasource::physical_plan::FileMeta;
 use crate::physical_plan::insert::DataSink;
 use crate::physical_plan::insert::InsertExec;
 use crate::physical_plan::SendableRecordBatchStream;
 use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
 
+use super::create_writer;
 use super::stateless_serialize_and_write_files;
-use super::AbortMode;
-use super::AbortableWrite;
-use super::AsyncPutWriter;
 use super::BatchSerializer;
 use super::FileFormat;
 use super::FileScanConfig;
 use super::FileWriterMode;
-use super::MultiPart;
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::datasource::physical_plan::FileSinkConfig;
@@ -191,7 +186,7 @@ impl FileFormat for JsonFormat {
             ));
         }
         let sink_schema = conf.output_schema().clone();
-        let sink = Arc::new(JsonSink::new(conf, 
self.file_compression_type.clone()));
+        let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
 
         Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
     }
@@ -266,56 +261,6 @@ impl JsonSink {
             file_compression_type,
         }
     }
-
-    // Create a write for Json files
-    async fn create_writer(
-        &self,
-        file_meta: FileMeta,
-        object_store: Arc<dyn ObjectStore>,
-    ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
-        let object = &file_meta.object_meta;
-        match self.config.writer_mode {
-            // If the mode is append, call the store's append method and 
return wrapped in
-            // a boxed trait object.
-            FileWriterMode::Append => {
-                let writer = object_store
-                    .append(&object.location)
-                    .await
-                    .map_err(DataFusionError::ObjectStore)?;
-                let writer = AbortableWrite::new(
-                    self.file_compression_type.convert_async_writer(writer)?,
-                    AbortMode::Append,
-                );
-                Ok(writer)
-            }
-            // If the mode is put, create a new AsyncPut writer and return it 
wrapped in
-            // a boxed trait object
-            FileWriterMode::Put => {
-                let writer = Box::new(AsyncPutWriter::new(object.clone(), 
object_store));
-                let writer = AbortableWrite::new(
-                    self.file_compression_type.convert_async_writer(writer)?,
-                    AbortMode::Put,
-                );
-                Ok(writer)
-            }
-            // If the mode is put multipart, call the store's put_multipart 
method and
-            // return the writer wrapped in a boxed trait object.
-            FileWriterMode::PutMultipart => {
-                let (multipart_id, writer) = object_store
-                    .put_multipart(&object.location)
-                    .await
-                    .map_err(DataFusionError::ObjectStore)?;
-                Ok(AbortableWrite::new(
-                    self.file_compression_type.convert_async_writer(writer)?,
-                    AbortMode::MultiPart(MultiPart::new(
-                        object_store,
-                        multipart_id,
-                        object.location.clone(),
-                    )),
-                ))
-            }
-        }
-    }
 }
 
 #[async_trait]
@@ -341,12 +286,13 @@ impl DataSink for JsonSink {
                     serializers.push(Box::new(serializer));
 
                     let file = file_group.clone();
-                    let writer = self
-                        .create_writer(
-                            file.object_meta.clone().into(),
-                            object_store.clone(),
-                        )
-                        .await?;
+                    let writer = create_writer(
+                        self.config.writer_mode,
+                        self.file_compression_type,
+                        file.object_meta.clone().into(),
+                        object_store.clone(),
+                    )
+                    .await?;
                     writers.push(writer);
                 }
             }
@@ -372,9 +318,13 @@ impl DataSink for JsonSink {
                         size: 0,
                         e_tag: None,
                     };
-                    let writer = self
-                        .create_writer(object_meta.into(), 
object_store.clone())
-                        .await?;
+                    let writer = create_writer(
+                        self.config.writer_mode,
+                        self.file_compression_type,
+                        object_meta.into(),
+                        object_store.clone(),
+                    )
+                    .await?;
                     writers.push(writer);
                 }
             }
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 97492276a2..42b16656fb 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -53,6 +53,10 @@ use futures::{ready, StreamExt};
 use object_store::path::Path;
 use object_store::{MultipartId, ObjectMeta, ObjectStore};
 use tokio::io::{AsyncWrite, AsyncWriteExt};
+
+use self::file_type::FileCompressionType;
+
+use super::physical_plan::FileMeta;
 /// This trait abstracts all the file format specific implementations
 /// from the [`TableProvider`]. This helps code re-utilization across
 /// providers that support the the same file formats.
@@ -235,7 +239,7 @@ pub(crate) enum AbortMode {
 }
 
 /// A wrapper struct with abort method and writer
-struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
+pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
     writer: W,
     mode: AbortMode,
 }
@@ -306,6 +310,59 @@ pub enum FileWriterMode {
     /// Data is written to a new file in multiple parts.
     PutMultipart,
 }
+
+/// Returns an [`AbortableWrite`] which writes to the given object store 
location
+/// with the specified compression
+pub(crate) async fn create_writer(
+    writer_mode: FileWriterMode,
+    file_compression_type: FileCompressionType,
+    file_meta: FileMeta,
+    object_store: Arc<dyn ObjectStore>,
+) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
+    let object = &file_meta.object_meta;
+    match writer_mode {
+        // If the mode is append, call the store's append method and return 
wrapped in
+        // a boxed trait object.
+        FileWriterMode::Append => {
+            let writer = object_store
+                .append(&object.location)
+                .await
+                .map_err(DataFusionError::ObjectStore)?;
+            let writer = AbortableWrite::new(
+                file_compression_type.convert_async_writer(writer)?,
+                AbortMode::Append,
+            );
+            Ok(writer)
+        }
+        // If the mode is put, create a new AsyncPut writer and return it 
wrapped in
+        // a boxed trait object
+        FileWriterMode::Put => {
+            let writer = Box::new(AsyncPutWriter::new(object.clone(), 
object_store));
+            let writer = AbortableWrite::new(
+                file_compression_type.convert_async_writer(writer)?,
+                AbortMode::Put,
+            );
+            Ok(writer)
+        }
+        // If the mode is put multipart, call the store's put_multipart method 
and
+        // return the writer wrapped in a boxed trait object.
+        FileWriterMode::PutMultipart => {
+            let (multipart_id, writer) = object_store
+                .put_multipart(&object.location)
+                .await
+                .map_err(DataFusionError::ObjectStore)?;
+            Ok(AbortableWrite::new(
+                file_compression_type.convert_async_writer(writer)?,
+                AbortMode::MultiPart(MultiPart::new(
+                    object_store,
+                    multipart_id,
+                    object.location.clone(),
+                )),
+            ))
+        }
+    }
+}
+
 /// A trait that defines the methods required for a RecordBatch serializer.
 #[async_trait]
 pub trait BatchSerializer: Unpin + Send {
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 60e5428867..8519628760 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1636,7 +1636,7 @@ mod tests {
             "path{}",
             file_type
                 .to_owned()
-                .get_ext_with_compression(file_compression_type.clone())
+                .get_ext_with_compression(file_compression_type)
                 .unwrap()
         );
 

Reply via email to