devinjdangelo commented on PR #7801:
URL:
https://github.com/apache/arrow-datafusion/pull/7801#issuecomment-1771867480
Thank you @alamb for the review! No worries, the amount of high quality
reviews you complete is impressive :bow:.
I spent some time digging into the failure case you found, and it appears to
be specific to `datafusion-cli` interpreting the schema of the parquet file
incorrectly in the `SELECT` query. You can see in the describe query that the
second column is actually a dictionary:
```
❯ describe 'input.parquet';
+-------------+-------------------------+-------------+
| column_name | data_type | is_nullable |
+-------------+-------------------------+-------------+
| partition | Utf8 | NO |
| trace_id | Dictionary(Int32, Utf8) | YES |
+-------------+-------------------------+-------------+
2 rows in set. Query took 0.001 seconds.
```
But when you do a select it is incorrectly specifying the parquet schema as
both Utf8 columns. I added some debug print statements to see the following
(note that trace_id is type Utf8):
```
❯ insert into test select partition, trace_id from 'input.parquet';
Table Schema: Field { name: "trace_id", data_type: Utf8, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition",
data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata:
{} }
Input Schema: Field { name: "trace_id", data_type: Utf8, nullable: false,
dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "partition",
data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }
Object Store error: Generic LocalFileSystem error: Unable to open file
/tmp/partition=00000000000000002d0d2b9bbb99d0e3/OdZUZxMhsRWtSdIi.parquet#1: Too
many open files (os error 24)
```
My theory is that the schema being wrong is resulting in the arrow function
`RecordBatch::column_by_name` returning the wrong values corresponding to a
different column! Since the demux code uses the column name, it should never
end up partitioning by the wrong column unless `RecordBatch::column_by_name`
somehow returns the wrong values.
I tried to replicate the error in rust rather than via the cli, but I see
different and correct behavior:
```rust
use arrow_schema::DataType;
use datafusion::{
dataframe::DataFrameWriteOptions,
prelude::*,
};
use datafusion_common::DataFusionError;
use datafusion_expr::ExprSchemable;
use object_store::local::LocalFileSystem;
use std::sync::Arc;
use url::Url;
const FILENAME: &str =
"/home/dev/arrow-datafusion/input.parquet";
#[tokio::main]
async fn main() -> Result<(), DataFusionError> {
let _ctx = SessionContext::new();
let local = Arc::new(LocalFileSystem::new());
let local_url = Url::parse("file://local").unwrap();
_ctx.runtime_env().register_object_store(&local_url, local);
let _read_options = ParquetReadOptions::default();
let _df = _ctx
.read_parquet(FILENAME, _read_options.clone())
.await?;
_df.clone().show_limit(10).await?;
println!("{}", _df.clone().schema());
_ctx.sql("create external table
test(partition varchar, trace_id varchar)
stored as parquet
partitioned by (partition)
location './temptest/'
options (create_local_path 'true');").await?.collect().await?;
// Expecting an error here since schemas do not match
_df.clone()
.select(vec![col("trace_id"), col("partition")])?
.write_table("test", DataFrameWriteOptions::new()).await
.expect_err("Inserting query must have the same schema with the
table.");
// Cast the column to the correct type and it works as expected!
_df.clone()
.select(vec![col("trace_id").cast_to(&DataType::Utf8,
_df.schema())?.alias("trace_id"), col("partition")])?
.write_table("test", DataFrameWriteOptions::new()).await?;
_ctx.sql("select count(1) from test").await?.show().await?;
Ok(())
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]