This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new cdbaf5754e Python: Remove MemoryInputStream in favor of io.BytesIO
(#8074)
cdbaf5754e is described below
commit cdbaf5754e0cd2ee7439a1d94cb4cfc92fc29731
Author: Rusty Conover <[email protected]>
AuthorDate: Sat Jul 15 15:52:24 2023 -0400
Python: Remove MemoryInputStream in favor of io.BytesIO (#8074)
* Python: Remove MemoryInputStream in favor of io.BytesIO
* Add pre-commit fixes
---
python/pyiceberg/avro/file.py | 5 +--
python/pyiceberg/io/memory.py | 85 ---------------------------------------
python/tests/avro/test_decoder.py | 46 ++++++++++-----------
python/tests/avro/test_reader.py | 10 ++---
4 files changed, 29 insertions(+), 117 deletions(-)
diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py
index 10f7ef7d7d..cc63737c25 100644
--- a/python/pyiceberg/avro/file.py
+++ b/python/pyiceberg/avro/file.py
@@ -46,7 +46,6 @@ from pyiceberg.io import (
OutputFile,
OutputStream,
)
-from pyiceberg.io.memory import MemoryInputStream
from pyiceberg.schema import Schema
from pyiceberg.typedef import EMPTY_DICT, Record, StructProtocol
from pyiceberg.types import (
@@ -193,9 +192,7 @@ class AvroFile(Generic[D]):
if codec := self.header.compression_codec():
block_bytes = codec.decompress(block_bytes)
- self.block = Block(
- reader=self.reader, block_records=block_records,
block_decoder=BinaryDecoder(MemoryInputStream(block_bytes))
- )
+ self.block = Block(reader=self.reader, block_records=block_records,
block_decoder=BinaryDecoder(io.BytesIO(block_bytes)))
return block_records
def __next__(self) -> D:
diff --git a/python/pyiceberg/io/memory.py b/python/pyiceberg/io/memory.py
deleted file mode 100644
index c76efa570f..0000000000
--- a/python/pyiceberg/io/memory.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-from io import SEEK_CUR, SEEK_END, SEEK_SET
-from types import TracebackType
-from typing import Optional, Type
-
-from pyiceberg.io import InputStream
-
-
-class MemoryInputStream(InputStream):
- """
- Simple in memory stream that we use to store decompressed blocks.
-
- Examples:
- >>> stream = MemoryInputStream(b'22memory1925')
- >>> stream.tell()
- 0
- >>> stream.read(2)
- b'22'
- >>> stream.tell()
- 2
- >>> stream.seek(8)
- >>> stream.read(4)
- b'1925'
- >>> stream.close()
- """
-
- buffer: bytes
- len: int
- pos: int
-
- def __init__(self, buffer: bytes):
- self.buffer = buffer
- self.len = len(buffer)
- self.pos = 0
-
- def read(self, size: int = 0) -> bytes:
- b = self.buffer[self.pos : self.pos + size]
- self.pos += size
- return b
-
- def seek(self, offset: int, whence: int = SEEK_SET) -> int:
- if whence == SEEK_SET:
- self.pos = offset
- elif whence == SEEK_CUR:
- self.pos += offset
- elif whence == SEEK_END:
- self.pos = self.len + offset
- else:
- raise ValueError(f"Unknown whence {offset}")
-
- return self.pos
-
- def tell(self) -> int:
- return self.pos
-
- def close(self) -> None:
- del self.buffer
- self.pos = 0
-
- def __enter__(self) -> MemoryInputStream:
- """Provides setup when opening a MemoryInputStream using a 'with'
statement."""
- return self
-
- def __exit__(
- self, exctype: Optional[Type[BaseException]], excinst:
Optional[BaseException], exctb: Optional[TracebackType]
- ) -> None:
- """Performs cleanup when exiting the scope of a 'with' statement."""
- self.close()
diff --git a/python/tests/avro/test_decoder.py
b/python/tests/avro/test_decoder.py
index 35405c8bad..ee9a5b210e 100644
--- a/python/tests/avro/test_decoder.py
+++ b/python/tests/avro/test_decoder.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import io
from datetime import datetime, timezone
from decimal import Decimal
from io import SEEK_SET
@@ -28,12 +29,11 @@ import pytest
from pyiceberg.avro.decoder import BinaryDecoder
from pyiceberg.avro.resolver import resolve
from pyiceberg.io import InputStream
-from pyiceberg.io.memory import MemoryInputStream
from pyiceberg.types import DoubleType, FloatType
def test_read_decimal_from_fixed() -> None:
- mis =
MemoryInputStream(b"\x00\x00\x00\x05\x6A\x48\x1C\xFB\x2C\x7C\x50\x00")
+ mis = io.BytesIO(b"\x00\x00\x00\x05\x6A\x48\x1C\xFB\x2C\x7C\x50\x00")
decoder = BinaryDecoder(mis)
actual = decoder.read_decimal_from_fixed(28, 15, 12)
expected = Decimal("99892.123400000000000")
@@ -41,19 +41,19 @@ def test_read_decimal_from_fixed() -> None:
def test_read_boolean_true() -> None:
- mis = MemoryInputStream(b"\x01")
+ mis = io.BytesIO(b"\x01")
decoder = BinaryDecoder(mis)
assert decoder.read_boolean() is True
def test_read_boolean_false() -> None:
- mis = MemoryInputStream(b"\x00")
+ mis = io.BytesIO(b"\x00")
decoder = BinaryDecoder(mis)
assert decoder.read_boolean() is False
def test_skip_boolean() -> None:
- mis = MemoryInputStream(b"\x00")
+ mis = io.BytesIO(b"\x00")
decoder = BinaryDecoder(mis)
assert mis.tell() == 0
decoder.skip_boolean()
@@ -61,13 +61,13 @@ def test_skip_boolean() -> None:
def test_read_int() -> None:
- mis = MemoryInputStream(b"\x18")
+ mis = io.BytesIO(b"\x18")
decoder = BinaryDecoder(mis)
assert decoder.read_int() == 12
def test_skip_int() -> None:
- mis = MemoryInputStream(b"\x18")
+ mis = io.BytesIO(b"\x18")
decoder = BinaryDecoder(mis)
assert mis.tell() == 0
decoder.skip_int()
@@ -75,7 +75,7 @@ def test_skip_int() -> None:
def test_read_decimal() -> None:
- mis =
MemoryInputStream(b"\x18\x00\x00\x00\x05\x6A\x48\x1C\xFB\x2C\x7C\x50\x00")
+ mis = io.BytesIO(b"\x18\x00\x00\x00\x05\x6A\x48\x1C\xFB\x2C\x7C\x50\x00")
decoder = BinaryDecoder(mis)
actual = decoder.read_decimal_from_bytes(28, 15)
expected = Decimal("99892.123400000000000")
@@ -83,7 +83,7 @@ def test_read_decimal() -> None:
def test_decimal_from_fixed_big() -> None:
- mis =
MemoryInputStream(b"\x0E\xC2\x02\xE9\x06\x16\x33\x49\x77\x67\xA8\x00")
+ mis = io.BytesIO(b"\x0E\xC2\x02\xE9\x06\x16\x33\x49\x77\x67\xA8\x00")
decoder = BinaryDecoder(mis)
actual = decoder.read_decimal_from_fixed(28, 15, 12)
expected = Decimal("4567335489766.998340000000000")
@@ -91,7 +91,7 @@ def test_decimal_from_fixed_big() -> None:
def test_read_negative_bytes() -> None:
- mis = MemoryInputStream(b"")
+ mis = io.BytesIO(b"")
decoder = BinaryDecoder(mis)
with pytest.raises(ValueError) as exc_info:
@@ -136,13 +136,13 @@ def test_read_single_byte_at_the_time() -> None:
def test_read_float() -> None:
- mis = MemoryInputStream(b"\x00\x00\x9A\x41")
+ mis = io.BytesIO(b"\x00\x00\x9A\x41")
decoder = BinaryDecoder(mis)
assert decoder.read_float() == 19.25
def test_skip_float() -> None:
- mis = MemoryInputStream(b"\x00\x00\x9A\x41")
+ mis = io.BytesIO(b"\x00\x00\x9A\x41")
decoder = BinaryDecoder(mis)
assert mis.tell() == 0
decoder.skip_float()
@@ -150,13 +150,13 @@ def test_skip_float() -> None:
def test_read_double() -> None:
- mis = MemoryInputStream(b"\x00\x00\x00\x00\x00\x40\x33\x40")
+ mis = io.BytesIO(b"\x00\x00\x00\x00\x00\x40\x33\x40")
decoder = BinaryDecoder(mis)
assert decoder.read_double() == 19.25
def test_skip_double() -> None:
- mis = MemoryInputStream(b"\x00\x00\x00\x00\x00\x40\x33\x40")
+ mis = io.BytesIO(b"\x00\x00\x00\x00\x00\x40\x33\x40")
decoder = BinaryDecoder(mis)
assert mis.tell() == 0
decoder.skip_double()
@@ -164,50 +164,50 @@ def test_skip_double() -> None:
def test_read_uuid_from_fixed() -> None:
- mis = MemoryInputStream(b"\x12\x34\x56\x78" * 4)
+ mis = io.BytesIO(b"\x12\x34\x56\x78" * 4)
decoder = BinaryDecoder(mis)
assert decoder.read_uuid_from_fixed() ==
UUID("{12345678-1234-5678-1234-567812345678}")
def test_read_time_millis() -> None:
- mis = MemoryInputStream(b"\xBC\x7D")
+ mis = io.BytesIO(b"\xBC\x7D")
decoder = BinaryDecoder(mis)
assert decoder.read_time_millis().microsecond == 30000
def test_read_time_micros() -> None:
- mis = MemoryInputStream(b"\xBC\x7D")
+ mis = io.BytesIO(b"\xBC\x7D")
decoder = BinaryDecoder(mis)
assert decoder.read_time_micros().microsecond == 8030
def test_read_timestamp_micros() -> None:
- mis = MemoryInputStream(b"\xBC\x7D")
+ mis = io.BytesIO(b"\xBC\x7D")
decoder = BinaryDecoder(mis)
assert decoder.read_timestamp_micros() == datetime(1970, 1, 1, 0, 0, 0,
8030)
def test_read_timestamptz_micros() -> None:
- mis = MemoryInputStream(b"\xBC\x7D")
+ mis = io.BytesIO(b"\xBC\x7D")
decoder = BinaryDecoder(mis)
assert decoder.read_timestamptz_micros() == datetime(1970, 1, 1, 0, 0, 0,
8030, tzinfo=timezone.utc)
def test_read_bytes() -> None:
- mis = MemoryInputStream(b"\x08\x01\x02\x03\x04")
+ mis = io.BytesIO(b"\x08\x01\x02\x03\x04")
decoder = BinaryDecoder(mis)
actual = decoder.read_bytes()
assert actual == b"\x01\x02\x03\x04"
def test_read_utf8() -> None:
- mis = MemoryInputStream(b"\x04\x76\x6F")
+ mis = io.BytesIO(b"\x04\x76\x6F")
decoder = BinaryDecoder(mis)
assert decoder.read_utf8() == "vo"
def test_skip_utf8() -> None:
- mis = MemoryInputStream(b"\x04\x76\x6F")
+ mis = io.BytesIO(b"\x04\x76\x6F")
decoder = BinaryDecoder(mis)
assert mis.tell() == 0
decoder.skip_utf8()
@@ -215,7 +215,7 @@ def test_skip_utf8() -> None:
def test_read_int_as_float() -> None:
- mis = MemoryInputStream(b"\x00\x00\x9A\x41")
+ mis = io.BytesIO(b"\x00\x00\x9A\x41")
decoder = BinaryDecoder(mis)
reader = resolve(FloatType(), DoubleType())
assert reader.read(decoder) == 19.25
diff --git a/python/tests/avro/test_reader.py b/python/tests/avro/test_reader.py
index b8736dd69b..84a418e1db 100644
--- a/python/tests/avro/test_reader.py
+++ b/python/tests/avro/test_reader.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
# pylint:disable=protected-access
+import io
import json
import pytest
@@ -38,7 +39,6 @@ from pyiceberg.avro.reader import (
UUIDReader,
)
from pyiceberg.avro.resolver import construct_reader
-from pyiceberg.io.memory import MemoryInputStream
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import MANIFEST_ENTRY_SCHEMA, DataFile, ManifestEntry
from pyiceberg.schema import Schema
@@ -336,7 +336,7 @@ def test_uuid_reader() -> None:
def test_read_struct() -> None:
- mis = MemoryInputStream(b"\x18")
+ mis = io.BytesIO(b"\x18")
decoder = BinaryDecoder(mis)
struct = StructType(NestedField(1, "id", IntegerType(), required=True))
@@ -345,7 +345,7 @@ def test_read_struct() -> None:
def test_read_struct_lambda() -> None:
- mis = MemoryInputStream(b"\x18")
+ mis = io.BytesIO(b"\x18")
decoder = BinaryDecoder(mis)
struct = StructType(NestedField(1, "id", IntegerType(), required=True))
@@ -357,7 +357,7 @@ def test_read_struct_lambda() -> None:
def test_read_not_struct_type() -> None:
- mis = MemoryInputStream(b"\x18")
+ mis = io.BytesIO(b"\x18")
decoder = BinaryDecoder(mis)
struct = StructType(NestedField(1, "id", IntegerType(), required=True))
@@ -368,7 +368,7 @@ def test_read_not_struct_type() -> None:
def test_read_struct_exception_handling() -> None:
- mis = MemoryInputStream(b"\x18")
+ mis = io.BytesIO(b"\x18")
decoder = BinaryDecoder(mis)
def raise_err(struct: StructType) -> None: