This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 1d382435d1d2e46d279503ab64225342cf0068ea Author: Grant Henke <[email protected]> AuthorDate: Thu Oct 31 09:05:38 2019 -0500 [spark] Add a test for sink based writing Adds a test to verify writing via the KuduSink, as opposed to the KuduContext, works as expected. Change-Id: Ic1f28be80ad21b0783d8a0889ad7b1847601442b Reviewed-on: http://gerrit.cloudera.org:8080/14603 Tested-by: Kudu Jenkins Reviewed-by: Andrew Wong <[email protected]> --- .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 37 ++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index 6dca719..b2226e5 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -20,6 +20,7 @@ import scala.collection.JavaConverters._ import scala.collection.immutable.IndexedSeq import org.apache.spark.sql.Row import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DataTypes import org.apache.spark.sql.types.StructField @@ -338,6 +339,42 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { } @Test + def testWriteWithSink() { + val df = sqlContext.read.options(kuduOptions).format("kudu").load + val baseDF = df.limit(1) // Filter down to just the first row. + + // Change the c2 string to abc and upsert. + val upsertDF = baseDF.withColumn("c2_s", lit("abc")) + upsertDF.write + .format("kudu") + .option("kudu.master", harness.getMasterAddressesAsString) + .option("kudu.table", tableName) + // Default kudu.operation is upsert. + .mode(SaveMode.Append) + .save() + + // Change the key and insert. + val insertDF = df + .limit(1) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("def")) + insertDF.write + .format("kudu") + .option("kudu.master", harness.getMasterAddressesAsString) + .option("kudu.table", tableName) + .option("kudu.operation", "insert") + .mode(SaveMode.Append) + .save() + + // Read the data back. + val newDF = sqlContext.read.options(kuduOptions).format("kudu").load + val collectedUpdate = newDF.filter("key = 0").collect() + assertEquals("abc", collectedUpdate(0).getAs[String]("c2_s")) + val collectedInsert = newDF.filter("key = 100").collect() + assertEquals("def", collectedInsert(0).getAs[String]("c2_s")) + } + + @Test def testUpsertRowsIgnoreNulls() { val nonNullDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val")
