This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
The following commit(s) were added to refs/heads/main by this push:
new a983935 Bug fix for writing empty df or null only columns (#350)
a983935 is described below
commit a983935645fdf6937badb49087b4b6de3b91baa0
Author: Sung Yun <[email protected]>
AuthorDate: Thu Feb 1 17:23:02 2024 -0500
Bug fix for writing empty df or null only columns (#350)
* bug fix for writing empty df or null only cols
* overwrite
---
pyiceberg/io/pyarrow.py | 5 +-
pyiceberg/table/__init__.py | 17 ++--
tests/integration/test_writes.py | 162 ++++++++++++++++++++++++++++++++++++++-
3 files changed, 174 insertions(+), 10 deletions(-)
diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 9e05e2d..38d7d4a 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1339,7 +1339,10 @@ class StatsAggregator:
def update_max(self, val: Any) -> None:
self.current_max = val if self.current_max is None else max(val,
self.current_max)
- def min_as_bytes(self) -> bytes:
+ def min_as_bytes(self) -> Optional[bytes]:
+ if self.current_min is None:
+ return None
+
return self.serialize(
self.current_min
if self.trunc_length is None
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index e3975dc..7692bb0 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -943,10 +943,13 @@ class Table:
if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")
- data_files = _dataframe_to_data_files(self, df=df)
merge = _MergingSnapshotProducer(operation=Operation.APPEND,
table=self)
- for data_file in data_files:
- merge.append_data_file(data_file)
+
+ # skip writing data files if the dataframe is empty
+ if df.shape[0] > 0:
+ data_files = _dataframe_to_data_files(self, df=df)
+ for data_file in data_files:
+ merge.append_data_file(data_file)
merge.commit()
@@ -973,14 +976,16 @@ class Table:
if len(self.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")
- data_files = _dataframe_to_data_files(self, df=df)
merge = _MergingSnapshotProducer(
operation=Operation.OVERWRITE if self.current_snapshot() is not
None else Operation.APPEND,
table=self,
)
- for data_file in data_files:
- merge.append_data_file(data_file)
+ # skip writing data files if the dataframe is empty
+ if df.shape[0] > 0:
+ data_files = _dataframe_to_data_files(self, df=df)
+ for data_file in data_files:
+ merge.append_data_file(data_file)
merge.commit()
diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py
index 17dc997..c8552a2 100644
--- a/tests/integration/test_writes.py
+++ b/tests/integration/test_writes.py
@@ -119,9 +119,8 @@ def session_catalog() -> Catalog:
@pytest.fixture(scope="session")
-def arrow_table_with_null() -> pa.Table:
- """PyArrow table with all kinds of columns"""
- pa_schema = pa.schema([
+def pa_schema() -> pa.Schema:
+ return pa.schema([
("bool", pa.bool_()),
("string", pa.string()),
("string_long", pa.string()),
@@ -139,9 +138,26 @@ def arrow_table_with_null() -> pa.Table:
("binary", pa.binary()),
("fixed", pa.binary(16)),
])
+
+
[email protected](scope="session")
+def arrow_table_with_null(pa_schema: pa.Schema) -> pa.Table:
+ """PyArrow table with all kinds of columns"""
return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
[email protected](scope="session")
+def arrow_table_without_data(pa_schema: pa.Schema) -> pa.Table:
+ """PyArrow table with all kinds of columns"""
+ return pa.Table.from_pylist([], schema=pa_schema)
+
+
[email protected](scope="session")
+def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> pa.Table:
+ """PyArrow table with all kinds of columns"""
+ return pa.Table.from_pylist([{}, {}], schema=pa_schema)
+
+
@pytest.fixture(scope="session", autouse=True)
def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null:
pa.Table) -> None:
identifier = "default.arrow_table_v1_with_null"
@@ -157,6 +173,36 @@ def table_v1_with_null(session_catalog: Catalog,
arrow_table_with_null: pa.Table
assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
[email protected](scope="session", autouse=True)
+def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data:
pa.Table) -> None:
+ identifier = "default.arrow_table_v1_without_data"
+
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+ tbl.append(arrow_table_without_data)
+
+ assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
+
+
[email protected](scope="session", autouse=True)
+def table_v1_with_only_nulls(session_catalog: Catalog,
arrow_table_with_only_nulls: pa.Table) -> None:
+ identifier = "default.arrow_table_v1_with_only_nulls"
+
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+ tbl.append(arrow_table_with_only_nulls)
+
+ assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
+
+
@pytest.fixture(scope="session", autouse=True)
def table_v1_appended_with_null(session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v1_appended_with_null"
@@ -189,6 +235,36 @@ def table_v2_with_null(session_catalog: Catalog,
arrow_table_with_null: pa.Table
assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
[email protected](scope="session", autouse=True)
+def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data:
pa.Table) -> None:
+ identifier = "default.arrow_table_v2_without_data"
+
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '2'})
+ tbl.append(arrow_table_without_data)
+
+ assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
+
+
[email protected](scope="session", autouse=True)
+def table_v2_with_only_nulls(session_catalog: Catalog,
arrow_table_with_only_nulls: pa.Table) -> None:
+ identifier = "default.arrow_table_v2_with_only_nulls"
+
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+
+ tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '2'})
+ tbl.append(arrow_table_with_only_nulls)
+
+ assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
+
+
@pytest.fixture(scope="session", autouse=True)
def table_v2_appended_with_null(session_catalog: Catalog,
arrow_table_with_null: pa.Table) -> None:
identifier = "default.arrow_table_v2_appended_with_null"
@@ -279,6 +355,26 @@ def test_query_filter_null(spark: SparkSession, col: str,
format_version: int) -
assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for
{col}"
[email protected]
[email protected]("col", TEST_DATA_WITH_NULL.keys())
[email protected]("format_version", [1, 2])
+def test_query_filter_without_data(spark: SparkSession, col: str,
format_version: int) -> None:
+ identifier = f"default.arrow_table_v{format_version}_without_data"
+ df = spark.table(identifier)
+ assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}"
+ assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for
{col}"
+
+
[email protected]
[email protected]("col", TEST_DATA_WITH_NULL.keys())
[email protected]("format_version", [1, 2])
+def test_query_filter_only_nulls(spark: SparkSession, col: str,
format_version: int) -> None:
+ identifier = f"default.arrow_table_v{format_version}_with_only_nulls"
+ df = spark.table(identifier)
+ assert df.where(f"{col} is null").count() == 2, f"Expected 2 row for {col}"
+ assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for
{col}"
+
+
@pytest.mark.integration
@pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys())
@pytest.mark.parametrize("format_version", [1, 2])
@@ -409,3 +505,63 @@ def test_invalid_arguments(spark: SparkSession,
session_catalog: Catalog, arrow_
with pytest.raises(ValueError, match="Expected PyArrow table, got: not a
df"):
tbl.append("not a df")
+
+
[email protected]
+def test_summaries_with_only_nulls(
+ spark: SparkSession, session_catalog: Catalog, arrow_table_without_data:
pa.Table, arrow_table_with_only_nulls: pa.Table
+) -> None:
+ identifier = "default.arrow_table_summaries_with_only_nulls"
+
+ try:
+ session_catalog.drop_table(identifier=identifier)
+ except NoSuchTableError:
+ pass
+ tbl = session_catalog.create_table(identifier=identifier,
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+
+ tbl.append(arrow_table_without_data)
+ tbl.append(arrow_table_with_only_nulls)
+ tbl.overwrite(arrow_table_without_data)
+
+ rows = spark.sql(
+ f"""
+ SELECT operation, summary
+ FROM {identifier}.snapshots
+ ORDER BY committed_at ASC
+ """
+ ).collect()
+
+ operations = [row.operation for row in rows]
+ assert operations == ['append', 'append', 'overwrite']
+
+ summaries = [row.summary for row in rows]
+
+ assert summaries[0] == {
+ 'total-data-files': '0',
+ 'total-delete-files': '0',
+ 'total-equality-deletes': '0',
+ 'total-files-size': '0',
+ 'total-position-deletes': '0',
+ 'total-records': '0',
+ }
+
+ assert summaries[1] == {
+ 'added-data-files': '1',
+ 'added-files-size': '4045',
+ 'added-records': '2',
+ 'total-data-files': '1',
+ 'total-delete-files': '0',
+ 'total-equality-deletes': '0',
+ 'total-files-size': '4045',
+ 'total-position-deletes': '0',
+ 'total-records': '2',
+ }
+
+ assert summaries[0] == {
+ 'total-data-files': '0',
+ 'total-delete-files': '0',
+ 'total-equality-deletes': '0',
+ 'total-files-size': '0',
+ 'total-position-deletes': '0',
+ 'total-records': '0',
+ }