metesynnada commented on code in PR #7212:
URL: https://github.com/apache/arrow-datafusion/pull/7212#discussion_r1285446468
##########
datafusion/core/src/datasource/file_format/json.rs:
##########
@@ -148,6 +172,216 @@ impl FileFormat for JsonFormat {
let exec = NdJsonExec::new(conf,
self.file_compression_type.to_owned());
Ok(Arc::new(exec))
}
+
+ async fn create_writer_physical_plan(
+ &self,
+ input: Arc<dyn ExecutionPlan>,
+ _state: &SessionState,
+ conf: FileSinkConfig,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if conf.overwrite {
+ return Err(DataFusionError::NotImplemented(
+ "Overwrites are not implemented yet for Json".into(),
+ ));
+ }
+
+ if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
+ return Err(DataFusionError::NotImplemented(
+ "Inserting compressed JSON is not implemented yet.".into(),
+ ));
+ }
+ let sink_schema = conf.output_schema().clone();
+ let sink = Arc::new(JsonSink::new(conf,
self.file_compression_type.clone()));
+
+ Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+ }
+}
+
+impl Default for JsonSerializer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Define a struct for serializing Json records to a stream
+pub struct JsonSerializer {
+ // Inner buffer for avoiding reallocation
+ buffer: Vec<u8>,
+}
+
+impl JsonSerializer {
+ /// Constructor for the JsonSerializer object
+ pub fn new() -> Self {
+ Self {
+ buffer: Vec::with_capacity(4096),
+ }
+ }
+}
+
+#[async_trait]
+impl BatchSerializer for JsonSerializer {
+ async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+ let mut writer = json::LineDelimitedWriter::new(&mut self.buffer);
+ writer.write(&batch)?;
+ //drop(writer);
+ Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
+ }
+}
+
+/// Implements [`DataSink`] for writing to a Json file.
+struct JsonSink {
+ /// Config options for writing data
+ config: FileSinkConfig,
+ file_compression_type: FileCompressionType,
+}
+
+impl Debug for JsonSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("JsonSink")
+ .field("file_compression_type", &self.file_compression_type)
+ .finish()
+ }
+}
+
+impl DisplayAs for JsonSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "JsonSink(writer_mode={:?}, file_groups=",
+ self.config.writer_mode
+ )?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
+ }
+}
+
+impl JsonSink {
+ fn new(config: FileSinkConfig, file_compression_type: FileCompressionType)
-> Self {
+ Self {
+ config,
+ file_compression_type,
+ }
+ }
+
+ // Create a write for Json files
+ async fn create_writer(
+ &self,
+ file_meta: FileMeta,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
+ let object = &file_meta.object_meta;
+ match self.config.writer_mode {
+ // If the mode is append, call the store's append method and
return wrapped in
+ // a boxed trait object.
+ FileWriterMode::Append => {
+ let writer = object_store
+ .append(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ let writer = AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::Append,
+ );
+ Ok(writer)
+ }
+ // If the mode is put, create a new AsyncPut writer and return it
wrapped in
+ // a boxed trait object
+ FileWriterMode::Put => {
+ let writer = Box::new(AsyncPutWriter::new(object.clone(),
object_store));
+ let writer = AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::Put,
+ );
+ Ok(writer)
+ }
+ // If the mode is put multipart, call the store's put_multipart
method and
+ // return the writer wrapped in a boxed trait object.
+ FileWriterMode::PutMultipart => {
+ let (multipart_id, writer) = object_store
+ .put_multipart(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ Ok(AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::MultiPart(MultiPart::new(
+ object_store,
+ multipart_id,
+ object.location.clone(),
+ )),
+ ))
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl DataSink for JsonSink {
+ async fn write_all(
+ &self,
+ data: Vec<SendableRecordBatchStream>,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let num_partitions = data.len();
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+
+ // Construct serializer and writer for each file group
+ let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
+ let mut writers = vec![];
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ for file_group in &self.config.file_groups {
+ let serializer = JsonSerializer::new();
+ serializers.push(Box::new(serializer));
+
+ let file = file_group.clone();
+ let writer = self
+ .create_writer(
+ file.object_meta.clone().into(),
+ object_store.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ FileWriterMode::Put => {
+ return Err(DataFusionError::NotImplemented(
+ "Put Mode is not implemented for Json Sink yet".into(),
+ ))
+ }
+ FileWriterMode::PutMultipart => {
+ //currently assuming only 1 partition path (i.e. not hive
style partitioning on a column)
Review Comment:
```suggestion
// Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
```
##########
datafusion/core/src/datasource/file_format/json.rs:
##########
@@ -148,6 +172,216 @@ impl FileFormat for JsonFormat {
let exec = NdJsonExec::new(conf,
self.file_compression_type.to_owned());
Ok(Arc::new(exec))
}
+
+ async fn create_writer_physical_plan(
+ &self,
+ input: Arc<dyn ExecutionPlan>,
+ _state: &SessionState,
+ conf: FileSinkConfig,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ if conf.overwrite {
+ return Err(DataFusionError::NotImplemented(
+ "Overwrites are not implemented yet for Json".into(),
+ ));
+ }
+
+ if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
+ return Err(DataFusionError::NotImplemented(
+ "Inserting compressed JSON is not implemented yet.".into(),
+ ));
+ }
+ let sink_schema = conf.output_schema().clone();
+ let sink = Arc::new(JsonSink::new(conf,
self.file_compression_type.clone()));
+
+ Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+ }
+}
+
+impl Default for JsonSerializer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+/// Define a struct for serializing Json records to a stream
+pub struct JsonSerializer {
+ // Inner buffer for avoiding reallocation
+ buffer: Vec<u8>,
+}
+
+impl JsonSerializer {
+ /// Constructor for the JsonSerializer object
+ pub fn new() -> Self {
+ Self {
+ buffer: Vec::with_capacity(4096),
+ }
+ }
+}
+
+#[async_trait]
+impl BatchSerializer for JsonSerializer {
+ async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+ let mut writer = json::LineDelimitedWriter::new(&mut self.buffer);
+ writer.write(&batch)?;
+ //drop(writer);
+ Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
+ }
+}
+
+/// Implements [`DataSink`] for writing to a Json file.
+struct JsonSink {
+ /// Config options for writing data
+ config: FileSinkConfig,
+ file_compression_type: FileCompressionType,
+}
+
+impl Debug for JsonSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("JsonSink")
+ .field("file_compression_type", &self.file_compression_type)
+ .finish()
+ }
+}
+
+impl DisplayAs for JsonSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "JsonSink(writer_mode={:?}, file_groups=",
+ self.config.writer_mode
+ )?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
+ }
+}
+
+impl JsonSink {
+ fn new(config: FileSinkConfig, file_compression_type: FileCompressionType)
-> Self {
+ Self {
+ config,
+ file_compression_type,
+ }
+ }
+
+ // Create a write for Json files
+ async fn create_writer(
+ &self,
+ file_meta: FileMeta,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
+ let object = &file_meta.object_meta;
+ match self.config.writer_mode {
+ // If the mode is append, call the store's append method and
return wrapped in
+ // a boxed trait object.
+ FileWriterMode::Append => {
+ let writer = object_store
+ .append(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ let writer = AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::Append,
+ );
+ Ok(writer)
+ }
+ // If the mode is put, create a new AsyncPut writer and return it
wrapped in
+ // a boxed trait object
+ FileWriterMode::Put => {
+ let writer = Box::new(AsyncPutWriter::new(object.clone(),
object_store));
+ let writer = AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::Put,
+ );
+ Ok(writer)
+ }
+ // If the mode is put multipart, call the store's put_multipart
method and
+ // return the writer wrapped in a boxed trait object.
+ FileWriterMode::PutMultipart => {
+ let (multipart_id, writer) = object_store
+ .put_multipart(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ Ok(AbortableWrite::new(
+ self.file_compression_type.convert_async_writer(writer)?,
+ AbortMode::MultiPart(MultiPart::new(
+ object_store,
+ multipart_id,
+ object.location.clone(),
+ )),
+ ))
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl DataSink for JsonSink {
+ async fn write_all(
+ &self,
+ data: Vec<SendableRecordBatchStream>,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let num_partitions = data.len();
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+
+ // Construct serializer and writer for each file group
+ let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
+ let mut writers = vec![];
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ for file_group in &self.config.file_groups {
+ let serializer = JsonSerializer::new();
+ serializers.push(Box::new(serializer));
+
+ let file = file_group.clone();
+ let writer = self
+ .create_writer(
+ file.object_meta.clone().into(),
+ object_store.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ FileWriterMode::Put => {
+ return Err(DataFusionError::NotImplemented(
+ "Put Mode is not implemented for Json Sink yet".into(),
+ ))
+ }
+ FileWriterMode::PutMultipart => {
+ //currently assuming only 1 partition path (i.e. not hive
style partitioning on a column)
+ let base_path = &self.config.table_paths[0];
+ //uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
Review Comment:
```suggestion
// Uniquely identify this batch of files with a random
string, to prevent collisions overwriting files
```
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -805,6 +805,18 @@ impl TableProvider for ListingTable {
);
}
+ // Inserts currently make no effort to preserve sort_order. This could
lead to
+ // incorrect query results on the table after inserting incorrectly
sorted data.
+ let unsorted: Vec<Vec<Expr>> = vec![];
Review Comment:
Quite logical. Can you make this as a `// TODO`
##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1559,23 +1549,87 @@ mod tests {
}
#[tokio::test]
- async fn test_append_plan_to_external_table_stored_as_csv() -> Result<()> {
- let file_type = FileType::CSV;
- let file_compression_type = FileCompressionType::UNCOMPRESSED;
+ async fn test_insert_into_append_to_json_file() -> Result<()> {
+ helper_test_insert_into_append_to_existing_files(
+ FileType::JSON,
+ FileCompressionType::UNCOMPRESSED,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_append_new_json_files() -> Result<()> {
+ helper_test_append_new_files_to_table(
+ FileType::JSON,
+ FileCompressionType::UNCOMPRESSED,
+ )
+ .await?;
+ Ok(())
+ }
+ #[tokio::test]
+ async fn test_insert_into_append_to_csv_file() -> Result<()> {
+ helper_test_insert_into_append_to_existing_files(
+ FileType::CSV,
+ FileCompressionType::UNCOMPRESSED,
+ )
+ .await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_insert_into_append_new_csv_files() -> Result<()> {
+ helper_test_append_new_files_to_table(
+ FileType::CSV,
+ FileCompressionType::UNCOMPRESSED,
+ )
+ .await?;
+ Ok(())
+ }
+
+ fn load_empty_schema_table(
+ schema: SchemaRef,
+ temp_path: &str,
+ insert_mode: ListingTableInsertMode,
+ file_format: Arc<dyn FileFormat>,
+ ) -> Result<Arc<dyn TableProvider>> {
+ File::create(temp_path)?;
+ let table_path = ListingTableUrl::parse(temp_path).unwrap();
+
+ let listing_options =
+
ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode);
+
+ let config = ListingTableConfig::new(table_path)
+ .with_listing_options(listing_options)
+ .with_schema(schema);
+
+ let table = ListingTable::try_new(config)?;
+ Ok(Arc::new(table))
+ }
+
+ /// Logic of testing inserting into listing table by Appending to existing
files
+ /// is the same for all formats/options which support this. This helper
allows
+ /// passing different options to execute the same test with different
settings.
+ async fn helper_test_insert_into_append_to_existing_files(
+ file_type: FileType,
+ file_compression_type: FileCompressionType,
+ ) -> Result<()> {
// Create the initial context, schema, and batch.
let session_ctx = SessionContext::new();
// Create a new schema with one field called "a" of type Int32
let schema = Arc::new(Schema::new(vec![Field::new(
"column1",
- DataType::Int32,
+ DataType::Float64,
Review Comment:
Could you clarify the reason behind this change?
##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -313,6 +313,73 @@ pub trait BatchSerializer: Unpin + Send {
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
}
+async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
Review Comment:
Could you please provide a docstring for this section?
##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -313,6 +313,73 @@ pub trait BatchSerializer: Unpin + Send {
async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
}
+async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
+ result: Result<T>,
+ writers: &mut [AbortableWrite<W>],
+) -> Result<T> {
+ match result {
+ Ok(value) => Ok(value),
+ Err(e) => {
+ // Abort all writers before returning the error:
+ for writer in writers {
+ let mut abort_future = writer.abort_writer();
+ if let Ok(abort_future) = &mut abort_future {
+ let _ = abort_future.await;
+ }
+ // Ignore errors that occur during abortion,
+ // We do try to abort all writers before returning error.
+ }
+ // After aborting writers return original error.
+ Err(e)
+ }
+ }
+}
+
+/// Contains the common logic for serializing RecordBatches and
+/// writing the resulting bytes to an ObjectStore.
+/// Serialization is assumed to be stateless, i.e.
+/// each RecordBatch can be serialized without any
+/// dependency on the RecordBatches before or after.
+async fn stateless_serialize_and_write_files(
Review Comment:
It makes sense to consolidate these into a unified approach.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]