[spark] KUDU-2371: Add KuduWriteOptions class and ignoreNull option

This patch adds the KuduWriteOptions class to allow configuration of
writes to the Kudu table when writing with Spark. This allows
extensibility via adding more fields in the future. The instance of
this class is passed to functions (insert/delete/upsert/update) in
KuduContext.

KuduWriteOptions is also supported in DefaultSource APIs. Clients can
set up write options in the parameters from SparkSQL.

This patch also adds the ignoreNull write option so that users can
upsert/update only non-Null columns and leave the rest of the columns
unchanged.

For example, this feature is useful when users use Spark streaming to
process JSON and upsert to Kudu, because missing column values from
JSON are set to NULL, resulting in some existing row values being
upserted to Null, which is not desired.

Change-Id: Ide908ea29f572849eca0ba850ee197c1b22a07c8
Reviewed-on: http://gerrit.cloudera.org:8080/9834
Reviewed-by: Dan Burkert <danburk...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c25b7cbd
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c25b7cbd
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c25b7cbd

Branch: refs/heads/master
Commit: c25b7cbd8d2c2746effc4b6866ea5764bcdef157
Parents: bf835b0
Author: fwang29 <fw...@cloudera.com>
Authored: Tue Mar 27 16:43:05 2018 -0700
Committer: Dan Burkert <danburk...@apache.org>
Committed: Wed Apr 18 23:32:00 2018 +0000

----------------------------------------------------------------------
 .../apache/kudu/spark/kudu/DefaultSource.scala  |  18 ++-
 .../apache/kudu/spark/kudu/KuduContext.scala    |  59 ++++++--
 .../kudu/spark/kudu/KuduWriteOptions.scala      |  35 +++++
 .../apache/kudu/spark/kudu/OperationType.scala  |   5 -
 .../kudu/spark/kudu/DefaultSourceTest.scala     | 136 ++++++++++++++++++-
 .../apache/kudu/spark/kudu/TestContext.scala    |  10 ++
 6 files changed, 237 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/c25b7cbd/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 58beec7..761fd79 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -51,6 +51,8 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
   val OPERATION = "kudu.operation"
   val FAULT_TOLERANT_SCANNER = "kudu.faultTolerantScan"
   val SCAN_LOCALITY = "kudu.scanLocality"
+  val IGNORE_NULL = "kudu.ignoreNull"
+  val IGNORE_DUPLICATE_ROW_ERRORS = "kudu.ignoreDuplicateRowErrors"
 
   def defaultMasterAddrs: String = 
InetAddress.getLocalHost.getCanonicalHostName
 
@@ -72,9 +74,13 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
     val faultTolerantScanner = 
Try(parameters.getOrElse(FAULT_TOLERANT_SCANNER, "false").toBoolean)
       .getOrElse(false)
     val scanLocality = getScanLocalityType(parameters.getOrElse(SCAN_LOCALITY, 
"closest_replica"))
+    val ignoreDuplicateRowErrors = 
Try(parameters(IGNORE_DUPLICATE_ROW_ERRORS).toBoolean).getOrElse(false) ||
+      Try(parameters(OPERATION) == "insert-ignore").getOrElse(false)
+    val ignoreNull = Try(parameters.getOrElse(IGNORE_NULL, 
"false").toBoolean).getOrElse(false)
+    val writeOptions = new KuduWriteOptions(ignoreDuplicateRowErrors, 
ignoreNull)
 
     new KuduRelation(tableName, kuduMaster, faultTolerantScanner,
-      scanLocality, operationType, None)(sqlContext)
+      scanLocality, operationType, None, writeOptions)(sqlContext)
   }
 
   /**
@@ -83,7 +89,7 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
     * @param sqlContext
     * @param mode Only Append mode is supported. It will upsert or insert data
     *             to an existing table, depending on the upsert parameter
-    * @param parameters Necessary parameters for kudu.table and kudu.master
+    * @param parameters Necessary parameters for kudu.table, kudu.master, etc..
     * @param data Dataframe to save into kudu
     * @return returns populated base relation
     */
