zhuqi-lucas commented on issue #16836: URL: https://github.com/apache/datafusion/issues/16836#issuecomment-3113815429
I try to reproduce this, but it not reproduced, if we can reproduce it, i can help debugging, thanks! ```rust // src/main.rs use std::sync::Arc; use arrow::array::UInt64Array; use futures::{StreamExt, TryStreamExt}; // for try_next() use datafusion::arrow::{ array::{Int64Array, StringArray}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; use datafusion::catalog::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::logical_expr::dml::InsertOp; #[tokio::main] async fn main() -> datafusion::common::Result<()> { // 1) Create a SessionContext, optionally customizing the number of target partitions let config = SessionConfig::new(); let ctx = SessionContext::new_with_config(config); // 2) Build a RecordBatch with two columns (i BIGINT, s VARCHAR), containing 100 rows let schema = Arc::new(Schema::new(vec![ Field::new("i", DataType::Int64, false), Field::new("s", DataType::Utf8, false), ])); let i_array = Int64Array::from_iter_values(0_i64..100); let s_array = StringArray::from(vec!["hello"; 100]); let batch = RecordBatch::try_new(schema.clone(), vec![ Arc::new(i_array), Arc::new(s_array), ])?; // 3) Register the source MemTable, containing a single batch let input_table = MemTable::try_new(schema.clone(), vec![vec![batch.clone()]])?; ctx.register_table("input", Arc::new(input_table))?; // 4) Register the cache MemTable (initially empty) let empty_table = MemTable::try_new(schema.clone(), vec![vec![]])?; let cache_provider = Arc::new(empty_table); ctx.register_table("cache", cache_provider.clone())?; // 5) Read the DataFrame from input and print its row count let df_input = ctx.table("input").await?; let orig_count = df_input.clone().count().await?; println!("Row count in source table: {}", orig_count); // Should print 100 // 6) Build a physical INSERT plan using MemTable::insert_into let plan = cache_provider .insert_into( &ctx.state(), df_input.create_physical_plan().await?, InsertOp::Append, ) .await?; // 7) Execute the insert and accumulate the number of inserted rows let mut stream = plan.execute(0, ctx.task_ctx())?; // 8) Read the `count` column from the resulting status batch let inserted_rows = if let Some(batch) = stream.try_next().await? { let col = batch .column_by_name("count") .expect("Missing `count` column"); // First try unsigned integer array if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() { arr.value(0) } // Then try signed integer array for compatibility else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() { arr.value(0) as u64 } else { panic!( "`count` column is neither UInt64Array nor Int64Array, actual type: {:?}", col.data_type() ); } } else { 0 }; println!("Number of rows reported inserted: {}", inserted_rows); // 9) Scan the cache table again and verify its row count let df_cache = ctx.table("cache").await?; let cache_count = df_cache.count().await?; println!("Row count from scanning cache table: {}", cache_count); Ok(()) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org