This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0422f5be3ba [SPARK-41706][CONNECT][PYTHON]
pyspark_types_to_proto_types` should supports `MapType`
0422f5be3ba is described below
commit 0422f5be3ba5975a43942d4405ebb36c69059b7e
Author: Jiaan Geng <[email protected]>
AuthorDate: Tue Dec 27 14:34:49 2022 +0800
[SPARK-41706][CONNECT][PYTHON] pyspark_types_to_proto_types` should
supports `MapType`
### What changes were proposed in this pull request?
Currently, `pyspark_types_to_proto_types` used to transform pyspark
datatypes to protobuffer datatypes.
But it not supports the map type yet.
Many connect API need to transform pyspark map type to protobuffer map
type. For example, `createDataFrame`, `DataFrame.to` and so on.
### Why are the changes needed?
This PR let `pyspark_types_to_proto_types` support `MapType`.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
New tests.
Closes #39213 from beliefer/SPARK-41706.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
python/pyspark/sql/connect/types.py | 4 ++++
python/pyspark/sql/tests/connect/test_connect_basic.py | 18 +++++++++++++++++-
.../sql/tests/connect/test_connect_plan_only.py | 3 ++-
3 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/sql/connect/types.py
b/python/pyspark/sql/connect/types.py
index e6c179de8cb..0ef2e51b6e2 100644
--- a/python/pyspark/sql/connect/types.py
+++ b/python/pyspark/sql/connect/types.py
@@ -95,6 +95,10 @@ def pyspark_types_to_proto_types(data_type: DataType) ->
pb2.DataType:
struct_field.data_type.CopyFrom(pyspark_types_to_proto_types(field.dataType))
struct_field.nullable = field.nullable
ret.struct.fields.append(struct_field)
+ elif isinstance(data_type, MapType):
+
ret.map.key_type.CopyFrom(pyspark_types_to_proto_types(data_type.keyType))
+
ret.map.value_type.CopyFrom(pyspark_types_to_proto_types(data_type.valueType))
+ ret.map.value_contains_null = data_type.valueContainsNull
else:
raise Exception(f"Unsupported data type {data_type}")
return ret
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index ab05efc5e5d..94341822383 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -20,7 +20,7 @@ import tempfile
from pyspark.testing.sqlutils import SQLTestUtils
from pyspark.sql import SparkSession, Row
-from pyspark.sql.types import StructType, StructField, LongType, StringType,
IntegerType
+from pyspark.sql.types import StructType, StructField, LongType, StringType,
IntegerType, MapType
import pyspark.sql.functions
from pyspark.testing.utils import ReusedPySparkTestCase
from pyspark.testing.connectutils import should_test_connect,
connect_requirement_message
@@ -58,6 +58,7 @@ class SparkConnectSQLTestCase(PandasOnSparkTestCase,
ReusedPySparkTestCase, SQLT
cls.tbl_name = "test_connect_basic_table_1"
cls.tbl_name2 = "test_connect_basic_table_2"
cls.tbl_name3 = "test_connect_basic_table_3"
+ cls.tbl_name4 = "test_connect_basic_table_4"
cls.tbl_name_empty = "test_connect_basic_table_empty"
# Cleanup test data
@@ -82,6 +83,8 @@ class SparkConnectSQLTestCase(PandasOnSparkTestCase,
ReusedPySparkTestCase, SQLT
df2.write.saveAsTable(cls.tbl_name2)
df3 = cls.spark.createDataFrame([(x, f"{x}") for x in range(100)],
["id", "test\n_column"])
df3.write.saveAsTable(cls.tbl_name3)
+ df4 = cls.spark.createDataFrame([(x, {"a": x}) for x in range(100)],
["id", "map_column"])
+ df4.write.saveAsTable(cls.tbl_name4)
empty_table_schema = StructType(
[
StructField("firstname", StringType(), True),
@@ -449,6 +452,19 @@ class SparkConnectTests(SparkConnectSQLTestCase):
str(context.exception),
)
+ # Test map type
+ schema = StructType(
+ [
+ StructField("id", StringType(), True),
+ StructField("my_map", MapType(StringType(), IntegerType(),
False), True),
+ ]
+ )
+ cdf = self.connect.read.table(self.tbl_name4).to(schema)
+ df = self.spark.read.table(self.tbl_name4).to(schema)
+
+ self.assertEqual(cdf.schema, df.schema)
+ self.assert_eq(cdf.toPandas(), df.toPandas())
+
def test_toDF(self):
# SPARK-41310: test DataFrame.toDF()
self.assertEqual(
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index db42c00fa6c..def780ba7e1 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -30,7 +30,7 @@ if should_test_connect:
from pyspark.sql.connect.readwriter import DataFrameReader
from pyspark.sql.connect.function_builder import UserDefinedFunction, udf
from pyspark.sql.connect.types import pyspark_types_to_proto_types
- from pyspark.sql.types import StringType, StructType, StructField,
IntegerType
+ from pyspark.sql.types import StringType, StructType, StructField,
IntegerType, MapType
@unittest.skipIf(not should_test_connect, connect_requirement_message)
@@ -546,6 +546,7 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
[
StructField("col1", IntegerType(), True),
StructField("col2", StringType(), True),
+ StructField("map1", MapType(StringType(), IntegerType(),
True), True),
]
)
new_plan = df.to(schema)._plan.to_proto(self.connect)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]