This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e09fc872a32 [SPARK-55600][PYTHON] Fix pandas to arrow loses row count 
when schema has 0 columns on classic
1e09fc872a32 is described below

commit 1e09fc872a32f41f4b136be9c45970db7cc14c8b
Author: Yicong-Huang <[email protected]>
AuthorDate: Tue Feb 24 13:10:51 2026 -0800

    [SPARK-55600][PYTHON] Fix pandas to arrow loses row count when schema has 0 
columns on classic
    
    ### What changes were proposed in this pull request?
    
    This PR fixes the row count loss issue when creating a Spark DataFrame from 
a pandas DataFrame with 0 columns in classic.
    
    The issue occurs due to PyArrow limitations when creating RecordBatches or 
Tables with 0 columns - row count information is lost.
    
    ### Why are the changes needed?
    
    Before this fix:
    ```python
    import pandas as pd
    from pyspark.sql.types import StructType
    
    pdf = pd.DataFrame(index=range(5))  # 5 rows, 0 columns
    df = spark.createDataFrame(pdf, schema=StructType([]))
    df.count()  # Returns 0 (wrong!)
    ```
    
    After this fix:
    ```python
    df.count()  # Returns 5 (correct!)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Creating a DataFrame from a pandas DataFrame with 0 columns now 
correctly preserves the row count in Classic Spark.
    
    ### How was this patch tested?
    
    Added unit test `test_from_pandas_dataframe_with_zero_columns` in 
`test_creation.py` that tests both Arrow-enabled and Arrow-disabled paths.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54382 from 
Yicong-Huang/SPARK-55600/fix/pandas-arrow-zero-columns-row-count.
    
    Authored-by: Yicong-Huang <[email protected]>
    Signed-off-by: Takuya Ueshin <[email protected]>
---
 python/pyspark/sql/pandas/conversion.py   | 47 ++++++++++++++++++++-----------
 python/pyspark/sql/tests/test_creation.py | 32 +++++++++++++++++++++
 2 files changed, 62 insertions(+), 17 deletions(-)

diff --git a/python/pyspark/sql/pandas/conversion.py 
b/python/pyspark/sql/pandas/conversion.py
index cdcfcc872bbe..5c4b6d14b24d 100644
--- a/python/pyspark/sql/pandas/conversion.py
+++ b/python/pyspark/sql/pandas/conversion.py
@@ -861,6 +861,12 @@ class SparkConversionMixin:
                         ser.dt.to_pytimedelta(), index=ser.index, 
dtype="object", name=ser.name
                     )
 
+        # Handle the 0-column case separately to preserve row count
+        if len(pdf.columns) == 0:
+            from pyspark.sql import Row
+
+            return [Row()] * len(pdf)
+
         # Convert pandas.DataFrame to list of numpy records
         np_records = pdf.set_axis(
             [f"col_{i}" for i in range(len(pdf.columns))], axis="columns"
@@ -998,16 +1004,21 @@ class SparkConversionMixin:
         step = step if step > 0 else len(pdf)
         pdf_slices = (pdf.iloc[start : start + step] for start in range(0, 
len(pdf), step))
 
-        # Create Arrow batches directly using the standalone function
-        arrow_batches = [
-            create_arrow_batch_from_pandas(
-                [(c, t) for (_, c), t in zip(pdf_slice.items(), spark_types)],
-                timezone=timezone,
-                safecheck=safecheck,
-                prefers_large_types=prefers_large_var_types,
-            )
-            for pdf_slice in pdf_slices
-        ]
+        # Handle the 0-column case separately to preserve row count.
+        # pa.RecordBatch.from_pandas preserves num_rows via pandas index 
metadata.
+        if len(pdf.columns) == 0:
+            arrow_batches = [pa.RecordBatch.from_pandas(pdf_slice) for 
pdf_slice in pdf_slices]
+        else:
+            # Create Arrow batches directly using the standalone function
+            arrow_batches = [
+                create_arrow_batch_from_pandas(
+                    [(c, t) for (_, c), t in zip(pdf_slice.items(), 
spark_types)],
+                    timezone=timezone,
+                    safecheck=safecheck,
+                    prefers_large_types=prefers_large_var_types,
+                )
+                for pdf_slice in pdf_slices
+            ]
 
         jsparkSession = self._jsparkSession
 
@@ -1074,14 +1085,16 @@ class SparkConversionMixin:
         if not isinstance(schema, StructType):
             schema = from_arrow_schema(table.schema, 
prefer_timestamp_ntz=prefer_timestamp_ntz)
 
-        table = _check_arrow_table_timestamps_localize(table, schema, True, 
timezone).cast(
-            to_arrow_schema(
-                schema,
-                error_on_duplicated_field_names_in_struct=True,
-                timezone="UTC",
-                prefers_large_types=prefers_large_var_types,
+        # Skip cast for 0-column tables as it loses row count
+        if len(schema.fields) > 0:
+            table = _check_arrow_table_timestamps_localize(table, schema, 
True, timezone).cast(
+                to_arrow_schema(
+                    schema,
+                    error_on_duplicated_field_names_in_struct=True,
+                    timezone="UTC",
+                    prefers_large_types=prefers_large_var_types,
+                )
             )
-        )
 
         # Chunk the Arrow Table into RecordBatches
         chunk_size = arrow_batch_size
diff --git a/python/pyspark/sql/tests/test_creation.py 
b/python/pyspark/sql/tests/test_creation.py
index 906dab969201..96abd8d1ffba 100644
--- a/python/pyspark/sql/tests/test_creation.py
+++ b/python/pyspark/sql/tests/test_creation.py
@@ -261,6 +261,38 @@ class DataFrameCreationTestsMixin:
                 sdf = self.spark.createDataFrame(data, schema)
                 assertDataFrameEqual(sdf, data)
 
+    @unittest.skipIf(
+        not have_pandas or not have_pyarrow,
+        pandas_requirement_message or pyarrow_requirement_message,
+    )
+    def test_from_pandas_dataframe_with_zero_columns(self):
+        """SPARK-55600: Test that row count is preserved when creating 
DataFrame from
+        pandas with 0 columns but with explicit schema in classic Spark."""
+        import pandas as pd
+
+        # Create a pandas DataFrame with 5 rows but 0 columns
+        pdf = pd.DataFrame(index=range(5))
+        schema = StructType([])
+
+        # Test with Arrow optimization enabled
+        with self.sql_conf(
+            {
+                "spark.sql.execution.arrow.pyspark.enabled": True,
+                "spark.sql.execution.arrow.pyspark.fallback.enabled": False,
+            }
+        ):
+            df = self.spark.createDataFrame(pdf, schema=schema)
+            self.assertEqual(df.schema, schema)
+            self.assertEqual(df.count(), 5)
+            self.assertEqual(len(df.collect()), 5)
+
+        # Test with Arrow optimization disabled
+        with self.sql_conf({"spark.sql.execution.arrow.pyspark.enabled": 
False}):
+            df = self.spark.createDataFrame(pdf, schema=schema)
+            self.assertEqual(df.schema, schema)
+            self.assertEqual(df.count(), 5)
+            self.assertEqual(len(df.collect()), 5)
+
 
 class DataFrameCreationTests(
     DataFrameCreationTestsMixin,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to