martin-g commented on code in PR #8069:
URL: https://github.com/apache/arrow-rs/pull/8069#discussion_r2532221739
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -1297,6 +1343,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels:
&ArrayLevels) -> Result<usi
write_primitive(typed, array.values(), levels)
}
},
+ ArrowDataType::RunEndEncoded(_run_ends, _values) => {
+ Err(ParquetError::NYI(
+ "Int64ColumnWriter: Attempting to write an Arrow REE
type that is not yet implemented"
Review Comment:
```suggestion
"Int64ColumnWriter: Attempting to write an Arrow
RunEndEncoded type that is not yet implemented"
```
nit: IMO it would be easier for the end user to debug/understand the error
if it does not use acronyms.
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -92,6 +114,20 @@ macro_rules! downcast_op {
}
d => unreachable!("cannot downcast {} dictionary value to byte
array", d),
},
+ DataType::RunEndEncoded(run_end, value) => match value.data_type()
{
+ DataType::Utf8 => downcast_ree_op!(run_end, StringArray,
$array, $op$(, $arg)*),
+ DataType::LargeUtf8 => {
+ downcast_ree_op!(run_end, LargeStringArray, $array, $op$(,
$arg)*)
+ }
Review Comment:
```suggestion
}
DataType::Utf8View => {
+ downcast_ree_op!(run_end, StringViewArray, $array,
$op$(, $arg)*)
+ }
```
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -92,6 +114,20 @@ macro_rules! downcast_op {
}
d => unreachable!("cannot downcast {} dictionary value to byte
array", d),
},
+ DataType::RunEndEncoded(run_end, value) => match value.data_type()
{
+ DataType::Utf8 => downcast_ree_op!(run_end, StringArray,
$array, $op$(, $arg)*),
+ DataType::LargeUtf8 => {
+ downcast_ree_op!(run_end, LargeStringArray, $array, $op$(,
$arg)*)
+ }
+ DataType::Binary => downcast_ree_op!(run_end, BinaryArray,
$array, $op$(, $arg)*),
Review Comment:
```suggestion
DataType::Binary => downcast_ree_op!(run_end, BinaryArray,
$array, $op$(, $arg)*),
DataType::BinaryView => {
+ downcast_ree_op!(run_end, BinaryViewArray, $array,
$op$(, $arg)*)
+ }
```
##########
parquet/src/arrow/schema/primitive.rs:
##########
@@ -102,6 +102,18 @@ fn apply_hint(parquet: DataType, hint: DataType) ->
DataType {
false => hinted,
}
}
+
+ // Potentially preserve run end encoded encoding
Review Comment:
```suggestion
// Potentially preserve run-end encoding
```
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4481,4 +4539,153 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ #[test]
+ fn arrow_writer_run_end_encoded_string() {
+ // Create a run array of strings
+ let mut builder = StringRunBuilder::<Int32Type>::new();
+ builder.extend(
+ vec![Some("alpha"); 100000]
+ .into_iter()
+ .chain(vec![Some("beta"); 100000]),
+ );
+ let run_array: RunArray<Int32Type> = builder.finish();
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "ree",
+ run_array.data_type().clone(),
+ run_array.is_nullable(),
+ )]));
+
+ // Write to parquet
+ let mut parquet_bytes: Vec<u8> = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut parquet_bytes,
schema.clone(), None).unwrap();
+ let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(run_array)]).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ // Read back and verify
+ let bytes = Bytes::from(parquet_bytes);
+ let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+
+ // Check if dictionary was used by examining the metadata
+ let metadata = reader.metadata();
+ let row_group = &metadata.row_groups()[0];
+ let col_meta = &row_group.columns()[0];
+
+ // If dictionary encoding worked, we should see RLE_DICTIONARY encoding
+ // and have a dictionary page offset
+ let has_dict_encoding = col_meta.encodings().any(|e| e ==
Encoding::RLE_DICTIONARY);
+ let has_dict_page = col_meta.dictionary_page_offset().is_some();
+
+ // Verify the schema is REE encoded when we read it back
+ let expected_schema = Arc::new(Schema::new(vec![Field::new(
+ "ree",
+ DataType::RunEndEncoded(
+ Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32,
false)),
+ Arc::new(Field::new("values", arrow_schema::DataType::Utf8,
true)),
+ ),
+ false,
+ )]));
+ assert_eq!(&expected_schema, reader.schema());
+
+ // Read the data back
+ let batches: Vec<_> = reader
+ .build()
+ .unwrap()
+ .collect::<ArrowResult<Vec<_>>>()
+ .unwrap();
+ assert_eq!(batches.len(), 196);
+ // Count rows in total
+ let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
+ assert_eq!(total_rows, 200000);
+
+ // Ensure dictionary encoding
+ assert!(has_dict_encoding, "RunArray should be dictionary encoded");
+ assert!(has_dict_page, "RunArray should have dictionary page");
+ }
+
+ #[test]
+ fn arrow_writer_run_end_encoded_int() {
+ // Create a run array of strings
+ let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
+ builder.extend(
+ vec![Some(1); 100000]
+ .into_iter()
+ .chain(vec![Some(2); 100000]),
+ );
+ let run_array: RunArray<Int32Type> = builder.finish();
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "ree",
+ run_array.data_type().clone(),
+ run_array.is_nullable(),
+ )]));
+
+ // Write to parquet
+ let mut parquet_bytes: Vec<u8> = Vec::new();
+ let mut writer = ArrowWriter::try_new(&mut parquet_bytes,
schema.clone(), None).unwrap();
+ let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(run_array)]).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ // Read back and verify
+ let bytes = Bytes::from(parquet_bytes);
+ let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+
+ // Check if dictionary was used by examining the metadata
+ let metadata = reader.metadata();
+ let row_group = &metadata.row_groups()[0];
+ let col_meta = &row_group.columns()[0];
+ let has_dict_encoding = col_meta.encodings().any(|e| e ==
Encoding::RLE_DICTIONARY);
+
+ // If dictionary encoding worked, we should see RLE_DICTIONARY encoding
+ // and have a dictionary page offset
+ // let has_dict_encoding =
col_meta.encodings().contains(&Encoding::RLE_DICTIONARY);
+ let has_dict_page = col_meta.dictionary_page_offset().is_some();
+
+ // Verify the schema is REE encoded when we read it back
+ let expected_schema = Arc::new(Schema::new(vec![Field::new(
+ "ree",
+ DataType::RunEndEncoded(
+ Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32,
false)),
+ Arc::new(Field::new("values", arrow_schema::DataType::Int32,
true)),
+ ),
+ false,
+ )]));
+ assert_eq!(&expected_schema, reader.schema());
+
+ // Read the data back
+ let batches: Vec<_> = reader
+ .build()
+ .unwrap()
+ .collect::<ArrowResult<Vec<_>>>()
+ .unwrap();
+ assert_eq!(batches.len(), 196);
+ // Count rows in total
+ let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
+ assert_eq!(total_rows, 200000);
+
+ // Ensure dictionary encoding
+ assert!(has_dict_encoding, "RunArray should be dictionary encoded");
+ assert!(has_dict_page, "RunArray should have dictionary page");
+ }
+
+ #[test]
+ fn arrow_writer_round_trip_run_end_encoded_string() {
+ // Create a run array of strings (cannot have more than 1024 values
per record batch)
+ let mut builder = StringRunBuilder::<Int32Type>::new();
+ builder.extend(
+ vec![Some("alpha"); 512]
+ .into_iter()
+ .chain(vec![Some("beta"); 512]),
+ );
+ let run_array: RunArray<Int32Type> = builder.finish();
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "ree",
+ run_array.data_type().clone(),
+ run_array.is_nullable(),
+ )]));
+
+ let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(run_array)]).unwrap();
+ roundtrip(batch, None);
+ }
Review Comment:
Tests for decimal REE arrays are missing
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -1371,6 +1423,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels:
&ArrayLevels) -> Result<usi
let array = column.as_primitive::<Float16Type>();
get_float_16_array_slice(array, indices)
}
+ ArrowDataType::RunEndEncoded(_run_ends, _values) => {
+ return Err(ParquetError::NYI(
+ "FixedLenByteArrayColumnWriter: Attempting to write an
Arrow REE type that is not yet implemented"
Review Comment:
```suggestion
"FixedLenByteArrayColumnWriter: Attempting to write
an Arrow RunEndEncoded type that is not yet implemented"
```
nit: IMO it would be easier for the end user to debug/understand the error
if it does not use acronyms.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]