rok commented on code in PR #8029: URL: https://github.com/apache/arrow-rs/pull/8029#discussion_r2247756798
########## parquet/tests/encryption/encryption.rs: ########## @@ -1101,3 +1109,107 @@ fn read_and_roundtrip_to_encrypted_file( // check re-written example data verify_encryption_test_file_read(temp_file, decryption_properties); } + +#[tokio::test] +async fn test_multi_threaded_encrypted_writing() { + // Read example data and set up encryption/decryption properties + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted"); + + let file_encryption_properties = FileEncryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + let decryption_properties = FileDecryptionProperties::builder(b"0123456789012345".into()) + .with_column_key("double_field", b"1234567890123450".into()) + .with_column_key("float_field", b"1234567890123451".into()) + .build() + .unwrap(); + + let (record_batches, metadata) = + read_encrypted_file(&path, decryption_properties.clone()).unwrap(); + let to_write: Vec<_> = record_batches + .iter() + .flat_map(|rb| rb.columns().to_vec()) + .collect(); + let schema = metadata.schema().clone(); + + let props = Some( + WriterPropertiesBuilder::default() + .with_file_encryption_properties(file_encryption_properties) + .build(), + ); + + // Create a temporary file to write the encrypted data + let temp_file = tempfile::tempfile().unwrap(); + let mut writer = ArrowWriter::try_new(&temp_file, metadata.schema().clone(), props).unwrap(); + + // LOW-LEVEL API: Use low level API to write into a file using multiple threads + + // Get column writers + let col_writers = writer.get_column_writers().unwrap(); + let num_columns = col_writers.len(); + + // Create a channel for each column writer to send ArrowLeafColumn data to + let mut col_writer_tasks = Vec::with_capacity(num_columns); + let mut col_array_channels = Vec::with_capacity(num_columns); + for mut col_writer in col_writers.into_iter() { + let (send_array, mut receive_array) = tokio::sync::mpsc::channel::<ArrowLeafColumn>(100); + col_array_channels.push(send_array); + let handle = tokio::spawn(async move { + while let Some(col) = receive_array.recv().await { + col_writer.write(&col).unwrap(); + } + col_writer.close().unwrap() + }); + col_writer_tasks.push(handle); + } + + // Send the ArrowLeafColumn data to the respective column writer channels + let mut worker_iter = col_array_channels.iter_mut(); + for (array, field) in to_write.iter().zip(schema.fields()) { + for leaves in compute_leaves(field, array).unwrap() { + worker_iter.next().unwrap().send(leaves).await.unwrap(); + } + } + drop(col_array_channels); + + // Wait for all column writers to finish writing + let mut finalized_rg = Vec::with_capacity(num_columns); + for task in col_writer_tasks.into_iter() { + finalized_rg.push(task.await.unwrap()); + } + + // Append the finalized row group to the SerializedFileWriter + assert!(writer.append_to_row_groups(finalized_rg).is_ok()); + assert!(writer.flush().is_ok()); + + // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter + + // Write individual RecordBatches into the file + for rb in record_batches { + writer.write(&rb).unwrap() + } + assert!(writer.flush().is_ok()); + + // Close the file writer which writes the footer + let metadata = writer.finish().unwrap(); + assert_eq!(metadata.num_rows, 100); + assert_eq!(metadata.schema, metadata.schema); + + // Check that the file was written correctly + let (read_record_batches, read_metadata) = + read_encrypted_file(&path, decryption_properties.clone()).unwrap(); + + // TODO: This should be failing since we're writing data twice and + // we only seem to be reading one copy out. Review Comment: Oh :facepalm: Thanks, that helps immensely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org