This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 1f5d78b5952 [SPARK-44634][SQL][3.4] Encoders.bean does no longer
support nested beans with type arguments
1f5d78b5952 is described below
commit 1f5d78b5952fcc6c7d36d3338a5594070e3a62dd
Author: Giambattista Bloisi <[email protected]>
AuthorDate: Mon Aug 7 15:11:02 2023 +0200
[SPARK-44634][SQL][3.4] Encoders.bean does no longer support nested beans
with type arguments
### What changes were proposed in this pull request?
This is a port of [42327](https://github.com/apache/spark/pull/42327)
This PR fixes a regression introduced in Spark 3.4.x where Encoders.bean
is no longer able to process nested beans having type arguments. For example:
```
class A<T> {
T value;
// value getter and setter
}
class B {
A<String> stringHolder;
// stringHolder getter and setter
}
Encoders.bean(B.class); // throws "SparkUnsupportedOperationException:
[ENCODER_NOT_FOUND]..."
```
### Why are the changes needed?
JavaTypeInference.encoderFor main match does not manage ParameterizedType
and TypeVariable cases. I think this is a regression introduced after getting
rid of usage of guava TypeToken: [SPARK-42093 SQL Move JavaTypeInference to
AgnosticEncoders](https://github.com/apache/spark/commit/18672003513d5a4aa610b6b94dbbc15c33185d3#diff-1191737b908340a2f4c22b71b1c40ebaa0da9d8b40c958089c346a3bda26943b)
hvanhovell cloud-fan
In this PR I'm leveraging commons lang3 TypeUtils functionalities to solve
ParameterizedType type arguments for classes
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests have been extended to check correct encoding of a nested
bean having type arguments.
Closes #42379 from gbloisi-openaire/spark-44634-branch-3.4.
Authored-by: Giambattista Bloisi <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
---
.../spark/sql/catalyst/JavaTypeInference.scala | 85 +++++-----------------
.../spark/sql/catalyst/JavaBeanWithGenerics.java | 41 +++++++++++
.../sql/catalyst/JavaTypeInferenceSuite.scala | 4 +
3 files changed, 65 insertions(+), 65 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 36b98737a20..75aca3ccbdd 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -18,12 +18,14 @@ package org.apache.spark.sql.catalyst
import java.beans.{Introspector, PropertyDescriptor}
import java.lang.reflect.{ParameterizedType, Type, TypeVariable}
-import java.util.{ArrayDeque, List => JList, Map => JMap}
+import java.util.{List => JList, Map => JMap}
import javax.annotation.Nonnull
-import scala.annotation.tailrec
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
+import org.apache.commons.lang3.reflect.{TypeUtils => JavaTypeUtils}
+
import org.apache.spark.SPARK_DOC_ROOT
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder,
BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder,
BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder,
DayTimeIntervalEncoder, DEFAULT_JAVA_DECIMAL_ENCODER, EncoderField,
IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaEnumEncoder,
LocalDateTimeEncoder, MapEncoder, PrimitiveBooleanEncoder,
PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, P [...]
@@ -58,7 +60,8 @@ object JavaTypeInference {
encoderFor(beanType, Set.empty).asInstanceOf[AgnosticEncoder[T]]
}
- private def encoderFor(t: Type, seenTypeSet: Set[Class[_]]):
AgnosticEncoder[_] = t match {
+ private def encoderFor(t: Type, seenTypeSet: Set[Class[_]],
+ typeVariables: Map[TypeVariable[_], Type] = Map.empty): AgnosticEncoder[_]
= t match {
case c: Class[_] if c == java.lang.Boolean.TYPE => PrimitiveBooleanEncoder
case c: Class[_] if c == java.lang.Byte.TYPE => PrimitiveByteEncoder
@@ -102,18 +105,24 @@ object JavaTypeInference {
UDTEncoder(udt, udt.getClass)
case c: Class[_] if c.isArray =>
- val elementEncoder = encoderFor(c.getComponentType, seenTypeSet)
+ val elementEncoder = encoderFor(c.getComponentType, seenTypeSet,
typeVariables)
ArrayEncoder(elementEncoder, elementEncoder.nullable)
- case ImplementsList(c, Array(elementCls)) =>
- val element = encoderFor(elementCls, seenTypeSet)
+ case c: Class[_] if classOf[JList[_]].isAssignableFrom(c) =>
+ val element = encoderFor(c.getTypeParameters.array(0), seenTypeSet,
typeVariables)
IterableEncoder(ClassTag(c), element, element.nullable,
lenientSerialization = false)
- case ImplementsMap(c, Array(keyCls, valueCls)) =>
- val keyEncoder = encoderFor(keyCls, seenTypeSet)
- val valueEncoder = encoderFor(valueCls, seenTypeSet)
+ case c: Class[_] if classOf[JMap[_, _]].isAssignableFrom(c) =>
+ val keyEncoder = encoderFor(c.getTypeParameters.array(0), seenTypeSet,
typeVariables)
+ val valueEncoder = encoderFor(c.getTypeParameters.array(1), seenTypeSet,
typeVariables)
MapEncoder(ClassTag(c), keyEncoder, valueEncoder, valueEncoder.nullable)
+ case tv: TypeVariable[_] =>
+ encoderFor(typeVariables(tv), seenTypeSet, typeVariables)
+
+ case pt: ParameterizedType =>
+ encoderFor(pt.getRawType, seenTypeSet,
JavaTypeUtils.getTypeArguments(pt).asScala.toMap)
+
case c: Class[_] =>
if (seenTypeSet.contains(c)) {
throw
QueryExecutionErrors.cannotHaveCircularReferencesInBeanClassError(c)
@@ -125,7 +134,7 @@ object JavaTypeInference {
// Note that the fields are ordered by name.
val fields = properties.map { property =>
val readMethod = property.getReadMethod
- val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet
+ c)
+ val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet
+ c, typeVariables)
// The existence of `javax.annotation.Nonnull`, means this field is
not nullable.
val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
EncoderField(
@@ -148,59 +157,5 @@ object JavaTypeInference {
.filterNot(_.getName == "declaringClass")
.filter(_.getReadMethod != null)
}
-
- private class ImplementsGenericInterface(interface: Class[_]) {
- assert(interface.isInterface)
- assert(interface.getTypeParameters.nonEmpty)
-
- def unapply(t: Type): Option[(Class[_], Array[Type])] =
implementsInterface(t).map { cls =>
- cls -> findTypeArgumentsForInterface(t)
- }
-
- @tailrec
- private def implementsInterface(t: Type): Option[Class[_]] = t match {
- case pt: ParameterizedType => implementsInterface(pt.getRawType)
- case c: Class[_] if interface.isAssignableFrom(c) => Option(c)
- case _ => None
- }
-
- private def findTypeArgumentsForInterface(t: Type): Array[Type] = {
- val queue = new ArrayDeque[(Type, Map[Any, Type])]
- queue.add(t -> Map.empty)
- while (!queue.isEmpty) {
- queue.poll() match {
- case (pt: ParameterizedType, bindings) =>
- // translate mappings...
- val mappedTypeArguments = pt.getActualTypeArguments.map {
- case v: TypeVariable[_] => bindings(v.getName)
- case v => v
- }
- if (pt.getRawType == interface) {
- return mappedTypeArguments
- } else {
- val mappedTypeArgumentMap = mappedTypeArguments
- .zipWithIndex.map(_.swap)
- .toMap[Any, Type]
- queue.add(pt.getRawType -> mappedTypeArgumentMap)
- }
- case (c: Class[_], indexedBindings) =>
- val namedBindings = c.getTypeParameters.zipWithIndex.map {
- case (parameter, index) =>
- parameter.getName -> indexedBindings(index)
- }.toMap[Any, Type]
- val superClass = c.getGenericSuperclass
- if (superClass != null) {
- queue.add(superClass -> namedBindings)
- }
- c.getGenericInterfaces.foreach { iface =>
- queue.add(iface -> namedBindings)
- }
- }
- }
- throw QueryExecutionErrors.unreachableError()
- }
- }
-
- private object ImplementsList extends
ImplementsGenericInterface(classOf[JList[_]])
- private object ImplementsMap extends
ImplementsGenericInterface(classOf[JMap[_, _]])
}
+
diff --git
a/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java
new file mode 100755
index 00000000000..b84a3122cf8
--- /dev/null
+++
b/sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/JavaBeanWithGenerics.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst;
+
+class JavaBeanWithGenerics<T,A> {
+ private A attribute;
+
+ private T value;
+
+ public A getAttribute() {
+ return attribute;
+ }
+
+ public void setAttribute(A attribute) {
+ this.attribute = attribute;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
+}
+
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala
index 35f5bf739bf..64399976097 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/JavaTypeInferenceSuite.scala
@@ -66,6 +66,7 @@ class LeafBean {
@BeanProperty var period: java.time.Period = _
@BeanProperty var enum: java.time.Month = _
@BeanProperty val readOnlyString = "read-only"
+ @BeanProperty var genericNestedBean: JavaBeanWithGenerics[String, String] = _
var nonNullString: String = "value"
@javax.annotation.Nonnull
@@ -184,6 +185,9 @@ class JavaTypeInferenceSuite extends SparkFunSuite {
encoderField("date", STRICT_DATE_ENCODER),
encoderField("duration", DayTimeIntervalEncoder),
encoderField("enum", JavaEnumEncoder(classTag[java.time.Month])),
+ encoderField("genericNestedBean", JavaBeanEncoder(
+ ClassTag(classOf[JavaBeanWithGenerics[String, String]]),
+ Seq(encoderField("attribute", StringEncoder), encoderField("value",
StringEncoder)))),
encoderField("instant", STRICT_INSTANT_ENCODER),
encoderField("localDate", STRICT_LOCAL_DATE_ENCODER),
encoderField("localDateTime", LocalDateTimeEncoder),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]