abnobdoss commented on code in PR #3265:
URL: https://github.com/apache/iceberg-python/pull/3265#discussion_r3321758372


##########
pyiceberg/io/pyarrow.py:
##########
@@ -2757,14 +2877,25 @@ def parquet_file_to_data_file(io: FileIO, 
table_metadata: TableMetadata, file_pa
 PYARROW_UNCOMPRESSED_CODEC = "none"
 
 
-def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:
+def _get_parquet_writer_kwargs(table_properties: Properties, file_schema: 
Schema) -> dict[str, Any]:
     from pyiceberg.table import TableProperties
 
-    for key_pattern in [
+    unsupported_key_patterns = [
         TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
         TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
-        f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
-    ]:
+    ]
+
+    from packaging import version
+
+    MIN_PYARROW_VERSION_SUPPORTING_BLOOM_FILTER_WRITES = "24.0.0"
+    if version.parse(pyarrow.__version__) < 
version.parse(MIN_PYARROW_VERSION_SUPPORTING_BLOOM_FILTER_WRITES):

Review Comment:
   Would it be better to make this explicitly a different error since it is 
implemented but is gate on the dependency version?



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2613,6 +2731,8 @@ def write_parquet(task: WriteTask) -> DataFile:
         else:
             file_schema = table_schema
 
+        parquet_writer_kwargs = 
_get_parquet_writer_kwargs(table_metadata.properties, file_schema)

Review Comment:
   If I understand correctly, neither input varies per file: 
table_metadata.properties is table-level, and file_schema is derived solely 
from table_metadata.schema() (just sanitized), so it's identical for every 
task. 
   
   Would it make sense to lift the file_schema derivation and this 
_get_parquet_writer_kwargs call back out of write_parquet, computing them once 
per write_file call instead of once per write_parquet call?



##########
tests/integration/test_writes/test_writes.py:
##########
@@ -712,6 +718,27 @@ def test_write_parquet_unsupported_properties(
         tbl.append(arrow_table_with_null)
 
 
[email protected]
+@skip_if_bloom_filter_not_supported
[email protected]("format_version", [1, 2])
+def test_write_parquet_bloom_filter_properties(

Review Comment:
   Would it make sense to assert pq.ParquetWriter is called with 
bloom_filter_options?



##########
pyiceberg/io/pyarrow.py:
##########
@@ -2474,6 +2479,120 @@ def parquet_path_to_id_mapping(
     return result
 
 
+def id_to_parquet_path_mapping(schema: Schema) -> dict[int, str]:
+    """
+    Compute the mapping of Iceberg column ID to parquet column path.
+
+    Args:
+        schema (pyiceberg.schema.Schema): The current table schema.
+    """
+    result: dict[int, str] = {}
+    for pair in pre_order_visit(schema, ID2ParquetPathVisitor()):
+        result[pair.field_id] = pair.parquet_path
+    return result
+
+
+@dataclass(frozen=True)
+class BloomFilterOptions:
+    parquet_path: str
+    ndv: int | None
+    fpp: float | None
+
+
+class 
BloomFilterOptionsCollector(PreOrderSchemaVisitor[list[BloomFilterOptions]]):
+    _field_id: int = 0
+    _schema: Schema
+    _properties: dict[str, str]
+
+    def __init__(self, schema: Schema, properties: dict[str, str], 
id_to_parquet_path_mapping: dict[int, str]):
+        self._schema = schema
+        self._properties = properties
+        self._id_to_parquet_path_mapping = id_to_parquet_path_mapping
+
+    def schema(
+        self, schema: Schema, struct_result: Callable[[], 
builtins.list[BloomFilterOptions]]
+    ) -> builtins.list[BloomFilterOptions]:
+        return struct_result()
+
+    def struct(
+        self, struct: StructType, field_results: builtins.list[Callable[[], 
builtins.list[BloomFilterOptions]]]
+    ) -> builtins.list[BloomFilterOptions]:
+        return list(itertools.chain(*[result() for result in field_results]))
+
+    def field(
+        self, field: NestedField, field_result: Callable[[], 
builtins.list[BloomFilterOptions]]
+    ) -> builtins.list[BloomFilterOptions]:
+        self._field_id = field.field_id
+        return field_result()
+
+    def list(
+        self, list_type: ListType, element_result: Callable[[], 
builtins.list[BloomFilterOptions]]
+    ) -> builtins.list[BloomFilterOptions]:
+        self._field_id = list_type.element_id
+        return element_result()
+
+    def map(
+        self,
+        map_type: MapType,
+        key_result: Callable[[], builtins.list[BloomFilterOptions]],
+        value_result: Callable[[], builtins.list[BloomFilterOptions]],
+    ) -> builtins.list[BloomFilterOptions]:
+        self._field_id = map_type.key_id
+        k = key_result()
+        self._field_id = map_type.value_id
+        v = value_result()
+        return k + v
+
+    def primitive(self, primitive: PrimitiveType) -> 
builtins.list[BloomFilterOptions]:
+        from pyiceberg.table import TableProperties
+
+        column_name = self._schema.find_column_name(self._field_id)
+        if column_name is None:
+            return []
+
+        parquet_path = self._id_to_parquet_path_mapping.get(self._field_id)
+        if parquet_path is None:
+            return []
+
+        bloom_filter_enabled = property_as_bool(
+            self._properties, 
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.{column_name}", 
False
+        )
+        if not bloom_filter_enabled:
+            return []
+
+        bloom_filter_fpp = property_as_float(
+            self._properties, 
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_FPP_PREFIX}.{column_name}", None
+        )
+        bloom_filter_ndv = property_as_int(
+            self._properties, 
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_NDV_PREFIX}.{column_name}", None
+        )
+
+        return [BloomFilterOptions(parquet_path=parquet_path, 
ndv=bloom_filter_ndv, fpp=bloom_filter_fpp)]
+
+
+def get_bloom_filter_options(
+    schema: Schema,
+    table_properties: dict[str, str],
+) -> dict[str, dict[str, Any]]:
+    """
+    Get the bloom filter options from the table properties.
+
+    Args:
+        schema (pyiceberg.schema.Schema): The current table schema.
+        table_properties (dict[str, str]): The table properties.
+    """
+    bloom_filter_options = pre_order_visit(

Review Comment:
   I think parquet_path_to_id_mapping(file_schema) already walks this schema 
with ID2ParquetPathVisitor for stats. If I'm reading it right, the bloom path 
adds a couple more passes over the same schema. Would it make sense to do a 
single ID2ParquetPathVisitor pass and deriving both the bloom options and the 
stats mapping from it?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to