This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a1b954105 [KYUUBI #6207] Support to retrieve Spark UserDefinedType
result
a1b954105 is described below
commit a1b95410544fe2c2f0e9691e1999a522564403df
Author: Wang, Fei <[email protected]>
AuthorDate: Mon Mar 25 21:47:54 2024 -0700
[KYUUBI #6207] Support to retrieve Spark UserDefinedType result
# :mag: Description
## Issue References ๐
This pull request fixes #
To fix below issue:
```
24/03/25 00:47:10 ERROR SparkTBinaryFrontendService: Error getting result
set metadata:
java.lang.IllegalArgumentException: Unrecognized type name:
struct<type:tinyint,size:int,indices:array<int>,values:array<double>>
```
<img width="1567" alt="image"
src="https://github.com/apache/kyuubi/assets/6757692/9067d2d2-06d9-4937-b328-71434def34fd">
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context. List any dependencies that are
required for this change.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
After this pr:
<img width="1728" alt="image"
src="https://github.com/apache/kyuubi/assets/6757692/2d9f4f0b-9ac4-48e9-9e6a-4c0f1616edf9">
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6207 from turboFei/udt.
Closes #6207
b492568e3 [Wang, Fei] Revert "miss union type"
39ac1c42f [Wang, Fei] miss union type
8c32f54af [Wang, Fei] comment
00a469855 [Wang, Fei] getColumnTypeName
d7d291652 [Wang, Fei] ut
edce5cf1f [Wang, Fei] support udt
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../kyuubi/engine/spark/schema/SchemaHelper.scala | 4 +-
.../spark/sql/kyuubi/SparkDataTypeHelper.scala | 29 +++++++++++++
.../org/apache/spark/kyuubi/ExampleValueUDT.scala | 44 ++++++++++++++++++++
.../spark/kyuubi/SparkUDTOperationSuite.scala | 47 ++++++++++++++++++++++
.../org/apache/kyuubi/jdbc/hive/JdbcColumn.java | 3 ++
5 files changed, 126 insertions(+), 1 deletion(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
index 3da593701..464643122 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/schema/SchemaHelper.scala
@@ -21,6 +21,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
+import org.apache.spark.sql.kyuubi.SparkDataTypeHelper
import org.apache.spark.sql.types._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
@@ -65,7 +66,8 @@ object SchemaHelper {
case _: ArrayType => TTypeId.ARRAY_TYPE
case _: MapType => TTypeId.MAP_TYPE
case _: StructType => TTypeId.STRUCT_TYPE
- // TODO: it is private now, case udt: UserDefinedType =>
TTypeId.USER_DEFINED_TYPE
+ // SPARK-7768(fixed in 3.2.0) promoted UserDefinedType to DeveloperApi
+ case _ if SparkDataTypeHelper.isUserDefinedType(typ) =>
TTypeId.USER_DEFINED_TYPE
case other =>
throw new IllegalArgumentException(s"Unrecognized type name:
${other.catalogString}")
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala
new file mode 100644
index 000000000..11f8be076
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDataTypeHelper.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kyuubi
+
+import org.apache.spark.sql.types.{DataType, UserDefinedType}
+
+object SparkDataTypeHelper {
+ def isUserDefinedType(typ: DataType): Boolean = {
+ typ match {
+ case _: UserDefinedType[_] => true
+ case _ => false
+ }
+ }
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala
new file mode 100644
index 000000000..bfac1a153
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/ExampleValueUDT.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType,
UserDefinedType}
+
+case class ExampleValue(v: Double)
+
+class ExampleValueUDT extends UserDefinedType[ExampleValue] {
+
+ override def sqlType: DataType = ArrayType(DoubleType, false)
+
+ override def pyUDT: String = "pyspark.testing.ExampleValueUDT"
+
+ override def serialize(obj: ExampleValue): GenericArrayData = {
+ new GenericArrayData(Array[Any](obj.v))
+ }
+
+ override def deserialize(datum: Any): ExampleValue = {
+ datum match {
+ case values: ArrayData => new ExampleValue(values.getDouble(0))
+ }
+ }
+
+ override def userClass: Class[ExampleValue] = classOf[ExampleValue]
+
+ override private[spark] def asNullable: ExampleValueUDT = this
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala
new file mode 100644
index 000000000..68da0b5b1
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/kyuubi/SparkUDTOperationSuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import org.apache.spark.sql.types.UDTRegistration
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+
+class SparkUDTOperationSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
+ override def withKyuubiConf: Map[String, String] = Map(
+ KyuubiConf.ENGINE_SINGLE_SPARK_SESSION.key -> "true")
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ test("retrieve UserDefinedType result") {
+ UDTRegistration.register(classOf[ExampleValue].getName,
classOf[ExampleValueUDT].getName)
+ spark.udf.register(
+ "exampleValueUdf",
+ (param: Double) =>
+ {
+ ExampleValue(param)
+ }: ExampleValue)
+
+ withJdbcStatement() { stmt =>
+ val result = stmt.executeQuery("SELECT exampleValueUdf(1.0)")
+ assert(result.next())
+ assert(result.getString(1) == ExampleValue(1.0).toString)
+ }
+ }
+}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
index a6c4a948b..f80a42dab 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcColumn.java
@@ -178,6 +178,7 @@ public class JdbcColumn {
case INTERVAL_YEAR_MONTH_TYPE:
case INTERVAL_DAY_TIME_TYPE:
case UNION_TYPE:
+ case USER_DEFINED_TYPE:
return OTHER;
case DECIMAL_TYPE:
return DECIMAL;
@@ -240,6 +241,8 @@ public class JdbcColumn {
return "struct";
case NULL_TYPE:
return "void";
+ case USER_DEFINED_TYPE:
+ return "user_defined";
default:
throw new KyuubiSQLException("Invalid column type: " + type);
}