Repository: kudu Updated Branches: refs/heads/master fc50b98aa -> 542ba4ed7
KUDU-1824. KuduRDD.collect fails because of NoSerializableException The internal KuduRow class has been removed, and instead we copy into a serializable Spark row format. This also fixes a few style issues. Change-Id: I42618188003d2eef66088f3101803d1750e4134b Reviewed-on: http://gerrit.cloudera.org:8080/5636 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6db54007 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6db54007 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6db54007 Branch: refs/heads/master Commit: 6db540070a9326190c30996851207dfa0fb8066d Parents: fc50b98 Author: Dan Burkert <[email protected]> Authored: Fri Jan 6 16:26:03 2017 -0800 Committer: Todd Lipcon <[email protected]> Committed: Fri Mar 10 00:51:33 2017 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/spark/kudu/KuduRDD.scala | 34 +++++++------------- .../apache/kudu/spark/kudu/KuduRDDTest.scala | 32 ++++++++++++++++++ 2 files changed, 44 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/6db54007/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala index c771337..8a5d859 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala @@ -18,13 +18,12 @@ package org.apache.kudu.spark.kudu import scala.collection.JavaConverters._ +import org.apache.kudu.client._ +import org.apache.kudu.{Type, client} import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.{Partition, SparkContext, TaskContext} -import org.apache.kudu.client._ -import org.apache.kudu.{Type, client} - /** * A Resilient Distributed Dataset backed by a Kudu table. * @@ -59,7 +58,7 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext, val client: KuduClient = kuduContext.syncClient val partition: KuduPartition = part.asInstanceOf[KuduPartition] val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client) - new RowResultIteratorScala(scanner) + new RowIterator(scanner) } override def getPreferredLocations(partition: Partition): Seq[String] = { @@ -70,15 +69,15 @@ class KuduRDD private[kudu] (val kuduContext: KuduContext, /** * A Spark SQL [[Partition]] which wraps a [[KuduScanToken]]. */ -private[spark] class KuduPartition(val index: Int, - val scanToken: Array[Byte], - val locations: Array[String]) extends Partition {} +private class KuduPartition(val index: Int, + val scanToken: Array[Byte], + val locations: Array[String]) extends Partition {} /** * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]]. * @param scanner the wrapped scanner */ -private[spark] class RowResultIteratorScala(private val scanner: KuduScanner) extends Iterator[Row] { +private class RowIterator(private val scanner: KuduScanner) extends Iterator[Row] { private var currentIterator: RowResultIterator = null @@ -90,17 +89,7 @@ private[spark] class RowResultIteratorScala(private val scanner: KuduScanner) ex currentIterator.hasNext } - override def next(): Row = new KuduRow(currentIterator.next()) -} - -/** - * A Spark SQL [[Row]] which wraps a Kudu [[RowResult]]. - * @param rowResult the wrapped row result - */ -private[spark] class KuduRow(private val rowResult: RowResult) extends Row { - override def length: Int = rowResult.getColumnProjection.getColumnCount - - override def get(i: Int): Any = { + private def get(rowResult: RowResult, i: Int): Any = { if (rowResult.isNull(i)) null else rowResult.getColumnType(i) match { case Type.BOOL => rowResult.getBoolean(i) @@ -116,7 +105,8 @@ private[spark] class KuduRow(private val rowResult: RowResult) extends Row { } } - override def copy(): Row = Row.fromSeq(Range(0, length).map(get)) - - override def toString(): String = rowResult.toString + override def next(): Row = { + val rowResult = currentIterator.next() + Row.fromSeq(Range(0, rowResult.getColumnProjection.getColumnCount).map(get(rowResult, _))) + } } http://git-wip-us.apache.org/repos/asf/kudu/blob/6db54007/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala ---------------------------------------------------------------------- diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala new file mode 100644 index 0000000..d609e41 --- /dev/null +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kudu.spark.kudu + +import org.junit.runner.RunWith +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class KuduRDDTest extends FunSuite with TestContext with BeforeAndAfter { + + test("collect rows") { + insertRows(100) + val rdd = kuduContext.kuduRDD(sc, tableName, List("key")) + assert(rdd.collect.length == 100) + } +}
