Fokko commented on code in PR #7831:
URL: https://github.com/apache/iceberg/pull/7831#discussion_r1267878112
##########
python/tests/io/test_pyarrow.py:
##########
@@ -1330,3 +1342,411 @@ def test_pyarrow_wrap_fsspec(example_task:
FileScanTask, table_schema_simple: Sc
bar: [[1,2,3]]
baz: [[true,false,null]]"""
)
+
+
+def construct_test_table() -> pa.Buffer:
+ table_metadata = {
+ "format-version": 2,
+ "location": "s3://bucket/test/location",
+ "last-column-id": 7,
+ "current-schema-id": 0,
+ "schemas": [
+ {
+ "type": "struct",
+ "schema-id": 0,
+ "fields": [
+ {"id": 1, "name": "strings", "required": False, "type":
"string"},
+ {"id": 2, "name": "floats", "required": False, "type":
"float"},
+ {
+ "id": 3,
+ "name": "list",
+ "required": False,
+ "type": {"type": "list", "element-id": 5, "element":
"long", "element-required": False},
+ },
+ {
+ "id": 4,
+ "name": "maps",
+ "required": False,
+ "type": {
+ "type": "map",
+ "key-id": 6,
+ "key": "long",
+ "value-id": 7,
+ "value": "long",
+ "value-required": False,
+ },
+ },
+ ],
+ },
+ ],
+ "default-spec-id": 0,
+ "partition-specs": [{"spec-id": 0, "fields": []}],
+ "properties": {},
+ }
+
+ table_metadata = TableMetadataUtil.parse_obj(table_metadata)
+ arrow_schema = schema_to_pyarrow(table_metadata.schemas[0])
+
+ _strings = ["zzzzzzzzzzzzzzzzzzzz", "rrrrrrrrrrrrrrrrrrrr", None,
"aaaaaaaaaaaaaaaaaaaa"]
+
+ _floats = [3.14, math.nan, 1.69, 100]
+
+ _list = [[1, 2, 3], [4, 5, 6], None, [7, 8, 9]]
+
+ _maps: List[Optional[Dict[int, int]]] = [
+ {1: 2, 3: 4},
+ None,
+ {5: 6},
+ {},
+ ]
+
+ table = pa.Table.from_pydict(
+ {
+ "strings": _strings,
+ "floats": _floats,
+ "list": _list,
+ "maps": _maps,
+ },
+ schema=arrow_schema,
+ )
+ f = pa.BufferOutputStream()
+
+ metadata_collector: List[Any] = []
+ writer = pq.ParquetWriter(f, table.schema,
metadata_collector=metadata_collector)
+
+ writer.write_table(table)
+ writer.close()
+
+ return f.getvalue(), metadata_collector[0], table_metadata
+
+
+def test_record_count() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ fill_parquet_file_metadata(datafile, metadata, len(file_bytes),
table_metadata)
+
+ assert datafile.record_count == 4
+
+
+def test_file_size() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ fill_parquet_file_metadata(datafile, metadata, len(file_bytes),
table_metadata)
+
+ assert datafile.file_size_in_bytes == len(file_bytes)
+
+
+def test_value_counts() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ fill_parquet_file_metadata(datafile, metadata, len(file_bytes),
table_metadata)
+
+ assert len(datafile.value_counts) == 5
+ assert datafile.value_counts[1] == 4
+ assert datafile.value_counts[2] == 4
+ assert datafile.value_counts[5] == 10 # 3 lists with 3 items and a None
value
+ assert datafile.value_counts[6] == 5
+ assert datafile.value_counts[7] == 5
+
+
+def test_column_sizes() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ fill_parquet_file_metadata(datafile, metadata, len(file_bytes),
table_metadata)
+
+ assert len(datafile.column_sizes) == 5
+ # these values are an artifact of how the write_table encodes the columns
+ assert datafile.column_sizes[1] == 116
+ assert datafile.column_sizes[2] == 89
+ assert datafile.column_sizes[5] == 151
+ assert datafile.column_sizes[6] == 117
+ assert datafile.column_sizes[7] == 117
+
+
+def test_null_and_nan_counts() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ fill_parquet_file_metadata(datafile, metadata, len(file_bytes),
table_metadata)
+
+ assert len(datafile.null_value_counts) == 5
+ assert datafile.null_value_counts[1] == 1
+ assert datafile.null_value_counts[2] == 0
+ assert datafile.null_value_counts[5] == 1
+ assert datafile.null_value_counts[6] == 2
+ assert datafile.null_value_counts[7] == 2
+
+ # #arrow does not include this in the statistics
+ # assert len(datafile.nan_value_counts) == 3
+ # assert datafile.nan_value_counts[1] == 0
+ # assert datafile.nan_value_counts[2] == 1
+ # assert datafile.nan_value_counts[3] == 0
+
+
+def test_bounds() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ fill_parquet_file_metadata(datafile, metadata, len(file_bytes),
table_metadata)
+
+ assert len(datafile.lower_bounds) == 5
+ assert datafile.lower_bounds[1].decode() ==
"aaaaaaaaaaaaaaaaaaaa"[:DEFAULT_TRUNCATION_LENGHT]
+ assert datafile.lower_bounds[2] == STRUCT_FLOAT.pack(1.69)
+ assert datafile.lower_bounds[5] == STRUCT_INT64.pack(1)
+ assert datafile.lower_bounds[6] == STRUCT_INT64.pack(1)
+ assert datafile.lower_bounds[7] == STRUCT_INT64.pack(2)
+
+ assert len(datafile.upper_bounds) == 5
+ assert datafile.upper_bounds[1].decode() ==
"zzzzzzzzzzzzzzzzzzzz"[:DEFAULT_TRUNCATION_LENGHT]
+ assert datafile.upper_bounds[2] == STRUCT_FLOAT.pack(100)
+ assert datafile.upper_bounds[5] == STRUCT_INT64.pack(9)
+ assert datafile.upper_bounds[6] == STRUCT_INT64.pack(5)
+ assert datafile.upper_bounds[7] == STRUCT_INT64.pack(6)
+
+
+def test_metrics_mode_parsing() -> None:
+ assert match_metrics_mode("none") == MetricsMode(MetricModeTypes.NONE)
+ assert match_metrics_mode("nOnE") == MetricsMode(MetricModeTypes.NONE)
+ assert match_metrics_mode("counts") == MetricsMode(MetricModeTypes.COUNTS)
+ assert match_metrics_mode("Counts") == MetricsMode(MetricModeTypes.COUNTS)
+ assert match_metrics_mode("full") == MetricsMode(MetricModeTypes.FULL)
+ assert match_metrics_mode("FuLl") == MetricsMode(MetricModeTypes.FULL)
+
+ with pytest.raises(AssertionError) as exc_info:
+ match_metrics_mode(" Full")
+ assert "Unsupported metrics mode Full" in str(exc_info.value)
+
+ assert match_metrics_mode("truncate(16)") ==
MetricsMode(MetricModeTypes.TRUNCATE, 16)
+ assert match_metrics_mode("trUncatE(16)") ==
MetricsMode(MetricModeTypes.TRUNCATE, 16)
+ assert match_metrics_mode("trUncatE(7)") ==
MetricsMode(MetricModeTypes.TRUNCATE, 7)
+ assert match_metrics_mode("trUncatE(07)") ==
MetricsMode(MetricModeTypes.TRUNCATE, 7)
+
+ with pytest.raises(AssertionError) as exc_info:
+ match_metrics_mode("trUncatE(-7)")
+ assert "Unsupported metrics mode trUncatE(-7)" in str(exc_info.value)
+
+ with pytest.raises(AssertionError) as exc_info:
+ match_metrics_mode("trUncatE(0)")
+ assert "Truncation length must be larger than 0" in str(exc_info.value)
+
+
+def test_metrics_mode_none() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ table_metadata.properties["write.metadata.metrics.default"] = "none"
+ fill_parquet_file_metadata(
+ datafile,
+ metadata,
+ len(file_bytes),
+ table_metadata,
+ )
+
+ assert len(datafile.value_counts) == 0
+ assert len(datafile.null_value_counts) == 0
+ assert len(datafile.nan_value_counts) == 0
+ assert len(datafile.lower_bounds) == 0
+ assert len(datafile.upper_bounds) == 0
+
+
+def test_metrics_mode_counts() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ table_metadata.properties["write.metadata.metrics.default"] = "counts"
+ fill_parquet_file_metadata(
+ datafile,
+ metadata,
+ len(file_bytes),
+ table_metadata,
+ )
+
+ assert len(datafile.value_counts) == 5
+ assert len(datafile.null_value_counts) == 5
+ assert len(datafile.nan_value_counts) == 0
+ assert len(datafile.lower_bounds) == 0
+ assert len(datafile.upper_bounds) == 0
+
+
+def test_metrics_mode_full() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ table_metadata.properties["write.metadata.metrics.default"] = "full"
+ fill_parquet_file_metadata(
+ datafile,
+ metadata,
+ len(file_bytes),
+ table_metadata,
+ )
+
+ assert len(datafile.value_counts) == 5
+ assert len(datafile.null_value_counts) == 5
+ assert len(datafile.nan_value_counts) == 0
+
+ assert len(datafile.lower_bounds) == 5
+ assert datafile.lower_bounds[1].decode() == "aaaaaaaaaaaaaaaaaaaa"
+ assert datafile.lower_bounds[2] == STRUCT_FLOAT.pack(1.69)
+ assert datafile.lower_bounds[5] == STRUCT_INT64.pack(1)
+ assert datafile.lower_bounds[6] == STRUCT_INT64.pack(1)
+ assert datafile.lower_bounds[7] == STRUCT_INT64.pack(2)
+
+ assert len(datafile.upper_bounds) == 5
+ assert datafile.upper_bounds[1].decode() == "zzzzzzzzzzzzzzzzzzzz"
+ assert datafile.upper_bounds[2] == STRUCT_FLOAT.pack(100)
+ assert datafile.upper_bounds[5] == STRUCT_INT64.pack(9)
+ assert datafile.upper_bounds[6] == STRUCT_INT64.pack(5)
+ assert datafile.upper_bounds[7] == STRUCT_INT64.pack(6)
+
+
+def test_metrics_mode_non_default_trunc() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)"
+ fill_parquet_file_metadata(
+ datafile,
+ metadata,
+ len(file_bytes),
+ table_metadata,
+ )
+
+ assert len(datafile.value_counts) == 5
+ assert len(datafile.null_value_counts) == 5
+ assert len(datafile.nan_value_counts) == 0
+
+ assert len(datafile.lower_bounds) == 5
+ assert datafile.lower_bounds[1].decode() == "aa"
+ assert datafile.lower_bounds[2] == STRUCT_FLOAT.pack(1.69)[:2]
+ assert datafile.lower_bounds[5] == STRUCT_INT64.pack(1)[:2]
+ assert datafile.lower_bounds[6] == STRUCT_INT64.pack(1)[:2]
+ assert datafile.lower_bounds[7] == STRUCT_INT64.pack(2)[:2]
+
+ assert len(datafile.upper_bounds) == 5
+ assert datafile.upper_bounds[1].decode() == "zz"
+ assert datafile.upper_bounds[2] == STRUCT_FLOAT.pack(100)[:2]
+ assert datafile.upper_bounds[5] == STRUCT_INT64.pack(9)[:2]
+ assert datafile.upper_bounds[6] == STRUCT_INT64.pack(5)[:2]
+ assert datafile.upper_bounds[7] == STRUCT_INT64.pack(6)[:2]
+
+
+def test_column_metrics_mode() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ table_metadata.properties["write.metadata.metrics.default"] = "truncate(2)"
+ table_metadata.properties["write.metadata.metrics.column.strings"] = "none"
+ table_metadata.properties["write.metadata.metrics.column.list.element"] =
"counts"
+ fill_parquet_file_metadata(
+ datafile,
+ metadata,
+ len(file_bytes),
+ table_metadata,
+ )
+
+ assert len(datafile.value_counts) == 4
+ assert len(datafile.null_value_counts) == 4
+ assert len(datafile.nan_value_counts) == 0
+
+ assert len(datafile.lower_bounds) == 3
+ assert datafile.lower_bounds[2] == STRUCT_FLOAT.pack(1.69)[:2]
+ assert 5 not in datafile.lower_bounds
+ assert datafile.lower_bounds[6] == STRUCT_INT64.pack(1)[:2]
+ assert datafile.lower_bounds[7] == STRUCT_INT64.pack(2)[:2]
+
+ assert len(datafile.upper_bounds) == 3
+ assert datafile.upper_bounds[2] == STRUCT_FLOAT.pack(100)[:2]
+ assert 5 not in datafile.upper_bounds
+ assert datafile.upper_bounds[6] == STRUCT_INT64.pack(5)[:2]
+ assert datafile.upper_bounds[7] == STRUCT_INT64.pack(6)[:2]
+
+
+def test_offsets() -> None:
+ (file_bytes, metadata, table_metadata) = construct_test_table()
+
+ datafile = DataFile()
+ fill_parquet_file_metadata(datafile, metadata, len(file_bytes),
table_metadata)
+
+ assert datafile.split_offsets is not None
+ assert len(datafile.split_offsets) == 1
+ assert datafile.split_offsets[0] == 4
+
Review Comment:
Thanks for all the tests, this is great @maxdebayser!
Could you add these two for my sanity:
```python
def test_write_and_read_stats_schema(table_schema_nested: Schema):
tbl = pa.Table.from_pydict({
"foo": ["a", "b"],
"bar": [1, 2],
"baz": [False, True],
"qux": [["a", "b"], ["c", "d"]],
"quux": [[("a", (("aa", 1), ("ab", 2)))], [("b", (("ba", 3), ("bb",
4)))]],
"location": [[(52.377956, 4.897070), (4.897070, -122.431297)],
[(43.618881, -116.215019), (41.881832, -87.623177)]],
"person": [("Fokko", 33), ("Max", 42)] # Possible data quality issue
},
schema=schema_to_pyarrow(table_schema_nested)
)
stats_columns = pre_order_visit(table_schema_nested,
PyArrowStatisticsCollector(table_schema_nested, {}))
visited_paths = []
def file_visitor(written_file: Any) -> None:
visited_paths.append(written_file)
with tempfile.TemporaryDirectory() as tmpdir:
pq.write_to_dataset(tbl, tmpdir, file_visitor=file_visitor)
assert visited_paths[0].metadata.num_columns == len(stats_columns)
def test_stats_types(table_schema_nested: Schema):
stats_columns = pre_order_visit(table_schema_nested,
PyArrowStatisticsCollector(table_schema_nested, {}))
# the field-ids should be sorted
assert all(stats_columns[i].field_id <= stats_columns[i + 1].field_id
for i in range(len(stats_columns)-1))
assert [col.iceberg_type for col in stats_columns] == [
StringType(),
IntegerType(),
BooleanType(),
StringType(),
StringType(),
StringType(),
IntegerType(),
FloatType(),
FloatType(),
StringType(),
IntegerType(),
]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]