This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new ba2fe43 Centralized table properties management (#388)
ba2fe43 is described below
commit ba2fe437c825943478307bf8ae431d9b8de0cb07
Author: Honah J <[email protected]>
AuthorDate: Thu Feb 8 00:50:40 2024 -0800
Centralized table properties management (#388)
* add TableProperties and PropertyUtil
* fix lint
* Revert unrelated change
---------
Co-authored-by: Fokko Driesprong <[email protected]>
---
pyiceberg/catalog/hive.py | 4 +--
pyiceberg/io/pyarrow.py | 67 ++++++++++++++++++++---------------------
pyiceberg/table/__init__.py | 47 +++++++++++++++++++++++++++--
pyiceberg/table/name_mapping.py | 2 --
4 files changed, 80 insertions(+), 40 deletions(-)
diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py
index 8069321..d81404f 100644
--- a/pyiceberg/catalog/hive.py
+++ b/pyiceberg/catalog/hive.py
@@ -67,7 +67,7 @@ from pyiceberg.io import FileIO, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
-from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table,
update_table_metadata
+from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table,
TableProperties, update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
@@ -155,7 +155,7 @@ PROP_EXTERNAL = "EXTERNAL"
PROP_TABLE_TYPE = "table_type"
PROP_METADATA_LOCATION = "metadata_location"
PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
-DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
+DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION:
TableProperties.PARQUET_COMPRESSION_DEFAULT}
def _construct_parameters(metadata_location: str, previous_metadata_location:
Optional[str] = None) -> Dict[str, Any]:
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 1b14771..ee8f63f 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -124,7 +124,7 @@ from pyiceberg.schema import (
visit,
visit_with_partner,
)
-from pyiceberg.table import WriteTask
+from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
@@ -1389,19 +1389,12 @@ class MetricModeTypes(Enum):
FULL = "full"
-DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
-COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"
-
-
@dataclass(frozen=True)
class MetricsMode(Singleton):
type: MetricModeTypes
length: Optional[int] = None
-_DEFAULT_METRICS_MODE = MetricsMode(MetricModeTypes.TRUNCATE,
DEFAULT_TRUNCATION_LENGTH)
-
-
def match_metrics_mode(mode: str) -> MetricsMode:
sanitized_mode = mode.strip().lower()
if sanitized_mode.startswith("truncate"):
@@ -1435,12 +1428,14 @@ class
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
_field_id: int = 0
_schema: Schema
_properties: Dict[str, str]
- _default_mode: Optional[str]
+ _default_mode: str
def __init__(self, schema: Schema, properties: Dict[str, str]):
self._schema = schema
self._properties = properties
- self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
+ self._default_mode = self._properties.get(
+ TableProperties.DEFAULT_WRITE_METRICS_MODE,
TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT
+ )
def schema(self, schema: Schema, struct_result: Callable[[],
List[StatisticsCollector]]) -> List[StatisticsCollector]:
return struct_result()
@@ -1475,12 +1470,9 @@ class
PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
if column_name is None:
return []
- metrics_mode = _DEFAULT_METRICS_MODE
-
- if self._default_mode:
- metrics_mode = match_metrics_mode(self._default_mode)
+ metrics_mode = match_metrics_mode(self._default_mode)
- col_mode =
self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
+ col_mode =
self._properties.get(f"{TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX}.{column_name}")
if col_mode:
metrics_mode = match_metrics_mode(col_mode)
@@ -1767,33 +1759,40 @@ def write_file(table: Table, tasks:
Iterator[WriteTask]) -> Iterator[DataFile]:
return iter([data_file])
-def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
- def _get_int(key: str, default: Optional[int] = None) -> Optional[int]:
- if value := table_properties.get(key):
- try:
- return int(value)
- except ValueError as e:
- raise ValueError(f"Could not parse table property {key} to an
integer: {value}") from e
- else:
- return default
+ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
+PYARROW_UNCOMPRESSED_CODEC = "none"
+
+def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
for key_pattern in [
- "write.parquet.row-group-size-bytes",
- "write.parquet.page-row-limit",
- "write.parquet.bloom-filter-max-bytes",
- "write.parquet.bloom-filter-enabled.column.*",
+ TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
+ TableProperties.PARQUET_PAGE_ROW_LIMIT,
+ TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
+ f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
]:
if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
raise NotImplementedError(f"Parquet writer option(s)
{unsupported_keys} not implemented")
- compression_codec =
table_properties.get("write.parquet.compression-codec", "zstd")
- compression_level = _get_int("write.parquet.compression-level")
- if compression_codec == "uncompressed":
- compression_codec = "none"
+ compression_codec =
table_properties.get(TableProperties.PARQUET_COMPRESSION,
TableProperties.PARQUET_COMPRESSION_DEFAULT)
+ compression_level = PropertyUtil.property_as_int(
+ properties=table_properties,
+ property_name=TableProperties.PARQUET_COMPRESSION_LEVEL,
+ default=TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
+ )
+ if compression_codec == ICEBERG_UNCOMPRESSED_CODEC:
+ compression_codec = PYARROW_UNCOMPRESSED_CODEC
return {
"compression": compression_codec,
"compression_level": compression_level,
- "data_page_size": _get_int("write.parquet.page-size-bytes"),
- "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes",
default=2 * 1024 * 1024),
+ "data_page_size": PropertyUtil.property_as_int(
+ properties=table_properties,
+ property_name=TableProperties.PARQUET_PAGE_SIZE_BYTES,
+ default=TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT,
+ ),
+ "dictionary_pagesize_limit": PropertyUtil.property_as_int(
+ properties=table_properties,
+ property_name=TableProperties.PARQUET_DICT_SIZE_BYTES,
+ default=TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT,
+ ),
}
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index 1feffc6..8670bc2 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -85,7 +85,6 @@ from pyiceberg.table.metadata import (
TableMetadataUtil,
)
from pyiceberg.table.name_mapping import (
- SCHEMA_NAME_MAPPING_DEFAULT,
NameMapping,
create_mapping_from_schema,
parse_mapping_from_json,
@@ -134,6 +133,50 @@ TABLE_ROOT_ID = -1
_JAVA_LONG_MAX = 9223372036854775807
+class TableProperties:
+ PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
+ PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB
+
+ PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
+ PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB
+
+ PARQUET_PAGE_ROW_LIMIT = "write.parquet.page-row-limit"
+ PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20000
+
+ PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"
+ PARQUET_DICT_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024 # 2 MB
+
+ PARQUET_COMPRESSION = "write.parquet.compression-codec"
+ PARQUET_COMPRESSION_DEFAULT = "zstd"
+
+ PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level"
+ PARQUET_COMPRESSION_LEVEL_DEFAULT = None
+
+ PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes"
+ PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024
+
+ PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX =
"write.parquet.bloom-filter-enabled.column"
+
+ DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
+ DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"
+
+ METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column"
+
+ DEFAULT_NAME_MAPPING = "schema.name-mapping.default"
+
+
+class PropertyUtil:
+ @staticmethod
+ def property_as_int(properties: Dict[str, str], property_name: str,
default: Optional[int] = None) -> Optional[int]:
+ if value := properties.get(property_name):
+ try:
+ return int(value)
+ except ValueError as e:
+ raise ValueError(f"Could not parse table property
{property_name} to an integer: {value}") from e
+ else:
+ return default
+
+
class Transaction:
_table: Table
_updates: Tuple[TableUpdate, ...]
@@ -921,7 +964,7 @@ class Table:
def name_mapping(self) -> NameMapping:
"""Return the table's field-id NameMapping."""
- if name_mapping_json :=
self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT):
+ if name_mapping_json :=
self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
return parse_mapping_from_json(name_mapping_json)
else:
return create_mapping_from_schema(self.schema())
diff --git a/pyiceberg/table/name_mapping.py b/pyiceberg/table/name_mapping.py
index 84a295f..ffe9635 100644
--- a/pyiceberg/table/name_mapping.py
+++ b/pyiceberg/table/name_mapping.py
@@ -34,8 +34,6 @@ from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel
from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType,
StructType
-SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default"
-
class MappedField(IcebergBaseModel):
field_id: int = Field(alias="field-id")