[ https://issues.apache.org/jira/browse/ARROW-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jorge Leitão updated ARROW-10844: --------------------------------- Description: The complete failing test: {code:java} use std::sync::Arc; use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion::{datasource::MemTable, prelude::JoinType}; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; #[tokio::test] async fn join() -> Result<()> { let schema1 = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Int32, false), ])); let schema2 = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("c", DataType::Int32, false), ])); // define data. let batch1 = RecordBatch::try_new( schema1.clone(), vec![ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), Arc::new(Int32Array::from(vec![1, 10, 10, 100])), ], )?; // define data. let batch2 = RecordBatch::try_new( schema2.clone(), vec![ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), Arc::new(Int32Array::from(vec![1, 10, 10, 100])), ], )?; let mut ctx = ExecutionContext::new(); let table1 = MemTable::new(schema1, vec![vec![batch1]])?; let table2 = MemTable::new(schema2, vec![vec![batch2]])?; ctx.register_table("aa", Box::new(table1)); let df1 = ctx.table("aa")?; ctx.register_table("aaa", Box::new(table2)); let df2 = ctx.table("aaa")?; let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?; let batches = a.collect().await?; assert_eq!(batches.len(), 1); Ok(()) } {code} When the create dataframes via `ctx.table`, they receive a clone of the {{ExecutionContextState. }}If at a later state the context receives a new table, that table will not be part of the state on the first DataFrame. On a Join op, the left DataFrame's state is passed to the newly created DataFrame, which is then used in collect(). Because the right side has a table not in the state of the left, the execution fails. We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common mutable state across multiple DataFrames. Alternatively, not require tables to be registered in the context to be used by DataFrames. Note that the current example in `DataFrame::join` docs works because the table is registered for both DataFrames. was: The complete test: {code:java} {code} currently does not work because we clone the {{ExecutionContextState}} from the context to the `df`, causing the left and right to share a different context state. In particular, `left` will not have the table registered on the right, which means that its `collect` will fail. We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common mutable state across multiple DataFrames. Alternatively, not require tables to be registered in the context to be used by DataFrames. Note that the current example in `DataFrame::join` docs works because it shares the same table. This won't happen if e.g. we use in-memory tables. > [Rust] [DataFusion] join of two DataFrames is not possible > ---------------------------------------------------------- > > Key: ARROW-10844 > URL: https://issues.apache.org/jira/browse/ARROW-10844 > Project: Apache Arrow > Issue Type: Bug > Components: Rust, Rust - DataFusion > Affects Versions: 3.0.0 > Reporter: Jorge Leitão > Priority: Blocker > Fix For: 3.0.0 > > > The complete failing test: > > {code:java} > use std::sync::Arc; > use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch}; > use arrow::datatypes::{DataType, Field, Schema}; > use datafusion::{datasource::MemTable, prelude::JoinType}; > use datafusion::error::Result; > use datafusion::execution::context::ExecutionContext; > #[tokio::test] > async fn join() -> Result<()> { > let schema1 = Arc::new(Schema::new(vec![ > Field::new("a", DataType::Utf8, false), > Field::new("b", DataType::Int32, false), > ])); > let schema2 = Arc::new(Schema::new(vec![ > Field::new("a", DataType::Utf8, false), > Field::new("c", DataType::Int32, false), > ])); > // define data. > let batch1 = RecordBatch::try_new( > schema1.clone(), > vec![ > Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), > Arc::new(Int32Array::from(vec![1, 10, 10, 100])), > ], > )?; > // define data. > let batch2 = RecordBatch::try_new( > schema2.clone(), > vec![ > Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), > Arc::new(Int32Array::from(vec![1, 10, 10, 100])), > ], > )?; > let mut ctx = ExecutionContext::new(); > let table1 = MemTable::new(schema1, vec![vec![batch1]])?; > let table2 = MemTable::new(schema2, vec![vec![batch2]])?; > ctx.register_table("aa", Box::new(table1)); > let df1 = ctx.table("aa")?; > ctx.register_table("aaa", Box::new(table2)); > let df2 = ctx.table("aaa")?; > let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?; > let batches = a.collect().await?; > assert_eq!(batches.len(), 1); > Ok(()) > } > {code} > > When the create dataframes via `ctx.table`, they receive a clone of the > {{ExecutionContextState. }}If at a later state the context receives a new > table, that table will not be part of the state on the first DataFrame. On a > Join op, the left DataFrame's state is passed to the newly created DataFrame, > which is then used in collect(). Because the right side has a table not in > the state of the left, the execution fails. > > We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common > mutable state across multiple DataFrames. Alternatively, not require tables > to be registered in the context to be used by DataFrames. > Note that the current example in `DataFrame::join` docs works because the > table is registered for both DataFrames. -- This message was sent by Atlassian Jira (v8.3.4#803005)