This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a971f1e7e3 Defer file creation to write (#8539)
a971f1e7e3 is described below
commit a971f1e7e379f5efe9fa3a5839c36ca2d797e201
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Dec 14 17:49:56 2023 +0000
Defer file creation to write (#8539)
* Defer file creation to write
* Format
* Remove INSERT_MODE
* Format
* Add ticket link
---
datafusion/core/src/datasource/listing/url.rs | 1 +
.../core/src/datasource/listing_table_factory.rs | 17 +++++------------
datafusion/core/src/datasource/stream.rs | 10 ++++++++--
datafusion/core/src/physical_planner.rs | 6 +-----
.../sqllogictest/test_files/insert_to_external.slt | 18 +++++++++---------
docs/source/user-guide/sql/write_options.md | 8 +++-----
6 files changed, 27 insertions(+), 33 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/core/src/datasource/listing/url.rs
index 979ed9e975..3ca7864f7f 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -116,6 +116,7 @@ impl ListingTableUrl {
/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so
ListingTableUrl::parse works
+ #[deprecated(note = "Use parse")]
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index 96436306c6..a9d0c3a009 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -148,19 +148,18 @@ impl TableProviderFactory for ListingTableFactory {
.unwrap_or(false)
};
- let create_local_path = statement_options
- .take_bool_option("create_local_path")?
- .unwrap_or(false);
let single_file = statement_options
.take_bool_option("single_file")?
.unwrap_or(false);
- // Backwards compatibility
+ // Backwards compatibility (#8547)
if let Some(s) = statement_options.take_str_option("insert_mode") {
if !s.eq_ignore_ascii_case("append_new_files") {
- return plan_err!("Unknown or unsupported insert mode {s}. Only
append_to_file supported");
+ return plan_err!("Unknown or unsupported insert mode {s}. Only
append_new_files supported");
}
}
+ statement_options.take_bool_option("create_local_path")?;
+
let file_type = file_format.file_type();
// Use remaining options and session state to build
FileTypeWriterOptions
@@ -199,13 +198,7 @@ impl TableProviderFactory for ListingTableFactory {
FileType::AVRO => file_type_writer_options,
};
- let table_path = match create_local_path {
- true => ListingTableUrl::parse_create_local_if_not_exists(
- &cmd.location,
- !single_file,
- ),
- false => ListingTableUrl::parse(&cmd.location),
- }?;
+ let table_path = ListingTableUrl::parse(&cmd.location)?;
let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
diff --git a/datafusion/core/src/datasource/stream.rs
b/datafusion/core/src/datasource/stream.rs
index e7512499eb..b9b45a6c74 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -179,7 +179,10 @@ impl StreamConfig {
match &self.encoding {
StreamEncoding::Csv => {
let header = self.header && !self.location.exists();
- let file =
OpenOptions::new().append(true).open(&self.location)?;
+ let file = OpenOptions::new()
+ .create(true)
+ .append(true)
+ .open(&self.location)?;
let writer = arrow::csv::WriterBuilder::new()
.with_header(header)
.build(file);
@@ -187,7 +190,10 @@ impl StreamConfig {
Ok(Box::new(writer))
}
StreamEncoding::Json => {
- let file =
OpenOptions::new().append(true).open(&self.location)?;
+ let file = OpenOptions::new()
+ .create(true)
+ .append(true)
+ .open(&self.location)?;
Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
}
}
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index ab38b3ec6d..93f0b31e52 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -571,11 +571,7 @@ impl DefaultPhysicalPlanner {
copy_options,
}) => {
let input_exec = self.create_initial_plan(input,
session_state).await?;
-
- // TODO: make this behavior configurable via options
(should copy to create path/file as needed?)
- // TODO: add additional configurable options for if
existing files should be overwritten or
- // appended to
- let parsed_url =
ListingTableUrl::parse_create_local_if_not_exists(output_url,
!*single_file_output)?;
+ let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();
let schema: Schema = (**input.schema()).clone().into();
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 85c2db7faa..cdaf0bb643 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -57,7 +57,7 @@ CREATE EXTERNAL TABLE dictionary_encoded_parquet_partitioned(
b varchar,
)
STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned'
+LOCATION 'test_files/scratch/insert_to_external/parquet_types_partitioned/'
PARTITIONED BY (b)
OPTIONS(
create_local_path 'true',
@@ -292,7 +292,7 @@ statement ok
CREATE EXTERNAL TABLE
directory_test(a bigint, b bigint)
STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0'
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q0/'
OPTIONS(
create_local_path 'true',
);
@@ -312,7 +312,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1'
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q1/'
OPTIONS (create_local_path 'true');
query TT
@@ -378,7 +378,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2'
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q2/'
OPTIONS (create_local_path 'true');
query TT
@@ -423,7 +423,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(c1 varchar NULL)
STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3'
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q3/'
OPTIONS (create_local_path 'true');
# verify that the sort order of the insert query is maintained into the
@@ -462,7 +462,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(id BIGINT, name varchar)
STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4'
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q4/'
OPTIONS (create_local_path 'true');
query IT
@@ -505,7 +505,7 @@ statement ok
CREATE EXTERNAL TABLE
table_without_values(field1 BIGINT NOT NULL, field2 BIGINT NULL)
STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5'
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q5/'
OPTIONS (create_local_path 'true');
query II
@@ -555,7 +555,7 @@ CREATE EXTERNAL TABLE test_column_defaults(
d text default lower('DEFAULT_TEXT'),
e timestamp default now()
) STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6'
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q6/'
OPTIONS (create_local_path 'true');
# fill in all column values
@@ -608,5 +608,5 @@ CREATE EXTERNAL TABLE test_column_defaults(
a int,
b int default a+1
) STORED AS parquet
-LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7'
+LOCATION 'test_files/scratch/insert_to_external/external_parquet_table_q7/'
OPTIONS (create_local_path 'true');
diff --git a/docs/source/user-guide/sql/write_options.md
b/docs/source/user-guide/sql/write_options.md
index 941484e84e..94adee9609 100644
--- a/docs/source/user-guide/sql/write_options.md
+++ b/docs/source/user-guide/sql/write_options.md
@@ -78,11 +78,9 @@ The following special options are specific to the `COPY`
command.
The following special options are specific to creating an external table.
-| Option | Description
| Default Value
|
-| ----------------- |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| ----------------------------------------------------------------------------
|
-| SINGLE_FILE | If true, indicates that this external table is backed by
a single file. INSERT INTO queries will append to this file.
| false
|
-| CREATE_LOCAL_PATH | If true, the folder or file backing this table will be
created on the local file system if it does not already exist when running
INSERT INTO queries.
| false
|
-| INSERT_MODE | Determines if INSERT INTO queries should append to
existing files or append new files to an existing directory. Valid values are
append_to_file, append_new_files, and error. Note that "error" will block
inserting data into this table. | CSV and JSON default to append_to_file.
Parquet defaults to append_new_files |
+| Option | Description
| Default Value |
+| ----------- |
---------------------------------------------------------------------------------------------------------------------
| ------------- |
+| SINGLE_FILE | If true, indicates that this external table is backed by a
single file. INSERT INTO queries will append to this file. | false |
### JSON Format Specific Options