Repository: kudu
Updated Branches:
  refs/heads/master d13ac79a4 -> efb60241b


KUDU-1581 Fix DataFrame read failure when table has Binary Col

For Binary Cols, kudu-spark is returning a ByteBuffer object when Spark expects
to receive Array[Byte], so change is to return a copy of the byte array.
Modified testcase to add missing col types and verify the values read back.

Change-Id: I3e3926d85fab7efb407325c9992c3fccdab04bad
Reviewed-on: http://gerrit.cloudera.org:8080/4145
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <[email protected]>
Reviewed-by: Adar Dembo <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/efb60241
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/efb60241
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/efb60241

Branch: refs/heads/master
Commit: efb60241bbc1caf1bcda47b2605f6773d2d0dded
Parents: d13ac79
Author: Ram Mettu <[email protected]>
Authored: Sat Aug 27 14:01:48 2016 -0400
Committer: Adar Dembo <[email protected]>
Committed: Fri Sep 2 18:50:22 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/spark/kudu/KuduRDD.scala    |  2 +-
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 16 +++----
 .../kudu/spark/kudu/KuduContextTest.scala       | 46 ++++++++++++++++----
 .../apache/kudu/spark/kudu/TestContext.scala    | 13 ++++--
 4 files changed, 57 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/efb60241/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index bfd0b55..763ec96 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -123,7 +123,7 @@ private[spark] class KuduRow(private val rowResult: 
RowResult) extends Row {
       case Type.FLOAT => rowResult.getFloat(i)
       case Type.DOUBLE => rowResult.getDouble(i)
       case Type.STRING => rowResult.getString(i)
-      case Type.BINARY => rowResult.getBinary(i)
+      case Type.BINARY => rowResult.getBinaryCopy(i)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/efb60241/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 2dfce76..ed8d627 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -55,7 +55,7 @@ class DefaultSourceTest extends FunSuite with TestContext 
with BeforeAndAfter {
 
   val rowCount = 10
   var sqlContext : SQLContext = _
-  var rows : IndexedSeq[(Int, Int, String)] = _
+  var rows : IndexedSeq[(Int, Int, String, Long)] = _
   var kuduOptions : Map[String, String] = _
 
   before {
@@ -206,36 +206,36 @@ class DefaultSourceTest extends FunSuite with TestContext 
with BeforeAndAfter {
   }
 
   test("table scan with projection and predicate double") {
-    assertEquals(rows.count { case (key, i, s) => i != null && i > 5 },
+    assertEquals(rows.count { case (key, i, s, ts) => i != null && i > 5 },
                  sqlContext.sql(s"""SELECT key, c3_double FROM $tableName 
where c3_double > "5.0"""").count())
   }
 
   test("table scan with projection and predicate long") {
-    assertEquals(rows.count { case (key, i, s) => i != null && i > 5 },
+    assertEquals(rows.count { case (key, i, s, ts) => i != null && i > 5 },
                  sqlContext.sql(s"""SELECT key, c4_long FROM $tableName where 
c4_long > "5"""").count())
 
   }
   test("table scan with projection and predicate bool") {
-    assertEquals(rows.count { case (key, i, s) => i != null && i%2==0 },
+    assertEquals(rows.count { case (key, i, s, ts) => i != null && i%2==0 },
                  sqlContext.sql(s"""SELECT key, c5_bool FROM $tableName where 
c5_bool = true""").count())
 
   }
   test("table scan with projection and predicate short") {
-    assertEquals(rows.count { case (key, i, s) => i != null && i > 5},
+    assertEquals(rows.count { case (key, i, s, ts) => i != null && i > 5},
                  sqlContext.sql(s"""SELECT key, c6_short FROM $tableName where 
c6_short > 5""").count())
 
   }
   test("table scan with projection and predicate float") {
-    assertEquals(rows.count { case (key, i, s) => i != null && i > 5},
+    assertEquals(rows.count { case (key, i, s, ts) => i != null && i > 5},
                  sqlContext.sql(s"""SELECT key, c7_float FROM $tableName where 
c7_float > 5""").count())
 
   }
 
   test("table scan with projection and predicate ") {
-    assertEquals(rows.count { case (key, i, s) => s != null && s > "5" },
+    assertEquals(rows.count { case (key, i, s, ts) => s != null && s > "5" },
       sqlContext.sql(s"""SELECT key FROM $tableName where c2_s > 
"5"""").count())
 
-    assertEquals(rows.count { case (key, i, s) => s != null },
+    assertEquals(rows.count { case (key, i, s, ts) => s != null },
       sqlContext.sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT 
NULL""").count())
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/efb60241/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
index f020919..484e8f2 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala
@@ -16,20 +16,50 @@
  */
 package org.apache.kudu.spark.kudu
 
+import java.sql.Timestamp
+
+import org.apache.spark.sql.SQLContext
 import org.junit.runner.RunWith
-import org.scalatest.FunSuite
+import org.scalatest.{FunSuite, Matchers}
 import org.scalatest.junit.JUnitRunner
+import org.apache.spark.sql.functions.decode
 
 @RunWith(classOf[JUnitRunner])
-class KuduContextTest extends FunSuite with TestContext {
+class KuduContextTest extends FunSuite with TestContext with Matchers {
+  val rowCount = 10
+
   test("Test basic kuduRDD") {
-    val rowCount = 10
+    val rows = insertRows(rowCount)
+    val scanList = kuduContext.kuduRDD(sc, "test", Seq("key", "c1_i", "c2_s", 
"c3_double",
+        "c4_long", "c5_bool", "c6_short", "c7_float", "c8_binary", 
"c9_timestamp", "c10_byte"))
+      .map(r => r.toSeq).collect()
+    scanList.foreach(r => {
+      val index = r.apply(0).asInstanceOf[Int]
+      assert(r.apply(0).asInstanceOf[Int] == rows.apply(index)._1)
+      assert(r.apply(1).asInstanceOf[Int] == rows.apply(index)._2)
+      assert(r.apply(2).asInstanceOf[String] == rows.apply(index)._3)
+      assert(r.apply(3).asInstanceOf[Double] == rows.apply(index)._2.toDouble)
+      assert(r.apply(4).asInstanceOf[Long] == rows.apply(index)._2.toLong)
+      assert(r.apply(5).asInstanceOf[Boolean] == (rows.apply(index)._2%2==1))
+      assert(r.apply(6).asInstanceOf[Short] == rows.apply(index)._2.toShort)
+      assert(r.apply(7).asInstanceOf[Float] == rows.apply(index)._2.toFloat)
+      val binaryBytes = s"bytes ${rows.apply(index)._2}".getBytes().toSeq
+      assert(r.apply(8).asInstanceOf[Array[Byte]].toSeq == binaryBytes)
+      assert(r.apply(9).asInstanceOf[Timestamp] ==
+        KuduRelation.microsToTimestamp(rows.apply(index)._4))
+      assert(r.apply(10).asInstanceOf[Byte] == rows.apply(index)._2.toByte)
+    })
+  }
 
+  test("Test kudu-spark DataFrame") {
     insertRows(rowCount)
-
-    val scanRdd = kuduContext.kuduRDD(sc, "test", Seq("key"))
-
-    val scanList = scanRdd.map(r => r.getInt(0)).collect()
-    assert(scanList.length == rowCount)
+    val sqlContext = new SQLContext(sc)
+    val dataDF = sqlContext.read.options(Map("kudu.master" -> 
miniCluster.getMasterAddresses,
+      "kudu.table" -> "test")).kudu
+    dataDF.sort("key").select("c8_binary").first.get(0)
+      .asInstanceOf[Array[Byte]].shouldBe("bytes 0".getBytes)
+    // decode the binary to string and compare
+    dataDF.sort("key").withColumn("c8_binary", decode(dataDF("c8_binary"), 
"UTF-8"))
+      .select("c8_binary").first.get(0).shouldBe("bytes 0")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/efb60241/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
index f4af644..47aba37 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
@@ -49,7 +49,10 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
       new ColumnSchemaBuilder("c4_long", Type.INT64).build(),
       new ColumnSchemaBuilder("c5_bool", Type.BOOL).build(),
       new ColumnSchemaBuilder("c6_short", Type.INT16).build(),
-      new ColumnSchemaBuilder("c7_float", Type.FLOAT).build())
+      new ColumnSchemaBuilder("c7_float", Type.FLOAT).build(),
+      new ColumnSchemaBuilder("c8_binary", Type.BINARY).build(),
+      new ColumnSchemaBuilder("c9_timestamp", Type.TIMESTAMP).build(),
+      new ColumnSchemaBuilder("c10_byte", Type.INT8).build())
     new Schema(columns)
   }
 
@@ -93,7 +96,7 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
     kuduSession.apply(delete)
   }
 
-  def insertRows(rowCount: Integer): IndexedSeq[(Int, Int, String)] = {
+  def insertRows(rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = {
     val kuduSession = kuduClient.newSession()
 
     val rows = Range(0, rowCount).map { i =>
@@ -106,6 +109,10 @@ trait TestContext extends BeforeAndAfterAll { self: Suite 
=>
       row.addBoolean(5, i%2==1)
       row.addShort(6, i.toShort)
       row.addFloat(7, i.toFloat)
+      row.addBinary(8, s"bytes ${i}".getBytes())
+      val ts = System.currentTimeMillis() * 1000
+      row.addLong(9, ts)
+      row.addByte(10, i.toByte)
 
       // Sprinkling some nulls so that queries see them.
       val s = if (i % 2 == 0) {
@@ -117,7 +124,7 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
       }
 
       kuduSession.apply(insert)
-      (i, i, s)
+      (i, i, s, ts)
     }
     rows
   }

Reply via email to