This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new cee5f7553 Python: Add pyupgrade as a pre-commit hook (#4935)
cee5f7553 is described below
commit cee5f75536e280cfb8bfbe2bcdc33731dd18a4ae
Author: Fokko Driesprong <[email protected]>
AuthorDate: Sun Jun 5 20:17:16 2022 +0200
Python: Add pyupgrade as a pre-commit hook (#4935)
---
python/.pre-commit-config.yaml | 5 ++
python/src/iceberg/catalog/base.py | 40 +++++-----
python/src/iceberg/schema.py | 103 +++++++++++++-------------
python/src/iceberg/table/base.py | 3 +-
python/src/iceberg/transforms.py | 2 +-
python/src/iceberg/types.py | 2 +-
python/src/iceberg/utils/schema_conversion.py | 35 ++++-----
7 files changed, 90 insertions(+), 100 deletions(-)
diff --git a/python/.pre-commit-config.yaml b/python/.pre-commit-config.yaml
index cc30a5aa5..1d0db0ba7 100644
--- a/python/.pre-commit-config.yaml
+++ b/python/.pre-commit-config.yaml
@@ -45,3 +45,8 @@ repos:
hooks:
- id: pycln
args: [--config=python/pyproject.toml]
+ - repo: https://github.com/asottile/pyupgrade
+ rev: v2.32.1
+ hooks:
+ - id: pyupgrade
+ args: [--py38-plus]
diff --git a/python/src/iceberg/catalog/base.py
b/python/src/iceberg/catalog/base.py
index 40f2747ab..243f6a8f7 100644
--- a/python/src/iceberg/catalog/base.py
+++ b/python/src/iceberg/catalog/base.py
@@ -18,12 +18,6 @@
from __future__ import annotations
from abc import ABC, abstractmethod
-from typing import (
- List,
- Optional,
- Set,
- Union,
-)
from iceberg.catalog import Identifier, Properties
from iceberg.schema import Schema
@@ -59,11 +53,11 @@ class Catalog(ABC):
@abstractmethod
def create_table(
self,
- identifier: Union[str, Identifier],
+ identifier: str | Identifier,
schema: Schema,
- location: Optional[str] = None,
- partition_spec: Optional[PartitionSpec] = None,
- properties: Optional[Properties] = None,
+ location: str | None = None,
+ partition_spec: PartitionSpec | None = None,
+ properties: Properties | None = None,
) -> Table:
"""Create a table
@@ -82,7 +76,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def load_table(self, identifier: Union[str, Identifier]) -> Table:
+ def load_table(self, identifier: str | Identifier) -> Table:
"""Loads the table's metadata and returns the table instance.
You can also use this method to check for table existence using 'try
catalog.table() except TableNotFoundError'
@@ -99,7 +93,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def drop_table(self, identifier: Union[str, Identifier]) -> None:
+ def drop_table(self, identifier: str | Identifier) -> None:
"""Drop a table.
Args:
@@ -110,7 +104,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def purge_table(self, identifier: Union[str, Identifier]) -> None:
+ def purge_table(self, identifier: str | Identifier) -> None:
"""Drop a table and purge all data and metadata files.
Args:
@@ -121,7 +115,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def rename_table(self, from_identifier: Union[str, Identifier],
to_identifier: Union[str, Identifier]) -> Table:
+ def rename_table(self, from_identifier: str | Identifier, to_identifier:
str | Identifier) -> Table:
"""Rename a fully classified table name
Args:
@@ -136,7 +130,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def create_namespace(self, namespace: Union[str, Identifier], properties:
Optional[Properties] = None) -> None:
+ def create_namespace(self, namespace: str | Identifier, properties:
Properties | None = None) -> None:
"""Create a namespace in the catalog.
Args:
@@ -148,7 +142,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
+ def drop_namespace(self, namespace: str | Identifier) -> None:
"""Drop a namespace.
Args:
@@ -160,7 +154,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def list_tables(self, namespace: Optional[Union[str, Identifier]] = None)
-> List[Identifier]:
+ def list_tables(self, namespace: str | Identifier | None = None) ->
list[Identifier]:
"""List tables under the given namespace in the catalog.
If namespace not provided, will list all tables in the catalog.
@@ -176,7 +170,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def list_namespaces(self) -> List[Identifier]:
+ def list_namespaces(self) -> list[Identifier]:
"""List namespaces from the given namespace. If not given, list
top-level namespaces from the catalog.
Returns:
@@ -184,7 +178,7 @@ class Catalog(ABC):
"""
@abstractmethod
- def load_namespace_properties(self, namespace: Union[str, Identifier]) ->
Properties:
+ def load_namespace_properties(self, namespace: str | Identifier) ->
Properties:
"""Get properties for a namespace.
Args:
@@ -199,7 +193,7 @@ class Catalog(ABC):
@abstractmethod
def update_namespace_properties(
- self, namespace: Union[str, Identifier], removals: Optional[Set[str]]
= None, updates: Optional[Properties] = None
+ self, namespace: str | Identifier, removals: set[str] | None = None,
updates: Properties | None = None
) -> None:
"""Removes provided property keys and updates properties for a
namespace.
@@ -214,7 +208,7 @@ class Catalog(ABC):
"""
@staticmethod
- def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier:
+ def identifier_to_tuple(identifier: str | Identifier) -> Identifier:
"""Parses an identifier to a tuple.
If the identifier is a string, it is split into a tuple on '.'. If it
is a tuple, it is used as-is.
@@ -228,7 +222,7 @@ class Catalog(ABC):
return identifier if isinstance(identifier, tuple) else
tuple(str.split(identifier, "."))
@staticmethod
- def table_name_from(identifier: Union[str, Identifier]) -> str:
+ def table_name_from(identifier: str | Identifier) -> str:
"""Extracts table name from a table identifier
Args:
@@ -240,7 +234,7 @@ class Catalog(ABC):
return Catalog.identifier_to_tuple(identifier)[-1]
@staticmethod
- def namespace_from(identifier: Union[str, Identifier]) -> Identifier:
+ def namespace_from(identifier: str | Identifier) -> Identifier:
"""Extracts table namespace from a table identifier
Args:
diff --git a/python/src/iceberg/schema.py b/python/src/iceberg/schema.py
index 0b0295580..b0fd8427c 100644
--- a/python/src/iceberg/schema.py
+++ b/python/src/iceberg/schema.py
@@ -25,9 +25,6 @@ from typing import (
Any,
Dict,
Generic,
- List,
- Optional,
- Tuple,
TypeVar,
)
@@ -52,15 +49,15 @@ class Schema:
>>> from iceberg import types
"""
- def __init__(self, *columns: NestedField, schema_id: int,
identifier_field_ids: Optional[List[int]] = None):
+ def __init__(self, *columns: NestedField, schema_id: int,
identifier_field_ids: list[int] | None = None):
self._struct = StructType(*columns)
self._schema_id = schema_id
self._identifier_field_ids = identifier_field_ids or []
- self._name_to_id: Dict[str, int] = index_by_name(self)
- self._name_to_id_lower: Dict[str, int] = {} # Should be accessed
through self._lazy_name_to_id_lower()
- self._id_to_field: Dict[int, NestedField] = {} # Should be accessed
through self._lazy_id_to_field()
- self._id_to_name: Dict[int, str] = {} # Should be accessed through
self._lazy_id_to_name()
- self._id_to_accessor: Dict[int, Accessor] = {} # Should be accessed
through self._lazy_id_to_accessor()
+ self._name_to_id: dict[str, int] = index_by_name(self)
+ self._name_to_id_lower: dict[str, int] = {} # Should be accessed
through self._lazy_name_to_id_lower()
+ self._id_to_field: dict[int, NestedField] = {} # Should be accessed
through self._lazy_id_to_field()
+ self._id_to_name: dict[int, str] = {} # Should be accessed through
self._lazy_id_to_name()
+ self._id_to_accessor: dict[int, Accessor] = {} # Should be accessed
through self._lazy_id_to_accessor()
def __str__(self):
return "table {\n" + "\n".join([" " + str(field) for field in
self.columns]) + "\n}"
@@ -86,7 +83,7 @@ class Schema:
return identifier_field_ids_is_equal and schema_is_equal
@property
- def columns(self) -> Tuple[NestedField, ...]:
+ def columns(self) -> tuple[NestedField, ...]:
"""A list of the top-level fields in the underlying struct"""
return self._struct.fields
@@ -96,10 +93,10 @@ class Schema:
return self._schema_id
@property
- def identifier_field_ids(self) -> List[int]:
+ def identifier_field_ids(self) -> list[int]:
return self._identifier_field_ids
- def _lazy_id_to_field(self) -> Dict[int, NestedField]:
+ def _lazy_id_to_field(self) -> dict[int, NestedField]:
"""Returns an index of field ID to NestedField instance
This is calculated once when called for the first time. Subsequent
calls to this method will use a cached index.
@@ -108,7 +105,7 @@ class Schema:
self._id_to_field = index_by_id(self)
return self._id_to_field
- def _lazy_name_to_id_lower(self) -> Dict[str, int]:
+ def _lazy_name_to_id_lower(self) -> dict[str, int]:
"""Returns an index of lower-case field names to field IDs
This is calculated once when called for the first time. Subsequent
calls to this method will use a cached index.
@@ -117,7 +114,7 @@ class Schema:
self._name_to_id_lower = {name.lower(): field_id for name,
field_id in self._name_to_id.items()}
return self._name_to_id_lower
- def _lazy_id_to_name(self) -> Dict[int, str]:
+ def _lazy_id_to_name(self) -> dict[int, str]:
"""Returns an index of field ID to full name
This is calculated once when called for the first time. Subsequent
calls to this method will use a cached index.
@@ -126,7 +123,7 @@ class Schema:
self._id_to_name = index_name_by_id(self)
return self._id_to_name
- def _lazy_id_to_accessor(self) -> Dict[int, Accessor]:
+ def _lazy_id_to_accessor(self) -> dict[int, Accessor]:
"""Returns an index of field ID to accessor
This is calculated once when called for the first time. Subsequent
calls to this method will use a cached index.
@@ -193,7 +190,7 @@ class Schema:
"""
return self._lazy_id_to_accessor().get(field_id) # type: ignore
- def select(self, names: List[str], case_sensitive: bool = True) ->
"Schema":
+ def select(self, names: list[str], case_sensitive: bool = True) -> Schema:
"""Return a new schema instance pruned to a subset of columns
Args:
@@ -208,12 +205,12 @@ class Schema:
return self._case_insensitive_select(schema=self, names=names)
@classmethod
- def _case_sensitive_select(cls, schema: "Schema", names: List[str]):
+ def _case_sensitive_select(cls, schema: Schema, names: list[str]):
# TODO: Add a PruneColumns schema visitor and use it here
raise NotImplementedError()
@classmethod
- def _case_insensitive_select(cls, schema: "Schema", names: List[str]):
+ def _case_insensitive_select(cls, schema: Schema, names: list[str]):
# TODO: Add a PruneColumns schema visitor and use it here
raise NotImplementedError()
@@ -254,7 +251,7 @@ class SchemaVisitor(Generic[T], ABC):
"""Visit a Schema"""
@abstractmethod
- def struct(self, struct: StructType, field_results: List[T]) -> T:
+ def struct(self, struct: StructType, field_results: list[T]) -> T:
"""Visit a StructType"""
@abstractmethod
@@ -279,7 +276,7 @@ class Accessor:
"""An accessor for a specific position in a container that implements the
StructProtocol"""
position: int
- inner: Optional["Accessor"] = None
+ inner: Accessor | None = None
def __str__(self):
return f"Accessor(position={self.position},inner={self.inner})"
@@ -377,35 +374,35 @@ class _IndexById(SchemaVisitor[Dict[int, NestedField]]):
"""A schema visitor for generating a field ID to NestedField index"""
def __init__(self) -> None:
- self._index: Dict[int, NestedField] = {}
+ self._index: dict[int, NestedField] = {}
- def schema(self, schema: Schema, struct_result) -> Dict[int, NestedField]:
+ def schema(self, schema: Schema, struct_result) -> dict[int, NestedField]:
return self._index
- def struct(self, struct: StructType, field_results) -> Dict[int,
NestedField]:
+ def struct(self, struct: StructType, field_results) -> dict[int,
NestedField]:
return self._index
- def field(self, field: NestedField, field_result) -> Dict[int,
NestedField]:
+ def field(self, field: NestedField, field_result) -> dict[int,
NestedField]:
"""Add the field ID to the index"""
self._index[field.field_id] = field
return self._index
- def list(self, list_type: ListType, element_result) -> Dict[int,
NestedField]:
+ def list(self, list_type: ListType, element_result) -> dict[int,
NestedField]:
"""Add the list element ID to the index"""
self._index[list_type.element.field_id] = list_type.element
return self._index
- def map(self, map_type: MapType, key_result, value_result) -> Dict[int,
NestedField]:
+ def map(self, map_type: MapType, key_result, value_result) -> dict[int,
NestedField]:
"""Add the key ID and value ID as individual items in the index"""
self._index[map_type.key.field_id] = map_type.key
self._index[map_type.value.field_id] = map_type.value
return self._index
- def primitive(self, primitive) -> Dict[int, NestedField]:
+ def primitive(self, primitive) -> dict[int, NestedField]:
return self._index
-def index_by_id(schema_or_type) -> Dict[int, NestedField]:
+def index_by_id(schema_or_type) -> dict[int, NestedField]:
"""Generate an index of field IDs to NestedField instances
Args:
@@ -421,11 +418,11 @@ class _IndexByName(SchemaVisitor[Dict[str, int]]):
"""A schema visitor for generating a field name to field ID index"""
def __init__(self) -> None:
- self._index: Dict[str, int] = {}
- self._short_name_to_id: Dict[str, int] = {}
- self._combined_index: Dict[str, int] = {}
- self._field_names: List[str] = []
- self._short_field_names: List[str] = []
+ self._index: dict[str, int] = {}
+ self._short_name_to_id: dict[str, int] = {}
+ self._combined_index: dict[str, int] = {}
+ self._field_names: list[str] = []
+ self._short_field_names: list[str] = []
def before_list_element(self, element: NestedField) -> None:
"""Short field names omit element when the element is a StructType"""
@@ -448,23 +445,23 @@ class _IndexByName(SchemaVisitor[Dict[str, int]]):
self._field_names.pop()
self._short_field_names.pop()
- def schema(self, schema: Schema, struct_result: Dict[str, int]) ->
Dict[str, int]:
+ def schema(self, schema: Schema, struct_result: dict[str, int]) ->
dict[str, int]:
return self._index
- def struct(self, struct: StructType, field_results: List[Dict[str, int]])
-> Dict[str, int]:
+ def struct(self, struct: StructType, field_results: list[dict[str, int]])
-> dict[str, int]:
return self._index
- def field(self, field: NestedField, field_result: Dict[str, int]) ->
Dict[str, int]:
+ def field(self, field: NestedField, field_result: dict[str, int]) ->
dict[str, int]:
"""Add the field name to the index"""
self._add_field(field.name, field.field_id)
return self._index
- def list(self, list_type: ListType, element_result: Dict[str, int]) ->
Dict[str, int]:
+ def list(self, list_type: ListType, element_result: dict[str, int]) ->
dict[str, int]:
"""Add the list element name to the index"""
self._add_field(list_type.element.name, list_type.element.field_id)
return self._index
- def map(self, map_type: MapType, key_result: Dict[str, int], value_result:
Dict[str, int]) -> Dict[str, int]:
+ def map(self, map_type: MapType, key_result: dict[str, int], value_result:
dict[str, int]) -> dict[str, int]:
"""Add the key name and value name as individual items in the index"""
self._add_field(map_type.key.name, map_type.key.field_id)
self._add_field(map_type.value.name, map_type.value.field_id)
@@ -493,10 +490,10 @@ class _IndexByName(SchemaVisitor[Dict[str, int]]):
short_name = ".".join([".".join(self._short_field_names), name])
self._short_name_to_id[short_name] = field_id
- def primitive(self, primitive) -> Dict[str, int]:
+ def primitive(self, primitive) -> dict[str, int]:
return self._index
- def by_name(self) -> Dict[str, int]:
+ def by_name(self) -> dict[str, int]:
"""Returns an index of combined full and short names
Note: Only short names that do not conflict with full names are
included.
@@ -505,13 +502,13 @@ class _IndexByName(SchemaVisitor[Dict[str, int]]):
combined_index.update(self._index)
return combined_index
- def by_id(self) -> Dict[int, str]:
+ def by_id(self) -> dict[int, str]:
"""Returns an index of ID to full names"""
- id_to_full_name = dict([(value, key) for key, value in
self._index.items()])
+ id_to_full_name = {value: key for key, value in self._index.items()}
return id_to_full_name
-def index_by_name(schema_or_type: Schema | IcebergType) -> Dict[str, int]:
+def index_by_name(schema_or_type: Schema | IcebergType) -> dict[str, int]:
"""Generate an index of field names to field IDs
Args:
@@ -525,7 +522,7 @@ def index_by_name(schema_or_type: Schema | IcebergType) ->
Dict[str, int]:
return indexer.by_name()
-def index_name_by_id(schema_or_type: Schema | IcebergType) -> Dict[int, str]:
+def index_name_by_id(schema_or_type: Schema | IcebergType) -> dict[int, str]:
"""Generate an index of field IDs full field names
Args:
@@ -575,13 +572,13 @@ class
_BuildPositionAccessors(SchemaVisitor[Dict[Position, Accessor]]):
"""
@staticmethod
- def _wrap_leaves(result: Dict[Position, Accessor], position: Position = 0)
-> Dict[Position, Accessor]:
+ def _wrap_leaves(result: dict[Position, Accessor], position: Position = 0)
-> dict[Position, Accessor]:
return {field_id: Accessor(position, inner=inner) for field_id, inner
in result.items()}
- def schema(self, schema: Schema, struct_result: Dict[Position, Accessor])
-> Dict[Position, Accessor]:
+ def schema(self, schema: Schema, struct_result: dict[Position, Accessor])
-> dict[Position, Accessor]:
return struct_result
- def struct(self, struct: StructType, field_results: List[Dict[Position,
Accessor]]) -> Dict[Position, Accessor]:
+ def struct(self, struct: StructType, field_results: list[dict[Position,
Accessor]]) -> dict[Position, Accessor]:
result = {}
for position, field in enumerate(struct.fields):
@@ -593,22 +590,22 @@ class
_BuildPositionAccessors(SchemaVisitor[Dict[Position, Accessor]]):
return result
- def field(self, field: NestedField, field_result: Dict[Position,
Accessor]) -> Dict[Position, Accessor]:
+ def field(self, field: NestedField, field_result: dict[Position,
Accessor]) -> dict[Position, Accessor]:
return field_result
- def list(self, list_type: ListType, element_result: Dict[Position,
Accessor]) -> Dict[Position, Accessor]:
+ def list(self, list_type: ListType, element_result: dict[Position,
Accessor]) -> dict[Position, Accessor]:
return {}
def map(
- self, map_type: MapType, key_result: Dict[Position, Accessor],
value_result: Dict[Position, Accessor]
- ) -> Dict[Position, Accessor]:
+ self, map_type: MapType, key_result: dict[Position, Accessor],
value_result: dict[Position, Accessor]
+ ) -> dict[Position, Accessor]:
return {}
- def primitive(self, primitive: PrimitiveType) -> Dict[Position, Accessor]:
+ def primitive(self, primitive: PrimitiveType) -> dict[Position, Accessor]:
return {}
-def build_position_accessors(schema_or_type: Schema | IcebergType) ->
Dict[int, Accessor]:
+def build_position_accessors(schema_or_type: Schema | IcebergType) ->
dict[int, Accessor]:
"""Generate an index of field IDs to schema position accessors
Args:
diff --git a/python/src/iceberg/table/base.py b/python/src/iceberg/table/base.py
index f14ec6422..1cc8b3906 100644
--- a/python/src/iceberg/table/base.py
+++ b/python/src/iceberg/table/base.py
@@ -18,7 +18,6 @@
from __future__ import annotations
from abc import ABC
-from typing import Union
from iceberg.catalog.base import Identifier
@@ -29,7 +28,7 @@ class Table(ABC):
To be implemented by https://github.com/apache/iceberg/issues/3227
"""
- identifier: Union[str, Identifier]
+ identifier: str | Identifier
class PartitionSpec:
diff --git a/python/src/iceberg/transforms.py b/python/src/iceberg/transforms.py
index 6043d2aef..39ec5d538 100644
--- a/python/src/iceberg/transforms.py
+++ b/python/src/iceberg/transforms.py
@@ -258,7 +258,7 @@ class VoidTransform(Transform):
def __new__(cls): # pylint: disable=W0221
if cls._instance is None:
- cls._instance = super(VoidTransform, cls).__new__(cls)
+ cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
diff --git a/python/src/iceberg/types.py b/python/src/iceberg/types.py
index 22aa6d8a6..6f1ad701c 100644
--- a/python/src/iceberg/types.py
+++ b/python/src/iceberg/types.py
@@ -44,7 +44,7 @@ class Singleton:
def __new__(cls):
if not isinstance(cls._instance, cls):
- cls._instance = super(Singleton, cls).__new__(cls)
+ cls._instance = super().__new__(cls)
return cls._instance
diff --git a/python/src/iceberg/utils/schema_conversion.py
b/python/src/iceberg/utils/schema_conversion.py
index e6fba45a0..25edc62ac 100644
--- a/python/src/iceberg/utils/schema_conversion.py
+++ b/python/src/iceberg/utils/schema_conversion.py
@@ -20,12 +20,7 @@
from __future__ import annotations
import logging
-from typing import (
- Any,
- Dict,
- List,
- Tuple,
-)
+from typing import Any
from iceberg.schema import Schema
from iceberg.types import (
@@ -52,7 +47,7 @@ from iceberg.types import (
logger = logging.getLogger(__name__)
-PRIMITIVE_FIELD_TYPE_MAPPING: Dict[str, PrimitiveType] = {
+PRIMITIVE_FIELD_TYPE_MAPPING: dict[str, PrimitiveType] = {
"boolean": BooleanType(),
"bytes": BinaryType(),
"double": DoubleType(),
@@ -63,7 +58,7 @@ PRIMITIVE_FIELD_TYPE_MAPPING: Dict[str, PrimitiveType] = {
"enum": StringType(),
}
-LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str], PrimitiveType] = {
+LOGICAL_FIELD_TYPE_MAPPING: dict[tuple[str, str], PrimitiveType] = {
("date", "int"): DateType(),
("time-millis", "int"): TimeType(),
("timestamp-millis", "long"): TimestampType(),
@@ -74,7 +69,7 @@ LOGICAL_FIELD_TYPE_MAPPING: Dict[Tuple[str, str],
PrimitiveType] = {
class AvroSchemaConversion:
- def avro_to_iceberg(self, avro_schema: Dict[str, Any]) -> Schema:
+ def avro_to_iceberg(self, avro_schema: dict[str, Any]) -> Schema:
"""Converts an Apache Avro into an Apache Iceberg schema equivalent
This expects to have field id's to be encoded in the Avro schema::
@@ -119,7 +114,7 @@ class AvroSchemaConversion:
"""
return Schema(*[self._convert_field(field) for field in
avro_schema["fields"]], schema_id=1)
- def _resolve_union(self, type_union: Dict | List | str) -> Tuple[str |
Dict[str, Any], bool]:
+ def _resolve_union(self, type_union: dict | list | str) -> tuple[str |
dict[str, Any], bool]:
"""
Converts Unions into their type and resolves if the field is optional
@@ -142,7 +137,7 @@ class AvroSchemaConversion:
Raises:
TypeError: In the case non-optional union types are encountered
"""
- avro_types: Dict | List
+ avro_types: dict | list
if isinstance(type_union, str):
# It is a primitive and required
return type_union, False
@@ -160,7 +155,7 @@ class AvroSchemaConversion:
# Filter the null value and return the type
return list(filter(lambda t: t != "null", avro_types))[0], is_optional
- def _convert_schema(self, avro_type: str | Dict[str, Any]) -> IcebergType:
+ def _convert_schema(self, avro_type: str | dict[str, Any]) -> IcebergType:
"""
Resolves the Avro type
@@ -198,7 +193,7 @@ class AvroSchemaConversion:
else:
raise ValueError(f"Unknown type: {avro_type}")
- def _convert_field(self, field: Dict[str, Any]) -> NestedField:
+ def _convert_field(self, field: dict[str, Any]) -> NestedField:
"""
Converts an Avro field into an Iceberg equivalent field
Args:
@@ -220,7 +215,7 @@ class AvroSchemaConversion:
doc=field.get("doc"),
)
- def _convert_record_type(self, record_type: Dict[str, Any]) -> StructType:
+ def _convert_record_type(self, record_type: dict[str, Any]) -> StructType:
"""
Converts the fields from a record into an Iceberg struct
@@ -274,7 +269,7 @@ class AvroSchemaConversion:
return StructType(*[self._convert_field(field) for field in
record_type["fields"]])
- def _convert_array_type(self, array_type: Dict[str, Any]) -> ListType:
+ def _convert_array_type(self, array_type: dict[str, Any]) -> ListType:
if "element-id" not in array_type:
raise ValueError(f"Cannot convert array-type, missing element-id:
{array_type}")
@@ -286,7 +281,7 @@ class AvroSchemaConversion:
element_is_optional=element_is_optional,
)
- def _convert_map_type(self, map_type: Dict[str, Any]) -> MapType:
+ def _convert_map_type(self, map_type: dict[str, Any]) -> MapType:
"""
Args:
map_type: The dict that describes the Avro map type
@@ -322,7 +317,7 @@ class AvroSchemaConversion:
value_is_optional=value_is_optional,
)
- def _convert_logical_type(self, avro_logical_type: Dict[str, Any]) ->
IcebergType:
+ def _convert_logical_type(self, avro_logical_type: dict[str, Any]) ->
IcebergType:
"""
Convert a schema with a logical type annotation. For the decimal and
map
we need to fetch more keys from the dict, and for the simple ones we
can just
@@ -358,7 +353,7 @@ class AvroSchemaConversion:
else:
raise ValueError(f"Unknown logical/physical type combination:
{avro_logical_type}")
- def _convert_logical_decimal_type(self, avro_type: Dict[str, Any]) ->
DecimalType:
+ def _convert_logical_decimal_type(self, avro_type: dict[str, Any]) ->
DecimalType:
"""
Args:
avro_type: The Avro type
@@ -384,7 +379,7 @@ class AvroSchemaConversion:
"""
return DecimalType(precision=avro_type["precision"],
scale=avro_type["scale"])
- def _convert_logical_map_type(self, avro_type: Dict[str, Any]) -> MapType:
+ def _convert_logical_map_type(self, avro_type: dict[str, Any]) -> MapType:
"""
In the case where a map hasn't a key as a type you can use a logical
map to
still encode this in Avro
@@ -436,7 +431,7 @@ class AvroSchemaConversion:
value_is_optional=value.is_optional,
)
- def _convert_fixed_type(self, avro_type: Dict[str, Any]) -> FixedType:
+ def _convert_fixed_type(self, avro_type: dict[str, Any]) -> FixedType:
"""
https://avro.apache.org/docs/current/spec.html#Fixed