This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new a983935  Bug fix for writing empty df or null only columns (#350)
a983935 is described below

commit a983935645fdf6937badb49087b4b6de3b91baa0
Author: Sung Yun <[email protected]>
AuthorDate: Thu Feb 1 17:23:02 2024 -0500

    Bug fix for writing empty df or null only columns (#350)
    
    * bug fix for writing empty df or null only cols
    
    * overwrite
---
 pyiceberg/io/pyarrow.py          |   5 +-
 pyiceberg/table/__init__.py      |  17 ++--
 tests/integration/test_writes.py | 162 ++++++++++++++++++++++++++++++++++++++-
 3 files changed, 174 insertions(+), 10 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 9e05e2d..38d7d4a 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -1339,7 +1339,10 @@ class StatsAggregator:
     def update_max(self, val: Any) -> None:
         self.current_max = val if self.current_max is None else max(val, 
self.current_max)
 
-    def min_as_bytes(self) -> bytes:
+    def min_as_bytes(self) -> Optional[bytes]:
+        if self.current_min is None:
+            return None
+
         return self.serialize(
             self.current_min
             if self.trunc_length is None
diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py
index e3975dc..7692bb0 100644
--- a/pyiceberg/table/__init__.py
+++ b/pyiceberg/table/__init__.py
@@ -943,10 +943,13 @@ class Table:
         if len(self.spec().fields) > 0:
             raise ValueError("Cannot write to partitioned tables")
 
-        data_files = _dataframe_to_data_files(self, df=df)
         merge = _MergingSnapshotProducer(operation=Operation.APPEND, 
table=self)
-        for data_file in data_files:
-            merge.append_data_file(data_file)
+
+        # skip writing data files if the dataframe is empty
+        if df.shape[0] > 0:
+            data_files = _dataframe_to_data_files(self, df=df)
+            for data_file in data_files:
+                merge.append_data_file(data_file)
 
         merge.commit()
 
@@ -973,14 +976,16 @@ class Table:
         if len(self.spec().fields) > 0:
             raise ValueError("Cannot write to partitioned tables")
 
-        data_files = _dataframe_to_data_files(self, df=df)
         merge = _MergingSnapshotProducer(
             operation=Operation.OVERWRITE if self.current_snapshot() is not 
None else Operation.APPEND,
             table=self,
         )
 
-        for data_file in data_files:
-            merge.append_data_file(data_file)
+        # skip writing data files if the dataframe is empty
+        if df.shape[0] > 0:
+            data_files = _dataframe_to_data_files(self, df=df)
+            for data_file in data_files:
+                merge.append_data_file(data_file)
 
         merge.commit()
 
diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py
index 17dc997..c8552a2 100644
--- a/tests/integration/test_writes.py
+++ b/tests/integration/test_writes.py
@@ -119,9 +119,8 @@ def session_catalog() -> Catalog:
 
 
 @pytest.fixture(scope="session")
-def arrow_table_with_null() -> pa.Table:
-    """PyArrow table with all kinds of columns"""
-    pa_schema = pa.schema([
+def pa_schema() -> pa.Schema:
+    return pa.schema([
         ("bool", pa.bool_()),
         ("string", pa.string()),
         ("string_long", pa.string()),
@@ -139,9 +138,26 @@ def arrow_table_with_null() -> pa.Table:
         ("binary", pa.binary()),
         ("fixed", pa.binary(16)),
     ])
+
+
[email protected](scope="session")
+def arrow_table_with_null(pa_schema: pa.Schema) -> pa.Table:
+    """PyArrow table with all kinds of columns"""
     return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
 
 
[email protected](scope="session")
+def arrow_table_without_data(pa_schema: pa.Schema) -> pa.Table:
+    """PyArrow table with all kinds of columns"""
+    return pa.Table.from_pylist([], schema=pa_schema)
+
+
[email protected](scope="session")
+def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> pa.Table:
+    """PyArrow table with all kinds of columns"""
+    return pa.Table.from_pylist([{}, {}], schema=pa_schema)
+
+
 @pytest.fixture(scope="session", autouse=True)
 def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: 
pa.Table) -> None:
     identifier = "default.arrow_table_v1_with_null"
@@ -157,6 +173,36 @@ def table_v1_with_null(session_catalog: Catalog, 
arrow_table_with_null: pa.Table
     assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
 
 
[email protected](scope="session", autouse=True)
+def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: 
pa.Table) -> None:
+    identifier = "default.arrow_table_v1_without_data"
+
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+    tbl.append(arrow_table_without_data)
+
+    assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
+
+
[email protected](scope="session", autouse=True)
+def table_v1_with_only_nulls(session_catalog: Catalog, 
arrow_table_with_only_nulls: pa.Table) -> None:
+    identifier = "default.arrow_table_v1_with_only_nulls"
+
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+    tbl.append(arrow_table_with_only_nulls)
+
+    assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}"
+
+
 @pytest.fixture(scope="session", autouse=True)
 def table_v1_appended_with_null(session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
     identifier = "default.arrow_table_v1_appended_with_null"
@@ -189,6 +235,36 @@ def table_v2_with_null(session_catalog: Catalog, 
arrow_table_with_null: pa.Table
     assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
 
 
[email protected](scope="session", autouse=True)
+def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: 
pa.Table) -> None:
+    identifier = "default.arrow_table_v2_without_data"
+
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '2'})
+    tbl.append(arrow_table_without_data)
+
+    assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
+
+
[email protected](scope="session", autouse=True)
+def table_v2_with_only_nulls(session_catalog: Catalog, 
arrow_table_with_only_nulls: pa.Table) -> None:
+    identifier = "default.arrow_table_v2_with_only_nulls"
+
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+
+    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '2'})
+    tbl.append(arrow_table_with_only_nulls)
+
+    assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}"
+
+
 @pytest.fixture(scope="session", autouse=True)
 def table_v2_appended_with_null(session_catalog: Catalog, 
arrow_table_with_null: pa.Table) -> None:
     identifier = "default.arrow_table_v2_appended_with_null"
