This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b679d0ce71 Python: Improve Avro read performance (#8075)
b679d0ce71 is described below
commit b679d0ce71920075f4f04cf4659295f179da273b
Author: Rusty Conover <[email protected]>
AuthorDate: Sun Jul 16 13:09:24 2023 -0400
Python: Improve Avro read performance (#8075)
Utilize __slots__ for various Avro classes.
Memoize the key/value reader functions in MapReader.
---
python/pyiceberg/avro/decoder.py | 1 +
python/pyiceberg/avro/file.py | 16 +++++++++++++++-
python/pyiceberg/avro/reader.py | 11 +++++++++--
python/pyiceberg/avro/resolver.py | 3 +++
python/pyiceberg/manifest.py | 38 ++++++++++++++++++++++++++++++++++++++
python/pyiceberg/typedef.py | 1 +
python/tests/avro/test_file.py | 3 ++-
7 files changed, 69 insertions(+), 4 deletions(-)
diff --git a/python/pyiceberg/avro/decoder.py b/python/pyiceberg/avro/decoder.py
index 366cf7b26e..e680c00bcd 100644
--- a/python/pyiceberg/avro/decoder.py
+++ b/python/pyiceberg/avro/decoder.py
@@ -29,6 +29,7 @@ from pyiceberg.utils.decimal import unscaled_to_decimal
class BinaryDecoder:
"""Read leaf values."""
+ __slots__ = "_input_stream"
_input_stream: InputStream
def __init__(self, input_stream: InputStream) -> None:
diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py
index cc63737c25..520eda9ce1 100644
--- a/python/pyiceberg/avro/file.py
+++ b/python/pyiceberg/avro/file.py
@@ -77,6 +77,7 @@ _SCHEMA_KEY = "avro.schema"
class AvroFileHeader(Record):
+ __slots__ = ("magic", "meta", "sync")
magic: bytes
meta: Dict[str, str]
sync: bytes
@@ -128,6 +129,18 @@ class Block(Generic[D]):
class AvroFile(Generic[D]):
+ __slots__ = (
+ "input_file",
+ "read_schema",
+ "read_types",
+ "read_enums",
+ "input_stream",
+ "header",
+ "schema",
+ "reader",
+ "decoder",
+ "block",
+ )
input_file: InputFile
read_schema: Optional[Schema]
read_types: Dict[int, Callable[..., StructProtocol]]
@@ -138,7 +151,7 @@ class AvroFile(Generic[D]):
reader: Reader
decoder: BinaryDecoder
- block: Optional[Block[D]] = None
+ block: Optional[Block[D]]
def __init__(
self,
@@ -151,6 +164,7 @@ class AvroFile(Generic[D]):
self.read_schema = read_schema
self.read_types = read_types
self.read_enums = read_enums
+ self.block = None
def __enter__(self) -> AvroFile[D]:
"""Generates a reader tree for the payload within an avro file.
diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py
index a1978c6ca3..7d3d8fcec2 100644
--- a/python/pyiceberg/avro/reader.py
+++ b/python/pyiceberg/avro/reader.py
@@ -105,6 +105,7 @@ class NoneReader(Reader):
class DefaultReader(Reader):
+ __slots__ = ("default_value",)
default_value: Any
def __init__(self, default_value: Any) -> None:
@@ -266,6 +267,7 @@ class OptionReader(Reader):
class StructReader(Reader):
+ __slots__ = ("field_readers", "create_struct", "struct")
field_readers: Tuple[Tuple[Optional[int], Reader], ...]
create_struct: Callable[..., StructProtocol]
struct: StructType
@@ -324,6 +326,7 @@ class StructReader(Reader):
@dataclass(frozen=True)
class ListReader(Reader):
+ __slots__ = ("element",)
element: Reader
def read(self, decoder: BinaryDecoder) -> List[Any]:
@@ -344,20 +347,24 @@ class ListReader(Reader):
@dataclass(frozen=True)
class MapReader(Reader):
+ __slots__ = ("key", "value")
key: Reader
value: Reader
def read(self, decoder: BinaryDecoder) -> Dict[Any, Any]:
read_items = {}
block_count = decoder.read_int()
+ key_reader = self.key.read
+ value_reader = self.value.read
+
while block_count != 0:
if block_count < 0:
block_count = -block_count
# We ignore the block size for now
_ = decoder.read_int()
for _ in range(block_count):
- key = self.key.read(decoder)
- read_items[key] = self.value.read(decoder)
+ key = key_reader(decoder)
+ read_items[key] = value_reader(decoder)
block_count = decoder.read_int()
return read_items
diff --git a/python/pyiceberg/avro/resolver.py
b/python/pyiceberg/avro/resolver.py
index 51505301ea..efd99936c5 100644
--- a/python/pyiceberg/avro/resolver.py
+++ b/python/pyiceberg/avro/resolver.py
@@ -217,6 +217,8 @@ def resolve(
class EnumReader(Reader):
"""An Enum reader to wrap primitive values into an Enum."""
+ __slots__ = ("enum", "reader")
+
enum: Callable[..., Enum]
reader: Reader
@@ -232,6 +234,7 @@ class EnumReader(Reader):
class SchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
+ __slots__ = ("read_types", "read_enums", "context")
read_types: Dict[int, Callable[..., StructProtocol]]
read_enums: Dict[int, Callable[..., Enum]]
context: List[int]
diff --git a/python/pyiceberg/manifest.py b/python/pyiceberg/manifest.py
index fbc013f50c..ce0e47fc24 100644
--- a/python/pyiceberg/manifest.py
+++ b/python/pyiceberg/manifest.py
@@ -164,6 +164,25 @@ DATA_FILE_TYPE = StructType(
class DataFile(Record):
+ __slots__ = (
+ "content",
+ "file_path",
+ "file_format",
+ "partition",
+ "record_count",
+ "file_size_in_bytes",
+ "column_sizes",
+ "value_counts",
+ "null_value_counts",
+ "nan_value_counts",
+ "lower_bounds",
+ "upper_bounds",
+ "key_metadata",
+ "split_offsets",
+ "equality_ids",
+ "sort_order_id",
+ "spec_id",
+ )
content: DataFileContent
file_path: str
file_format: FileFormat
@@ -214,6 +233,7 @@ MANIFEST_ENTRY_SCHEMA = Schema(
class ManifestEntry(Record):
+ __slots__ = ("status", "snapshot_id", "data_sequence_number",
"file_sequence_number", "data_file")
status: ManifestEntryStatus
snapshot_id: Optional[int]
data_sequence_number: Optional[int]
@@ -233,6 +253,7 @@ PARTITION_FIELD_SUMMARY_TYPE = StructType(
class PartitionFieldSummary(Record):
+ __slots__ = ("contains_null", "contains_nan", "lower_bound", "upper_bound")
contains_null: bool
contains_nan: Optional[bool]
lower_bound: Optional[bytes]
@@ -266,6 +287,23 @@ POSITIONAL_DELETE_SCHEMA = Schema(
class ManifestFile(Record):
+ __slots__ = (
+ "manifest_path",
+ "manifest_length",
+ "partition_spec_id",
+ "content",
+ "sequence_number",
+ "min_sequence_number",
+ "added_snapshot_id",
+ "added_files_count",
+ "existing_files_count",
+ "deleted_files_count",
+ "added_rows_count",
+ "existing_rows_count",
+ "deleted_rows_count",
+ "partitions",
+ "key_metadata",
+ )
manifest_path: str
manifest_length: int
partition_spec_id: int
diff --git a/python/pyiceberg/typedef.py b/python/pyiceberg/typedef.py
index 2e4d1b938f..51495acdf0 100644
--- a/python/pyiceberg/typedef.py
+++ b/python/pyiceberg/typedef.py
@@ -128,6 +128,7 @@ class IcebergBaseModel(BaseModel):
class Record(StructProtocol):
+ __slots__ = ("_position_to_field_name",)
_position_to_field_name: Dict[int, str]
def __init__(self, *data: Any, struct: Optional[StructType] = None,
**named_data: Any) -> None:
diff --git a/python/tests/avro/test_file.py b/python/tests/avro/test_file.py
index 53d0216ab0..101e676c90 100644
--- a/python/tests/avro/test_file.py
+++ b/python/tests/avro/test_file.py
@@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+import inspect
from enum import Enum
from tempfile import TemporaryDirectory
from typing import Any
@@ -90,7 +91,7 @@ def todict(obj: Any) -> Any:
elif hasattr(obj, "__iter__") and not isinstance(obj, str) and not
isinstance(obj, bytes):
return [todict(v) for v in obj]
elif hasattr(obj, "__dict__"):
- return {key: todict(value) for key, value in obj.__dict__.items() if
not callable(value) and not key.startswith("_")}
+ return {key: todict(value) for key, value in inspect.getmembers(obj)
if not callable(value) and not key.startswith("_")}
else:
return obj