This is an automated email from the ASF dual-hosted git repository. fokko pushed a commit to branch pyiceberg-0.6.x in repository https://gitbox.apache.org/repos/asf/iceberg-python.git
commit 86ff66e3f770d9ed0996138503e17533ef1d8290 Author: HonahX <[email protected]> AuthorDate: Sat Apr 13 16:53:42 2024 -0700 fix integration test --- tests/integration/test_writes.py | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 887a5194..425e4cee 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -20,7 +20,7 @@ import time import uuid from datetime import date, datetime from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from urllib.parse import urlparse import pyarrow as pa @@ -170,15 +170,22 @@ def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> pa.Table: return pa.Table.from_pylist([{}, {}], schema=pa_schema) -def _create_table(session_catalog: Catalog, identifier: str, properties: Properties, data: List[pa.Table]) -> Table: +def _create_table( + session_catalog: Catalog, + identifier: str, + properties: Properties, + data: Optional[List[pa.Table]] = None, + schema: Schema = TABLE_SCHEMA, +) -> Table: try: session_catalog.drop_table(identifier=identifier) except NoSuchTableError: pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) - for d in data: - tbl.append(d) + tbl = session_catalog.create_table(identifier=identifier, schema=schema, properties=properties) + if data: + for d in data: + tbl.append(d) return tbl @@ -425,6 +432,28 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w assert [row.deleted_data_files_count for row in rows] == [0, 0, 1, 0, 0] [email protected] [email protected]("format_version", [1, 2]) +def test_python_writes_special_character_column_with_spark_reads( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = "default.python_writes_special_character_column_with_spark_reads" + column_name_with_special_character = "letter/abc" + TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = { + column_name_with_special_character: ['a', None, 'z'], + } + pa_schema = pa.schema([ + (column_name_with_special_character, pa.string()), + ]) + arrow_table_with_special_character_column = pa.Table.from_pydict(TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN, schema=pa_schema) + tbl = _create_table(session_catalog, identifier, {"format-version": str(format_version)}, schema=pa_schema) + + tbl.overwrite(arrow_table_with_special_character_column) + spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas() + pyiceberg_df = tbl.scan().to_pandas() + assert spark_df.equals(pyiceberg_df) + + @pytest.mark.integration @pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize(
