This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d5704f75fc Support  Writing Arrow files (#8608)
d5704f75fc is described below

commit d5704f75fc28f88632518ef9a808c9cda38dc162
Author: Devin D'Angelo <[email protected]>
AuthorDate: Sun Dec 24 07:46:26 2023 -0500

    Support  Writing Arrow files (#8608)
    
    * write arrow files
    
    * update datafusion-cli lock
    
    * fix toml formatting
    
    * Update insert_to_external.slt
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * add ticket tracking arrow options
    
    * default to lz4 compression
    
    * update datafusion-cli lock
    
    * cargo update
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 Cargo.toml                                         |  28 +--
 datafusion-cli/Cargo.lock                          |  56 +++---
 datafusion/core/Cargo.toml                         |   1 +
 .../core/src/datasource/file_format/arrow.rs       | 207 ++++++++++++++++++++-
 .../core/src/datasource/file_format/parquet.rs     |  34 +---
 .../core/src/datasource/file_format/write/mod.rs   |  33 +++-
 datafusion/sqllogictest/test_files/copy.slt        |  56 ++++++
 .../sqllogictest/test_files/insert_to_external.slt |  39 ++++
 8 files changed, 368 insertions(+), 86 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 023dc6c6fc..a698fbf471 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,24 +17,7 @@
 
 [workspace]
 exclude = ["datafusion-cli"]
-members = [
-    "datafusion/common",
-    "datafusion/core",
-    "datafusion/expr",
-    "datafusion/execution",
-    "datafusion/optimizer",
-    "datafusion/physical-expr",
-    "datafusion/physical-plan",
-    "datafusion/proto",
-    "datafusion/proto/gen",
-    "datafusion/sql",
-    "datafusion/sqllogictest",
-    "datafusion/substrait",
-    "datafusion/wasmtest",
-    "datafusion-examples",
-    "docs",
-    "test-utils",
-    "benchmarks",
+members = ["datafusion/common", "datafusion/core", "datafusion/expr", 
"datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", 
"datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", 
"datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", 
"datafusion/wasmtest", "datafusion-examples", "docs", "test-utils", 
"benchmarks",
 ]
 resolver = "2"
 
@@ -53,24 +36,26 @@ arrow = { version = "49.0.0", features = ["prettyprint"] }
 arrow-array = { version = "49.0.0", default-features = false, features = 
["chrono-tz"] }
 arrow-buffer = { version = "49.0.0", default-features = false }
 arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
+arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] }
 arrow-ord = { version = "49.0.0", default-features = false }
 arrow-schema = { version = "49.0.0", default-features = false }
 async-trait = "0.1.73"
 bigdecimal = "0.4.1"
 bytes = "1.4"
+chrono = { version = "0.4.31", default-features = false }
 ctor = "0.2.0"
+dashmap = "5.4.0"
 datafusion = { path = "datafusion/core", version = "34.0.0" }
 datafusion-common = { path = "datafusion/common", version = "34.0.0" }
+datafusion-execution = { path = "datafusion/execution", version = "34.0.0" }
 datafusion-expr = { path = "datafusion/expr", version = "34.0.0" }
-datafusion-sql = { path = "datafusion/sql", version = "34.0.0" }
 datafusion-optimizer = { path = "datafusion/optimizer", version = "34.0.0" }
 datafusion-physical-expr = { path = "datafusion/physical-expr", version = 
"34.0.0" }
 datafusion-physical-plan = { path = "datafusion/physical-plan", version = 
"34.0.0" }
-datafusion-execution = { path = "datafusion/execution", version = "34.0.0" }
 datafusion-proto = { path = "datafusion/proto", version = "34.0.0" }
+datafusion-sql = { path = "datafusion/sql", version = "34.0.0" }
 datafusion-sqllogictest = { path = "datafusion/sqllogictest", version = 
"34.0.0" }
 datafusion-substrait = { path = "datafusion/substrait", version = "34.0.0" }
-dashmap = "5.4.0"
 doc-comment = "0.3"
 env_logger = "0.10"
 futures = "0.3"
@@ -88,7 +73,6 @@ serde_json = "1"
 sqlparser = { version = "0.40.0", features = ["visitor"] }
 tempfile = "3"
 thiserror = "1.0.44"
-chrono = { version = "0.4.31", default-features = false }
 url = "2.2"
 
 [profile.release]
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index ac05ddf10a..9f75013c86 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -255,6 +255,7 @@ dependencies = [
  "arrow-data",
  "arrow-schema",
  "flatbuffers",
+ "lz4_flex",
 ]
 
 [[package]]
@@ -378,13 +379,13 @@ dependencies = [
 
 [[package]]
 name = "async-trait"
-version = "0.1.74"
+version = "0.1.75"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9"
+checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -1074,7 +1075,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e"
 dependencies = [
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -1104,6 +1105,7 @@ dependencies = [
  "apache-avro",
  "arrow",
  "arrow-array",
+ "arrow-ipc",
  "arrow-schema",
  "async-compression",
  "async-trait",
@@ -1576,7 +1578,7 @@ checksum = 
"53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -2496,7 +2498,7 @@ checksum = 
"4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -2513,9 +2515,9 @@ checksum = 
"8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
 
 [[package]]
 name = "pkg-config"
-version = "0.3.27"
+version = "0.3.28"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
+checksum = "69d3587f8a9e599cc7ec2c00e331f71c4e69a5f9a4b8a6efd5b07466b9736f9a"
 
 [[package]]
 name = "powerfmt"
@@ -2586,9 +2588,9 @@ dependencies = [
 
 [[package]]
 name = "proc-macro2"
-version = "1.0.70"
+version = "1.0.71"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "39278fbbf5fb4f646ce651690877f89d1c5811a3d4acb27700c1cb3cdb78fd3b"
+checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8"
 dependencies = [
  "unicode-ident",
 ]
@@ -3020,7 +3022,7 @@ checksum = 
"43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -3186,7 +3188,7 @@ dependencies = [
  "proc-macro2",
  "quote",
  "rustversion",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -3208,9 +3210,9 @@ dependencies = [
 
 [[package]]
 name = "syn"
-version = "2.0.41"
+version = "2.0.42"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "44c8b28c477cc3bf0e7966561e3460130e1255f7a1cf71931075f1c5e7a7e269"
+checksum = "5b7d0a2c048d661a1a59fcd7355baa232f7ed34e0ee4df2eef3c1c1c0d3852d8"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -3289,7 +3291,7 @@ checksum = 
"01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -3357,9 +3359,9 @@ checksum = 
"1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.35.0"
+version = "1.35.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "841d45b238a16291a4e1584e61820b8ae57d696cc5015c459c229ccc6990cc1c"
+checksum = "c89b4efa943be685f629b149f53829423f8f5531ea21249408e8e2f8671ec104"
 dependencies = [
  "backtrace",
  "bytes",
@@ -3381,7 +3383,7 @@ checksum = 
"5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -3478,7 +3480,7 @@ checksum = 
"34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -3523,7 +3525,7 @@ checksum = 
"f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
@@ -3677,7 +3679,7 @@ dependencies = [
  "once_cell",
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
  "wasm-bindgen-shared",
 ]
 
@@ -3711,7 +3713,7 @@ checksum = 
"f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
  "wasm-bindgen-backend",
  "wasm-bindgen-shared",
 ]
@@ -3960,22 +3962,22 @@ dependencies = [
 
 [[package]]
 name = "zerocopy"
-version = "0.7.31"
+version = "0.7.32"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1c4061bedbb353041c12f413700357bec76df2c7e2ca8e4df8bac24c6bf68e3d"
+checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be"
 dependencies = [
  "zerocopy-derive",
 ]
 
 [[package]]
 name = "zerocopy-derive"
-version = "0.7.31"
+version = "0.7.32"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b3c129550b3e6de3fd0ba67ba5c81818f9805e58b8d7fee80a3a59d2c9fc601a"
+checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 2.0.41",
+ "syn 2.0.42",
 ]
 
 [[package]]
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 0ee83e7567..9de6a7f7d6 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -55,6 +55,7 @@ ahash = { version = "0.8", default-features = false, features 
= ["runtime-rng"]
 apache-avro = { version = "0.16", optional = true }
 arrow = { workspace = true }
 arrow-array = { workspace = true }
+arrow-ipc = { workspace = true }
 arrow-schema = { workspace = true }
 async-compression = { version = "0.4.0", features = ["bzip2", "gzip", "xz", 
"zstd", "futures-io", "tokio"], optional = true }
 async-trait = { workspace = true }
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs 
b/datafusion/core/src/datasource/file_format/arrow.rs
index 07c96bdae1..7d393d9129 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -21,10 +21,13 @@
 
 use std::any::Any;
 use std::borrow::Cow;
+use std::fmt::{self, Debug};
 use std::sync::Arc;
 
 use crate::datasource::file_format::FileFormat;
-use crate::datasource::physical_plan::{ArrowExec, FileScanConfig};
+use crate::datasource::physical_plan::{
+    ArrowExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
+};
 use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::physical_plan::ExecutionPlan;
@@ -32,16 +35,28 @@ use crate::physical_plan::ExecutionPlan;
 use arrow::ipc::convert::fb_to_schema;
 use arrow::ipc::reader::FileReader;
 use arrow::ipc::root_as_message;
+use arrow_ipc::writer::IpcWriteOptions;
+use arrow_ipc::CompressionType;
 use arrow_schema::{ArrowError, Schema, SchemaRef};
 
 use bytes::Bytes;
-use datafusion_common::{FileType, Statistics};
-use datafusion_physical_expr::PhysicalExpr;
+use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics};
+use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 
+use crate::physical_plan::{DisplayAs, DisplayFormatType};
 use async_trait::async_trait;
+use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
+use datafusion_physical_plan::metrics::MetricsSet;
 use futures::stream::BoxStream;
 use futures::StreamExt;
 use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
+use tokio::io::AsyncWriteExt;
+use tokio::task::JoinSet;
+
+use super::file_compression_type::FileCompressionType;
+use super::write::demux::start_demuxer_task;
+use super::write::{create_writer, SharedBuffer};
 
 /// Arrow `FileFormat` implementation.
 #[derive(Default, Debug)]
@@ -97,11 +112,197 @@ impl FileFormat for ArrowFormat {
         Ok(Arc::new(exec))
     }
 
+    async fn create_writer_physical_plan(
+        &self,
+        input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        conf: FileSinkConfig,
+        order_requirements: Option<Vec<PhysicalSortRequirement>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if conf.overwrite {
+            return not_impl_err!("Overwrites are not implemented yet for Arrow 
format");
+        }
+
+        let sink_schema = conf.output_schema().clone();
+        let sink = Arc::new(ArrowFileSink::new(conf));
+
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            sink,
+            sink_schema,
+            order_requirements,
+        )) as _)
+    }
+
     fn file_type(&self) -> FileType {
         FileType::ARROW
     }
 }
 
+/// Implements [`DataSink`] for writing to arrow_ipc files
+struct ArrowFileSink {
+    config: FileSinkConfig,
+}
+
+impl ArrowFileSink {
+    fn new(config: FileSinkConfig) -> Self {
+        Self { config }
+    }
+
+    /// Converts table schema to writer schema, which may differ in the case
+    /// of hive style partitioning where some columns are removed from the
+    /// underlying files.
+    fn get_writer_schema(&self) -> Arc<Schema> {
+        if !self.config.table_partition_cols.is_empty() {
+            let schema = self.config.output_schema();
+            let partition_names: Vec<_> = self
+                .config
+                .table_partition_cols
+                .iter()
+                .map(|(s, _)| s)
+                .collect();
+            Arc::new(Schema::new(
+                schema
+                    .fields()
+                    .iter()
+                    .filter(|f| !partition_names.contains(&f.name()))
+                    .map(|f| (**f).clone())
+                    .collect::<Vec<_>>(),
+            ))
+        } else {
+            self.config.output_schema().clone()
+        }
+    }
+}
+
+impl Debug for ArrowFileSink {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("ArrowFileSink").finish()
+    }
+}
+
+impl DisplayAs for ArrowFileSink {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(f, "ArrowFileSink(file_groups=",)?;
+                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                write!(f, ")")
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl DataSink for ArrowFileSink {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
+    async fn write_all(
+        &self,
+        data: SendableRecordBatchStream,
+        context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        // No props are supported yet, but can be by updating 
FileTypeWriterOptions
+        // to populate this struct and use those options to initialize the 
arrow_ipc::writer::FileWriter
+        // https://github.com/apache/arrow-datafusion/issues/8635
+        let _arrow_props = 
self.config.file_type_writer_options.try_into_arrow()?;
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.config.object_store_url)?;
+
+        let part_col = if !self.config.table_partition_cols.is_empty() {
+            Some(self.config.table_partition_cols.clone())
+        } else {
+            None
+        };
+
+        let (demux_task, mut file_stream_rx) = start_demuxer_task(
+            data,
+            context,
+            part_col,
+            self.config.table_paths[0].clone(),
+            "arrow".into(),
+            self.config.single_file_output,
+        );
+
+        let mut file_write_tasks: JoinSet<std::result::Result<usize, 
DataFusionError>> =
+            JoinSet::new();
+
+        let ipc_options =
+            IpcWriteOptions::try_new(64, false, 
arrow_ipc::MetadataVersion::V5)?
+                .try_with_compression(Some(CompressionType::LZ4_FRAME))?;
+        while let Some((path, mut rx)) = file_stream_rx.recv().await {
+            let shared_buffer = SharedBuffer::new(1048576);
+            let mut arrow_writer = 
arrow_ipc::writer::FileWriter::try_new_with_options(
+                shared_buffer.clone(),
+                &self.get_writer_schema(),
+                ipc_options.clone(),
+            )?;
+            let mut object_store_writer = create_writer(
+                FileCompressionType::UNCOMPRESSED,
+                &path,
+                object_store.clone(),
+            )
+            .await?;
+            file_write_tasks.spawn(async move {
+                let mut row_count = 0;
+                while let Some(batch) = rx.recv().await {
+                    row_count += batch.num_rows();
+                    arrow_writer.write(&batch)?;
+                    let mut buff_to_flush = 
shared_buffer.buffer.try_lock().unwrap();
+                    if buff_to_flush.len() > 1024000 {
+                        object_store_writer
+                            .write_all(buff_to_flush.as_slice())
+                            .await?;
+                        buff_to_flush.clear();
+                    }
+                }
+                arrow_writer.finish()?;
+                let final_buff = shared_buffer.buffer.try_lock().unwrap();
+
+                object_store_writer.write_all(final_buff.as_slice()).await?;
+                object_store_writer.shutdown().await?;
+                Ok(row_count)
+            });
+        }
+
+        let mut row_count = 0;
+        while let Some(result) = file_write_tasks.join_next().await {
+            match result {
+                Ok(r) => {
+                    row_count += r?;
+                }
+                Err(e) => {
+                    if e.is_panic() {
+                        std::panic::resume_unwind(e.into_panic());
+                    } else {
+                        unreachable!();
+                    }
+                }
+            }
+        }
+
+        match demux_task.await {
+            Ok(r) => r?,
+            Err(e) => {
+                if e.is_panic() {
+                    std::panic::resume_unwind(e.into_panic());
+                } else {
+                    unreachable!();
+                }
+            }
+        }
+        Ok(row_count as u64)
+    }
+}
+
 const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
 const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];
 
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 9db320fb9d..0c813b6ccb 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -29,7 +29,6 @@ use parquet::file::writer::SerializedFileWriter;
 use std::any::Any;
 use std::fmt;
 use std::fmt::Debug;
-use std::io::Write;
 use std::sync::Arc;
 use tokio::io::{AsyncWrite, AsyncWriteExt};
 use tokio::sync::mpsc::{self, Receiver, Sender};
@@ -56,7 +55,7 @@ use parquet::file::properties::WriterProperties;
 use parquet::file::statistics::Statistics as ParquetStatistics;
 
 use super::write::demux::start_demuxer_task;
-use super::write::{create_writer, AbortableWrite};
+use super::write::{create_writer, AbortableWrite, SharedBuffer};
 use super::{FileFormat, FileScanConfig};
 use crate::arrow::array::{
     BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
@@ -1101,37 +1100,6 @@ async fn output_single_parquet_file_parallelized(
     Ok(row_count)
 }
 
-/// A buffer with interior mutability shared by the SerializedFileWriter and
-/// ObjectStore writer
-#[derive(Clone)]
-struct SharedBuffer {
-    /// The inner buffer for reading and writing
-    ///
-    /// The lock is used to obtain internal mutability, so no worry about the
-    /// lock contention.
-    buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
-}
-
-impl SharedBuffer {
-    pub fn new(capacity: usize) -> Self {
-        Self {
-            buffer: 
Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
-        }
-    }
-}
-
-impl Write for SharedBuffer {
-    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
-        let mut buffer = self.buffer.try_lock().unwrap();
-        Write::write(&mut *buffer, buf)
-    }
-
-    fn flush(&mut self) -> std::io::Result<()> {
-        let mut buffer = self.buffer.try_lock().unwrap();
-        Write::flush(&mut *buffer)
-    }
-}
-
 #[cfg(test)]
 pub(crate) mod test_util {
     use super::*;
diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs 
b/datafusion/core/src/datasource/file_format/write/mod.rs
index cfcdbd8c46..68fe81ce91 100644
--- a/datafusion/core/src/datasource/file_format/write/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write/mod.rs
@@ -18,7 +18,7 @@
 //! Module containing helper methods/traits related to enabling
 //! write support for the various file formats
 
-use std::io::Error;
+use std::io::{Error, Write};
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
@@ -43,6 +43,37 @@ use tokio::io::AsyncWrite;
 pub(crate) mod demux;
 pub(crate) mod orchestration;
 
+/// A buffer with interior mutability shared by the SerializedFileWriter and
+/// ObjectStore writer
+#[derive(Clone)]
+pub(crate) struct SharedBuffer {
+    /// The inner buffer for reading and writing
+    ///
+    /// The lock is used to obtain internal mutability, so no worry about the
+    /// lock contention.
+    pub(crate) buffer: Arc<futures::lock::Mutex<Vec<u8>>>,
+}
+
+impl SharedBuffer {
+    pub fn new(capacity: usize) -> Self {
+        Self {
+            buffer: 
Arc::new(futures::lock::Mutex::new(Vec::with_capacity(capacity))),
+        }
+    }
+}
+
+impl Write for SharedBuffer {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        let mut buffer = self.buffer.try_lock().unwrap();
+        Write::write(&mut *buffer, buf)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        let mut buffer = self.buffer.try_lock().unwrap();
+        Write::flush(&mut *buffer)
+    }
+}
+
 /// Stores data needed during abortion of MultiPart writers
 #[derive(Clone)]
 pub(crate) struct MultiPart {
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index 02ab330833..89b2391788 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -230,6 +230,62 @@ select * from validate_csv_with_options;
 1;Foo
 2;Bar
 
+# Copy from table to single arrow file
+query IT
+COPY source_table to 'test_files/scratch/copy/table.arrow';
+----
+2
+
+# Validate single csv output
+statement ok
+CREATE EXTERNAL TABLE validate_arrow_file
+STORED AS arrow
+LOCATION 'test_files/scratch/copy/table.arrow';
+
+query IT
+select * from validate_arrow_file;
+----
+1 Foo
+2 Bar
+
+# Copy from dict encoded values to single arrow file
+query T?
+COPY (values 
+('c', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('d', arrow_cast('bar', 
'Dictionary(Int32, Utf8)'))) 
+to 'test_files/scratch/copy/table_dict.arrow';
+----
+2
+
+# Validate single csv output
+statement ok
+CREATE EXTERNAL TABLE validate_arrow_file_dict
+STORED AS arrow
+LOCATION 'test_files/scratch/copy/table_dict.arrow';
+
+query T?
+select * from validate_arrow_file_dict;
+----
+c foo
+d bar
+
+
+# Copy from table to folder of json
+query IT
+COPY source_table to 'test_files/scratch/copy/table_arrow' (format arrow, 
single_file_output false);
+----
+2
+
+# Validate json output
+statement ok
+CREATE EXTERNAL TABLE validate_arrow STORED AS arrow LOCATION 
'test_files/scratch/copy/table_arrow';
+
+query IT
+select * from validate_arrow;
+----
+1 Foo
+2 Bar
+
+
 # Error cases:
 
 # Copy from table with options
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index cdaf0bb643..e73778ad44 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -76,6 +76,45 @@ select * from dictionary_encoded_parquet_partitioned order 
by (a);
 a foo
 b bar
 
+statement ok
+CREATE EXTERNAL TABLE dictionary_encoded_arrow_partitioned(
+  a varchar,
+  b varchar,
+) 
+STORED AS arrow
+LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/'
+PARTITIONED BY (b)
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query TT
+insert into dictionary_encoded_arrow_partitioned 
+select * from dictionary_encoded_values
+----
+2
+
+statement ok
+CREATE EXTERNAL TABLE dictionary_encoded_arrow_test_readback(
+  a varchar,
+) 
+STORED AS arrow
+LOCATION 'test_files/scratch/insert_to_external/arrow_dict_partitioned/b=bar/'
+OPTIONS(
+create_local_path 'true',
+insert_mode 'append_new_files',
+);
+
+query T
+select * from dictionary_encoded_arrow_test_readback;
+----
+b
+
+# https://github.com/apache/arrow-datafusion/issues/7816
+query error DataFusion error: Arrow error: Schema error: project index 1 out 
of bounds, max field 1
+select * from dictionary_encoded_arrow_partitioned order by (a);
+
 
 # test_insert_into
 statement ok

Reply via email to