zhuqi-lucas commented on issue #16836:
URL: https://github.com/apache/datafusion/issues/16836#issuecomment-3113815429

   I try to reproduce this, but it not reproduced, if we can reproduce it, i 
can help debugging, thanks!
   
   ```rust
   // src/main.rs
   
   use std::sync::Arc;
   use arrow::array::UInt64Array;
   use futures::{StreamExt, TryStreamExt}; // for try_next()
   use datafusion::arrow::{
       array::{Int64Array, StringArray},
       datatypes::{DataType, Field, Schema},
       record_batch::RecordBatch,
   };
   use datafusion::catalog::TableProvider;
   use datafusion::datasource::MemTable;
   use datafusion::execution::context::{SessionConfig, SessionContext};
   use datafusion::logical_expr::dml::InsertOp;
   
   #[tokio::main]
   async fn main() -> datafusion::common::Result<()> {
       // 1) Create a SessionContext, optionally customizing the number of 
target partitions
       let config = SessionConfig::new();
       let ctx = SessionContext::new_with_config(config);
   
       // 2) Build a RecordBatch with two columns (i BIGINT, s VARCHAR), 
containing 100 rows
       let schema = Arc::new(Schema::new(vec![
           Field::new("i", DataType::Int64, false),
           Field::new("s", DataType::Utf8, false),
       ]));
       let i_array = Int64Array::from_iter_values(0_i64..100);
       let s_array = StringArray::from(vec!["hello"; 100]);
       let batch = RecordBatch::try_new(schema.clone(), vec![
           Arc::new(i_array),
           Arc::new(s_array),
       ])?;
   
       // 3) Register the source MemTable, containing a single batch
       let input_table = MemTable::try_new(schema.clone(), 
vec![vec![batch.clone()]])?;
       ctx.register_table("input", Arc::new(input_table))?;
   
       // 4) Register the cache MemTable (initially empty)
       let empty_table = MemTable::try_new(schema.clone(), vec![vec![]])?;
       let cache_provider = Arc::new(empty_table);
       ctx.register_table("cache", cache_provider.clone())?;
   
       // 5) Read the DataFrame from input and print its row count
       let df_input = ctx.table("input").await?;
       let orig_count = df_input.clone().count().await?;
       println!("Row count in source table: {}", orig_count);  // Should print 
100
   
       // 6) Build a physical INSERT plan using MemTable::insert_into
       let plan = cache_provider
           .insert_into(
               &ctx.state(),
               df_input.create_physical_plan().await?,
               InsertOp::Append,
           )
           .await?;
   
       // 7) Execute the insert and accumulate the number of inserted rows
       let mut stream = plan.execute(0, ctx.task_ctx())?;
   
       // 8) Read the `count` column from the resulting status batch
       let inserted_rows = if let Some(batch) = stream.try_next().await? {
           let col = batch
               .column_by_name("count")
               .expect("Missing `count` column");
           // First try unsigned integer array
           if let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() {
               arr.value(0)
           }
           // Then try signed integer array for compatibility
           else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
               arr.value(0) as u64
           }
           else {
               panic!(
                   "`count` column is neither UInt64Array nor Int64Array, 
actual type: {:?}",
                   col.data_type()
               );
           }
       } else {
           0
       };
   
       println!("Number of rows reported inserted: {}", inserted_rows);
   
       // 9) Scan the cache table again and verify its row count
       let df_cache = ctx.table("cache").await?;
       let cache_count = df_cache.count().await?;
       println!("Row count from scanning cache table: {}", cache_count);
   
       Ok(())
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to