This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new f849968  Fix can not load parquet table form spark in datafusion-cli. 
(#1665)
f849968 is described below

commit f849968057ddddccc9aa19915ef3ea56bf14d80d
Author: Yang <[email protected]>
AuthorDate: Mon Jan 31 22:48:37 2022 +0800

    Fix can not load parquet table form spark in datafusion-cli. (#1665)
    
    * fix can not load parquet table form spark
    
    * add Invalid file in log.
    
    * fix fmt
---
 benchmarks/src/bin/tpch.rs                         |  6 +++--
 .../examples/parquet_sql_multiple_files.rs         |  6 +++--
 datafusion/src/datasource/file_format/avro.rs      |  2 ++
 datafusion/src/datasource/file_format/csv.rs       |  2 ++
 datafusion/src/datasource/file_format/json.rs      |  2 ++
 datafusion/src/datasource/listing/table.rs         |  6 +++--
 datafusion/src/execution/context.rs                | 31 ++++++++++++----------
 datafusion/src/execution/options.rs                |  3 ++-
 .../src/physical_plan/file_format/parquet.rs       |  9 ++++---
 9 files changed, 43 insertions(+), 24 deletions(-)

diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs
index 0b9fba5..59bb551 100644
--- a/benchmarks/src/bin/tpch.rs
+++ b/benchmarks/src/bin/tpch.rs
@@ -54,6 +54,8 @@ use datafusion::{
     },
 };
 
+use datafusion::datasource::file_format::csv::DEFAULT_CSV_EXTENSION;
+use datafusion::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
 use structopt::StructOpt;
 
 #[cfg(feature = "snmalloc")]
@@ -652,13 +654,13 @@ fn get_table(
                     .with_delimiter(b',')
                     .with_has_header(true);
 
-                (Arc::new(format), path, ".csv")
+                (Arc::new(format), path, DEFAULT_CSV_EXTENSION)
             }
             "parquet" => {
                 let path = format!("{}/{}", path, table);
                 let format = 
ParquetFormat::default().with_enable_pruning(true);
 
-                (Arc::new(format), path, ".parquet")
+                (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
             }
             other => {
                 unimplemented!("Invalid file format '{}'", other);
diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs 
b/datafusion-examples/examples/parquet_sql_multiple_files.rs
index 2e95427..7485bc7 100644
--- a/datafusion-examples/examples/parquet_sql_multiple_files.rs
+++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs
@@ -15,7 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion::datasource::file_format::parquet::ParquetFormat;
+use datafusion::datasource::file_format::parquet::{
+    ParquetFormat, DEFAULT_PARQUET_EXTENSION,
+};
 use datafusion::datasource::listing::ListingOptions;
 use datafusion::error::Result;
 use datafusion::prelude::*;
@@ -33,7 +35,7 @@ async fn main() -> Result<()> {
     // Configure listing options
     let file_format = ParquetFormat::default().with_enable_pruning(true);
     let listing_options = ListingOptions {
-        file_extension: ".parquet".to_owned(),
+        file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
         format: Arc::new(file_format),
         table_partition_cols: vec![],
         collect_stat: true,
diff --git a/datafusion/src/datasource/file_format/avro.rs 
b/datafusion/src/datasource/file_format/avro.rs
index 08eb343..fa02d1a 100644
--- a/datafusion/src/datasource/file_format/avro.rs
+++ b/datafusion/src/datasource/file_format/avro.rs
@@ -34,6 +34,8 @@ use crate::physical_plan::file_format::{AvroExec, 
FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
+/// The default file extension of avro files
+pub const DEFAULT_AVRO_EXTENSION: &str = ".avro";
 /// Avro `FileFormat` implementation.
 #[derive(Default, Debug)]
 pub struct AvroFormat;
diff --git a/datafusion/src/datasource/file_format/csv.rs 
b/datafusion/src/datasource/file_format/csv.rs
index f0a70d9..6aa0d21 100644
--- a/datafusion/src/datasource/file_format/csv.rs
+++ b/datafusion/src/datasource/file_format/csv.rs
@@ -33,6 +33,8 @@ use crate::physical_plan::file_format::{CsvExec, 
FileScanConfig};
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
+/// The default file extension of csv files
+pub const DEFAULT_CSV_EXTENSION: &str = ".csv";
 /// Character Separated Value `FileFormat` implementation.
 #[derive(Debug)]
 pub struct CsvFormat {
diff --git a/datafusion/src/datasource/file_format/json.rs 
b/datafusion/src/datasource/file_format/json.rs
index d7a278d..bdd5ef8 100644
--- a/datafusion/src/datasource/file_format/json.rs
+++ b/datafusion/src/datasource/file_format/json.rs
@@ -37,6 +37,8 @@ use crate::physical_plan::file_format::NdJsonExec;
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::Statistics;
 
+/// The default file extension of json files
+pub const DEFAULT_JSON_EXTENSION: &str = ".json";
 /// New line delimited JSON `FileFormat` implementation.
 #[derive(Debug, Default)]
 pub struct JsonFormat {
diff --git a/datafusion/src/datasource/listing/table.rs 
b/datafusion/src/datasource/listing/table.rs
index 2f8f70f..1501b8b 100644
--- a/datafusion/src/datasource/listing/table.rs
+++ b/datafusion/src/datasource/listing/table.rs
@@ -266,6 +266,8 @@ impl ListingTable {
 mod tests {
     use arrow::datatypes::DataType;
 
+    use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION;
+    use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
     use crate::{
         datasource::{
             file_format::{avro::AvroFormat, parquet::ParquetFormat},
@@ -318,7 +320,7 @@ mod tests {
         let store = TestObjectStore::new_arc(&[("table/p1=v1/file.avro", 
100)]);
 
         let opt = ListingOptions {
-            file_extension: ".avro".to_owned(),
+            file_extension: DEFAULT_AVRO_EXTENSION.to_owned(),
             format: Arc::new(AvroFormat {}),
             table_partition_cols: vec![String::from("p1")],
             target_partitions: 4,
@@ -419,7 +421,7 @@ mod tests {
         let testdata = crate::test_util::parquet_test_data();
         let filename = format!("{}/{}", testdata, name);
         let opt = ListingOptions {
-            file_extension: "parquet".to_owned(),
+            file_extension: DEFAULT_PARQUET_EXTENSION.to_owned(),
             format: Arc::new(ParquetFormat::default()),
             table_partition_cols: vec![],
             target_partitions: 2,
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index ca86d0f..023d3a0 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -24,8 +24,8 @@ use crate::{
     datasource::listing::{ListingOptions, ListingTable},
     datasource::{
         file_format::{
-            avro::AvroFormat,
-            csv::CsvFormat,
+            avro::{AvroFormat, DEFAULT_AVRO_EXTENSION},
+            csv::{CsvFormat, DEFAULT_CSV_EXTENSION},
             parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
             FileFormat,
         },
@@ -218,17 +218,20 @@ impl ExecutionContext {
                 ref file_type,
                 ref has_header,
             }) => {
-                let file_format = match file_type {
-                    FileType::CSV => {
-                        
Ok(Arc::new(CsvFormat::default().with_has_header(*has_header))
-                            as Arc<dyn FileFormat>)
-                    }
-                    FileType::Parquet => {
-                        Ok(Arc::new(ParquetFormat::default()) as Arc<dyn 
FileFormat>)
-                    }
-                    FileType::Avro => {
-                        Ok(Arc::new(AvroFormat::default()) as Arc<dyn 
FileFormat>)
-                    }
+                let (file_format, file_extension) = match file_type {
+                    FileType::CSV => Ok((
+                        
Arc::new(CsvFormat::default().with_has_header(*has_header))
+                            as Arc<dyn FileFormat>,
+                        DEFAULT_CSV_EXTENSION,
+                    )),
+                    FileType::Parquet => Ok((
+                        Arc::new(ParquetFormat::default()) as Arc<dyn 
FileFormat>,
+                        DEFAULT_PARQUET_EXTENSION,
+                    )),
+                    FileType::Avro => Ok((
+                        Arc::new(AvroFormat::default()) as Arc<dyn FileFormat>,
+                        DEFAULT_AVRO_EXTENSION,
+                    )),
                     _ => Err(DataFusionError::NotImplemented(format!(
                         "Unsupported file type {:?}.",
                         file_type
@@ -238,7 +241,7 @@ impl ExecutionContext {
                 let options = ListingOptions {
                     format: file_format,
                     collect_stat: false,
-                    file_extension: String::new(),
+                    file_extension: file_extension.to_owned(),
                     target_partitions: self
                         .state
                         .lock()
diff --git a/datafusion/src/execution/options.rs 
b/datafusion/src/execution/options.rs
index 219e2fd..79b0753 100644
--- a/datafusion/src/execution/options.rs
+++ b/datafusion/src/execution/options.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
 
 use arrow::datatypes::{Schema, SchemaRef};
 
+use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
 use crate::datasource::{
     file_format::{avro::AvroFormat, csv::CsvFormat},
     listing::ListingOptions,
@@ -173,7 +174,7 @@ impl<'a> Default for NdJsonReadOptions<'a> {
         Self {
             schema: None,
             schema_infer_max_records: 1000,
-            file_extension: ".json",
+            file_extension: DEFAULT_JSON_EXTENSION,
         }
     }
 }
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs 
b/datafusion/src/physical_plan/file_format/parquet.rs
index d240fe2..905bb1e 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -221,7 +221,7 @@ impl ExecutionPlan for ParquetExec {
                 object_store.as_ref(),
                 file_schema_ref,
                 partition_index,
-                partition,
+                &partition,
                 metrics,
                 &projection,
                 &pruning_predicate,
@@ -230,7 +230,10 @@ impl ExecutionPlan for ParquetExec {
                 limit,
                 partition_col_proj,
             ) {
-                println!("Parquet reader thread terminated due to error: 
{:?}", e);
+                println!(
+                    "Parquet reader thread terminated due to error: {:?} for 
files: {:?}",
+                    e, partition
+                );
             }
         });
 
@@ -445,7 +448,7 @@ fn read_partition(
     object_store: &dyn ObjectStore,
     file_schema: SchemaRef,
     partition_index: usize,
-    partition: Vec<PartitionedFile>,
+    partition: &[PartitionedFile],
     metrics: ExecutionPlanMetricsSet,
     projection: &[usize],
     pruning_predicate: &Option<PruningPredicate>,

Reply via email to