vegarsti commented on code in PR #8069:
URL: https://github.com/apache/arrow-rs/pull/8069#discussion_r2594649992


##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4481,4 +4624,408 @@ mod tests {
         assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
         assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
     }
+
+    #[test]
+    fn arrow_writer_run_end_encoded_string() {
+        // Create a run array of strings
+        let mut builder = StringRunBuilder::<Int32Type>::new();
+        builder.extend(
+            vec![Some("alpha"); 100000]
+                .into_iter()
+                .chain(vec![Some("beta"); 100000]),
+        );
+        let run_array: RunArray<Int32Type> = builder.finish();
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            run_array.data_type().clone(),
+            run_array.is_nullable(),
+        )]));
+
+        // Write to parquet
+        let mut parquet_bytes: Vec<u8> = Vec::new();
+        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, 
schema.clone(), None).unwrap();
+        let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(run_array)]).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        // Read back and verify
+        let bytes = Bytes::from(parquet_bytes);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+
+        // Check if dictionary was used by examining the metadata
+        let metadata = reader.metadata();
+        let row_group = &metadata.row_groups()[0];
+        let col_meta = &row_group.columns()[0];
+
+        // If dictionary encoding worked, we should see RLE_DICTIONARY encoding
+        // and have a dictionary page offset
+        let has_dict_encoding = col_meta.encodings().any(|e| e == 
Encoding::RLE_DICTIONARY);
+        let has_dict_page = col_meta.dictionary_page_offset().is_some();
+
+        // Verify the schema is REE encoded when we read it back
+        let expected_schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            DataType::RunEndEncoded(
+                Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, 
false)),
+                Arc::new(Field::new("values", arrow_schema::DataType::Utf8, 
true)),
+            ),
+            false,
+        )]));
+        assert_eq!(&expected_schema, reader.schema());
+
+        // Read the data back
+        let batches: Vec<_> = reader
+            .build()
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+        assert_eq!(batches.len(), 196);
+        // Count rows in total
+        let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
+        assert_eq!(total_rows, 200000);
+
+        // Ensure dictionary encoding
+        assert!(has_dict_encoding, "RunArray should be dictionary encoded");
+        assert!(has_dict_page, "RunArray should have dictionary page");
+    }
+
+    #[test]
+    fn arrow_writer_run_end_encoded_int() {
+        // Create a run array of strings
+        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
+        builder.extend(
+            vec![Some(1); 100000]
+                .into_iter()
+                .chain(vec![Some(2); 100000]),
+        );
+        let run_array: RunArray<Int32Type> = builder.finish();
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            run_array.data_type().clone(),
+            run_array.is_nullable(),
+        )]));
+
+        // Write to parquet
+        let mut parquet_bytes: Vec<u8> = Vec::new();
+        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, 
schema.clone(), None).unwrap();
+        let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(run_array)]).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        // Read back and verify
+        let bytes = Bytes::from(parquet_bytes);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+
+        // Check if dictionary was used by examining the metadata
+        let metadata = reader.metadata();
+        let row_group = &metadata.row_groups()[0];
+        let col_meta = &row_group.columns()[0];
+
+        // If dictionary encoding worked, we should see RLE_DICTIONARY encoding
+        // and have a dictionary page offset
+        let has_dict_encoding = col_meta.encodings().any(|e| e == 
Encoding::RLE_DICTIONARY);
+        let has_dict_page = col_meta.dictionary_page_offset().is_some();
+
+        // Verify the schema is REE encoded when we read it back
+        let expected_schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            DataType::RunEndEncoded(
+                Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, 
false)),
+                Arc::new(Field::new("values", arrow_schema::DataType::Int32, 
true)),
+            ),
+            false,
+        )]));
+        assert_eq!(&expected_schema, reader.schema());
+
+        // Read the data back
+        let batches: Vec<_> = reader
+            .build()
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+        assert_eq!(batches.len(), 196);
+        // Count rows in total
+        let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
+        assert_eq!(total_rows, 200000);
+
+        // Ensure dictionary encoding
+        assert!(has_dict_encoding, "RunArray should be dictionary encoded");
+        assert!(has_dict_page, "RunArray should have dictionary page");
+    }
+
+    #[test]
+    fn arrow_writer_round_trip_run_end_encoded_string() {
+        // Create a run array of strings (cannot have more than 1024 values 
per record batch)
+        let mut builder = StringRunBuilder::<Int32Type>::new();
+        builder.extend(
+            vec![Some("alpha"); 512]
+                .into_iter()
+                .chain(vec![Some("beta"); 512]),
+        );
+        let run_array: RunArray<Int32Type> = builder.finish();
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            run_array.data_type().clone(),
+            run_array.is_nullable(),
+        )]));
+
+        let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(run_array)]).unwrap();
+        roundtrip(batch, None);
+    }
+
+    #[test]
+    fn arrow_writer_run_end_encoded_decimal32() {
+        // Create a run array of Decimal32 values
+        let mut builder = PrimitiveRunBuilder::<Int32Type, 
Decimal32Type>::new();
+        builder.extend(
+            vec![Some(12345i32); 100000]
+                .into_iter()
+                .chain(vec![Some(56789i32); 100000]),
+        );
+        let run_array: RunArray<Int32Type> = builder.finish();
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            run_array.data_type().clone(),
+            run_array.is_nullable(),
+        )]));
+
+        // Write to parquet
+        let mut parquet_bytes: Vec<u8> = Vec::new();
+        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, 
schema.clone(), None).unwrap();
+        let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(run_array)]).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        // Read back and verify
+        let bytes = Bytes::from(parquet_bytes);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+
+        // Check if dictionary was used by examining the metadata
+        let metadata = reader.metadata();
+        let row_group = &metadata.row_groups()[0];
+        let col_meta = &row_group.columns()[0];
+        let has_dict_encoding = col_meta.encodings().any(|e| e == 
Encoding::RLE_DICTIONARY);
+        let has_dict_page = col_meta.dictionary_page_offset().is_some();
+
+        // Verify the schema is REE encoded when we read it back
+        let expected_schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            DataType::RunEndEncoded(
+                Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, 
false)),
+                Arc::new(Field::new(
+                    "values",
+                    arrow_schema::DataType::Decimal32(9, 2),
+                    true,
+                )),
+            ),
+            false,
+        )]));
+        assert_eq!(&expected_schema, reader.schema());
+
+        // Read the data back
+        let batches: Vec<_> = reader
+            .build()
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+        assert_eq!(batches.len(), 196);
+        // Count rows in total
+        let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
+        assert_eq!(total_rows, 200000);
+
+        // Ensure dictionary encoding
+        assert!(has_dict_encoding, "RunArray should be dictionary encoded");
+        assert!(has_dict_page, "RunArray should have dictionary page");
+    }
+
+    #[test]
+    fn arrow_writer_run_end_encoded_decimal64() {
+        // Create a run array of Decimal64 values
+        let mut builder = PrimitiveRunBuilder::<Int32Type, 
Decimal64Type>::new();
+        builder.extend(
+            vec![Some(12345i64); 100000]
+                .into_iter()
+                .chain(vec![Some(56789i64); 100000]),
+        );
+        let run_array: RunArray<Int32Type> = builder.finish();
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            run_array.data_type().clone(),
+            run_array.is_nullable(),
+        )]));
+
+        // Write to parquet
+        let mut parquet_bytes: Vec<u8> = Vec::new();
+        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, 
schema.clone(), None).unwrap();
+        let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(run_array)]).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        // Read back and verify
+        let bytes = Bytes::from(parquet_bytes);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+
+        // Check if dictionary was used by examining the metadata
+        let metadata = reader.metadata();
+        let row_group = &metadata.row_groups()[0];
+        let col_meta = &row_group.columns()[0];
+        let has_dict_encoding = col_meta.encodings().any(|e| e == 
Encoding::RLE_DICTIONARY);
+        let has_dict_page = col_meta.dictionary_page_offset().is_some();
+
+        // Verify the schema is REE encoded when we read it back
+        let expected_schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            DataType::RunEndEncoded(
+                Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, 
false)),
+                Arc::new(Field::new(
+                    "values",
+                    arrow_schema::DataType::Decimal64(18, 6),
+                    true,
+                )),
+            ),
+            false,
+        )]));
+        assert_eq!(&expected_schema, reader.schema());
+
+        // Read the data back
+        let batches: Vec<_> = reader
+            .build()
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+        assert_eq!(batches.len(), 196);
+        // Count rows in total
+        let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
+        assert_eq!(total_rows, 200000);
+
+        // Ensure dictionary encoding
+        assert!(has_dict_encoding, "RunArray should be dictionary encoded");
+        assert!(has_dict_page, "RunArray should have dictionary page");
+    }
+
+    #[test]
+    fn arrow_writer_run_end_encoded_decimal128() {
+        // Create a run array of Decimal128 values
+        let mut builder = PrimitiveRunBuilder::<Int32Type, 
Decimal128Type>::new();
+        builder.extend(
+            vec![Some(12345i128); 100000]
+                .into_iter()
+                .chain(vec![Some(56789i128); 100000]),
+        );
+        let run_array: RunArray<Int32Type> = builder.finish();
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "ree",
+            run_array.data_type().clone(),
+            run_array.is_nullable(),
+        )]));
+
+        // Write to parquet
+        let mut parquet_bytes: Vec<u8> = Vec::new();
+        let mut writer = ArrowWriter::try_new(&mut parquet_bytes, 
schema.clone(), None).unwrap();
+        let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(run_array)]).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        // Read back and verify
+        let bytes = Bytes::from(parquet_bytes);
+        let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
+
+        // Check if RLE encoding was used by examining the metadata
+        let metadata = reader.metadata();
+        let row_group = &metadata.row_groups()[0];
+        let col_meta = &row_group.columns()[0];
+        let has_rle_encoding = col_meta.encodings().any(|e| e == 
Encoding::RLE);

Review Comment:
   Note that Decimal128 and Decimal256 does not end up using RLE_DICTIONARY 
encoding, but rather the RLE encoding. I'm not sure why... They use the fixed 
length byte array writer rather than the byte array writer (which Decimal32 and 
Decimal64) uses, but I don't yet know why that writer produces RLE rather than 
RLE_DICTIONARY.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to