This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 026c1d0598 Minor: remove duplication in `create_writer` (#7229)
026c1d0598 is described below
commit 026c1d059829a60de30e315da4556db2ebf2d3c0
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Aug 9 12:09:47 2023 -0500
Minor: remove duplication in `create_writer` (#7229)
* Minor: remove duplication in `create_writer`
* clippy
* more clippy
* Update datafusion/core/src/datasource/file_format/mod.rs
Co-authored-by: Metehan Yıldırım
<[email protected]>
* fmt
---------
Co-authored-by: Metehan Yıldırım
<[email protected]>
---
datafusion/core/src/datasource/file_format/csv.rs | 88 +++++-----------------
.../core/src/datasource/file_format/file_type.rs | 2 +-
datafusion/core/src/datasource/file_format/json.rs | 82 ++++----------------
datafusion/core/src/datasource/file_format/mod.rs | 59 ++++++++++++++-
datafusion/core/src/datasource/listing/table.rs | 2 +-
5 files changed, 95 insertions(+), 138 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 0d8641a464..8f56bf139e 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -36,17 +36,13 @@ use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
-use tokio::io::AsyncWrite;
-use super::{stateless_serialize_and_write_files, FileFormat};
+use super::{create_writer, stateless_serialize_and_write_files, FileFormat};
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::FileWriterMode;
-use crate::datasource::file_format::{
- AbortMode, AbortableWrite, AsyncPutWriter, BatchSerializer, MultiPart,
- DEFAULT_SCHEMA_INFER_MAX_RECORD,
-};
+use crate::datasource::file_format::{BatchSerializer,
DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::physical_plan::{
- CsvExec, FileGroupDisplay, FileMeta, FileScanConfig, FileSinkConfig,
+ CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
use crate::error::Result;
use crate::execution::context::SessionState;
@@ -285,7 +281,7 @@ impl FileFormat for CsvFormat {
conf,
self.has_header,
self.delimiter,
- self.file_compression_type.clone(),
+ self.file_compression_type,
));
Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
@@ -494,56 +490,6 @@ impl CsvSink {
file_compression_type,
}
}
-
- // Create a write for Csv files
- async fn create_writer(
- &self,
- file_meta: FileMeta,
- object_store: Arc<dyn ObjectStore>,
- ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
- let object = &file_meta.object_meta;
- match self.config.writer_mode {
- // If the mode is append, call the store's append method and
return wrapped in
- // a boxed trait object.
- FileWriterMode::Append => {
- let writer = object_store
- .append(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- let writer = AbortableWrite::new(
- self.file_compression_type.convert_async_writer(writer)?,
- AbortMode::Append,
- );
- Ok(writer)
- }
- // If the mode is put, create a new AsyncPut writer and return it
wrapped in
- // a boxed trait object
- FileWriterMode::Put => {
- let writer = Box::new(AsyncPutWriter::new(object.clone(),
object_store));
- let writer = AbortableWrite::new(
- self.file_compression_type.convert_async_writer(writer)?,
- AbortMode::Put,
- );
- Ok(writer)
- }
- // If the mode is put multipart, call the store's put_multipart
method and
- // return the writer wrapped in a boxed trait object.
- FileWriterMode::PutMultipart => {
- let (multipart_id, writer) = object_store
- .put_multipart(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- Ok(AbortableWrite::new(
- self.file_compression_type.convert_async_writer(writer)?,
- AbortMode::MultiPart(MultiPart::new(
- object_store,
- multipart_id,
- object.location.clone(),
- )),
- ))
- }
- }
- }
}
#[async_trait]
@@ -577,12 +523,13 @@ impl DataSink for CsvSink {
serializers.push(Box::new(serializer));
let file = file_group.clone();
- let writer = self
- .create_writer(
- file.object_meta.clone().into(),
- object_store.clone(),
- )
- .await?;
+ let writer = create_writer(
+ self.config.writer_mode,
+ self.file_compression_type,
+ file.object_meta.clone().into(),
+ object_store.clone(),
+ )
+ .await?;
writers.push(writer);
}
}
@@ -612,9 +559,13 @@ impl DataSink for CsvSink {
size: 0,
e_tag: None,
};
- let writer = self
- .create_writer(object_meta.into(),
object_store.clone())
- .await?;
+ let writer = create_writer(
+ self.config.writer_mode,
+ self.file_compression_type,
+ object_meta.into(),
+ object_store.clone(),
+ )
+ .await?;
writers.push(writer);
}
}
@@ -854,8 +805,7 @@ mod tests {
Field::new("c13", DataType::Utf8, true),
]);
- let compressed_csv =
- csv.with_file_compression_type(file_compression_type.clone());
+ let compressed_csv =
csv.with_file_compression_type(file_compression_type);
//convert compressed_stream to decoded_stream
let decoded_stream = compressed_csv
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs
b/datafusion/core/src/datasource/file_format/file_type.rs
index 567fffb323..68967221ee 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -62,7 +62,7 @@ pub trait GetExt {
}
/// Readable file compression type
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FileCompressionType {
variant: CompressionTypeVariant,
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index dae3a18f96..6856ad89ea 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -28,7 +28,6 @@ use std::fmt;
use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;
-use tokio::io::AsyncWrite;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
@@ -43,21 +42,17 @@ use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use crate::datasource::physical_plan::FileGroupDisplay;
-use crate::datasource::physical_plan::FileMeta;
use crate::physical_plan::insert::DataSink;
use crate::physical_plan::insert::InsertExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
+use super::create_writer;
use super::stateless_serialize_and_write_files;
-use super::AbortMode;
-use super::AbortableWrite;
-use super::AsyncPutWriter;
use super::BatchSerializer;
use super::FileFormat;
use super::FileScanConfig;
use super::FileWriterMode;
-use super::MultiPart;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::FileSinkConfig;
@@ -191,7 +186,7 @@ impl FileFormat for JsonFormat {
));
}
let sink_schema = conf.output_schema().clone();
- let sink = Arc::new(JsonSink::new(conf,
self.file_compression_type.clone()));
+ let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
}
@@ -266,56 +261,6 @@ impl JsonSink {
file_compression_type,
}
}
-
- // Create a write for Json files
- async fn create_writer(
- &self,
- file_meta: FileMeta,
- object_store: Arc<dyn ObjectStore>,
- ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
- let object = &file_meta.object_meta;
- match self.config.writer_mode {
- // If the mode is append, call the store's append method and
return wrapped in
- // a boxed trait object.
- FileWriterMode::Append => {
- let writer = object_store
- .append(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- let writer = AbortableWrite::new(
- self.file_compression_type.convert_async_writer(writer)?,
- AbortMode::Append,
- );
- Ok(writer)
- }
- // If the mode is put, create a new AsyncPut writer and return it
wrapped in
- // a boxed trait object
- FileWriterMode::Put => {
- let writer = Box::new(AsyncPutWriter::new(object.clone(),
object_store));
- let writer = AbortableWrite::new(
- self.file_compression_type.convert_async_writer(writer)?,
- AbortMode::Put,
- );
- Ok(writer)
- }
- // If the mode is put multipart, call the store's put_multipart
method and
- // return the writer wrapped in a boxed trait object.
- FileWriterMode::PutMultipart => {
- let (multipart_id, writer) = object_store
- .put_multipart(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- Ok(AbortableWrite::new(
- self.file_compression_type.convert_async_writer(writer)?,
- AbortMode::MultiPart(MultiPart::new(
- object_store,
- multipart_id,
- object.location.clone(),
- )),
- ))
- }
- }
- }
}
#[async_trait]
@@ -341,12 +286,13 @@ impl DataSink for JsonSink {
serializers.push(Box::new(serializer));
let file = file_group.clone();
- let writer = self
- .create_writer(
- file.object_meta.clone().into(),
- object_store.clone(),
- )
- .await?;
+ let writer = create_writer(
+ self.config.writer_mode,
+ self.file_compression_type,
+ file.object_meta.clone().into(),
+ object_store.clone(),
+ )
+ .await?;
writers.push(writer);
}
}
@@ -372,9 +318,13 @@ impl DataSink for JsonSink {
size: 0,
e_tag: None,
};
- let writer = self
- .create_writer(object_meta.into(),
object_store.clone())
- .await?;
+ let writer = create_writer(
+ self.config.writer_mode,
+ self.file_compression_type,
+ object_meta.into(),
+ object_store.clone(),
+ )
+ .await?;
writers.push(writer);
}
}
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 97492276a2..42b16656fb 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -53,6 +53,10 @@ use futures::{ready, StreamExt};
use object_store::path::Path;
use object_store::{MultipartId, ObjectMeta, ObjectStore};
use tokio::io::{AsyncWrite, AsyncWriteExt};
+
+use self::file_type::FileCompressionType;
+
+use super::physical_plan::FileMeta;
/// This trait abstracts all the file format specific implementations
/// from the [`TableProvider`]. This helps code re-utilization across
/// providers that support the the same file formats.
@@ -235,7 +239,7 @@ pub(crate) enum AbortMode {
}
/// A wrapper struct with abort method and writer
-struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
+pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
writer: W,
mode: AbortMode,
}
@@ -306,6 +310,59 @@ pub enum FileWriterMode {
/// Data is written to a new file in multiple parts.
PutMultipart,
}
+
+/// Returns an [`AbortableWrite`] which writes to the given object store
location
+/// with the specified compression
+pub(crate) async fn create_writer(
+ writer_mode: FileWriterMode,
+ file_compression_type: FileCompressionType,
+ file_meta: FileMeta,
+ object_store: Arc<dyn ObjectStore>,
+) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
+ let object = &file_meta.object_meta;
+ match writer_mode {
+ // If the mode is append, call the store's append method and return
wrapped in
+ // a boxed trait object.
+ FileWriterMode::Append => {
+ let writer = object_store
+ .append(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ let writer = AbortableWrite::new(
+ file_compression_type.convert_async_writer(writer)?,
+ AbortMode::Append,
+ );
+ Ok(writer)
+ }
+ // If the mode is put, create a new AsyncPut writer and return it
wrapped in
+ // a boxed trait object
+ FileWriterMode::Put => {
+ let writer = Box::new(AsyncPutWriter::new(object.clone(),
object_store));
+ let writer = AbortableWrite::new(
+ file_compression_type.convert_async_writer(writer)?,
+ AbortMode::Put,
+ );
+ Ok(writer)
+ }
+ // If the mode is put multipart, call the store's put_multipart method
and
+ // return the writer wrapped in a boxed trait object.
+ FileWriterMode::PutMultipart => {
+ let (multipart_id, writer) = object_store
+ .put_multipart(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ Ok(AbortableWrite::new(
+ file_compression_type.convert_async_writer(writer)?,
+ AbortMode::MultiPart(MultiPart::new(
+ object_store,
+ multipart_id,
+ object.location.clone(),
+ )),
+ ))
+ }
+ }
+}
+
/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
pub trait BatchSerializer: Unpin + Send {
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 60e5428867..8519628760 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -1636,7 +1636,7 @@ mod tests {
"path{}",
file_type
.to_owned()
- .get_ext_with_compression(file_compression_type.clone())
+ .get_ext_with_compression(file_compression_type)
.unwrap()
);