timsaucer commented on code in PR #1123: URL: https://github.com/apache/datafusion-python/pull/1123#discussion_r2145455758
########## python/datafusion/dataframe.py: ########## @@ -704,38 +694,135 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, - compression: Union[str, Compression] = Compression.ZSTD, - compression_level: int | None = None, + data_pagesize_limit: int = 1024 * 1024, + write_batch_size: int = 1024, + writer_version: str = "1.0", + skip_arrow_metadata: bool = False, + compression: Optional[str] = "zstd(3)", + dictionary_enabled: Optional[bool] = True, + dictionary_page_size_limit: int = 1024 * 1024, + statistics_enabled: Optional[str] = "page", + max_row_group_size: int = 1024 * 1024, + created_by: str = "datafusion-python", + column_index_truncate_length: Optional[int] = 64, + statistics_truncate_length: Optional[int] = None, + data_page_row_count_limit: int = 20_000, + encoding: Optional[str] = None, + bloom_filter_on_write: bool = False, + bloom_filter_fpp: Optional[float] = None, + bloom_filter_ndv: Optional[int] = None, + allow_single_file_parallelism: bool = True, + maximum_parallel_row_group_writers: int = 1, + maximum_buffered_record_batches_per_stream: int = 2, + column_specific_options: Optional[dict[str, ParquetColumnOptions]] = None, Review Comment: I think the full parquet writer options are going to be less commonly used for most users. I would suggest we want to have two different versions of the `write_parquet` signature, one that is the simple existing compression parameters and on that looks like ``` def write_parquet(self, path: str | pathlib.Path, options: ParquetWriterOptions) ``` And then we have a `ParquetWriterOptions` class that has this large initialize options. ########## python/tests/test_dataframe.py: ########## @@ -1560,40 +1588,323 @@ def test_write_compressed_parquet(df, tmp_path, compression, compression_level): @pytest.mark.parametrize( - ("compression", "compression_level"), - [("gzip", 12), ("brotli", 15), ("zstd", 23), ("wrong", 12)], + "compression", + ["gzip(12)", "brotli(15)", "zstd(23)"], ) -def test_write_compressed_parquet_wrong_compression_level( - df, tmp_path, compression, compression_level -): +def test_write_compressed_parquet_wrong_compression_level(df, tmp_path, compression): path = tmp_path - with pytest.raises(ValueError): - df.write_parquet( - str(path), - compression=compression, - compression_level=compression_level, - ) + with pytest.raises(Exception, match=r"valid compression range .*? exceeded."): + df.write_parquet(str(path), compression=compression) -@pytest.mark.parametrize("compression", ["wrong"]) +@pytest.mark.parametrize("compression", ["wrong", "wrong(12)"]) def test_write_compressed_parquet_invalid_compression(df, tmp_path, compression): path = tmp_path - with pytest.raises(ValueError): + with pytest.raises(Exception, match="Unknown or unsupported parquet compression"): df.write_parquet(str(path), compression=compression) -# not testing lzo because it it not implemented yet -# https://github.com/apache/arrow-rs/issues/6970 -@pytest.mark.parametrize("compression", ["zstd", "brotli", "gzip"]) -def test_write_compressed_parquet_default_compression_level(df, tmp_path, compression): - # Test write_parquet with zstd, brotli, gzip default compression level, - # ie don't specify compression level - # should complete without error - path = tmp_path +@pytest.mark.parametrize( + ("writer_version", "format_version"), + [("1.0", "1.0"), ("2.0", "2.6"), (None, "1.0")], +) +def test_write_parquet_writer_version(df, tmp_path, writer_version, format_version): + """Test the Parquet writer version. Note that writer_version=2.0 results in + format_version=2.6""" + if writer_version is None: + df.write_parquet(tmp_path) + else: + df.write_parquet(tmp_path, writer_version=writer_version) - df.write_parquet(str(path), compression=compression) + for file in tmp_path.rglob("*.parquet"): + parquet = pq.ParquetFile(file) + metadata = parquet.metadata.to_dict() + assert metadata["format_version"] == format_version + + +@pytest.mark.parametrize("writer_version", ["1.2.3", "custom-version", "0"]) +def test_write_parquet_wrong_writer_version(df, tmp_path, writer_version): + """Test that invalid writer versions in Parquet throw an exception.""" + with pytest.raises( + Exception, match="Unknown or unsupported parquet writer version" + ): + df.write_parquet(tmp_path, writer_version=writer_version) + + +@pytest.mark.parametrize("dictionary_enabled", [True, False, None]) +def test_write_parquet_dictionary_enabled(df, tmp_path, dictionary_enabled): + """Test enabling/disabling the dictionaries in Parquet.""" + df.write_parquet(tmp_path, dictionary_enabled=dictionary_enabled) + # by default, the dictionary is enabled, so None results in True + result = dictionary_enabled if dictionary_enabled is not None else True + + for file in tmp_path.rglob("*.parquet"): + parquet = pq.ParquetFile(file) + metadata = parquet.metadata.to_dict() + + for row_group in metadata["row_groups"]: + for col in row_group["columns"]: + assert col["has_dictionary_page"] == result + + +@pytest.mark.parametrize( + ("statistics_enabled", "has_statistics"), + [("page", True), ("chunk", True), ("none", False), (None, True)], +) +def test_write_parquet_statistics_enabled( + df, tmp_path, statistics_enabled, has_statistics +): + """Test configuring the statistics in Parquet. In pyarrow we can only check for + column-level statistics, so "page" and "chunk" are tested in the same way.""" + df.write_parquet(tmp_path, statistics_enabled=statistics_enabled) + + for file in tmp_path.rglob("*.parquet"): + parquet = pq.ParquetFile(file) + metadata = parquet.metadata.to_dict() + + for row_group in metadata["row_groups"]: + for col in row_group["columns"]: + if has_statistics: + assert col["statistics"] is not None + else: + assert col["statistics"] is None + + +@pytest.mark.parametrize("max_row_group_size", [1000, 5000, 10000, 100000]) +def test_write_parquet_max_row_group_size(large_df, tmp_path, max_row_group_size): + """Test configuring the max number of rows per group in Parquet. These test cases + guarantee that the number of rows for each row group is max_row_group_size, given + the total number of rows is a multiple of max_row_group_size.""" + large_df.write_parquet(tmp_path, max_row_group_size=max_row_group_size) + + for file in tmp_path.rglob("*.parquet"): + parquet = pq.ParquetFile(file) + metadata = parquet.metadata.to_dict() + for row_group in metadata["row_groups"]: + assert row_group["num_rows"] == max_row_group_size + + +@pytest.mark.parametrize("created_by", ["datafusion", "datafusion-python", "custom"]) +def test_write_parquet_created_by(df, tmp_path, created_by): + """Test configuring the created by metadata in Parquet.""" + df.write_parquet(tmp_path, created_by=created_by) + + for file in tmp_path.rglob("*.parquet"): + parquet = pq.ParquetFile(file) + metadata = parquet.metadata.to_dict() + assert metadata["created_by"] == created_by + + +@pytest.mark.parametrize("statistics_truncate_length", [5, 25, 50]) +def test_write_parquet_statistics_truncate_length( + df, tmp_path, statistics_truncate_length +): + """Test configuring the truncate limit in Parquet's row-group-level statistics.""" + ctx = SessionContext() + data = { + "a": [ + "a_the_quick_brown_fox_jumps_over_the_lazy_dog", + "m_the_quick_brown_fox_jumps_over_the_lazy_dog", + "z_the_quick_brown_fox_jumps_over_the_lazy_dog", + ], + "b": ["a_smaller", "m_smaller", "z_smaller"], + } + df = ctx.from_arrow(pa.record_batch(data)) + df.write_parquet(tmp_path, statistics_truncate_length=statistics_truncate_length) + + for file in tmp_path.rglob("*.parquet"): + parquet = pq.ParquetFile(file) + metadata = parquet.metadata.to_dict() + + for row_group in metadata["row_groups"]: + for col in row_group["columns"]: + statistics = col["statistics"] + assert len(statistics["min"]) <= statistics_truncate_length + assert len(statistics["max"]) <= statistics_truncate_length + + +def test_write_parquet_default_encoding(tmp_path): + """Test that, by default, Parquet files are written with dictionary encoding. + Note that dictionary encoding is not used for boolean values, so it is not tested + here.""" + ctx = SessionContext() + data = { + "a": [1, 2, 3], + "b": ["1", "2", "3"], + "c": [1.01, 2.02, 3.03], + } + df = ctx.from_arrow(pa.record_batch(data)) + df.write_parquet(tmp_path) + + for file in tmp_path.rglob("*.parquet"): + parquet = pq.ParquetFile(file) + metadata = parquet.metadata.to_dict() + + for row_group in metadata["row_groups"]: + for col in row_group["columns"]: + assert col["encodings"] == ("PLAIN", "RLE", "RLE_DICTIONARY") + + +@pytest.mark.parametrize( + ("encoding", "data_types", "result"), + [ + ("plain", ["int", "float", "str", "bool"], ("PLAIN", "RLE")), + ("rle", ["bool"], ("RLE",)), + ("delta_binary_packed", ["int"], ("RLE", "DELTA_BINARY_PACKED")), + ("delta_length_byte_array", ["str"], ("RLE", "DELTA_LENGTH_BYTE_ARRAY")), + ("delta_byte_array", ["str"], ("RLE", "DELTA_BYTE_ARRAY")), + ("byte_stream_split", ["int", "float"], ("RLE", "BYTE_STREAM_SPLIT")), + ], +) +def test_write_parquet_encoding(tmp_path, encoding, data_types, result): + """Test different encodings in Parquet in their respective support column types.""" + ctx = SessionContext() + + data = {} + for data_type in data_types: + match data_type: + case "int": + data["int"] = [1, 2, 3] + case "float": + data["float"] = [1.01, 2.02, 3.03] + case "str": + data["str"] = ["a", "b", "c"] + case "bool": + data["bool"] = [True, False, True] Review Comment: This code failed for me because our minimum supported python is 3.9 and match was introduced in 3.10. 3.9 doesn't reach end of life until Oct 2025. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org