[ 
https://issues.apache.org/jira/browse/ARROW-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Leitão updated ARROW-10844:
---------------------------------
    Description: 
The complete failing test:

 
{code:java}
use std::sync::Arc;

use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};

use datafusion::{datasource::MemTable, prelude::JoinType};
use datafusion::error::Result;

use datafusion::execution::context::ExecutionContext;

#[tokio::test]
async fn join() -> Result<()> {
    let schema1 = Arc::new(Schema::new(vec![
        Field::new("a", DataType::Utf8, false),
        Field::new("b", DataType::Int32, false),
    ]));
    let schema2 = Arc::new(Schema::new(vec![
        Field::new("a", DataType::Utf8, false),
        Field::new("c", DataType::Int32, false),
    ]));

    // define data.
    let batch1 = RecordBatch::try_new(
        schema1.clone(),
        vec![
            Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
        ],
    )?;
    // define data.
    let batch2 = RecordBatch::try_new(
        schema2.clone(),
        vec![
            Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
        ],
    )?;

    let mut ctx = ExecutionContext::new();

    let table1 = MemTable::new(schema1, vec![vec![batch1]])?;
    let table2 = MemTable::new(schema2, vec![vec![batch2]])?;

    ctx.register_table("aa", Box::new(table1));

    let df1 = ctx.table("aa")?;

    ctx.register_table("aaa", Box::new(table2));

    let df2 = ctx.table("aaa")?;

    let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?;

    let batches = a.collect().await?;
    assert_eq!(batches.len(), 1);

    Ok(())
}
{code}
 

When the create dataframes via `ctx.table`, they receive a clone of the 
{{ExecutionContextState. }}If at a later state the context receives a new 
table, that table will not be part of the state on the first DataFrame. On a 
Join op, the left DataFrame's state is passed to the newly created DataFrame, 
which is then used in collect(). Because the right side has a table not in the 
state of the left, the execution fails.

 

We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common mutable 
state across multiple DataFrames. Alternatively, not require tables to be 
registered in the context to be used by DataFrames.

Note that the current example in `DataFrame::join` docs works because the table 
is registered for both DataFrames.

  was:
The complete test:

 
{code:java}
{code}
 

currently does not work because we clone the {{ExecutionContextState}} from the 
context to the `df`, causing the left and right to share a different context 
state. In particular, `left` will not have the table registered on the right, 
which means that its `collect` will fail.

We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common mutable 
state across multiple DataFrames. Alternatively, not require tables to be 
registered in the context to be used by DataFrames.

Note that the current example in `DataFrame::join` docs works because it shares 
the same table. This won't happen if e.g. we use in-memory tables.

 

 


> [Rust] [DataFusion] join of two DataFrames is not possible
> ----------------------------------------------------------
>
>                 Key: ARROW-10844
>                 URL: https://issues.apache.org/jira/browse/ARROW-10844
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust, Rust - DataFusion
>    Affects Versions: 3.0.0
>            Reporter: Jorge Leitão
>            Priority: Blocker
>             Fix For: 3.0.0
>
>
> The complete failing test:
>  
> {code:java}
> use std::sync::Arc;
> use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch};
> use arrow::datatypes::{DataType, Field, Schema};
> use datafusion::{datasource::MemTable, prelude::JoinType};
> use datafusion::error::Result;
> use datafusion::execution::context::ExecutionContext;
> #[tokio::test]
> async fn join() -> Result<()> {
>     let schema1 = Arc::new(Schema::new(vec![
>         Field::new("a", DataType::Utf8, false),
>         Field::new("b", DataType::Int32, false),
>     ]));
>     let schema2 = Arc::new(Schema::new(vec![
>         Field::new("a", DataType::Utf8, false),
>         Field::new("c", DataType::Int32, false),
>     ]));
>     // define data.
>     let batch1 = RecordBatch::try_new(
>         schema1.clone(),
>         vec![
>             Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
>             Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
>         ],
>     )?;
>     // define data.
>     let batch2 = RecordBatch::try_new(
>         schema2.clone(),
>         vec![
>             Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
>             Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
>         ],
>     )?;
>     let mut ctx = ExecutionContext::new();
>     let table1 = MemTable::new(schema1, vec![vec![batch1]])?;
>     let table2 = MemTable::new(schema2, vec![vec![batch2]])?;
>     ctx.register_table("aa", Box::new(table1));
>     let df1 = ctx.table("aa")?;
>     ctx.register_table("aaa", Box::new(table2));
>     let df2 = ctx.table("aaa")?;
>     let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?;
>     let batches = a.collect().await?;
>     assert_eq!(batches.len(), 1);
>     Ok(())
> }
> {code}
>  
> When the create dataframes via `ctx.table`, they receive a clone of the 
> {{ExecutionContextState. }}If at a later state the context receives a new 
> table, that table will not be part of the state on the first DataFrame. On a 
> Join op, the left DataFrame's state is passed to the newly created DataFrame, 
> which is then used in collect(). Because the right side has a table not in 
> the state of the left, the execution fails.
>  
> We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common 
> mutable state across multiple DataFrames. Alternatively, not require tables 
> to be registered in the context to be used by DataFrames.
> Note that the current example in `DataFrame::join` docs works because the 
> table is registered for both DataFrames.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to