Repository: spark
Updated Branches:
  refs/heads/branch-1.6 b0954b532 -> f383c5a55


[SPARK-11833][SQL] Add Java tests for Kryo/Java Dataset encoders

Also added some nicer error messages for incompatible types (private types and 
primitive types) for Kryo/Java encoder.

Author: Reynold Xin <[email protected]>

Closes #9823 from rxin/SPARK-11833.

(cherry picked from commit e61367b9f9bfc8e123369d55d7ca5925568b98a7)
Signed-off-by: Reynold Xin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f383c5a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f383c5a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f383c5a5

Branch: refs/heads/branch-1.6
Commit: f383c5a55f3aae091653eb20342603930aa81db5
Parents: b0954b5
Author: Reynold Xin <[email protected]>
Authored: Wed Nov 18 18:34:36 2015 -0800
Committer: Reynold Xin <[email protected]>
Committed: Wed Nov 18 18:34:45 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Encoder.scala    | 69 ++++++++++++------
 .../encoders/EncoderErrorMessageSuite.scala     | 40 +++++++++++
 .../catalyst/encoders/FlatEncoderSuite.scala    | 22 ++----
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 75 +++++++++++++++++++-
 4 files changed, 166 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f383c5a5/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 1ed5111..d54f285 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import java.lang.reflect.Modifier
+
 import scala.reflect.{ClassTag, classTag}
 
 import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
@@ -43,30 +45,28 @@ trait Encoder[T] extends Serializable {
  */
 object Encoders {
 
-  /** A way to construct encoders using generic serializers. */
-  private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
-    ExpressionEncoder[T](
-      schema = new StructType().add("value", BinaryType),
-      flat = true,
-      toRowExpressions = Seq(
-        EncodeUsingSerializer(
-          BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), 
kryo = useKryo)),
-      fromRowExpression =
-        DecodeUsingSerializer[T](
-          BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = 
useKryo),
-      clsTag = classTag[T]
-    )
-  }
+  def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
+  def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
+  def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
+  def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true)
+  def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true)
+  def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true)
+  def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
+  def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
 
   /**
    * (Scala-specific) Creates an encoder that serializes objects of type T 
using Kryo.
    * This encoder maps T into a single byte array (binary) field.
+   *
+   * T must be publicly accessible.
    */
   def kryo[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = true)
 
   /**
    * Creates an encoder that serializes objects of type T using Kryo.
    * This encoder maps T into a single byte array (binary) field.
+   *
+   * T must be publicly accessible.
    */
   def kryo[T](clazz: Class[T]): Encoder[T] = kryo(ClassTag[T](clazz))
 
