djanderson commented on PR #14286: URL: https://github.com/apache/datafusion/pull/14286#issuecomment-2655439836
I checked out this PR and tried to add a test to reproduce the issue. Including here although it's definitely too dependency heavy for a unit test: <details> #[tokio::test] async fn test_parquet_sink_io_doesnt_panic() { use arrow::array as arrow_array; // needed for record_batch! macro use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::datasource::listing::ListingTableUrl; use datafusion::datasource::physical_plan::FileSink; use datafusion::datasource::physical_plan::FileSinkConfig; use datafusion::execution::object_store::ObjectStoreUrl; use datafusion::prelude::SessionContext; use datafusion_expr::dml::InsertOp; let exec = DedicatedExecutorBuilder::new().build(); let store = MockStore::create().await; let store = exec.wrap_object_store_for_io(store); let batch = record_batch!(("col", Int32, vec![1, 2, 3])).unwrap(); let schema = batch.schema(); let stream_adapter = RecordBatchStreamAdapter::new( batch.schema(), futures::stream::iter(vec![Ok(batch)]), ); let stream: SendableRecordBatchStream = Box::pin(stream_adapter); let ctx = SessionContext::new(); let object_store_url = ObjectStoreUrl::local_filesystem(); ctx.register_object_store(object_store_url.as_ref(), store); // Configure sink let file_sink_config = FileSinkConfig { object_store_url, file_groups: vec![], table_paths: vec![ListingTableUrl::parse("/test.parquet").unwrap()], output_schema: schema, table_partition_cols: vec![], insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: String::from("parquet"), }; let table_options = Default::default(); let data_sink = ParquetSink::new(file_sink_config, table_options); // Execute write on dedicated runtime exec.spawn( async move { data_sink.write_all(stream, &ctx.task_ctx()).await.unwrap() }, ) .await .unwrap(); exec.join().await } </details> In the end it did not reproduce the issue I'm seeing locally. I noticed that @mertak-synnada recently refactored `ParquetSink.write_all()` so I updated my flightsql server to 45.0.0 to pull in his changes, but... no joy. Will keep investigating but no clear leads right now, other than that I'm running my object store against a local MinIO container. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org