devinjdangelo commented on PR #7801:
URL: 
https://github.com/apache/arrow-datafusion/pull/7801#issuecomment-1771867480

   Thank you @alamb for the review! No worries, the amount of high quality 
reviews you complete is impressive :bow:.
   
   I spent some time digging into the failure case you found, and it appears to 
be specific to `datafusion-cli` interpreting the schema of the parquet file 
incorrectly in the `SELECT` query. You can see in the describe query that the 
second column is actually a dictionary:
   
   ```
     ❯ describe 'input.parquet';
   +-------------+-------------------------+-------------+
   | column_name | data_type               | is_nullable |
   +-------------+-------------------------+-------------+
   | partition   | Utf8                    | NO          |
   | trace_id    | Dictionary(Int32, Utf8) | YES         |
   +-------------+-------------------------+-------------+
   2 rows in set. Query took 0.001 seconds.
   ```
   
   But when you do a select it is incorrectly specifying the parquet schema as 
both Utf8 columns. I added some debug print statements to see the following 
(note that trace_id is type Utf8):
   
   ```
     ❯ insert into test select partition, trace_id from 'input.parquet';
   Table Schema: Field { name: "trace_id", data_type: Utf8, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", 
data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{} }
   Input Schema: Field { name: "trace_id", data_type: Utf8, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition", 
data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }
   Object Store error: Generic LocalFileSystem error: Unable to open file 
/tmp/partition=00000000000000002d0d2b9bbb99d0e3/OdZUZxMhsRWtSdIi.parquet#1: Too 
many open files (os error 24)
   ```
   
   My theory is that the schema being wrong is resulting in the arrow function 
`RecordBatch::column_by_name` returning the wrong values corresponding to a 
different column! Since the demux code uses the column name, it should never 
end up partitioning by the wrong column unless `RecordBatch::column_by_name` 
somehow returns the wrong values.
   
   I tried to replicate the error in rust rather than via the cli, but I see 
different and correct behavior:
   
   ```rust
   use arrow_schema::DataType;
   use datafusion::{
       dataframe::DataFrameWriteOptions,
       prelude::*,
   };
   use datafusion_common::DataFusionError;
   use datafusion_expr::ExprSchemable;
   use object_store::local::LocalFileSystem;
   use std::sync::Arc;
   use url::Url;
   
   const FILENAME: &str =
       "/home/dev/arrow-datafusion/input.parquet";
   
   #[tokio::main]
   async fn main() -> Result<(), DataFusionError> {
       let _ctx = SessionContext::new();
       let local = Arc::new(LocalFileSystem::new());
       let local_url = Url::parse("file://local").unwrap();
       _ctx.runtime_env().register_object_store(&local_url, local);
   
       let _read_options = ParquetReadOptions::default();
   
       let _df = _ctx
           .read_parquet(FILENAME, _read_options.clone())
           .await?;
   
       _df.clone().show_limit(10).await?;
   
       println!("{}", _df.clone().schema());
   
       _ctx.sql("create external table 
       test(partition varchar, trace_id varchar)
       stored as parquet 
       partitioned by (partition) 
       location './temptest/'  
       options (create_local_path 'true');").await?.collect().await?;
   
       // Expecting an error here since schemas do not match
       _df.clone()
           .select(vec![col("trace_id"), col("partition")])?
           .write_table("test", DataFrameWriteOptions::new()).await
           .expect_err("Inserting query must have the same schema with the 
table.");
   
       // Cast the column to the correct type and it works as expected!
       _df.clone()
           .select(vec![col("trace_id").cast_to(&DataType::Utf8, 
_df.schema())?.alias("trace_id"), col("partition")])?
           .write_table("test", DataFrameWriteOptions::new()).await?;
   
       _ctx.sql("select count(1) from test").await?.show().await?;
   
       Ok(())
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to