This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new ba2fe43  Centralized table properties management (#388)
ba2fe43 is described below

commit ba2fe437c825943478307bf8ae431d9b8de0cb07
Author: Honah J <[email protected]>
AuthorDate: Thu Feb 8 00:50:40 2024 -0800

    Centralized table properties management (#388)
    
    * add TableProperties and PropertyUtil
    
    * fix lint
    
    * Revert unrelated change
    
    ---------
    
    Co-authored-by: Fokko Driesprong <[email protected]>
---
 pyiceberg/catalog/hive.py       |  4 +--
 pyiceberg/io/pyarrow.py         | 67 ++++++++++++++++++++---------------------
 pyiceberg/table/__init__.py     | 47 +++++++++++++++++++++++++++--
 pyiceberg/table/name_mapping.py |  2 --
 4 files changed, 80 insertions(+), 40 deletions(-)

diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 8069321..d81404f 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -67,7 +67,7 @@ from pyiceberg.io import FileIO, load_file_io
 from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
 from pyiceberg.schema import Schema, SchemaVisitor, visit
 from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, 
update_table_metadata
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, 
TableProperties, update_table_metadata
 from pyiceberg.table.metadata import new_table_metadata
 from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
 from pyiceberg.typedef import EMPTY_DICT
@@ -155,7 +155,7 @@ PROP_EXTERNAL = "EXTERNAL"
 PROP_TABLE_TYPE = "table_type"
 PROP_METADATA_LOCATION = "metadata_location"
 PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
-DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
+DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: 
TableProperties.PARQUET_COMPRESSION_DEFAULT}
 
 
 def _construct_parameters(metadata_location: str, previous_metadata_location: 
Optional[str] = None) -> Dict[str, Any]:
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 1b14771..ee8f63f 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -124,7 +124,7 @@ from pyiceberg.schema import (
     visit,
     visit_with_partner,
 )
-from pyiceberg.table import WriteTask
+from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
 from pyiceberg.table.name_mapping import NameMapping
 from pyiceberg.transforms import TruncateTransform
 from pyiceberg.typedef import EMPTY_DICT, Properties, Record
@@ -1389,19 +1389,12 @@ class MetricModeTypes(Enum):
     FULL = "full"
 
 
-DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
-COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
-
-
 @dataclass(frozen=True)
 class MetricsMode(Singleton):
     type: MetricModeTypes
     length: Optional[int] = None
 
 
-_DEFAULT_METRICS_MODE = MetricsMode(MetricModeTypes.TRUNCATE, 
DEFAULT_TRUNCATION_LENGTH)
-
-
 def match_metrics_mode(mode: str) -> MetricsMode:
     sanitized_mode = mode.strip().lower()
     if sanitized_mode.startswith("truncate"):
@@ -1435,12 +1428,14 @@ class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
     _field_id: int = 0
     _schema: Schema
     _properties: Dict[str, str]
-    _default_mode: Optional[str]
+    _default_mode: str
 
     def __init__(self, schema: Schema, properties: Dict[str, str]):
         self._schema = schema
         self._properties = properties
-        self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
+        self._default_mode = self._properties.get(
+            TableProperties.DEFAULT_WRITE_METRICS_MODE, 
TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT
+        )
 
     def schema(self, schema: Schema, struct_result: Callable[[], 
List[StatisticsCollector]]) -> List[StatisticsCollector]:
         return struct_result()
@@ -1475,12 +1470,9 @@ class 
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
         if column_name is None:
             return []
 
-        metrics_mode = _DEFAULT_METRICS_MODE
-
-        if self._default_mode:
-            metrics_mode = match_metrics_mode(self._default_mode)
+        metrics_mode = match_metrics_mode(self._default_mode)
 
-        col_mode = 
self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
+        col_mode = 
self._properties.get(f"{TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX}.{column_name}")
         if col_mode:
             metrics_mode = match_metrics_mode(col_mode)
 
@@ -1767,33 +1759,40 @@ def write_file(table: Table, tasks: 
Iterator[WriteTask]) -> Iterator[DataFile]:
     return iter([data_file])
 
 
-def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
-    def _get_int(key: str, default: Optional[int] = None) -> Optional[int]:
-        if value := table_properties.get(key):
-            try:
-                return int(value)
-            except ValueError as e:
-                raise ValueError(f"Could not parse table property {key} to an 
integer: {value}") from e
-        else:
-            return default
+ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
+PYARROW_UNCOMPRESSED_CODEC = "none"
 
+
+def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
     for key_pattern in [
-        "write.parquet.row-group-size-bytes",
-        "write.parquet.page-row-limit",
-        "write.parquet.bloom-filter-max-bytes",
-        "write.parquet.bloom-filter-enabled.column.*",
+        TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+        TableProperties.PARQUET_PAGE_ROW_LIMIT,
+        TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
+        f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
     ]:
         if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
             raise NotImplementedError(f"Parquet writer option(s) 
{unsupported_keys} not implemented")
 
-    compression_codec = 
table_properties.get("write.parquet.compression-codec", "zstd")
-    compression_level = _get_int("write.parquet.compression-level")
-    if compression_codec == "uncompressed":
-        compression_codec = "none"
+    compression_codec = 
table_properties.get(TableProperties.PARQUET_COMPRESSION, 
TableProperties.PARQUET_COMPRESSION_DEFAULT)
+    compression_level = PropertyUtil.property_as_int(
+        properties=table_properties,
+        property_name=TableProperties.PARQUET_COMPRESSION_LEVEL,
+        default=TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
+    )
+    if compression_codec == ICEBERG_UNCOMPRESSED_CODEC:
+        compression_codec = PYARROW_UNCOMPRESSED_CODEC
 
     return {
         "compression": compression_codec,
         "compression_level": compression_level,
-        "data_page_size": _get_int("write.parquet.page-size-bytes"),
-        "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes", 
default=2 * 1024 * 1024),
+        "data_page_size": PropertyUtil.property_as_int(
+            properties=table_properties,
+            property_name=TableProperties.PARQUET_PAGE_SIZE_BYTES,
+            default=TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT,
+        ),
+        "dictionary_pagesize_limit": PropertyUtil.property_as_int(
+            properties=table_properties,
+            property_name=TableProperties.PARQUET_DICT_SIZE_BYTES,
+            default=TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT,
+        ),
     }
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 1feffc6..8670bc2 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -85,7 +85,6 @@ from pyiceberg.table.metadata import (
     TableMetadataUtil,
 )
 from pyiceberg.table.name_mapping import (
-    SCHEMA_NAME_MAPPING_DEFAULT,
     NameMapping,
     create_mapping_from_schema,
     parse_mapping_from_json,
@@ -134,6 +133,50 @@ TABLE_ROOT_ID = -1
 _JAVA_LONG_MAX = 9223372036854775807
 
 
+class TableProperties:
+    PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
+    PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024  # 128 MB
+
+    PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
+    PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024  # 1 MB
+
+    PARQUET_PAGE_ROW_LIMIT = "write.parquet.page-row-limit"
+    PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20000
+
+    PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"
+    PARQUET_DICT_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024  # 2 MB
+
+    PARQUET_COMPRESSION = "write.parquet.compression-codec"
+    PARQUET_COMPRESSION_DEFAULT = "zstd"
+
+    PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level"
+    PARQUET_COMPRESSION_LEVEL_DEFAULT = None
+
+    PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes"
+    PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024
+
+    PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = 
"write.parquet.bloom-filter-enabled.column"
+
+    DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
+    DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
+
+    METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column"
+
+    DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
+
+
+class PropertyUtil:
+    @staticmethod
+    def property_as_int(properties: Dict[str, str], property_name: str, 
default: Optional[int] = None) -> Optional[int]:
+        if value := properties.get(property_name):
+            try:
+                return int(value)
+            except ValueError as e:
+                raise ValueError(f"Could not parse table property 
{property_name} to an integer: {value}") from e
+        else:
+            return default
+
+
 class Transaction:
     _table: Table
     _updates: Tuple[TableUpdate, ...]
@@ -921,7 +964,7 @@ class Table:
 
     def name_mapping(self) -> NameMapping:
         """Return the table's field-id NameMapping."""
-        if name_mapping_json := 
self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT):
+        if name_mapping_json := 
self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
             return parse_mapping_from_json(name_mapping_json)
         else:
             return create_mapping_from_schema(self.schema())
diff --git a/pyiceberg/table/name_mapping.py b/pyiceberg/table/name_mapping.py
index 84a295f..ffe9635 100644
--- a/pyiceberg/table/name_mapping.py
+++ b/pyiceberg/table/name_mapping.py
@@ -34,8 +34,6 @@ from pyiceberg.schema import Schema, SchemaVisitor, visit
 from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel
 from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType, 
StructType
 
-SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default"
-
 
 class MappedField(IcebergBaseModel):
     field_id: int = Field(alias="field-id")

Reply via email to