Repository: spark
Updated Branches:
refs/heads/master 1f86e795b -> 37112fcfc
[SPARK-19666][SQL] Skip a property without getter in Java schema inference and
allow empty bean in encoder creation
## What changes were proposed in this pull request?
This PR proposes to fix two.
**Skip a property without a getter in beans**
Currently, if we use a JavaBean without the getter as below:
```java
public static class BeanWithoutGetter implements Serializable {
private String a;
public void setA(String a) {
this.a = a;
}
}
BeanWithoutGetter bean = new BeanWithoutGetter();
List<BeanWithoutGetter> data = Arrays.asList(bean);
spark.createDataFrame(data, BeanWithoutGetter.class).show();
```
- Before
It throws an exception as below:
```
java.lang.NullPointerException
at org.spark_project.guava.reflect.TypeToken.method(TypeToken.java:465)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:126)
at
org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:125)
```
- After
```
++
||
++
||
++
```
**Supports empty bean in encoder creation**
```java
public static class EmptyBean implements Serializable {}
EmptyBean bean = new EmptyBean();
List<EmptyBean> data = Arrays.asList(bean);
spark.createDataset(data, Encoders.bean(EmptyBean.class)).show();
```
- Before
throws an exception as below:
```
java.lang.UnsupportedOperationException: Cannot infer type for class EmptyBean
because it is not bean-compliant
at
org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$serializerFor(JavaTypeInference.scala:436)
at
org.apache.spark.sql.catalyst.JavaTypeInference$.serializerFor(JavaTypeInference.scala:341)
```
- After
```
++
||
++
||
++
```
## How was this patch tested?
Unit test in `JavaDataFrameSuite`.
Author: hyukjinkwon <[email protected]>
Closes #17013 from HyukjinKwon/SPARK-19666.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37112fcf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37112fcf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37112fcf
Branch: refs/heads/master
Commit: 37112fcfcd64db8f84f437e5c54cc3ea039c68f6
Parents: 1f86e79
Author: hyukjinkwon <[email protected]>
Authored: Wed Feb 22 12:42:23 2017 -0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Feb 22 12:42:23 2017 -0800
----------------------------------------------------------------------
.../spark/sql/catalyst/JavaTypeInference.scala | 45 +++++++++-----------
.../scala/org/apache/spark/sql/SQLContext.scala | 6 +--
.../org/apache/spark/sql/SparkSession.scala | 7 +--
.../apache/spark/sql/JavaDataFrameSuite.java | 17 ++++++++
.../org/apache/spark/sql/JavaDatasetSuite.java | 11 +++++
5 files changed, 54 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/37112fcf/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
index 8b53d98..e9d9508 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala
@@ -117,11 +117,10 @@ object JavaTypeInference {
val (valueDataType, nullable) = inferDataType(valueType)
(MapType(keyDataType, valueDataType, nullable), true)
- case _ =>
+ case other =>
// TODO: we should only collect properties that have getter and
setter. However, some tests
// pass in scala case class as java bean class which doesn't have
getter and setter.
- val beanInfo = Introspector.getBeanInfo(typeToken.getRawType)
- val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName
== "class")
+ val properties = getJavaBeanReadableProperties(other)
val fields = properties.map { property =>
val returnType =
typeToken.method(property.getReadMethod).getReturnType
val (dataType, nullable) = inferDataType(returnType)
@@ -131,10 +130,15 @@ object JavaTypeInference {
}
}
- private def getJavaBeanProperties(beanClass: Class[_]):
Array[PropertyDescriptor] = {
+ def getJavaBeanReadableProperties(beanClass: Class[_]):
Array[PropertyDescriptor] = {
val beanInfo = Introspector.getBeanInfo(beanClass)
- beanInfo.getPropertyDescriptors
- .filter(p => p.getReadMethod != null && p.getWriteMethod != null)
+ beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
+ .filter(_.getReadMethod != null)
+ }
+
+ private def getJavaBeanReadableAndWritableProperties(
+ beanClass: Class[_]): Array[PropertyDescriptor] = {
+ getJavaBeanReadableProperties(beanClass).filter(_.getWriteMethod != null)
}
private def elementType(typeToken: TypeToken[_]): TypeToken[_] = {
@@ -298,9 +302,7 @@ object JavaTypeInference {
keyData :: valueData :: Nil)
case other =>
- val properties = getJavaBeanProperties(other)
- assert(properties.length > 0)
-
+ val properties = getJavaBeanReadableAndWritableProperties(other)
val setters = properties.map { p =>
val fieldName = p.getName
val fieldType = typeToken.method(p.getReadMethod).getReturnType
@@ -417,21 +419,16 @@ object JavaTypeInference {
)
case other =>
- val properties = getJavaBeanProperties(other)
- if (properties.length > 0) {
- CreateNamedStruct(properties.flatMap { p =>
- val fieldName = p.getName
- val fieldType = typeToken.method(p.getReadMethod).getReturnType
- val fieldValue = Invoke(
- inputObject,
- p.getReadMethod.getName,
- inferExternalType(fieldType.getRawType))
- expressions.Literal(fieldName) :: serializerFor(fieldValue,
fieldType) :: Nil
- })
- } else {
- throw new UnsupportedOperationException(
- s"Cannot infer type for class ${other.getName} because it is not
bean-compliant")
- }
+ val properties = getJavaBeanReadableAndWritableProperties(other)
+ CreateNamedStruct(properties.flatMap { p =>
+ val fieldName = p.getName
+ val fieldType = typeToken.method(p.getReadMethod).getReturnType
+ val fieldValue = Invoke(
+ inputObject,
+ p.getReadMethod.getName,
+ inferExternalType(fieldType.getRawType))
+ expressions.Literal(fieldName) :: serializerFor(fieldValue,
fieldType) :: Nil
+ })
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/37112fcf/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index dbe5509..234ef2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1090,14 +1090,14 @@ object SQLContext {
*/
private[sql] def beansToRows(
data: Iterator[_],
- beanInfo: BeanInfo,
+ beanClass: Class[_],
attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
val extractors =
- beanInfo.getPropertyDescriptors.filterNot(_.getName ==
"class").map(_.getReadMethod)
+
JavaTypeInference.getJavaBeanReadableProperties(beanClass).map(_.getReadMethod)
val methodsToConverts = extractors.zip(attrs).map { case (e, attr) =>
(e, CatalystTypeConverters.createToCatalystConverter(attr.dataType))
}
- data.map{ element =>
+ data.map { element =>
new GenericInternalRow(
methodsToConverts.map { case (e, convert) =>
convert(e.invoke(element)) }
): InternalRow
http://git-wip-us.apache.org/repos/asf/spark/blob/37112fcf/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 72af55c..afc1827 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql
-import java.beans.Introspector
import java.io.Closeable
import java.util.concurrent.atomic.AtomicReference
@@ -347,8 +346,7 @@ class SparkSession private(
val className = beanClass.getName
val rowRdd = rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each
partition.
- val localBeanInfo =
Introspector.getBeanInfo(Utils.classForName(className))
- SQLContext.beansToRows(iter, localBeanInfo, attributeSeq)
+ SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq)
}
Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self))
}
@@ -374,8 +372,7 @@ class SparkSession private(
*/
def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame
= {
val attrSeq = getSchema(beanClass)
- val beanInfo = Introspector.getBeanInfo(beanClass)
- val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq)
+ val rows = SQLContext.beansToRows(data.asScala.iterator, beanClass,
attrSeq)
Dataset.ofRows(self, LocalRelation(attrSeq, rows.toSeq))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/37112fcf/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
index c3b94a4..a8f814b 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
@@ -397,4 +397,21 @@ public class JavaDataFrameSuite {
Assert.assertTrue(filter4.mightContain(i * 3));
}
}
+
+ public static class BeanWithoutGetter implements Serializable {
+ private String a;
+
+ public void setA(String a) {
+ this.a = a;
+ }
+ }
+
+ @Test
+ public void testBeanWithoutGetter() {
+ BeanWithoutGetter bean = new BeanWithoutGetter();
+ List<BeanWithoutGetter> data = Arrays.asList(bean);
+ Dataset<Row> df = spark.createDataFrame(data, BeanWithoutGetter.class);
+ Assert.assertEquals(df.schema().length(), 0);
+ Assert.assertEquals(df.collectAsList().size(), 1);
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/37112fcf/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 577672c..4581c6e 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -1276,4 +1276,15 @@ public class JavaDatasetSuite implements Serializable {
spark.createDataset(data,
Encoders.bean(NestedComplicatedJavaBean.class));
ds.collectAsList();
}
+
+ public static class EmptyBean implements Serializable {}
+
+ @Test
+ public void testEmptyBean() {
+ EmptyBean bean = new EmptyBean();
+ List<EmptyBean> data = Arrays.asList(bean);
+ Dataset<EmptyBean> df = spark.createDataset(data,
Encoders.bean(EmptyBean.class));
+ Assert.assertEquals(df.schema().length(), 0);
+ Assert.assertEquals(df.collectAsList().size(), 1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]