Repository: kudu Updated Branches: refs/heads/master d13ac79a4 -> efb60241b
KUDU-1581 Fix DataFrame read failure when table has Binary Col For Binary Cols, kudu-spark is returning a ByteBuffer object when Spark expects to receive Array[Byte], so change is to return a copy of the byte array. Modified testcase to add missing col types and verify the values read back. Change-Id: I3e3926d85fab7efb407325c9992c3fccdab04bad Reviewed-on: http://gerrit.cloudera.org:8080/4145 Tested-by: Kudu Jenkins Reviewed-by: Will Berkeley <[email protected]> Reviewed-by: Adar Dembo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/efb60241 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/efb60241 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/efb60241 Branch: refs/heads/master Commit: efb60241bbc1caf1bcda47b2605f6773d2d0dded Parents: d13ac79 Author: Ram Mettu <[email protected]> Authored: Sat Aug 27 14:01:48 2016 -0400 Committer: Adar Dembo <[email protected]> Committed: Fri Sep 2 18:50:22 2016 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/spark/kudu/KuduRDD.scala | 2 +- .../kudu/spark/kudu/DefaultSourceTest.scala | 16 +++---- .../kudu/spark/kudu/KuduContextTest.scala | 46 ++++++++++++++++---- .../apache/kudu/spark/kudu/TestContext.scala | 13 ++++-- 4 files changed, 57 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/efb60241/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala index bfd0b55..763ec96 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala @@ -123,7 +123,7 @@ private[spark] class KuduRow(private val rowResult: RowResult) extends Row { case Type.FLOAT => rowResult.getFloat(i) case Type.DOUBLE => rowResult.getDouble(i) case Type.STRING => rowResult.getString(i) - case Type.BINARY => rowResult.getBinary(i) + case Type.BINARY => rowResult.getBinaryCopy(i) } } http://git-wip-us.apache.org/repos/asf/kudu/blob/efb60241/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 2dfce76..ed8d627 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -55,7 +55,7 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter { val rowCount = 10 var sqlContext : SQLContext = _ - var rows : IndexedSeq[(Int, Int, String)] = _ + var rows : IndexedSeq[(Int, Int, String, Long)] = _ var kuduOptions : Map[String, String] = _ before { @@ -206,36 +206,36 @@ class DefaultSourceTest extends FunSuite with TestContext with BeforeAndAfter { } test("table scan with projection and predicate double") { - assertEquals(rows.count { case (key, i, s) => i != null && i > 5 }, + assertEquals(rows.count { case (key, i, s, ts) => i != null && i > 5 }, sqlContext.sql(s"""SELECT key, c3_double FROM $tableName where c3_double > "5.0"""").count()) } test("table scan with projection and predicate long") { - assertEquals(rows.count { case (key, i, s) => i != null && i > 5 }, + assertEquals(rows.count { case (key, i, s, ts) => i != null && i > 5 }, sqlContext.sql(s"""SELECT key, c4_long FROM $tableName where c4_long > "5"""").count()) } test("table scan with projection and predicate bool") { - assertEquals(rows.count { case (key, i, s) => i != null && i%2==0 }, + assertEquals(rows.count { case (key, i, s, ts) => i != null && i%2==0 }, sqlContext.sql(s"""SELECT key, c5_bool FROM $tableName where c5_bool = true""").count()) } test("table scan with projection and predicate short") { - assertEquals(rows.count { case (key, i, s) => i != null && i > 5}, + assertEquals(rows.count { case (key, i, s, ts) => i != null && i > 5}, sqlContext.sql(s"""SELECT key, c6_short FROM $tableName where c6_short > 5""").count()) } test("table scan with projection and predicate float") { - assertEquals(rows.count { case (key, i, s) => i != null && i > 5}, + assertEquals(rows.count { case (key, i, s, ts) => i != null && i > 5}, sqlContext.sql(s"""SELECT key, c7_float FROM $tableName where c7_float > 5""").count()) } test("table scan with projection and predicate ") { - assertEquals(rows.count { case (key, i, s) => s != null && s > "5" }, + assertEquals(rows.count { case (key, i, s, ts) => s != null && s > "5" }, sqlContext.sql(s"""SELECT key FROM $tableName where c2_s > "5"""").count()) - assertEquals(rows.count { case (key, i, s) => s != null }, + assertEquals(rows.count { case (key, i, s, ts) => s != null }, sqlContext.sql(s"""SELECT key, c2_s FROM $tableName where c2_s IS NOT NULL""").count()) } http://git-wip-us.apache.org/repos/asf/kudu/blob/efb60241/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala index f020919..484e8f2 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduContextTest.scala @@ -16,20 +16,50 @@ */ package org.apache.kudu.spark.kudu +import java.sql.Timestamp + +import org.apache.spark.sql.SQLContext import org.junit.runner.RunWith -import org.scalatest.FunSuite +import org.scalatest.{FunSuite, Matchers} import org.scalatest.junit.JUnitRunner +import org.apache.spark.sql.functions.decode @RunWith(classOf[JUnitRunner]) -class KuduContextTest extends FunSuite with TestContext { +class KuduContextTest extends FunSuite with TestContext with Matchers { + val rowCount = 10 + test("Test basic kuduRDD") { - val rowCount = 10 + val rows = insertRows(rowCount) + val scanList = kuduContext.kuduRDD(sc, "test", Seq("key", "c1_i", "c2_s", "c3_double", + "c4_long", "c5_bool", "c6_short", "c7_float", "c8_binary", "c9_timestamp", "c10_byte")) + .map(r => r.toSeq).collect() + scanList.foreach(r => { + val index = r.apply(0).asInstanceOf[Int] + assert(r.apply(0).asInstanceOf[Int] == rows.apply(index)._1) + assert(r.apply(1).asInstanceOf[Int] == rows.apply(index)._2) + assert(r.apply(2).asInstanceOf[String] == rows.apply(index)._3) + assert(r.apply(3).asInstanceOf[Double] == rows.apply(index)._2.toDouble) + assert(r.apply(4).asInstanceOf[Long] == rows.apply(index)._2.toLong) + assert(r.apply(5).asInstanceOf[Boolean] == (rows.apply(index)._2%2==1)) + assert(r.apply(6).asInstanceOf[Short] == rows.apply(index)._2.toShort) + assert(r.apply(7).asInstanceOf[Float] == rows.apply(index)._2.toFloat) + val binaryBytes = s"bytes ${rows.apply(index)._2}".getBytes().toSeq + assert(r.apply(8).asInstanceOf[Array[Byte]].toSeq == binaryBytes) + assert(r.apply(9).asInstanceOf[Timestamp] == + KuduRelation.microsToTimestamp(rows.apply(index)._4)) + assert(r.apply(10).asInstanceOf[Byte] == rows.apply(index)._2.toByte) + }) + } + test("Test kudu-spark DataFrame") { insertRows(rowCount) - - val scanRdd = kuduContext.kuduRDD(sc, "test", Seq("key")) - - val scanList = scanRdd.map(r => r.getInt(0)).collect() - assert(scanList.length == rowCount) + val sqlContext = new SQLContext(sc) + val dataDF = sqlContext.read.options(Map("kudu.master" -> miniCluster.getMasterAddresses, + "kudu.table" -> "test")).kudu + dataDF.sort("key").select("c8_binary").first.get(0) + .asInstanceOf[Array[Byte]].shouldBe("bytes 0".getBytes) + // decode the binary to string and compare + dataDF.sort("key").withColumn("c8_binary", decode(dataDF("c8_binary"), "UTF-8")) + .select("c8_binary").first.get(0).shouldBe("bytes 0") } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/efb60241/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala index f4af644..47aba37 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala @@ -49,7 +49,10 @@ trait TestContext extends BeforeAndAfterAll { self: Suite => new ColumnSchemaBuilder("c4_long", Type.INT64).build(), new ColumnSchemaBuilder("c5_bool", Type.BOOL).build(), new ColumnSchemaBuilder("c6_short", Type.INT16).build(), - new ColumnSchemaBuilder("c7_float", Type.FLOAT).build()) + new ColumnSchemaBuilder("c7_float", Type.FLOAT).build(), + new ColumnSchemaBuilder("c8_binary", Type.BINARY).build(), + new ColumnSchemaBuilder("c9_timestamp", Type.TIMESTAMP).build(), + new ColumnSchemaBuilder("c10_byte", Type.INT8).build()) new Schema(columns) } @@ -93,7 +96,7 @@ trait TestContext extends BeforeAndAfterAll { self: Suite => kuduSession.apply(delete) } - def insertRows(rowCount: Integer): IndexedSeq[(Int, Int, String)] = { + def insertRows(rowCount: Integer): IndexedSeq[(Int, Int, String, Long)] = { val kuduSession = kuduClient.newSession() val rows = Range(0, rowCount).map { i => @@ -106,6 +109,10 @@ trait TestContext extends BeforeAndAfterAll { self: Suite => row.addBoolean(5, i%2==1) row.addShort(6, i.toShort) row.addFloat(7, i.toFloat) + row.addBinary(8, s"bytes ${i}".getBytes()) + val ts = System.currentTimeMillis() * 1000 + row.addLong(9, ts) + row.addByte(10, i.toByte) // Sprinkling some nulls so that queries see them. val s = if (i % 2 == 0) { @@ -117,7 +124,7 @@ trait TestContext extends BeforeAndAfterAll { self: Suite => } kuduSession.apply(insert) - (i, i, s) + (i, i, s, ts) } rows }
