timsaucer opened a new issue, #16740:
URL: https://github.com/apache/datafusion/issues/16740
### Is your feature request related to a problem or challenge?
I have a case where I have two table providers. They produce partitioned
data with a partition hash. I want to be able to do efficient joins on these
partition hashes. When the number of partitions of one table does not match the
other, it generates an error during execution recommending the user add a
`RepartitionExec`.
This problem is not reproducible if the number of partitions is 14 or less
since our optimizer will add a repartition.
### Describe the solution you'd like
I can think of two ways to handle this. One is to identify during planning
that that the partitions differ and to add a `RepartitionExec` into the plan to
account for this. The other is to update `HashJoinExec` to identify when we
have unmatched input partitions and to identify the correct number of output
partitions and the mapping from inputs to output.
### Describe alternatives you've considered
The only alternative I can think of is to add repartitions into the plan
manually, but this does not seem like something each of our users should have
to do.
### Additional context
Here is a minimal reproducible example in 48.0.0. When you run this code you
will get the error `Error: Internal("Invalid HashJoinExec, partition count
mismatch 14!=15,consider using RepartitionExec")`
```rust
use arrow::array::{
ArrayRef, RecordBatch, UInt64Array,
};
use arrow_schema::SchemaRef;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::catalog::{Session, TableProvider};
use datafusion::datasource::TableType;
use datafusion::error::Result;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan,
PlanProperties};
use datafusion::prelude::*;
use std::any::Any;
use std::fmt::Formatter;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::common::exec_err;
#[derive(Debug)]
struct MyTableProvider {
num_partitions: usize,
}
#[async_trait]
impl TableProvider for MyTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
MyExecPlan::schema()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(MyExecPlan::new(self.num_partitions)))
}
}
#[derive(Debug)]
struct MyExecPlan {
props: PlanProperties,
num_partitions: usize,
}
impl MyExecPlan {
pub fn new(num_partitions: usize) -> Self {
let schema = Self::schema();
let partition_col =
datafusion::physical_expr::expressions::col("a",
schema.as_ref()).unwrap();
Self {
props: PlanProperties::new(
EquivalenceProperties::new(Self::schema()),
datafusion::physical_expr::Partitioning::Hash(
vec![partition_col],
num_partitions,
),
EmissionType::Final,
Boundedness::Bounded,
),
num_partitions
}
}
pub fn schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("a", DataType::UInt64, false),
Field::new("b", DataType::UInt64, false),
]))
}
}
impl ExecutionPlan for MyExecPlan {
fn name(&self) -> &str {
"MyExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.props
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(self)
}
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition >= self.num_partitions {
return exec_err!("Invalid partition number {}", partition);
}
let a: ArrayRef = Arc::new(UInt64Array::from(vec![partition as u64;
5]));
let b: ArrayRef =
Arc::new(UInt64Array::from((0..5).collect::<Vec<_>>()));
let batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b)])?;
let schema = batch.schema();
let stream = futures::stream::iter(vec![Ok(batch)]);
let adapter = RecordBatchStreamAdapter::new(schema, stream);
Ok(Box::pin(adapter))
}
}
impl DisplayAs for MyExecPlan {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
f.write_str("MyExec")
}
}
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let table_provider_1 = Arc::new(MyTableProvider { num_partitions: 14 });
let table_provider_2 = Arc::new(MyTableProvider { num_partitions: 15 });
let _ = ctx.register_table("t1", table_provider_1)?;
let _ = ctx.register_table("t2", table_provider_2)?;
let df1 = ctx.table("t1").await?;
let df2 = ctx.table("t2").await?;
let df = df1.join(df2, JoinType::Inner, &["a"], &["a"], None)?;
df.show().await?;
Ok(())
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]