djanderson commented on PR #14286:
URL: https://github.com/apache/datafusion/pull/14286#issuecomment-2655439836

   I checked out this PR and tried to add a test to reproduce the issue. 
Including here although it's definitely too dependency heavy for a unit test:
   
   <details>
   
       #[tokio::test]
       async fn test_parquet_sink_io_doesnt_panic() {
           use arrow::array as arrow_array; // needed for record_batch! macro
           use datafusion::datasource::file_format::parquet::ParquetSink;
           use datafusion::datasource::listing::ListingTableUrl;
           use datafusion::datasource::physical_plan::FileSink;
           use datafusion::datasource::physical_plan::FileSinkConfig;
           use datafusion::execution::object_store::ObjectStoreUrl;
           use datafusion::prelude::SessionContext;
           use datafusion_expr::dml::InsertOp;
   
           let exec = DedicatedExecutorBuilder::new().build();
           let store = MockStore::create().await;
           let store = exec.wrap_object_store_for_io(store);
   
           let batch = record_batch!(("col", Int32, vec![1, 2, 3])).unwrap();
           let schema = batch.schema();
           let stream_adapter = RecordBatchStreamAdapter::new(
               batch.schema(),
               futures::stream::iter(vec![Ok(batch)]),
           );
           let stream: SendableRecordBatchStream = Box::pin(stream_adapter);
   
           let ctx = SessionContext::new();
           let object_store_url = ObjectStoreUrl::local_filesystem();
           ctx.register_object_store(object_store_url.as_ref(), store);
   
           // Configure sink
           let file_sink_config = FileSinkConfig {
               object_store_url,
               file_groups: vec![],
               table_paths: 
vec![ListingTableUrl::parse("/test.parquet").unwrap()],
               output_schema: schema,
               table_partition_cols: vec![],
               insert_op: InsertOp::Overwrite,
               keep_partition_by_columns: false,
               file_extension: String::from("parquet"),
           };
           let table_options = Default::default();
           let data_sink = ParquetSink::new(file_sink_config, table_options);
   
           // Execute write on dedicated runtime
           exec.spawn(
               async move { data_sink.write_all(stream, 
&ctx.task_ctx()).await.unwrap() },
           )
           .await
           .unwrap();
   
           exec.join().await
       }
   
   </details>
   
   In the end it did not reproduce the issue I'm seeing locally. I noticed that 
@mertak-synnada recently refactored `ParquetSink.write_all()` so I updated my 
flightsql server to 45.0.0 to pull in his changes, but... no joy.
   
   Will keep investigating but no clear leads right now, other than that I'm 
running my object store against a local MinIO container.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to