adamreeve commented on code in PR #8029:
URL: https://github.com/apache/arrow-rs/pull/8029#discussion_r2246622208


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -402,6 +402,25 @@ impl<W: Write + Send> ArrowWriter<W> {
     pub fn close(mut self) -> Result<crate::format::FileMetaData> {
         self.finish()
     }
+
+    /// Create a new row group writer and return its column writers.
+    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
+        let _ = self.flush();

Review Comment:
   This shouldn't ignore errors
   ```suggestion
           self.flush()?;
   ```



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -402,6 +402,25 @@ impl<W: Write + Send> ArrowWriter<W> {
     pub fn close(mut self) -> Result<crate::format::FileMetaData> {
         self.finish()
     }
+
+    /// Create a new row group writer and return its column writers.
+    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
+        let _ = self.flush();
+        let in_progress = self
+            .row_group_writer_factory
+            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
+        Ok(in_progress.writers)
+    }
+
+    /// Append the given column chunks to the current row group.
+    pub fn append_to_row_groups(&mut self, chunks: Vec<ArrowColumnChunk>) -> 
Result<()> {

Review Comment:
   This name doesn't make sense to me, it sounds like there are multiple row 
groups. How about:
   ```suggestion
       /// Append the given column chunks as a new row group.
       pub fn append_row_group(&mut self, chunks: Vec<ArrowColumnChunk>) -> 
Result<()> {
   ```



##########
parquet/tests/encryption/encryption.rs:
##########
@@ -1101,3 +1109,107 @@ fn read_and_roundtrip_to_encrypted_file(
     // check re-written example data
     verify_encryption_test_file_read(temp_file, decryption_properties);
 }
+
+#[tokio::test]

Review Comment:
   This should probably be moved to `encryption_async.rs`, as it won't build 
with the async feature disabled.



##########
parquet/tests/encryption/encryption.rs:
##########
@@ -1101,3 +1109,107 @@ fn read_and_roundtrip_to_encrypted_file(
     // check re-written example data
     verify_encryption_test_file_read(temp_file, decryption_properties);
 }
+
+#[tokio::test]
+async fn test_multi_threaded_encrypted_writing() {
+    // Read example data and set up encryption/decryption properties
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+
+    let file_encryption_properties = 
FileEncryptionProperties::builder(b"0123456789012345".into())
+        .with_column_key("double_field", b"1234567890123450".into())
+        .with_column_key("float_field", b"1234567890123451".into())
+        .build()
+        .unwrap();
+    let decryption_properties = 
FileDecryptionProperties::builder(b"0123456789012345".into())
+        .with_column_key("double_field", b"1234567890123450".into())
+        .with_column_key("float_field", b"1234567890123451".into())
+        .build()
+        .unwrap();
+
+    let (record_batches, metadata) =
+        read_encrypted_file(&path, decryption_properties.clone()).unwrap();
+    let to_write: Vec<_> = record_batches
+        .iter()
+        .flat_map(|rb| rb.columns().to_vec())
+        .collect();
+    let schema = metadata.schema().clone();
+
+    let props = Some(
+        WriterPropertiesBuilder::default()
+            .with_file_encryption_properties(file_encryption_properties)
+            .build(),
+    );
+
+    // Create a temporary file to write the encrypted data
+    let temp_file = tempfile::tempfile().unwrap();
+    let mut writer = ArrowWriter::try_new(&temp_file, 
metadata.schema().clone(), props).unwrap();
+
+    // LOW-LEVEL API: Use low level API to write into a file using multiple 
threads
+
+    // Get column writers
+    let col_writers = writer.get_column_writers().unwrap();
+    let num_columns = col_writers.len();
+
+    // Create a channel for each column writer to send ArrowLeafColumn data to
+    let mut col_writer_tasks = Vec::with_capacity(num_columns);
+    let mut col_array_channels = Vec::with_capacity(num_columns);
+    for mut col_writer in col_writers.into_iter() {
+        let (send_array, mut receive_array) = 
tokio::sync::mpsc::channel::<ArrowLeafColumn>(100);
+        col_array_channels.push(send_array);
+        let handle = tokio::spawn(async move {
+            while let Some(col) = receive_array.recv().await {
+                col_writer.write(&col).unwrap();
+            }
+            col_writer.close().unwrap()
+        });
+        col_writer_tasks.push(handle);
+    }
+
+    // Send the ArrowLeafColumn data to the respective column writer channels
+    let mut worker_iter = col_array_channels.iter_mut();
+    for (array, field) in to_write.iter().zip(schema.fields()) {
+        for leaves in compute_leaves(field, array).unwrap() {
+            worker_iter.next().unwrap().send(leaves).await.unwrap();
+        }
+    }
+    drop(col_array_channels);
+
+    // Wait for all column writers to finish writing
+    let mut finalized_rg = Vec::with_capacity(num_columns);
+    for task in col_writer_tasks.into_iter() {
+        finalized_rg.push(task.await.unwrap());
+    }
+
+    // Append the finalized row group to the SerializedFileWriter
+    assert!(writer.append_to_row_groups(finalized_rg).is_ok());
+    assert!(writer.flush().is_ok());
+
+    // HIGH-LEVEL API: Write RecordBatches into the file using ArrowWriter
+
+    // Write individual RecordBatches into the file
+    for rb in record_batches {
+        writer.write(&rb).unwrap()
+    }
+    assert!(writer.flush().is_ok());
+
+    // Close the file writer which writes the footer
+    let metadata = writer.finish().unwrap();
+    assert_eq!(metadata.num_rows, 100);
+    assert_eq!(metadata.schema, metadata.schema);
+
+    // Check that the file was written correctly
+    let (read_record_batches, read_metadata) =
+        read_encrypted_file(&path, decryption_properties.clone()).unwrap();
+
+    // TODO: This should be failing since we're writing data twice and
+    // we only seem to be reading one copy out.

Review Comment:
   When you call `read_encrypted_file` above, you pass the path to the original 
input test file, not the rewritten file. You might want to use 
`tempfile::Builder::new().tempfiler()` to create a `NamedTempFile`, or refactor 
`read_encrypted_file` to work with a `File`.



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -402,6 +402,25 @@ impl<W: Write + Send> ArrowWriter<W> {
     pub fn close(mut self) -> Result<crate::format::FileMetaData> {
         self.finish()
     }
+
+    /// Create a new row group writer and return its column writers.
+    pub fn get_column_writers(&mut self) -> Result<Vec<ArrowColumnWriter>> {
+        let _ = self.flush();
+        let in_progress = self
+            .row_group_writer_factory
+            .create_row_group_writer(self.writer.flushed_row_groups().len())?;
+        Ok(in_progress.writers)
+    }
+
+    /// Append the given column chunks to the current row group.
+    pub fn append_to_row_groups(&mut self, chunks: Vec<ArrowColumnChunk>) -> 
Result<()> {
+        let mut row_group_writer = self.writer.next_row_group()?;
+        for chunk in chunks {
+            chunk.append_to_row_group(&mut row_group_writer)?;
+        }
+        let _ = row_group_writer.close();

Review Comment:
   ```suggestion
           row_group_writer.close()?;
   ```



##########
parquet/tests/encryption/encryption.rs:
##########
@@ -1101,3 +1109,107 @@ fn read_and_roundtrip_to_encrypted_file(
     // check re-written example data
     verify_encryption_test_file_read(temp_file, decryption_properties);
 }
+
+#[tokio::test]
+async fn test_multi_threaded_encrypted_writing() {
+    // Read example data and set up encryption/decryption properties
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = 
format!("{testdata}/encrypt_columns_and_footer.parquet.encrypted");
+
+    let file_encryption_properties = 
FileEncryptionProperties::builder(b"0123456789012345".into())
+        .with_column_key("double_field", b"1234567890123450".into())
+        .with_column_key("float_field", b"1234567890123451".into())
+        .build()
+        .unwrap();
+    let decryption_properties = 
FileDecryptionProperties::builder(b"0123456789012345".into())
+        .with_column_key("double_field", b"1234567890123450".into())
+        .with_column_key("float_field", b"1234567890123451".into())
+        .build()
+        .unwrap();
+
+    let (record_batches, metadata) =
+        read_encrypted_file(&path, decryption_properties.clone()).unwrap();
+    let to_write: Vec<_> = record_batches
+        .iter()
+        .flat_map(|rb| rb.columns().to_vec())
+        .collect();
+    let schema = metadata.schema().clone();
+
+    let props = Some(
+        WriterPropertiesBuilder::default()
+            .with_file_encryption_properties(file_encryption_properties)
+            .build(),
+    );
+
+    // Create a temporary file to write the encrypted data
+    let temp_file = tempfile::tempfile().unwrap();
+    let mut writer = ArrowWriter::try_new(&temp_file, 
metadata.schema().clone(), props).unwrap();
+
+    // LOW-LEVEL API: Use low level API to write into a file using multiple 
threads
+
+    // Get column writers
+    let col_writers = writer.get_column_writers().unwrap();
+    let num_columns = col_writers.len();
+
+    // Create a channel for each column writer to send ArrowLeafColumn data to
+    let mut col_writer_tasks = Vec::with_capacity(num_columns);
+    let mut col_array_channels = Vec::with_capacity(num_columns);
+    for mut col_writer in col_writers.into_iter() {
+        let (send_array, mut receive_array) = 
tokio::sync::mpsc::channel::<ArrowLeafColumn>(100);
+        col_array_channels.push(send_array);
+        let handle = tokio::spawn(async move {
+            while let Some(col) = receive_array.recv().await {
+                col_writer.write(&col).unwrap();
+            }
+            col_writer.close().unwrap()
+        });
+        col_writer_tasks.push(handle);
+    }
+
+    // Send the ArrowLeafColumn data to the respective column writer channels
+    let mut worker_iter = col_array_channels.iter_mut();
+    for (array, field) in to_write.iter().zip(schema.fields()) {
+        for leaves in compute_leaves(field, array).unwrap() {
+            worker_iter.next().unwrap().send(leaves).await.unwrap();
+        }
+    }
+    drop(col_array_channels);
+
+    // Wait for all column writers to finish writing
+    let mut finalized_rg = Vec::with_capacity(num_columns);
+    for task in col_writer_tasks.into_iter() {
+        finalized_rg.push(task.await.unwrap());
+    }
+
+    // Append the finalized row group to the SerializedFileWriter
+    assert!(writer.append_to_row_groups(finalized_rg).is_ok());
+    assert!(writer.flush().is_ok());

Review Comment:
   There shouldn't be any need to flush, because there should be no in progress 
data in the writer, and the row group was already written in the line above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to