This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 35d4648f Read: fetch file_schema directly from pyarrow_to_schema (#597)
35d4648f is described below

commit 35d4648f38c1ae924dbf9597d707e44f7c53225a
Author: Honah J <[email protected]>
AuthorDate: Fri Apr 12 17:53:22 2024 -0700

    Read: fetch file_schema directly from pyarrow_to_schema (#597)
---
 pyiceberg/io/pyarrow.py                      | 10 ++--------
 tests/conftest.py                            |  4 +++-
 tests/integration/test_writes/test_writes.py | 22 ++++++++++++++++++++++
 tests/integration/test_writes/utils.py       |  7 ++++---
 tests/io/test_pyarrow.py                     |  6 +++---
 5 files changed, 34 insertions(+), 15 deletions(-)

diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py
index 74692f85..67ebaa81 100644
--- a/pyiceberg/io/pyarrow.py
+++ b/pyiceberg/io/pyarrow.py
@@ -122,7 +122,6 @@ from pyiceberg.schema import (
     pre_order_visit,
     promote,
     prune_columns,
-    sanitize_column_names,
     visit,
     visit_with_partner,
 )
@@ -966,12 +965,7 @@ def _task_to_table(
     with fs.open_input_file(path) as fin:
         fragment = arrow_format.make_fragment(fin)
         physical_schema = fragment.physical_schema
-        schema_raw = None
-        if metadata := physical_schema.metadata:
-            schema_raw = metadata.get(ICEBERG_SCHEMA)
-        file_schema = (
-            Schema.model_validate_json(schema_raw) if schema_raw is not None 
else pyarrow_to_schema(physical_schema, name_mapping)
-        )
+        file_schema = pyarrow_to_schema(physical_schema, name_mapping)
 
         pyarrow_filter = None
         if bound_row_filter is not AlwaysTrue():
@@ -979,7 +973,7 @@ def _task_to_table(
             bound_file_filter = bind(file_schema, translated_row_filter, 
case_sensitive=case_sensitive)
             pyarrow_filter = expression_to_pyarrow(bound_file_filter)
 
-        file_project_schema = sanitize_column_names(prune_columns(file_schema, 
projected_field_ids, select_full_types=False))
+        file_project_schema = prune_columns(file_schema, projected_field_ids, 
select_full_types=False)
 
         if file_schema is None:
             raise ValueError(f"Missing Iceberg schema in Metadata for file: 
{path}")
diff --git a/tests/conftest.py b/tests/conftest.py
index 7da0a0a8..e0d82910 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1931,9 +1931,11 @@ def data_file(table_schema_simple: Schema, tmp_path: 
str) -> str:
     import pyarrow as pa
     from pyarrow import parquet as pq
 
+    from pyiceberg.io.pyarrow import schema_to_pyarrow
+
     table = pa.table(
         {"foo": ["a", "b", "c"], "bar": [1, 2, 3], "baz": [True, False, None]},
-        metadata={"iceberg.schema": table_schema_simple.model_dump_json()},
+        schema=schema_to_pyarrow(table_schema_simple),
     )
 
     file_path = f"{tmp_path}/0000-data.parquet"
diff --git a/tests/integration/test_writes/test_writes.py 
b/tests/integration/test_writes/test_writes.py
index e1526d2a..775a6f9d 100644
--- a/tests/integration/test_writes/test_writes.py
+++ b/tests/integration/test_writes/test_writes.py
@@ -271,6 +271,28 @@ def test_python_writes_with_spark_snapshot_reads(
     assert tbl.current_snapshot().snapshot_id == 
get_current_snapshot_id(identifier)  # type: ignore
 
 
[email protected]
[email protected]("format_version", [1, 2])
+def test_python_writes_special_character_column_with_spark_reads(
+    spark: SparkSession, session_catalog: Catalog, format_version: int
+) -> None:
+    identifier = 
"default.python_writes_special_character_column_with_spark_reads"
+    column_name_with_special_character = "letter/abc"
+    TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = {
+        column_name_with_special_character: ['a', None, 'z'],
+    }
+    pa_schema = pa.schema([
+        (column_name_with_special_character, pa.string()),
+    ])
+    arrow_table_with_special_character_column = 
pa.Table.from_pydict(TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN, schema=pa_schema)
+    tbl = _create_table(session_catalog, identifier, {"format-version": 
format_version}, schema=pa_schema)
+
+    tbl.overwrite(arrow_table_with_special_character_column)
+    spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas()
+    pyiceberg_df = tbl.scan().to_pandas()
+    assert spark_df.equals(pyiceberg_df)
+
+
 @pytest.mark.integration
 def test_write_bin_pack_data_files(spark: SparkSession, session_catalog: 
Catalog, arrow_table_with_null: pa.Table) -> None:
     identifier = "default.write_bin_pack_data_files"
diff --git a/tests/integration/test_writes/utils.py 
b/tests/integration/test_writes/utils.py
index 792e2518..742b1e14 100644
--- a/tests/integration/test_writes/utils.py
+++ b/tests/integration/test_writes/utils.py
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 # pylint:disable=redefined-outer-name
-from typing import List, Optional
+from typing import List, Optional, Union
 
 import pyarrow as pa
 
@@ -65,6 +65,7 @@ def _create_table(
     properties: Properties,
     data: Optional[List[pa.Table]] = None,
     partition_spec: Optional[PartitionSpec] = None,
+    schema: Union[Schema, "pa.Schema"] = TABLE_SCHEMA,
 ) -> Table:
     try:
         session_catalog.drop_table(identifier=identifier)
@@ -73,10 +74,10 @@ def _create_table(
 
     if partition_spec:
         tbl = session_catalog.create_table(
-            identifier=identifier, schema=TABLE_SCHEMA, properties=properties, 
partition_spec=partition_spec
+            identifier=identifier, schema=schema, properties=properties, 
partition_spec=partition_spec
         )
     else:
-        tbl = session_catalog.create_table(identifier=identifier, 
schema=TABLE_SCHEMA, properties=properties)
+        tbl = session_catalog.create_table(identifier=identifier, 
schema=schema, properties=properties)
 
     if data:
         for d in data:
diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py
index 46ece778..ef2c4cec 100644
--- a/tests/io/test_pyarrow.py
+++ b/tests/io/test_pyarrow.py
@@ -1373,7 +1373,7 @@ def test_delete(deletes_file: str, example_task: 
FileScanTask, table_schema_simp
         str(with_deletes)
         == """pyarrow.Table
 foo: string
-bar: int64 not null
+bar: int32 not null
 baz: bool
 ----
 foo: [["a","c"]]
@@ -1411,7 +1411,7 @@ def test_delete_duplicates(deletes_file: str, 
example_task: FileScanTask, table_
         str(with_deletes)
         == """pyarrow.Table
 foo: string
-bar: int64 not null
+bar: int32 not null
 baz: bool
 ----
 foo: [["a","c"]]
@@ -1442,7 +1442,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, 
table_schema_simple: Sc
         str(projection)
         == """pyarrow.Table
 foo: string
-bar: int64 not null
+bar: int32 not null
 baz: bool
 ----
 foo: [["a","b","c"]]

Reply via email to