etseidl commented on code in PR #6649:
URL: https://github.com/apache/arrow-rs/pull/6649#discussion_r1823063529
##########
parquet/src/record/reader.rs:
##########
@@ -138,7 +138,15 @@ impl TreeBuilder {
.column_descr_ptr();
let col_reader = row_group_reader.get_column_reader(orig_index)?;
let column = TripletIter::new(col_descr, col_reader,
self.batch_size);
- Reader::PrimitiveReader(field, Box::new(column))
+ let reader = Reader::PrimitiveReader(field.clone(),
Box::new(column));
+ if repetition == Repetition::REPEATED && path.len() == 1 {
+ if curr_def_level != 1 || curr_rep_level != 1 {
+ return Err(ParquetError::General(format!("Top level
REPEATED primitve field {} should have definition and repetition levels of 1",
field.name())));
Review Comment:
I just did a test of
```
message spark_schema {
REPEATED group my_struct {
REQUIRED INT32 key;
OPTIONAL INT32 value;
}
}
```
and this also worked without the patch in this PR. A second test with
```
message spark_schema {
REPEATED INT32 key;
REPEATED INT32 value;
}
```
fails without the patch.
<details>
<summary>test code</summary>
```rust
#[test]
fn test_repeated_struct_no_annotation() {
let schema = "
message spark_schema {
REPEATED group my_struct {
REQUIRED INT32 key;
OPTIONAL INT32 value;
}
}
";
let schema = Arc::new(parse_message_type(schema).unwrap());
// Write Parquet file to buffer
//let mut file =
std::fs::File::create("/Users/seidl/test_struct.pq").unwrap();
let mut buffer: Vec<u8> = Vec::new();
let mut file_writer =
SerializedFileWriter::new(&mut buffer, schema,
Default::default()).unwrap();
let mut row_group_writer = file_writer.next_row_group().unwrap();
// Write column my_struct.key
let mut column_writer =
row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();
// Write column my_struct.value
let mut column_writer =
row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[42, 7, 6, 99, 100, 2, 7, 9],
Some(&[2, 2, 2, 2, 2, 2, 2, 1, 2]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();
// Finalize Parquet file
row_group_writer.close().unwrap();
file_writer.close().unwrap();
assert_eq!(&buffer[0..4], b"PAR1");
// Read Parquet file from buffer
let file_reader =
SerializedFileReader::new(Bytes::from(buffer)).unwrap();
let rows: Vec<_> = file_reader
.get_row_iter(None)
.unwrap()
.map(|row| row.unwrap())
.collect();
let expected_rows = vec![
row![(
"my_struct".to_string(),
list![
group![
("key".to_string(), Field::Int(1)),
("value".to_string(), Field::Int(42))
],
group![
("key".to_string(), Field::Int(2)),
("value".to_string(), Field::Int(7))
],
group![
("key".to_string(), Field::Int(3)),
("value".to_string(), Field::Int(6))
]
]
)],
row![(
"my_struct".to_string(),
list![
group![
("key".to_string(), Field::Int(4)),
("value".to_string(), Field::Int(99))
],
group![
("key".to_string(), Field::Int(5)),
("value".to_string(), Field::Int(100))
],
group![
("key".to_string(), Field::Int(6)),
("value".to_string(), Field::Int(2))
]
]
)],
row![(
"my_struct".to_string(),
list![
group![
("key".to_string(), Field::Int(7)),
("value".to_string(), Field::Int(7))
],
group![
("key".to_string(), Field::Int(8)),
("value".to_string(), Field::Null)
],
group![
("key".to_string(), Field::Int(9)),
("value".to_string(), Field::Int(9))
]
]
)],
];
assert_eq!(rows, expected_rows);
}
#[test]
fn test_repeated_primitive_no_annotation() {
let schema = "
message spark_schema {
REPEATED INT32 key;
REPEATED INT32 value;
}
";
let schema = Arc::new(parse_message_type(schema).unwrap());
// Write Parquet file to buffer
//let mut file =
std::fs::File::create("/Users/seidl/test_struct.pq").unwrap();
let mut buffer: Vec<u8> = Vec::new();
let mut file_writer =
SerializedFileWriter::new(&mut buffer, schema,
Default::default()).unwrap();
let mut row_group_writer = file_writer.next_row_group().unwrap();
// Write column my_struct.key
let mut column_writer =
row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[1, 2, 3, 4, 5, 6, 7, 8, 9],
Some(&[1, 1, 1, 1, 1, 1, 1, 1, 1]),
Some(&[0, 1, 1, 0, 1, 1, 0, 1, 1]),
)
.unwrap();
column_writer.close().unwrap();
// Write column my_struct.value
let mut column_writer =
row_group_writer.next_column().unwrap().unwrap();
column_writer
.typed::<Int32Type>()
.write_batch(
&[42, 7, 6, 99, 100, 2],
Some(&[1, 1, 1, 1, 1, 1, 0]),
Some(&[0, 1, 1, 0, 1, 1, 0]),
)
.unwrap();
column_writer.close().unwrap();
// Finalize Parquet file
row_group_writer.close().unwrap();
file_writer.close().unwrap();
assert_eq!(&buffer[0..4], b"PAR1");
// Read Parquet file from buffer
let file_reader =
SerializedFileReader::new(Bytes::from(buffer)).unwrap();
let rows: Vec<_> = file_reader
.get_row_iter(None)
.unwrap()
.map(|row| row.unwrap())
.collect();
let expected_rows = vec![
row![
(
"key".to_string(),
list![Field::Int(1), Field::Int(2), Field::Int(3)]
),
(
"value".to_string(),
list![Field::Int(42), Field::Int(7), Field::Int(6)]
),
],
row![
(
"key".to_string(),
list![Field::Int(4), Field::Int(5), Field::Int(6)]
),
(
"value".to_string(),
list![Field::Int(99), Field::Int(100), Field::Int(2)]
),
],
row![
(
"key".to_string(),
list![Field::Int(7), Field::Int(8), Field::Int(9)]
),
(
"value".to_string(),
list![]
),
],
];
assert_eq!(rows, expected_rows);
}
```
</details>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]