mbutrovich commented on code in PR #2584:
URL: https://github.com/apache/iceberg-rust/pull/2584#discussion_r3414612059
##########
crates/iceberg/src/arrow/reader/pipeline.rs:
##########
@@ -638,6 +675,222 @@ mod tests {
}
}
+ fn write_encrypted_parquet(
+ path: &str,
+ batch: &RecordBatch,
+ key: &[u8],
+ aad_prefix: Option<&[u8]>,
+ ) {
+ let mut builder = FileEncryptionProperties::builder(key.to_vec());
+ if let Some(aad) = aad_prefix {
+ builder = builder.with_aad_prefix(aad.to_vec());
+ }
+ let encryption_properties = builder.build().unwrap();
+
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .with_file_encryption_properties(encryption_properties)
+ .build();
+
+ let file = File::create(path).unwrap();
+ let mut writer = ArrowWriter::try_new(file, batch.schema(),
Some(props)).unwrap();
+ writer.write(batch).expect("Writing batch");
+ writer.close().unwrap();
+ }
+
+ #[tokio::test]
+ async fn test_read_encrypted_parquet() {
+ let encryption_key = b"0123456789abcdef";
+ let aad_prefix = b"my-table-uuid!!";
+
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io = FileIO::new_with_fs();
+
+ let id_data = Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef;
+ let batch = RecordBatch::try_new(arrow_schema.clone(),
vec![id_data]).unwrap();
+
+ let file_path = format!("{table_location}/encrypted.parquet");
+ write_encrypted_parquet(&file_path, &batch, encryption_key,
Some(aad_prefix));
+
+ let key_metadata =
crate::encryption::StandardKeyMetadata::new(encryption_key)
+ .with_aad_prefix(aad_prefix)
+ .encode()
+ .unwrap();
+
+ let reader = ArrowReaderBuilder::new(file_io,
Runtime::current()).build();
+
+ let task = FileScanTask::builder()
+
.with_file_size_in_bytes(std::fs::metadata(&file_path).unwrap().len())
+ .with_start(0)
+ .with_length(0)
+ .with_data_file_path(file_path)
+ .with_data_file_format(DataFileFormat::Parquet)
+ .with_schema(schema)
+ .with_project_field_ids(vec![1])
+ .with_case_sensitive(false)
+ .with_key_metadata(Some(key_metadata))
+ .build();
+
+ let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
+ let batches: Vec<RecordBatch> = reader
+ .read(tasks)
+ .unwrap()
+ .stream()
+ .try_collect()
+ .await
+ .unwrap();
+
+ assert_eq!(batches.len(), 1);
+ let ids = batches[0]
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(ids.values(), &[10, 20, 30]);
+ }
+
+ #[tokio::test]
+ async fn test_read_encrypted_parquet_without_key_metadata_fails() {
+ let encryption_key = b"0123456789abcdef";
+
+ let schema = Arc::new(
+ Schema::builder()
+ .with_schema_id(1)
+ .with_fields(vec![
+ NestedField::required(1, "id",
Type::Primitive(PrimitiveType::Int)).into(),
+ ])
+ .build()
+ .unwrap(),
+ );
+
+ let arrow_schema = Arc::new(ArrowSchema::new(vec![
+ Field::new("id", DataType::Int32,
false).with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ ]));
+
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().to_str().unwrap().to_string();
+ let file_io = FileIO::new_with_fs();
+
+ let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;
+ let batch = RecordBatch::try_new(arrow_schema.clone(),
vec![id_data]).unwrap();
+
+ let file_path = format!("{table_location}/encrypted_no_key.parquet");
+ write_encrypted_parquet(&file_path, &batch, encryption_key, None);
+
+ let reader = ArrowReaderBuilder::new(file_io,
Runtime::current()).build();
+
+ let task = FileScanTask::builder()
+
.with_file_size_in_bytes(std::fs::metadata(&file_path).unwrap().len())
+ .with_start(0)
+ .with_length(0)
+ .with_data_file_path(file_path)
+ .with_data_file_format(DataFileFormat::Parquet)
+ .with_schema(schema)
+ .with_project_field_ids(vec![1])
+ .with_case_sensitive(false)
+ .build();
+
+ let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as
FileScanTaskStream;
+ let result: Result<Vec<RecordBatch>, _> =
+ reader.read(tasks).unwrap().stream().try_collect().await;
+
+ let err = result.unwrap_err();
+ assert_eq!(err.kind(), crate::ErrorKind::Unexpected);
+ let err_str = format!("{err}");
+ assert!(
+ err_str.contains("encrypted footer"),
Review Comment:
We would need to update this if `parquet` ever words that error differently.
Nothing worth changing, just noting it as a future maintenance spot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]