This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3d917a0efd Extend insert into support to include Json backed tables
(#7212)
3d917a0efd is described below
commit 3d917a0efd2688233d435d0235d9e6360cf79a63
Author: devinjdangelo <[email protected]>
AuthorDate: Tue Aug 8 09:18:28 2023 -0400
Extend insert into support to include Json backed tables (#7212)
* jsonsink and test simplemented
* fix tests and clean up
* clippy
* minor refactor
* comments + append existing file test check no new files added
* format comments
Co-authored-by: Metehan Yıldırım
<[email protected]>
---------
Co-authored-by: Metehan Yıldırım
<[email protected]>
---
datafusion/core/src/datasource/file_format/csv.rs | 78 ++-----
datafusion/core/src/datasource/file_format/json.rs | 236 +++++++++++++++++++-
datafusion/core/src/datasource/file_format/mod.rs | 75 ++++++-
.../core/src/datasource/file_format/options.rs | 20 ++
datafusion/core/src/datasource/listing/table.rs | 247 +++++++++++++++------
5 files changed, 518 insertions(+), 138 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index cbcdc2f112..0d8641a464 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -36,9 +36,9 @@ use bytes::{Buf, Bytes};
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
-use tokio::io::{AsyncWrite, AsyncWriteExt};
+use tokio::io::AsyncWrite;
-use super::FileFormat;
+use super::{stateless_serialize_and_write_files, FileFormat};
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::FileWriterMode;
use crate::datasource::file_format::{
@@ -274,6 +274,12 @@ impl FileFormat for CsvFormat {
"Overwrites are not implemented yet for CSV".into(),
));
}
+
+ if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
+ return Err(DataFusionError::NotImplemented(
+ "Inserting compressed CSV is not implemented yet.".into(),
+ ));
+ }
let sink_schema = conf.output_schema().clone();
let sink = Arc::new(CsvSink::new(
conf,
@@ -439,28 +445,6 @@ impl BatchSerializer for CsvSerializer {
}
}
-async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
- result: Result<T>,
- writers: &mut [AbortableWrite<W>],
-) -> Result<T> {
- match result {
- Ok(value) => Ok(value),
- Err(e) => {
- // Abort all writers before returning the error:
- for writer in writers {
- let mut abort_future = writer.abort_writer();
- if let Ok(abort_future) = &mut abort_future {
- let _ = abort_future.await;
- }
- // Ignore errors that occur during abortion,
- // We do try to abort all writers before returning error.
- }
- // After aborting writers return original error.
- Err(e)
- }
- }
-}
-
/// Implements [`DataSink`] for writing to a CSV file.
struct CsvSink {
/// Config options for writing data
@@ -566,7 +550,7 @@ impl CsvSink {
impl DataSink for CsvSink {
async fn write_all(
&self,
- mut data: Vec<SendableRecordBatchStream>,
+ data: Vec<SendableRecordBatchStream>,
context: &Arc<TaskContext>,
) -> Result<u64> {
let num_partitions = data.len();
@@ -576,7 +560,7 @@ impl DataSink for CsvSink {
.object_store(&self.config.object_store_url)?;
// Construct serializer and writer for each file group
- let mut serializers = vec![];
+ let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
let mut writers = vec![];
match self.config.writer_mode {
FileWriterMode::Append => {
@@ -590,7 +574,7 @@ impl DataSink for CsvSink {
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
- serializers.push(serializer);
+ serializers.push(Box::new(serializer));
let file = file_group.clone();
let writer = self
@@ -608,9 +592,9 @@ impl DataSink for CsvSink {
))
}
FileWriterMode::PutMultipart => {
- //currently assuming only 1 partition path (i.e. not hive
style partitioning on a column)
+ // Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
let base_path = &self.config.table_paths[0];
- //uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
+ // Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
for part_idx in 0..num_partitions {
let header = true;
@@ -618,7 +602,7 @@ impl DataSink for CsvSink {
let serializer = CsvSerializer::new()
.with_builder(builder)
.with_header(header);
- serializers.push(serializer);
+ serializers.push(Box::new(serializer));
let file_path = base_path
.prefix()
.child(format!("/{}_{}.csv", write_id, part_idx));
@@ -636,39 +620,7 @@ impl DataSink for CsvSink {
}
}
- let mut row_count = 0;
- // Map errors to DatafusionError.
- let err_converter =
- |_| DataFusionError::Internal("Unexpected FileSink
Error".to_string());
- // TODO parallelize serialization accross partitions and batches
within partitions
- // see: https://github.com/apache/arrow-datafusion/issues/7079
- for idx in 0..num_partitions {
- while let Some(maybe_batch) = data[idx].next().await {
- // Write data to files in a round robin fashion:
- let serializer = &mut serializers[idx];
- let batch = check_for_errors(maybe_batch, &mut writers).await?;
- row_count += batch.num_rows();
- let bytes =
- check_for_errors(serializer.serialize(batch).await, &mut
writers)
- .await?;
- let writer = &mut writers[idx];
- check_for_errors(
- writer.write_all(&bytes).await.map_err(err_converter),
- &mut writers,
- )
- .await?;
- }
- }
- // Perform cleanup:
- let n_writers = writers.len();
- for idx in 0..n_writers {
- check_for_errors(
- writers[idx].shutdown().await.map_err(err_converter),
- &mut writers,
- )
- .await?;
- }
- Ok(row_count as u64)
+ stateless_serialize_and_write_files(data, serializers, writers).await
}
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 6247e85ba8..dae3a18f96 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -19,28 +19,52 @@
use std::any::Any;
+use bytes::Bytes;
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use rand::distributions::Alphanumeric;
+use rand::distributions::DistString;
+use std::fmt;
+use std::fmt::Debug;
use std::io::BufReader;
use std::sync::Arc;
+use tokio::io::AsyncWrite;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
+use arrow::json;
use arrow::json::reader::infer_json_schema_from_iterator;
use arrow::json::reader::ValueIter;
+use arrow_array::RecordBatch;
use async_trait::async_trait;
use bytes::Buf;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResult, ObjectMeta, ObjectStore};
+use crate::datasource::physical_plan::FileGroupDisplay;
+use crate::datasource::physical_plan::FileMeta;
+use crate::physical_plan::insert::DataSink;
+use crate::physical_plan::insert::InsertExec;
+use crate::physical_plan::SendableRecordBatchStream;
+use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
+
+use super::stateless_serialize_and_write_files;
+use super::AbortMode;
+use super::AbortableWrite;
+use super::AsyncPutWriter;
+use super::BatchSerializer;
use super::FileFormat;
use super::FileScanConfig;
+use super::FileWriterMode;
+use super::MultiPart;
use crate::datasource::file_format::file_type::FileCompressionType;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
+use crate::datasource::physical_plan::FileSinkConfig;
use crate::datasource::physical_plan::NdJsonExec;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::ExecutionPlan;
-use crate::physical_plan::Statistics;
/// The default file extension of json files
pub const DEFAULT_JSON_EXTENSION: &str = ".json";
@@ -148,6 +172,216 @@ impl FileFormat for JsonFormat {
let exec = NdJsonExec::new(conf,
self.file_compression_type.to_owned());
Ok(Arc::new(exec))
}
+
+ async fn create_writer_physical_plan(
+ &self,
+ input: Arc<dyn ExecutionPlan>,
+ _state: &SessionState,
+ conf: FileSinkConfig,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if conf.overwrite {
+ return Err(DataFusionError::NotImplemented(
+ "Overwrites are not implemented yet for Json".into(),
+ ));
+ }
+
+ if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
+ return Err(DataFusionError::NotImplemented(
+ "Inserting compressed JSON is not implemented yet.".into(),
+ ));
+ }
+ let sink_schema = conf.output_schema().clone();
+ let sink = Arc::new(JsonSink::new(conf,
self.file_compression_type.clone()));
+
+ Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+ }
+}
+
+impl Default for JsonSerializer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Define a struct for serializing Json records to a stream
+pub struct JsonSerializer {
+ // Inner buffer for avoiding reallocation
+ buffer: Vec<u8>,
+}
+
+impl JsonSerializer {
+ /// Constructor for the JsonSerializer object
+ pub fn new() -> Self {
+ Self {
+ buffer: Vec::with_capacity(4096),
+ }
+ }
+}
+
+#[async_trait]
+impl BatchSerializer for JsonSerializer {
+ async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+ let mut writer = json::LineDelimitedWriter::new(&mut self.buffer);
+ writer.write(&batch)?;
+ //drop(writer);
+ Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
+ }
+}
+
+/// Implements [`DataSink`] for writing to a Json file.
+struct JsonSink {
+ /// Config options for writing data
+ config: FileSinkConfig,
+ file_compression_type: FileCompressionType,
+}
+
+impl Debug for JsonSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("JsonSink")
+ .field("file_compression_type", &self.file_compression_type)
+ .finish()
+ }
+}
+
+impl DisplayAs for JsonSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "JsonSink(writer_mode={:?}, file_groups=",
+ self.config.writer_mode
+ )?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
+ }
+}
+
+impl JsonSink {
+ fn new(config: FileSinkConfig, file_compression_type: FileCompressionType)
-> Self {
+ Self {
+ config,
+ file_compression_type,
+ }
+ }
+
+ // Create a write for Json files
+ async fn create_writer(
+ &self,
+ file_meta: FileMeta,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
+ let object = &file_meta.object_meta;
+ match self.config.writer_mode {
+ // If the mode is append, call the store's append method and
return wrapped in
+ // a boxed trait object.
+ FileWriterMode::Append => {
+ let writer = object_store
+ .append(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ let writer = AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::Append,
+ );
+ Ok(writer)
+ }
+ // If the mode is put, create a new AsyncPut writer and return it
wrapped in
+ // a boxed trait object
+ FileWriterMode::Put => {
+ let writer = Box::new(AsyncPutWriter::new(object.clone(),
object_store));
+ let writer = AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::Put,
+ );
+ Ok(writer)
+ }
+ // If the mode is put multipart, call the store's put_multipart
method and
+ // return the writer wrapped in a boxed trait object.
+ FileWriterMode::PutMultipart => {
+ let (multipart_id, writer) = object_store
+ .put_multipart(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ Ok(AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::MultiPart(MultiPart::new(
+ object_store,
+ multipart_id,
+ object.location.clone(),
+ )),
+ ))
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl DataSink for JsonSink {
+ async fn write_all(
+ &self,
+ data: Vec<SendableRecordBatchStream>,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let num_partitions = data.len();
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+
+ // Construct serializer and writer for each file group
+ let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
+ let mut writers = vec![];
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ for file_group in &self.config.file_groups {
+ let serializer = JsonSerializer::new();
+ serializers.push(Box::new(serializer));
+
+ let file = file_group.clone();
+ let writer = self
+ .create_writer(
+ file.object_meta.clone().into(),
+ object_store.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ FileWriterMode::Put => {
+ return Err(DataFusionError::NotImplemented(
+ "Put Mode is not implemented for Json Sink yet".into(),
+ ))
+ }
+ FileWriterMode::PutMultipart => {
+ // Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
+ let base_path = &self.config.table_paths[0];
+ // Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
+ let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+ for part_idx in 0..num_partitions {
+ let serializer = JsonSerializer::new();
+ serializers.push(Box::new(serializer));
+ let file_path = base_path
+ .prefix()
+ .child(format!("/{}_{}.json", write_id, part_idx));
+ let object_meta = ObjectMeta {
+ location: file_path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = self
+ .create_writer(object_meta.into(),
object_store.clone())
+ .await?;
+ writers.push(writer);
+ }
+ }
+ }
+
+ stateless_serialize_and_write_files(data, serializers, writers).await
+ }
}
#[cfg(test)]
diff --git a/datafusion/core/src/datasource/file_format/mod.rs
b/datafusion/core/src/datasource/file_format/mod.rs
index 4cc6e8706a..97492276a2 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -39,7 +39,7 @@ use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
-use crate::physical_plan::{ExecutionPlan, Statistics};
+use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream,
Statistics};
use arrow_array::RecordBatch;
use datafusion_common::DataFusionError;
@@ -48,11 +48,11 @@ use datafusion_physical_expr::PhysicalExpr;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
-use futures::ready;
use futures::FutureExt;
+use futures::{ready, StreamExt};
use object_store::path::Path;
use object_store::{MultipartId, ObjectMeta, ObjectStore};
-use tokio::io::AsyncWrite;
+use tokio::io::{AsyncWrite, AsyncWriteExt};
/// This trait abstracts all the file format specific implementations
/// from the [`TableProvider`]. This helps code re-utilization across
/// providers that support the the same file formats.
@@ -313,6 +313,75 @@ pub trait BatchSerializer: Unpin + Send {
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
}
+/// Checks if any of the passed writers have encountered an error
+/// and if so, all writers are aborted.
+async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
+ result: Result<T>,
+ writers: &mut [AbortableWrite<W>],
+) -> Result<T> {
+ match result {
+ Ok(value) => Ok(value),
+ Err(e) => {
+ // Abort all writers before returning the error:
+ for writer in writers {
+ let mut abort_future = writer.abort_writer();
+ if let Ok(abort_future) = &mut abort_future {
+ let _ = abort_future.await;
+ }
+ // Ignore errors that occur during abortion,
+ // We do try to abort all writers before returning error.
+ }
+ // After aborting writers return original error.
+ Err(e)
+ }
+ }
+}
+
+/// Contains the common logic for serializing RecordBatches and
+/// writing the resulting bytes to an ObjectStore.
+/// Serialization is assumed to be stateless, i.e.
+/// each RecordBatch can be serialized without any
+/// dependency on the RecordBatches before or after.
+async fn stateless_serialize_and_write_files(
+ mut data: Vec<SendableRecordBatchStream>,
+ mut serializers: Vec<Box<dyn BatchSerializer>>,
+ mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
+) -> Result<u64> {
+ let num_partitions = data.len();
+ let mut row_count = 0;
+ // Map errors to DatafusionError.
+ let err_converter =
+ |_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
+ // TODO parallelize serialization accross partitions and batches within
partitions
+ // see: https://github.com/apache/arrow-datafusion/issues/7079
+ for idx in 0..num_partitions {
+ while let Some(maybe_batch) = data[idx].next().await {
+ // Write data to files in a round robin fashion:
+ let serializer = &mut serializers[idx];
+ let batch = check_for_errors(maybe_batch, &mut writers).await?;
+ row_count += batch.num_rows();
+ let bytes =
+ check_for_errors(serializer.serialize(batch).await, &mut
writers).await?;
+ let writer = &mut writers[idx];
+ check_for_errors(
+ writer.write_all(&bytes).await.map_err(err_converter),
+ &mut writers,
+ )
+ .await?;
+ }
+ }
+ // Perform cleanup:
+ let n_writers = writers.len();
+ for idx in 0..n_writers {
+ check_for_errors(
+ writers[idx].shutdown().await.map_err(err_converter),
+ &mut writers,
+ )
+ .await?;
+ }
+ Ok(row_count as u64)
+}
+
#[cfg(test)]
pub(crate) mod test_util {
use std::ops::Range;
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index b8499065bd..73c20d3b0c 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -373,6 +373,10 @@ pub struct NdJsonReadOptions<'a> {
pub file_compression_type: FileCompressionType,
/// Flag indicating whether this file may be unbounded (as in a FIFO file).
pub infinite: bool,
+ /// Indicates how the file is sorted
+ pub file_sort_order: Vec<Vec<Expr>>,
+ /// Setting controls how inserts to this file should be handled
+ pub insert_mode: ListingTableInsertMode,
}
impl<'a> Default for NdJsonReadOptions<'a> {
@@ -384,6 +388,8 @@ impl<'a> Default for NdJsonReadOptions<'a> {
table_partition_cols: vec![],
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
+ file_sort_order: vec![],
+ insert_mode: ListingTableInsertMode::AppendToFile,
}
}
}
@@ -424,6 +430,18 @@ impl<'a> NdJsonReadOptions<'a> {
self.schema = Some(schema);
self
}
+
+ /// Configure if file has known sort order
+ pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
+ self.file_sort_order = file_sort_order;
+ self
+ }
+
+ /// Configure how insertions to this table should be handled
+ pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
+ self.insert_mode = insert_mode;
+ self
+ }
}
#[async_trait]
@@ -535,6 +553,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_infinite_source(self.infinite)
+ .with_file_sort_order(self.file_sort_order.clone())
+ .with_insert_mode(self.insert_mode.clone())
}
async fn get_resolved_schema(
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 4085dac484..b47d25d1f9 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -805,6 +805,19 @@ impl TableProvider for ListingTable {
);
}
+ // TODO support inserts to sorted tables which preserve sort_order
+ // Inserts currently make no effort to preserve sort_order. This could
lead to
+ // incorrect query results on the table after inserting incorrectly
sorted data.
+ let unsorted: Vec<Vec<Expr>> = vec![];
+ if self.options.file_sort_order != unsorted {
+ return Err(
+ DataFusionError::NotImplemented(
+ "Writing to a sorted listing table via insert into is not
supported yet. \
+ To write to this table in the meantime, register an
equivalent table with \
+ file_sort_order = vec![]".into())
+ );
+ }
+
let table_path = &self.table_paths()[0];
// Get the object store for the table path.
let store = state.runtime_env().object_store(table_path)?;
@@ -838,10 +851,9 @@ impl TableProvider for ListingTable {
writer_mode =
crate::datasource::file_format::FileWriterMode::PutMultipart
}
ListingTableInsertMode::Error => {
- return Err(DataFusionError::Plan(
+ return plan_err!(
"Invalid plan attempting write to table with
TableWriteMode::Error!"
- .into(),
- ))
+ )
}
}
@@ -935,6 +947,7 @@ mod tests {
use super::*;
use crate::datasource::file_format::file_type::GetExt;
use crate::datasource::{provider_as_source, MemTable};
+ use crate::execution::options::ArrowReadOptions;
use crate::physical_plan::collect;
use crate::prelude::*;
use crate::{
@@ -944,9 +957,7 @@ mod tests {
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
};
- use arrow::csv;
use arrow::datatypes::{DataType, Schema};
- use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use chrono::DateTime;
use datafusion_common::assert_contains;
@@ -1434,26 +1445,6 @@ mod tests {
Ok(Arc::new(table))
}
- fn load_empty_schema_csv_table(
- schema: SchemaRef,
- temp_path: &str,
- insert_mode: ListingTableInsertMode,
- ) -> Result<Arc<dyn TableProvider>> {
- File::create(temp_path)?;
- let table_path = ListingTableUrl::parse(temp_path).unwrap();
-
- let file_format = CsvFormat::default();
- let listing_options =
-
ListingOptions::new(Arc::new(file_format)).with_insert_mode(insert_mode);
-
- let config = ListingTableConfig::new(table_path)
- .with_listing_options(listing_options)
- .with_schema(schema);
-
- let table = ListingTable::try_new(config)?;
- Ok(Arc::new(table))
- }
-
/// Check that the files listed by the table match the specified
`output_partitioning`
/// when the object store contains `files`.
async fn assert_list_files_for_scan_grouping(
@@ -1559,10 +1550,72 @@ mod tests {
}
#[tokio::test]
- async fn test_append_plan_to_external_table_stored_as_csv() -> Result<()> {
- let file_type = FileType::CSV;
- let file_compression_type = FileCompressionType::UNCOMPRESSED;
+ async fn test_insert_into_append_to_json_file() -> Result<()> {
+ helper_test_insert_into_append_to_existing_files(
+ FileType::JSON,
+ FileCompressionType::UNCOMPRESSED,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_append_new_json_files() -> Result<()> {
+ helper_test_append_new_files_to_table(
+ FileType::JSON,
+ FileCompressionType::UNCOMPRESSED,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_append_to_csv_file() -> Result<()> {
+ helper_test_insert_into_append_to_existing_files(
+ FileType::CSV,
+ FileCompressionType::UNCOMPRESSED,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_append_new_csv_files() -> Result<()> {
+ helper_test_append_new_files_to_table(
+ FileType::CSV,
+ FileCompressionType::UNCOMPRESSED,
+ )
+ .await?;
+ Ok(())
+ }
+
+ fn load_empty_schema_table(
+ schema: SchemaRef,
+ temp_path: &str,
+ insert_mode: ListingTableInsertMode,
+ file_format: Arc<dyn FileFormat>,
+ ) -> Result<Arc<dyn TableProvider>> {
+ File::create(temp_path)?;
+ let table_path = ListingTableUrl::parse(temp_path).unwrap();
+
+ let listing_options =
+
ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode);
+
+ let config = ListingTableConfig::new(table_path)
+ .with_listing_options(listing_options)
+ .with_schema(schema);
+
+ let table = ListingTable::try_new(config)?;
+ Ok(Arc::new(table))
+ }
+ /// Logic of testing inserting into listing table by Appending to existing
files
+ /// is the same for all formats/options which support this. This helper
allows
+ /// passing different options to execute the same test with different
settings.
+ async fn helper_test_insert_into_append_to_existing_files(
+ file_type: FileType,
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
// Create the initial context, schema, and batch.
let session_ctx = SessionContext::new();
// Create a new schema with one field called "a" of type Int32
@@ -1587,17 +1640,27 @@ mod tests {
.unwrap()
);
- // Define batch size for file reader
- let batch_size = batch.num_rows();
-
// Create a temporary directory and a CSV file within it.
let tmp_dir = TempDir::new()?;
let path = tmp_dir.path().join(filename);
- let initial_table = load_empty_schema_csv_table(
+ let file_format: Arc<dyn FileFormat> = match file_type {
+ FileType::CSV => Arc::new(
+
CsvFormat::default().with_file_compression_type(file_compression_type),
+ ),
+ FileType::JSON => Arc::new(
+
JsonFormat::default().with_file_compression_type(file_compression_type),
+ ),
+ FileType::PARQUET => Arc::new(ParquetFormat::default()),
+ FileType::AVRO => Arc::new(AvroFormat {}),
+ FileType::ARROW => Arc::new(ArrowFormat {}),
+ };
+
+ let initial_table = load_empty_schema_table(
schema.clone(),
path.to_str().unwrap(),
ListingTableInsertMode::AppendToFile,
+ file_format,
)?;
session_ctx.register_table("t", initial_table)?;
// Create and register the source table with the provided schema and
inserted data
@@ -1632,19 +1695,9 @@ mod tests {
// Assert that the batches read from the file match the expected
result.
assert_batches_eq!(expected, &res);
- // Open the CSV file, read its contents as a record batch, and collect
the batches into a vector.
- let file = File::open(path.clone())?;
- let reader = csv::ReaderBuilder::new(schema.clone())
- .has_header(true)
- .with_batch_size(batch_size)
- .build(file)
- .map_err(|e| DataFusionError::Internal(e.to_string()))?;
-
- let batches = reader
- .collect::<Vec<ArrowResult<RecordBatch>>>()
- .into_iter()
- .collect::<ArrowResult<Vec<RecordBatch>>>()
- .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+ // Read the records in the table
+ let batches = session_ctx.sql("select * from
t").await?.collect().await?;
// Define the expected result as a vector of strings.
let expected = vec![
@@ -1663,6 +1716,10 @@ mod tests {
// Assert that the batches read from the file match the expected
result.
assert_batches_eq!(expected, &batches);
+ // Assert that only 1 file was added to the table
+ let num_files = tmp_dir.path().read_dir()?.count();
+ assert_eq!(num_files, 1);
+
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
@@ -1684,18 +1741,7 @@ mod tests {
assert_batches_eq!(expected, &res);
// Open the CSV file, read its contents as a record batch, and collect
the batches into a vector.
- let file = File::open(path.clone())?;
- let reader = csv::ReaderBuilder::new(schema.clone())
- .has_header(true)
- .with_batch_size(batch_size)
- .build(file)
- .map_err(|e| DataFusionError::Internal(e.to_string()))?;
-
- let batches = reader
- .collect::<Vec<ArrowResult<RecordBatch>>>()
- .into_iter()
- .collect::<ArrowResult<Vec<RecordBatch>>>()
- .map_err(|e| DataFusionError::Internal(e.to_string()));
+ let batches = session_ctx.sql("select * from
t").await?.collect().await?;
// Define the expected result after the second append.
let expected = vec![
@@ -1718,14 +1764,20 @@ mod tests {
];
// Assert that the batches read from the file after the second append
match the expected result.
- assert_batches_eq!(expected, &batches?);
+ assert_batches_eq!(expected, &batches);
+
+ // Assert that no additional files were added to the table
+ let num_files = tmp_dir.path().read_dir()?.count();
+ assert_eq!(num_files, 1);
// Return Ok if the function
Ok(())
}
- #[tokio::test]
- async fn test_append_new_files_to_csv_table() -> Result<()> {
+ async fn helper_test_append_new_files_to_table(
+ file_type: FileType,
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
// Create the initial context, schema, and batch.
let session_ctx = SessionContext::new();
// Create a new schema with one field called "a" of type Int32
@@ -1741,17 +1793,70 @@ mod tests {
vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
)?;
- // Create a temporary directory and a CSV file within it.
+ // Register appropriate table depending on file_type we want to test
let tmp_dir = TempDir::new()?;
- session_ctx
- .register_csv(
- "t",
- tmp_dir.path().to_str().unwrap(),
- CsvReadOptions::new()
- .insert_mode(ListingTableInsertMode::AppendNewFiles)
- .schema(schema.as_ref()),
- )
- .await?;
+ match file_type {
+ FileType::CSV => {
+ session_ctx
+ .register_csv(
+ "t",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new()
+
.insert_mode(ListingTableInsertMode::AppendNewFiles)
+ .schema(schema.as_ref())
+ .file_compression_type(file_compression_type),
+ )
+ .await?;
+ }
+ FileType::JSON => {
+ session_ctx
+ .register_json(
+ "t",
+ tmp_dir.path().to_str().unwrap(),
+ NdJsonReadOptions::default()
+
.insert_mode(ListingTableInsertMode::AppendNewFiles)
+ .schema(schema.as_ref())
+ .file_compression_type(file_compression_type),
+ )
+ .await?;
+ }
+ FileType::PARQUET => {
+ session_ctx
+ .register_parquet(
+ "t",
+ tmp_dir.path().to_str().unwrap(),
+ ParquetReadOptions::default(), // TODO implement
insert_mode for parquet
+
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
+
//.schema(schema.as_ref()),
+ )
+ .await?;
+ }
+ FileType::AVRO => {
+ session_ctx
+ .register_avro(
+ "t",
+ tmp_dir.path().to_str().unwrap(),
+ AvroReadOptions::default()
+ // TODO implement insert_mode for avro
+
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
+ .schema(schema.as_ref()),
+ )
+ .await?;
+ }
+ FileType::ARROW => {
+ session_ctx
+ .register_arrow(
+ "t",
+ tmp_dir.path().to_str().unwrap(),
+ ArrowReadOptions::default()
+ // TODO implement insert_mode for arrow
+
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
+ .schema(schema.as_ref()),
+ )
+ .await?;
+ }
+ }
+
// Create and register the source table with the provided schema and
inserted data
let source_table = Arc::new(MemTable::try_new(
schema.clone(),
@@ -1804,7 +1909,7 @@ mod tests {
// Assert that the batches read from the file match the expected
result.
assert_batches_eq!(expected, &batches);
- //asert that 6 files were added to the table
+ // Assert that 6 files were added to the table
let num_files = tmp_dir.path().read_dir()?.count();
assert_eq!(num_files, 6);