This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f849968 Fix can not load parquet table form spark in datafusion-cli.
(#1665)
f849968 is described below
commit f849968057ddddccc9aa19915ef3ea56bf14d80d
Author: Yang <[email protected]>
AuthorDate: Mon Jan 31 22:48:37 2022 +0800
Fix can not load parquet table form spark in datafusion-cli. (#1665)
* fix can not load parquet table form spark
* add Invalid file in log.
* fix fmt
---
benchmarks/src/bin/tpch.rs | 6 +++--
.../examples/parquet_sql_multiple_files.rs | 6 +++--
datafusion/src/datasource/file_format/avro.rs | 2 ++
datafusion/src/datasource/file_format/csv.rs | 2 ++
datafusion/src/datasource/file_format/json.rs | 2 ++
datafusion/src/datasource/listing/table.rs | 6 +++--
datafusion/src/execution/context.rs | 31 ++++++++++++----------
datafusion/src/execution/options.rs | 3 ++-
.../src/physical_plan/file_format/parquet.rs | 9 ++++---
9 files changed, 43 insertions(+), 24 deletions(-)
diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 0b9fba5..59bb551 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -54,6 +54,8 @@ use datafusion::{
},
};
+use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
+use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use structopt::StructOpt;
#[cfg(feature = "snmalloc")]
@@ -652,13 +654,13 @@ fn get_table(
.with_delimiter(b',')
.with_has_header(true);
- (Arc::new(format), path, ".csv")
+ (Arc::new(format), path, DEFAULT_CSV_EXTENSION)
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format =
ParquetFormat::default().with_enable_pruning(true);
- (Arc::new(format), path, ".parquet")
+ (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
other => {
unimplemented!("Invalid file format '{}'", other);
diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs
b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 2e95427..7485bc7 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::parquet::{
+ ParquetFormat, DEFAULT_PARQUET_EXTENSION,
+};
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
@@ -33,7 +35,7 @@ async fn main() -> Result<()> {
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions {
- file_extension: ".parquet".to_owned(),
+ file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
format: Arc::new(file_format),
table_partition_cols: vec![],
collect_stat: true,
diff --git a/datafusion/src/datasource/file_format/avro.rs
b/datafusion/src/datasource/file_format/avro.rs
index 08eb343..fa02d1a 100644
--- a/datafusion/src/datasource/file_format/avro.rs
+++ b/datafusion/src/datasource/file_format/avro.rs
@@ -34,6 +34,8 @@ use crate::physical_plan::file_format::{AvroExec,
FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
+/// The default file extension of avro files
+pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
/// Avro `FileFormat` implementation.
#[derive(Default, Debug)]
pub struct AvroFormat;
diff --git a/datafusion/src/datasource/file_format/csv.rs
b/datafusion/src/datasource/file_format/csv.rs
index f0a70d9..6aa0d21 100644
--- a/datafusion/src/datasource/file_format/csv.rs
+++ b/datafusion/src/datasource/file_format/csv.rs
@@ -33,6 +33,8 @@ use crate::physical_plan::file_format::{CsvExec,
FileScanConfig};
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
+/// The default file extension of csv files
+pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
/// Character Separated Value `FileFormat` implementation.
#[derive(Debug)]
pub struct CsvFormat {
diff --git a/datafusion/src/datasource/file_format/json.rs
b/datafusion/src/datasource/file_format/json.rs
index d7a278d..bdd5ef8 100644
--- a/datafusion/src/datasource/file_format/json.rs
+++ b/datafusion/src/datasource/file_format/json.rs
@@ -37,6 +37,8 @@ use crate::physical_plan::file_format::NdJsonExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;
+/// The default file extension of json files
+pub const DEFAULT_JSON_EXTENSION: &str = ".json";
/// New line delimited JSON `FileFormat` implementation.
#[derive(Debug, Default)]
pub struct JsonFormat {
diff --git a/datafusion/src/datasource/listing/table.rs
b/datafusion/src/datasource/listing/table.rs
index 2f8f70f..1501b8b 100644
--- a/datafusion/src/datasource/listing/table.rs
+++ b/datafusion/src/datasource/listing/table.rs
@@ -266,6 +266,8 @@ impl ListingTable {
mod tests {
use arrow::datatypes::DataType;
+ use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
+ use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
use crate::{
datasource::{
file_format::{avro::AvroFormat, parquet::ParquetFormat},
@@ -318,7 +320,7 @@ mod tests {
let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro",
100)]);
let opt = ListingOptions {
- file_extension: ".avro".to_owned(),
+ file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
format: Arc::new(AvroFormat {}),
table_partition_cols: vec![String::from("p1")],
target_partitions: 4,
@@ -419,7 +421,7 @@ mod tests {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, name);
let opt = ListingOptions {
- file_extension: "parquet".to_owned(),
+ file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
format: Arc::new(ParquetFormat::default()),
table_partition_cols: vec![],
target_partitions: 2,
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index ca86d0f..023d3a0 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -24,8 +24,8 @@ use crate::{
datasource::listing::{ListingOptions, ListingTable},
datasource::{
file_format::{
- avro::AvroFormat,
- csv::CsvFormat,
+ avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
+ csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
FileFormat,
},
@@ -218,17 +218,20 @@ impl ExecutionContext {
ref file_type,
ref has_header,
}) => {
- let file_format = match file_type {
- FileType::CSV => {
-
Ok(Arc::new(CsvFormat::default().with_has_header(*has_header))
- as Arc<dyn FileFormat>)
- }
- FileType::Parquet => {
- Ok(Arc::new(ParquetFormat::default()) as Arc<dyn
FileFormat>)
- }
- FileType::Avro => {
- Ok(Arc::new(AvroFormat::default()) as Arc<dyn
FileFormat>)
- }
+ let (file_format, file_extension) = match file_type {
+ FileType::CSV => Ok((
+
Arc::new(CsvFormat::default().with_has_header(*has_header))
+ as Arc<dyn FileFormat>,
+ DEFAULT_CSV_EXTENSION,
+ )),
+ FileType::Parquet => Ok((
+ Arc::new(ParquetFormat::default()) as Arc<dyn
FileFormat>,
+ DEFAULT_PARQUET_EXTENSION,
+ )),
+ FileType::Avro => Ok((
+ Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
+ DEFAULT_AVRO_EXTENSION,
+ )),
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
@@ -238,7 +241,7 @@ impl ExecutionContext {
let options = ListingOptions {
format: file_format,
collect_stat: false,
- file_extension: String::new(),
+ file_extension: file_extension.to_owned(),
target_partitions: self
.state
.lock()
diff --git a/datafusion/src/execution/options.rs
b/datafusion/src/execution/options.rs
index 219e2fd..79b0753 100644
--- a/datafusion/src/execution/options.rs
+++ b/datafusion/src/execution/options.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
use arrow::datatypes::{Schema, SchemaRef};
+use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::{
file_format::{avro::AvroFormat, csv::CsvFormat},
listing::ListingOptions,
@@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
Self {
schema: None,
schema_infer_max_records: 1000,
- file_extension: ".json",
+ file_extension: DEFAULT_JSON_EXTENSION,
}
}
}
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs
b/datafusion/src/physical_plan/file_format/parquet.rs
index d240fe2..905bb1e 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -221,7 +221,7 @@ impl ExecutionPlan for ParquetExec {
object_store.as_ref(),
file_schema_ref,
partition_index,
- partition,
+ &partition,
metrics,
&projection,
&pruning_predicate,
@@ -230,7 +230,10 @@ impl ExecutionPlan for ParquetExec {
limit,
partition_col_proj,
) {
- println!("Parquet reader thread terminated due to error:
{:?}", e);
+ println!(
+ "Parquet reader thread terminated due to error: {:?} for
files: {:?}",
+ e, partition
+ );
}
});
@@ -445,7 +448,7 @@ fn read_partition(
object_store: &dyn ObjectStore,
file_schema: SchemaRef,
partition_index: usize,
- partition: Vec<PartitionedFile>,
+ partition: &[PartitionedFile],
metrics: ExecutionPlanMetricsSet,
projection: &[usize],
pruning_predicate: &Option<PruningPredicate>,