@@ -279,6 +355,26 @@ def test_query_filter_null(spark: SparkSession, col: str, 
format_version: int) -
     assert df.where(f"{col} is not null").count() == 2, f"Expected 2 rows for 
{col}"
 
 
[email protected]
[email protected]("col", TEST_DATA_WITH_NULL.keys())
[email protected]("format_version", [1, 2])
+def test_query_filter_without_data(spark: SparkSession, col: str, 
format_version: int) -> None:
+    identifier = f"default.arrow_table_v{format_version}_without_data"
+    df = spark.table(identifier)
+    assert df.where(f"{col} is null").count() == 0, f"Expected 0 row for {col}"
+    assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for 
{col}"
+
+
[email protected]
[email protected]("col", TEST_DATA_WITH_NULL.keys())
[email protected]("format_version", [1, 2])
+def test_query_filter_only_nulls(spark: SparkSession, col: str, 
format_version: int) -> None:
+    identifier = f"default.arrow_table_v{format_version}_with_only_nulls"
+    df = spark.table(identifier)
+    assert df.where(f"{col} is null").count() == 2, f"Expected 2 row for {col}"
+    assert df.where(f"{col} is not null").count() == 0, f"Expected 0 rows for 
{col}"
+
+
 @pytest.mark.integration
 @pytest.mark.parametrize("col", TEST_DATA_WITH_NULL.keys())
 @pytest.mark.parametrize("format_version", [1, 2])
@@ -409,3 +505,63 @@ def test_invalid_arguments(spark: SparkSession, 
session_catalog: Catalog, arrow_
 
     with pytest.raises(ValueError, match="Expected PyArrow table, got: not a 
df"):
         tbl.append("not a df")
+
+
[email protected]
+def test_summaries_with_only_nulls(
+    spark: SparkSession, session_catalog: Catalog, arrow_table_without_data: 
pa.Table, arrow_table_with_only_nulls: pa.Table
+) -> None:
+    identifier = "default.arrow_table_summaries_with_only_nulls"
+
+    try:
+        session_catalog.drop_table(identifier=identifier)
+    except NoSuchTableError:
+        pass
+    tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties={'format-version': '1'})
+
+    tbl.append(arrow_table_without_data)
+    tbl.append(arrow_table_with_only_nulls)
+    tbl.overwrite(arrow_table_without_data)
+
+    rows = spark.sql(
+        f"""
+        SELECT operation, summary
+        FROM {identifier}.snapshots
+        ORDER BY committed_at ASC
+    """
+    ).collect()
+
+    operations = [row.operation for row in rows]
+    assert operations == ['append', 'append', 'overwrite']
+
+    summaries = [row.summary for row in rows]
+
+    assert summaries[0] == {
+        'total-data-files': '0',
+        'total-delete-files': '0',
+        'total-equality-deletes': '0',
+        'total-files-size': '0',
+        'total-position-deletes': '0',
+        'total-records': '0',
+    }
+
+    assert summaries[1] == {
+        'added-data-files': '1',
+        'added-files-size': '4045',
+        'added-records': '2',
+        'total-data-files': '1',
+        'total-delete-files': '0',
+        'total-equality-deletes': '0',
+        'total-files-size': '4045',
+        'total-position-deletes': '0',
+        'total-records': '2',
+    }
+
+    assert summaries[0] == {
+        'total-data-files': '0',
+        'total-delete-files': '0',
+        'total-equality-deletes': '0',
+        'total-files-size': '0',
+        'total-position-deletes': '0',
+        'total-records': '0',
+    }

Reply via email to