devinjdangelo commented on code in PR #9548:
URL: https://github.com/apache/arrow-datafusion/pull/9548#discussion_r1520647339


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -1789,4 +1816,182 @@ mod tests {
         let format = ParquetFormat::default();
         scan_format(state, &format, &testdata, file_name, projection, 
limit).await
     }
+
+    fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
+        let tmp_dir = tempfile::TempDir::new().unwrap();
+        let local = Arc::new(
+            LocalFileSystem::new_with_prefix(&tmp_dir)
+                .expect("should create object store"),
+        );
+
+        let mut session = SessionConfig::default();
+        let mut parquet_opts = ParquetOptions::default();
+        parquet_opts.allow_single_file_parallelism = true;
+        session.options_mut().execution.parquet = parquet_opts;
+
+        let runtime = RuntimeEnv::default();
+        runtime
+            .object_store_registry
+            .register_store(store_url, local);
+
+        Arc::new(
+            TaskContext::default()
+                .with_session_config(session)
+                .with_runtime(Arc::new(runtime)),
+        )
+    }
+
+    #[tokio::test]
+    async fn parquet_sink_write() -> Result<()> {
+        let field_a = Field::new("a", DataType::Utf8, false);
+        let field_b = Field::new("b", DataType::Utf8, false);
+        let schema = Arc::new(Schema::new(vec![field_a, field_b]));
+        let object_store_url = ObjectStoreUrl::local_filesystem();
+
+        let file_sink_config = FileSinkConfig {
+            object_store_url: object_store_url.clone(),
+            file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
+            table_paths: vec![ListingTableUrl::parse("file:///")?],
+            output_schema: schema.clone(),
+            table_partition_cols: vec![],
+            overwrite: true,
+            file_type_writer_options: FileTypeWriterOptions::Parquet(
+                ParquetWriterOptions::new(WriterProperties::default()),
+            ),
+        };
+        let parquet_sink = Arc::new(ParquetSink::new(file_sink_config));
+
+        // create data
+        let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
+        let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
+        let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", 
col_b)]).unwrap();
+
+        // write stream
+        parquet_sink
+            .write_all(
+                Box::pin(RecordBatchStreamAdapter::new(
+                    schema,
+                    futures::stream::iter(vec![Ok(batch)]),
+                )),
+                &build_ctx(object_store_url.as_ref()),
+            )
+            .await
+            .unwrap();
+
+        // assert written
+        let mut written = parquet_sink.written();
+        let written = written.drain();
+        assert_eq!(
+            written.len(),
+            1,
+            "expected a single parquet files to be written, instead found {}",
+            written.len()
+        );
+
+        // check the file metadata
+        for (
+            path,
+            FileMetaData {
+                num_rows, schema, ..
+            },

Review Comment:
   I think that getting rid of the for loop and doing:
   
   ```rust
   let (path, FileMetaData { num_rows, schema, ..}) = 
written.iter().next().unwrap();
   ```
   
   is easier to understand the intention, which is just getting the one and 
only element.



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -541,6 +542,8 @@ async fn fetch_statistics(
 pub struct ParquetSink {
     /// Config options for writing data
     config: FileSinkConfig,
+    /// File metadata from successfully produced parquet files.

Review Comment:
   ```suggestion
   /// File metadata from successfully produced parquet files. The Mutex is 
only used to allow inserting to HashMap from behind borrowed reference in 
DataSink::write_all.
   ```
   
   The use of a Mutex here is confusing without the context of this PR, so I 
think it would be a good idea to leave a comment explaining. 
   
   I think this is a fine temporary workaround, but I'm sure we can find a way 
to return FileMetaData in a new public interface without breaking changes to 
DataSink or using locks.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to