@@ -116,7 +122,7 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
   private def getOperationType(opParam: String): OperationType = {
     opParam.toLowerCase match {
       case "insert" => Insert
-      case "insert-ignore" => InsertIgnore
+      case "insert-ignore" => Insert
       case "upsert" => Upsert
       case "update" => Update
       case "delete" => Delete
@@ -144,6 +150,7 @@ class DefaultSource extends RelationProvider with 
CreatableRelationProvider
   *                     take place at the closest replica.
   * @param operationType The default operation type to perform when writing to 
the relation
   * @param userSchema A schema used to select columns for the relation
+  * @param writeOptions Kudu write options
   * @param sqlContext SparkSQL context
   */
 @InterfaceStability.Unstable
@@ -152,7 +159,8 @@ class KuduRelation(private val tableName: String,
                    private val faultTolerantScanner: Boolean,
                    private val scanLocality: ReplicaSelection,
                    private val operationType: OperationType,
-                   private val userSchema: Option[StructType])(
+                   private val userSchema: Option[StructType],
+                   private val writeOptions: KuduWriteOptions = new 
KuduWriteOptions)(
                    val sqlContext: SQLContext)
   extends BaseRelation
     with PrunedFilteredScan
@@ -320,7 +328,7 @@ class KuduRelation(private val tableName: String,
     if (overwrite) {
       throw new UnsupportedOperationException("overwrite is not yet supported")
     }
-    context.writeRows(data, tableName, operationType)
+    context.writeRows(data, tableName, operationType, writeOptions)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/c25b7cbd/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 165719a..4d9bb8e 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -235,20 +235,32 @@ class KuduContext(val kuduMaster: String,
     *
     * @param data the data to insert
     * @param tableName the Kudu table to insert into
+    * @param writeOptions the Kudu write options
     */
-  def insertRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, Insert)
+  def insertRows(data: DataFrame,
+                 tableName: String,
+                 writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit 
= {
+    writeRows(data, tableName, Insert, writeOptions)
   }
 
   /**
     * Inserts the rows of a [[DataFrame]] into a Kudu table, ignoring any new
     * rows that have a primary key conflict with existing rows.
     *
+    * This function call is equivalent to the following, which is preferred:
+    * {{{
+    * insertRows(data, tableName, new 
KuduWriteOptions(ignoreDuplicateRowErrors = true))
+    * }}}
+    *
     * @param data the data to insert into Kudu
     * @param tableName the Kudu table to insert into
     */
-  def insertIgnoreRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, InsertIgnore)
+  @deprecated("Use KuduContext.insertRows(data, tableName, new 
KuduWriteOptions(ignoreDuplicateRowErrors = true))")
+  def insertIgnoreRows(data: DataFrame,
+                       tableName: String): Unit = {
+    val writeOptions = new KuduWriteOptions
+    writeOptions.ignoreDuplicateRowErrors = true
+    writeRows(data, tableName, Insert, writeOptions)
   }
 
   /**
@@ -256,9 +268,12 @@ class KuduContext(val kuduMaster: String,
     *
     * @param data the data to upsert into Kudu
     * @param tableName the Kudu table to upsert into
+    * @param writeOptions the Kudu write options
     */
-  def upsertRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, Upsert)
+  def upsertRows(data: DataFrame,
+                 tableName: String,
+                 writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit 
= {
+    writeRows(data, tableName, Upsert, writeOptions)
   }
 
   /**
@@ -266,9 +281,12 @@ class KuduContext(val kuduMaster: String,
     *
     * @param data the data to update into Kudu
     * @param tableName the Kudu table to update
+    * @param writeOptions the Kudu write options
     */
-  def updateRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, Update)
+  def updateRows(data: DataFrame,
+                 tableName: String,
+                 writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit 
= {
+    writeRows(data, tableName, Update, writeOptions)
   }
 
   /**
@@ -277,18 +295,24 @@ class KuduContext(val kuduMaster: String,
     * @param data the data to delete from Kudu
     *             note that only the key columns should be specified for 
deletes
     * @param tableName The Kudu tabe to delete from
+    * @param writeOptions the Kudu write options
     */
-  def deleteRows(data: DataFrame, tableName: String): Unit = {
-    writeRows(data, tableName, Delete)
+  def deleteRows(data: DataFrame,
+                 tableName: String,
+                 writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit 
= {
+    writeRows(data, tableName, Delete, writeOptions)
   }
 
-  private[kudu] def writeRows(data: DataFrame, tableName: String, operation: 
OperationType) {
+  private[kudu] def writeRows(data: DataFrame,
+                              tableName: String,
+                              operation: OperationType,
+                              writeOptions: KuduWriteOptions = new 
KuduWriteOptions) {
     val schema = data.schema
     // Get the client's last propagated timestamp on the driver.
     val lastPropagatedTimestamp = syncClient.getLastPropagatedTimestamp
     data.foreachPartition(iterator => {
       val pendingErrors = writePartitionRows(iterator, schema, tableName, 
operation,
-                                             lastPropagatedTimestamp)
+                                             lastPropagatedTimestamp, 
writeOptions)
       val errorCount = pendingErrors.getRowErrors.length
       if (errorCount > 0) {
         val errors = 
pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
@@ -302,7 +326,8 @@ class KuduContext(val kuduMaster: String,
                                  schema: StructType,
                                  tableName: String,
                                  operationType: OperationType,
-                                 lastPropagatedTimestamp: Long): 
RowErrorsAndOverflowStatus = {
+                                 lastPropagatedTimestamp: Long,
+                                 writeOptions: KuduWriteOptions) : 
RowErrorsAndOverflowStatus = {
     // Since each executor has its own KuduClient, update executor's 
propagated timestamp
     // based on the last one on the driver.
     syncClient.updateLastPropagatedTimestamp(lastPropagatedTimestamp)
@@ -312,13 +337,17 @@ class KuduContext(val kuduMaster: String,
     })
     val session: KuduSession = syncClient.newSession
     session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
-    session.setIgnoreAllDuplicateRows(operationType.ignoreDuplicateRowErrors)
+    session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
     try {
       for (row <- rows) {
         val operation = operationType.operation(table)
         for ((sparkIdx, kuduIdx) <- indices) {
           if (row.isNullAt(sparkIdx)) {
-            operation.getRow.setNull(kuduIdx)
+            if (table.getSchema.getColumnByIndex(kuduIdx).isKey) {
+              val key_name = table.getSchema.getColumnByIndex(kuduIdx).getName
+              throw new IllegalArgumentException(s"Can't set primary key 
column '$key_name' to null")
+            }
+            if (!writeOptions.ignoreNull) operation.getRow.setNull(kuduIdx)
           } else {
             schema.fields(sparkIdx).dataType match {
               case DataTypes.StringType => operation.getRow.addString(kuduIdx, 
row.getString(sparkIdx))

http://git-wip-us.apache.org/repos/asf/kudu/blob/c25b7cbd/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
new file mode 100644
index 0000000..7262b75
--- /dev/null
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduWriteOptions.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kudu.spark.kudu
+
+import org.apache.yetus.audience.InterfaceStability
+
+/**
+  * KuduWriteOptions holds configuration of writes to Kudu tables.
+  *
+  * The instance of this class is passed to KuduContext write functions,
+  * such as insertRows, deleteRows, upsertRows, and updateRows.
+  *
+  * @param ignoreDuplicateRowErrors when inserting, ignore any new rows that
+  *                                 have a primary key conflict with existing 
rows
+  * @param ignoreNull update only non-Null columns if set true
+  */
+@InterfaceStability.Unstable
+class KuduWriteOptions(
+  var ignoreDuplicateRowErrors: Boolean = false,
+  var ignoreNull: Boolean = false) extends Serializable

http://git-wip-us.apache.org/repos/asf/kudu/blob/c25b7cbd/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
index fd23d05..a1e9a48 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/OperationType.scala
@@ -24,15 +24,10 @@ import org.apache.kudu.client.{KuduTable, Operation}
   */
 private[kudu] sealed trait OperationType {
   def operation(table: KuduTable): Operation
-  def ignoreDuplicateRowErrors: Boolean = false
 }
 private[kudu] case object Insert extends OperationType {
   override def operation(table: KuduTable): Operation = table.newInsert()
 }
-private[kudu] case object InsertIgnore extends OperationType {
-  override def operation(table: KuduTable): Operation = table.newInsert()
-  override def ignoreDuplicateRowErrors: Boolean = true
-}
 private[kudu] case object Update extends OperationType {
   override def operation(table: KuduTable): Operation = table.newUpdate()
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c25b7cbd/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index 3894dd0..4e43a6a 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -170,6 +170,86 @@ class DefaultSourceTest extends FunSuite with TestContext 
with BeforeAndAfter wi
 
     // change the c2 string to abc and insert
     val updateDF = baseDF.withColumn("c2_s", lit("abc"))
+    val kuduWriteOptions = new KuduWriteOptions
+    kuduWriteOptions.ignoreDuplicateRowErrors = true
+    kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
+
+    // change the key and insert
+    val insertDF = df.limit(1).withColumn("key", 
df("key").plus(100)).withColumn("c2_s", lit("def"))
+    kuduContext.insertRows(insertDF, tableName, kuduWriteOptions)
+
+    // read the data back
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    val collectedUpdate = newDF.filter("key = 0").collect()
+    assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
+    val collectedInsert = newDF.filter("key = 100").collect()
+    assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+
+    // restore the original state of the table
+    deleteRow(100)
+  }
+
+  test("insert ignore rows using DefaultSource") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+    val baseDF = df.limit(1) // filter down to just the first row
+
+    // change the c2 string to abc and insert
+    val updateDF = baseDF.withColumn("c2_s", lit("abc"))
+    val newOptions: Map[String, String] = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> miniCluster.getMasterAddresses,
+      "kudu.operation" -> "insert",
+      "kudu.ignoreDuplicateRowErrors" -> "true")
+    updateDF.write.options(newOptions).mode("append").kudu
+
+    // change the key and insert
+    val insertDF = df.limit(1).withColumn("key", 
df("key").plus(100)).withColumn("c2_s", lit("def"))
+    insertDF.write.options(newOptions).mode("append").kudu
+
+    // read the data back
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    val collectedUpdate = newDF.filter("key = 0").collect()
+    assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
+    val collectedInsert = newDF.filter("key = 100").collect()
+    assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+
+    // restore the original state of the table
+    deleteRow(100)
+  }
+
+  test("insert ignore rows using DefaultSource with 'kudu.operation' = 
'insert-ignore'") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+    val baseDF = df.limit(1) // filter down to just the first row
+
+    // change the c2 string to abc and insert
+    val updateDF = baseDF.withColumn("c2_s", lit("abc"))
+    val newOptions: Map[String, String] = Map(
+      "kudu.table" -> tableName,
+      "kudu.master" -> miniCluster.getMasterAddresses,
+      "kudu.operation" -> "insert-ignore")
+    updateDF.write.options(newOptions).mode("append").kudu
+
+    // change the key and insert
+    val insertDF = df.limit(1).withColumn("key", 
df("key").plus(100)).withColumn("c2_s", lit("def"))
+    insertDF.write.options(newOptions).mode("append").kudu
+
+    // read the data back
+    val newDF = sqlContext.read.options(kuduOptions).kudu
+    val collectedUpdate = newDF.filter("key = 0").collect()
+    assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
+    val collectedInsert = newDF.filter("key = 100").collect()
+    assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
+
+    // restore the original state of the table
+    deleteRow(100)
+  }
+
+  test("insert ignore rows with insertIgnoreRows(deprecated)") {
+    val df = sqlContext.read.options(kuduOptions).kudu
+    val baseDF = df.limit(1) // filter down to just the first row
+
+    // change the c2 string to abc and insert
+    val updateDF = baseDF.withColumn("c2_s", lit("abc"))
     kuduContext.insertIgnoreRows(updateDF, tableName)
 
     // change the key and insert
@@ -211,6 +291,58 @@ class DefaultSourceTest extends FunSuite with TestContext 
with BeforeAndAfter wi
     deleteRow(100)
   }
 
+  test("upsert rows ignore nulls") {
+    val nonNullDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", 
"val")
+    kuduContext.insertRows(nonNullDF, simpleTableName)
+
+    val dataDF = sqlContext.read.options(Map("kudu.master" -> 
miniCluster.getMasterAddresses,
+      "kudu.table" -> simpleTableName)).kudu
+
+    val nullDF = sqlContext.createDataFrame(Seq((0, 
null.asInstanceOf[String]))).toDF("key", "val")
+    val kuduWriteOptions = new KuduWriteOptions
+    kuduWriteOptions.ignoreNull = true
+    kuduContext.upsertRows(nullDF, simpleTableName, kuduWriteOptions)
+    assert(dataDF.collect.toList === nonNullDF.collect.toList)
+
+    kuduWriteOptions.ignoreNull = false
+    kuduContext.updateRows(nonNullDF, simpleTableName)
+    kuduContext.upsertRows(nullDF, simpleTableName, kuduWriteOptions)
+    assert(dataDF.collect.toList === nullDF.collect.toList)
+
+    kuduContext.updateRows(nonNullDF, simpleTableName)
+    kuduContext.upsertRows(nullDF, simpleTableName)
+    assert(dataDF.collect.toList === nullDF.collect.toList)
+
+    val deleteDF = dataDF.filter("key = 0").select("key")
+    kuduContext.deleteRows(deleteDF, simpleTableName)
+  }
+
+  test("upsert rows ignore nulls using DefaultSource") {
+    val nonNullDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", 
"val")
+    kuduContext.insertRows(nonNullDF, simpleTableName)
+
+    val dataDF = sqlContext.read.options(Map("kudu.master" -> 
miniCluster.getMasterAddresses,
+      "kudu.table" -> simpleTableName)).kudu
+
+    val nullDF = sqlContext.createDataFrame(Seq((0, 
null.asInstanceOf[String]))).toDF("key", "val")
+    val options_0: Map[String, String] = Map(
+      "kudu.table" -> simpleTableName,
+      "kudu.master" -> miniCluster.getMasterAddresses,
+      "kudu.ignoreNull" -> "true")
+    nullDF.write.options(options_0).mode("append").kudu
+    assert(dataDF.collect.toList === nonNullDF.collect.toList)
+
+    kuduContext.updateRows(nonNullDF, simpleTableName)
+    val options_1: Map[String, String] = Map(
+      "kudu.table" -> simpleTableName,
+      "kudu.master" -> miniCluster.getMasterAddresses)
+    nullDF.write.options(options_1).mode("append").kudu
+    assert(dataDF.collect.toList === nullDF.collect.toList)
+
+    val deleteDF = dataDF.filter("key = 0").select("key")
+    kuduContext.deleteRows(deleteDF, simpleTableName)
+  }
+
   test("delete rows") {
     val df = sqlContext.read.options(kuduOptions).kudu
     val deleteDF = df.filter("key = 0").select("key")
@@ -593,7 +725,9 @@ class DefaultSourceTest extends FunSuite with TestContext 
with BeforeAndAfter wi
                      .withColumn("key", df("key")
                      .plus(100))
                      .withColumn("c2_s", lit("def"))
-    kuduContext.insertIgnoreRows(updateDF, tableName)
+    val kuduWriteOptions = new KuduWriteOptions
+    kuduWriteOptions.ignoreDuplicateRowErrors = true
+    kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
     assert(kuduContext.syncClient.getLastPropagatedTimestamp > prevTimestamp)
   }
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/c25b7cbd/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
----------------------------------------------------------------------
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
index 62b41cd..414d48e 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/TestContext.scala
@@ -44,6 +44,7 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
   var kuduContext: KuduContext = _
 
   val tableName: String = "test"
+  val simpleTableName: String = "simple-test"
 
   lazy val schema: Schema = {
     val columns = ImmutableList.of(
@@ -73,6 +74,13 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
       new Schema(columns)
   }
 
+  lazy val simpleSchema: Schema = {
+    val columns = ImmutableList.of(
+      new ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
+      new ColumnSchemaBuilder("val", Type.STRING).nullable(true).build())
+    new Schema(columns)
+  }
+
   val appID: String = new Date().toString + math.floor(math.random * 
10E4).toLong.toString
 
   val conf: SparkConf = new SparkConf().
@@ -96,6 +104,8 @@ trait TestContext extends BeforeAndAfterAll { self: Suite =>
     val tableOptions = new 
CreateTableOptions().setRangePartitionColumns(List("key").asJava)
                                                .setNumReplicas(1)
     table = kuduClient.createTable(tableName, schema, tableOptions)
+
+    kuduClient.createTable(simpleTableName, simpleSchema, tableOptions)
   }
 
   override def afterAll() {

Reply via email to