TheBuilderJR opened a new issue, #7176:
URL: https://github.com/apache/arrow-rs/issues/7176
**Describe the bug**
If I have a struct and I add a nullable field to it, we should be able to
cast rows from before the migration to the schema afterwards.
**To Reproduce**
```
use std::fs;
use std::sync::Arc;
use datafusion::prelude::*;
use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::arrow::array::{Array, StringArray, StructArray,
TimestampMillisecondArray, Float64Array};
use datafusion::datasource::listing::{ListingOptions, ListingTable,
ListingTableConfig, ListingTableUrl};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::dataframe::DataFrameWriteOptions;
#[tokio::test]
async fn test_datafusion_schema_evolution_with_compaction() -> Result<(),
Box<dyn std::error::Error>> {
let ctx = SessionContext::new();
let schema1 = Arc::new(Schema::new(vec![
Field::new("component", DataType::Utf8, true),
Field::new("message", DataType::Utf8, true),
Field::new("stack", DataType::Utf8, true),
Field::new("timestamp", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"additionalInfo",
DataType::Struct(vec![
Field::new("location", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
].into()),
true,
),
]));
let batch1 = RecordBatch::try_new(
schema1.clone(),
vec![
Arc::new(StringArray::from(vec![Some("component1")])),
Arc::new(StringArray::from(vec![Some("message1")])),
Arc::new(StringArray::from(vec![Some("stack_trace")])),
Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("location", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("USA")])) as
Arc<dyn Array>,
),
(
Arc::new(Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
),
])),
],
)?;
let path1 = "test_data1.parquet";
let _ = fs::remove_file(path1);
let df1 = ctx.read_batch(batch1)?;
df1.write_parquet(
path1,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let schema2 = Arc::new(Schema::new(vec![
Field::new("component", DataType::Utf8, true),
Field::new("message", DataType::Utf8, true),
Field::new("stack", DataType::Utf8, true),
Field::new("timestamp", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"additionalInfo",
DataType::Struct(vec![
Field::new("location", DataType::Utf8, true),
Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(
"reason",
DataType::Struct(vec![
Field::new("_level", DataType::Float64, true),
Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
),
].into()),
true,
),
].into()),
true,
),
]));
let batch2 = RecordBatch::try_new(
schema2.clone(),
vec![
Arc::new(StringArray::from(vec![Some("component1")])),
Arc::new(StringArray::from(vec![Some("message1")])),
Arc::new(StringArray::from(vec![Some("stack_trace")])),
Arc::new(StringArray::from(vec![Some("2025-02-18T00:00:00Z")])),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("location", DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("USA")])) as
Arc<dyn Array>,
),
(
Arc::new(Field::new(
"timestamp_utc",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)),
Arc::new(TimestampMillisecondArray::from(vec![Some(1640995200000)])),
),
(
Arc::new(Field::new(
"reason",
DataType::Struct(vec![
Field::new("_level", DataType::Float64, true),
Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
),
].into()),
true,
)),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("_level", DataType::Float64,
true)),
Arc::new(Float64Array::from(vec![Some(1.5)])) as
Arc<dyn Array>,
),
(
Arc::new(Field::new(
"details",
DataType::Struct(vec![
Field::new("rurl", DataType::Utf8, true),
Field::new("s", DataType::Float64, true),
Field::new("t", DataType::Utf8, true),
].into()),
true,
)),
Arc::new(StructArray::from(vec![
(
Arc::new(Field::new("rurl",
DataType::Utf8, true)),
Arc::new(StringArray::from(vec![Some("https://example.com")])) as Arc<dyn
Array>,
),
(
Arc::new(Field::new("s",
DataType::Float64, true)),
Arc::new(Float64Array::from(vec![Some(3.14)])) as Arc<dyn Array>,
),
(
Arc::new(Field::new("t", DataType::Utf8,
true)),
Arc::new(StringArray::from(vec![Some("data")])) as Arc<dyn Array>,
),
])),
),
])),
),
])),
],
)?;
let path2 = "test_data2.parquet";
let _ = fs::remove_file(path2);
let df2 = ctx.read_batch(batch2)?;
df2.write_parquet(
path2,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let paths_str = vec![path1.to_string(), path2.to_string()];
let config = ListingTableConfig::new_with_multi_paths(
paths_str
.into_iter()
.map(|p| ListingTableUrl::parse(&p))
.collect::<Result<Vec<_>, _>>()?
)
.with_schema(schema2.as_ref().clone().into())
.infer(&ctx.state()).await?;
let config = ListingTableConfig {
options: Some(ListingOptions {
file_sort_order: vec![vec![
col("timestamp_utc").sort(true, true),
]],
..config.options.unwrap_or_else(||
ListingOptions::new(Arc::new(ParquetFormat::default())))
}),
..config
};
let listing_table = ListingTable::try_new(config)?;
ctx.register_table("events", Arc::new(listing_table))?;
let df = ctx.sql("SELECT * FROM events ORDER BY timestamp_utc").await?;
let results = df.clone().collect().await?;
assert_eq!(results[0].num_rows(), 2);
let compacted_path = "test_data_compacted.parquet";
let _ = fs::remove_file(compacted_path);
df.write_parquet(
compacted_path,
DataFrameWriteOptions::default()
.with_single_file_output(true)
.with_sort_by(vec![col("timestamp_utc").sort(true, true)]),
None
).await?;
let new_ctx = SessionContext::new();
let config =
ListingTableConfig::new_with_multi_paths(vec![ListingTableUrl::parse(compacted_path)?])
.with_schema(schema2.as_ref().clone().into())
.infer(&new_ctx.state()).await?;
let listing_table = ListingTable::try_new(config)?;
new_ctx.register_table("events", Arc::new(listing_table))?;
let df = new_ctx.sql("SELECT * FROM events ORDER BY
timestamp_utc").await?;
let compacted_results = df.collect().await?;
assert_eq!(compacted_results[0].num_rows(), 2);
assert_eq!(results, compacted_results);
let _ = fs::remove_file(path1);
let _ = fs::remove_file(path2);
let _ = fs::remove_file(compacted_path);
Ok(())
}
```
**Expected behavior**
The test should pastt
**Additional context**
Originally started in the datafusion repo but @alamb suggested perhaps the
fix should be on the arrow side:
https://github.com/apache/datafusion/issues/14757
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]