shivbhatia10 opened a new issue, #18517:
URL: https://github.com/apache/datafusion/issues/18517
### Describe the bug
Hi folks, we have a fairly complex pipeline which ends in a `drop
duplicates` transform, which is implemented as an aggregation with a group by
on the column we want to drop duplicates for. We know that if we don't have
this `drop duplicates` at the end, the primary key column is definitely
producing unique values for each row, so we expect the transform to do nothing.
However in reality it's going from 8.7 million rows down to 28 thousand, which
is a massive loss of data.
After investigating this for a few weeks we believe it's because partitions
are being handled incorrectly at the physical execution stage. We tried setting
`target_partitions` to 1 and the pipeline started behaving correctly. I wrote a
script to try to reproduce the logic of the pipeline and read through the
physical plans for different values of `target_partitions` - what I noticed is
that when it's set to 1, there are no instances of `UnKnownColumn`, but for any
other value I see 8 instances of it.
I believe this might be causing the issue with the `drop duplicates`
pipeline. Unfortunately I was not able to generate synthetic data which
reproduced the issue with dropping rows, and I can't share the original data
where we observe this because it's customer data.
Please let me know if there's any other context I can provide, we'd
appreciate any insight you folks can provide here!
### To Reproduce
This is the script I used to find this behaviour:
```
use datafusion::arrow::array::StringArray;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
use datafusion::functions_aggregate::first_last::FirstValue;
use datafusion::logical_expr::AggregateUDF;
use datafusion::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<()> {
let config = SessionConfig::new().with_target_partitions(16);
let ctx = SessionContext::new_with_config(config);
let first_value_udf = AggregateUDF::from(FirstValue::new());
ctx.register_udaf(first_value_udf);
let num_rows = 5_000_000;
let left_agg_source = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("join_key", DataType::Utf8, true),
Field::new("url_field", DataType::Utf8, true),
Field::new("data_field", DataType::Utf8, true),
])),
vec![
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("jk_{}", i % 100000)),
)),
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("proto://id_{}", i)),
)),
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("data_{}", i)),
)),
],
)?;
let right_agg_source = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("join_key", DataType::Utf8, true),
Field::new("key1", DataType::Utf8, true),
Field::new("attr1", DataType::Utf8, true),
])),
vec![
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("jk_{}", i % 100000)),
)),
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("k1_{}", i % 50000)),
)),
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("attr_{}", i % 100)),
)),
],
)?;
let left_union_source = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("key2", DataType::Utf8, true),
Field::new("key1", DataType::Utf8, true),
Field::new("value", DataType::Utf8, true),
])),
vec![
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("id_{}", i)),
)),
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("k1_{}", i % 50000)),
)),
Arc::new(StringArray::from_iter_values(
(0..num_rows).map(|i| format!("val_{}", i)),
)),
],
)?;
let right_union_source = RecordBatch::try_new(
Arc::new(Schema::new(vec![
Field::new("key2", DataType::Utf8, true),
Field::new("key1", DataType::Utf8, true),
Field::new("value", DataType::Utf8, true),
])),
vec![
Arc::new(StringArray::from_iter_values(
(0..num_rows / 2).map(|i| format!("id_{}", i + num_rows)),
)),
Arc::new(StringArray::from_iter_values(
(0..num_rows / 2).map(|i| format!("k1_{}", i % 50000)),
)),
Arc::new(StringArray::from_iter_values(
(0..num_rows / 2).map(|i| format!("val_{}", i + num_rows)),
)),
],
)?;
ctx.register_batch("left_agg_source", left_agg_source)?;
ctx.register_batch("right_agg_source", right_agg_source)?;
ctx.register_batch("left_union_source", left_union_source)?;
ctx.register_batch("right_union_source", right_union_source)?;
let query = "
WITH
agg1 AS (
SELECT
SUBSTRING(url_field FROM POSITION('://' IN url_field) + 3)
as extracted_id,
join_key,
first_value(data_field) as agg_data
FROM left_agg_source
WHERE url_field LIKE 'proto://%'
GROUP BY SUBSTRING(url_field FROM POSITION('://' IN url_field) +
3), join_key, url_field
),
agg2 AS (
SELECT join_key, key1, attr1
FROM right_agg_source
GROUP BY join_key, key1, attr1
),
joined_aggs AS (
SELECT
a1.extracted_id,
a2.attr1,
a2.key1,
first_value(a1.agg_data) as final_data
FROM agg1 a1
INNER JOIN agg2 a2 ON a1.join_key = a2.join_key
GROUP BY a1.extracted_id, a2.attr1, a2.key1
),
unioned AS (
SELECT key2, key1, value FROM left_union_source
UNION ALL
SELECT key2, key1, value FROM right_union_source
),
final_joined AS (
SELECT
u.key2,
u.key1,
u.value,
j.attr1,
j.final_data
FROM unioned u
LEFT JOIN joined_aggs j
ON u.key1 = j.key1
AND u.key2 = j.extracted_id
)
SELECT
CONCAT(key1, '_', COALESCE(attr1, 'NULL'), '_', key2) as
composite_key,
first_value(key2) as key2,
first_value(value) as value,
first_value(final_data) as final_data,
first_value(attr1) as attr1
FROM final_joined
GROUP BY CONCAT(key1, '_', COALESCE(attr1, 'NULL'), '_', key2)
";
println!("Running query with {} rows...", num_rows);
let df = ctx.sql(query).await?;
println!("\nChecking for UnknownColumn in physical plan...");
let plan = df.clone().create_physical_plan().await?;
let plan_str = format!("{:#?}", plan);
println!("{}", plan_str);
let unknown_col_count = plan_str.matches("UnKnownColumn").count();
println!("Found {} instances of UnKnownColumn", unknown_col_count);
Ok(())
}
```
### Expected behavior
For target_partitions set to 16 (or any value greater than 1 that I've tried
so far) I see this output:
`Found 8 instances of UnKnownColumn`
but specifically for target_partitions set to 1 I see
`Found 0 instances of UnKnownColumn`
I believe this is the source of the issue we're having with rows being
dropped. I apologize for not being able to provide a repro of that behavior,
but I believe this UnKnownColumn thing is an issue in its own right.
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]