@@ -75,6 +75,8 @@ object Encoders {
    * serialization. This encoder maps T into a single byte array (binary) 
field.
    *
    * Note that this is extremely inefficient and should only be used as the 
last resort.
+   *
+   * T must be publicly accessible.
    */
   def javaSerialization[T: ClassTag]: Encoder[T] = genericSerializer(useKryo = 
false)
 
@@ -83,17 +85,40 @@ object Encoders {
    * This encoder maps T into a single byte array (binary) field.
    *
    * Note that this is extremely inefficient and should only be used as the 
last resort.
+   *
+   * T must be publicly accessible.
    */
   def javaSerialization[T](clazz: Class[T]): Encoder[T] = 
javaSerialization(ClassTag[T](clazz))
 
-  def BOOLEAN: Encoder[java.lang.Boolean] = ExpressionEncoder(flat = true)
-  def BYTE: Encoder[java.lang.Byte] = ExpressionEncoder(flat = true)
-  def SHORT: Encoder[java.lang.Short] = ExpressionEncoder(flat = true)
-  def INT: Encoder[java.lang.Integer] = ExpressionEncoder(flat = true)
-  def LONG: Encoder[java.lang.Long] = ExpressionEncoder(flat = true)
-  def FLOAT: Encoder[java.lang.Float] = ExpressionEncoder(flat = true)
-  def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
-  def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
+  /** Throws an exception if T is not a public class. */
+  private def validatePublicClass[T: ClassTag](): Unit = {
+    if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) {
+      throw new UnsupportedOperationException(
+        s"${classTag[T].runtimeClass.getName} is not a public class. " +
+          "Only public classes are supported.")
+    }
+  }
+
+  /** A way to construct encoders using generic serializers. */
+  private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = {
+    if (classTag[T].runtimeClass.isPrimitive) {
+      throw new UnsupportedOperationException("Primitive types are not 
supported.")
+    }
+
+    validatePublicClass[T]()
+
+    ExpressionEncoder[T](
+      schema = new StructType().add("value", BinaryType),
+      flat = true,
+      toRowExpressions = Seq(
+        EncodeUsingSerializer(
+          BoundReference(0, ObjectType(classOf[AnyRef]), nullable = true), 
kryo = useKryo)),
+      fromRowExpression =
+        DecodeUsingSerializer[T](
+          BoundReference(0, BinaryType, nullable = true), classTag[T], kryo = 
useKryo),
+      clsTag = classTag[T]
+    )
+  }
 
   def tuple[T1, T2](
       e1: Encoder[T1],

http://git-wip-us.apache.org/repos/asf/spark/blob/f383c5a5/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
new file mode 100644
index 0000000..0b2a10b
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.encoders
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Encoders
+
+
+class EncoderErrorMessageSuite extends SparkFunSuite {
+
+  // Note: we also test error messages for encoders for private classes in 
JavaDatasetSuite.
+  // That is done in Java because Scala cannot create truly private classes.
+
+  test("primitive types in encoders using Kryo serialization") {
+    intercept[UnsupportedOperationException] { Encoders.kryo[Int] }
+    intercept[UnsupportedOperationException] { Encoders.kryo[Long] }
+    intercept[UnsupportedOperationException] { Encoders.kryo[Char] }
+  }
+
+  test("primitive types in encoders using Java serialization") {
+    intercept[UnsupportedOperationException] { Encoders.javaSerialization[Int] 
}
+    intercept[UnsupportedOperationException] { 
Encoders.javaSerialization[Long] }
+    intercept[UnsupportedOperationException] { 
Encoders.javaSerialization[Char] }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f383c5a5/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
index 6e0322f..07523d4 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/FlatEncoderSuite.scala
@@ -74,24 +74,14 @@ class FlatEncoderSuite extends ExpressionEncoderSuite {
     FlatEncoder[Map[Int, Map[String, Int]]], "map of map")
 
   // Kryo encoders
-  encodeDecodeTest(
-    "hello",
-    encoderFor(Encoders.kryo[String]),
-    "kryo string")
-  encodeDecodeTest(
-    new KryoSerializable(15),
-    encoderFor(Encoders.kryo[KryoSerializable]),
-    "kryo object serialization")
+  encodeDecodeTest("hello", encoderFor(Encoders.kryo[String]), "kryo string")
+  encodeDecodeTest(new KryoSerializable(15),
+    encoderFor(Encoders.kryo[KryoSerializable]), "kryo object")
 
   // Java encoders
-  encodeDecodeTest(
-    "hello",
-    encoderFor(Encoders.javaSerialization[String]),
-    "java string")
-  encodeDecodeTest(
-    new JavaSerializable(15),
-    encoderFor(Encoders.javaSerialization[JavaSerializable]),
-    "java object serialization")
+  encodeDecodeTest("hello", encoderFor(Encoders.javaSerialization[String]), 
"java string")
+  encodeDecodeTest(new JavaSerializable(15),
+    encoderFor(Encoders.javaSerialization[JavaSerializable]), "java object")
 }
 
 /** For testing Kryo serialization based encoder. */

http://git-wip-us.apache.org/repos/asf/spark/blob/f383c5a5/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 
b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index d9b2250..ce40dd8 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -24,6 +24,7 @@ import scala.Tuple2;
 import scala.Tuple3;
 import scala.Tuple4;
 import scala.Tuple5;
+
 import org.junit.*;
 
 import org.apache.spark.Accumulator;
@@ -410,8 +411,8 @@ public class JavaDatasetSuite implements Serializable {
       .as(Encoders.tuple(Encoders.STRING(), Encoders.INT(), Encoders.LONG(), 
Encoders.LONG()));
     Assert.assertEquals(
       Arrays.asList(
-        new Tuple4<String, Integer, Long, Long>("a", 3, 3L, 2L),
-        new Tuple4<String, Integer, Long, Long>("b", 3, 3L, 1L)),
+        new Tuple4<>("a", 3, 3L, 2L),
+        new Tuple4<>("b", 3, 3L, 1L)),
       agged2.collectAsList());
   }
 
@@ -437,4 +438,74 @@ public class JavaDatasetSuite implements Serializable {
       return reduction;
     }
   }
+
+  public static class KryoSerializable {
+    String value;
+
+    KryoSerializable(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return this.value.equals(((KryoSerializable) other).value);
+    }
+
+    @Override
+    public int hashCode() {
+      return this.value.hashCode();
+    }
+  }
+
+  public static class JavaSerializable implements Serializable {
+    String value;
+
+    JavaSerializable(String value) {
+      this.value = value;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      return this.value.equals(((JavaSerializable) other).value);
+    }
+
+    @Override
+    public int hashCode() {
+      return this.value.hashCode();
+    }
+  }
+
+  @Test
+  public void testKryoEncoder() {
+    Encoder<KryoSerializable> encoder = Encoders.kryo(KryoSerializable.class);
+    List<KryoSerializable> data = Arrays.asList(
+      new KryoSerializable("hello"), new KryoSerializable("world"));
+    Dataset<KryoSerializable> ds = context.createDataset(data, encoder);
+    Assert.assertEquals(data, ds.collectAsList());
+  }
+
+  @Test
+  public void testJavaEncoder() {
+    Encoder<JavaSerializable> encoder = 
Encoders.javaSerialization(JavaSerializable.class);
+    List<JavaSerializable> data = Arrays.asList(
+      new JavaSerializable("hello"), new JavaSerializable("world"));
+    Dataset<JavaSerializable> ds = context.createDataset(data, encoder);
+    Assert.assertEquals(data, ds.collectAsList());
+  }
+
+  /**
+   * For testing error messages when creating an encoder on a private class. 
This is done
+   * here since we cannot create truly private classes in Scala.
+   */
+  private static class PrivateClassTest { }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testJavaEncoderErrorMessageForPrivateClass() {
+    Encoders.javaSerialization(PrivateClassTest.class);
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testKryoEncoderErrorMessageForPrivateClass() {
+    Encoders.kryo(PrivateClassTest.class);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to