Another big change in that patch is changing the default writing behavior
from Insert with ignore duplicates to Upsert.​

I've been thinking about this a bit, and I think it would be nice if we
could use the API to do upserts or inserts or updates or deletes (with the
default being upsert).  Towards that end I put together a patch that builds
off Will's patch that makes this a configurable parameter:

This example starts with an empty kudu table t, created as:

CREATE TABLE t (a INT32 NOT NULL, b INT32 NOT NULL, c INT32 NOT NULL)
PRIMARY KEY (a)
DISTRIBUTE BY HASH (a) INTO 4 BUCKETS
WITH 1 REPLICA;

Spark Shell output:

scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
+---+---+---+

// Define some data frames for writing into the table

scala> val insertDF = Seq((1, 1, 1), (2, 2, 2)).toDF(Array("a", "b", "c"):
_*)
insertDF: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int]

scala> val upsertDF = Seq((1, 101, 101), (3, 3, 3)).toDF(Array("a", "b",
"c"): _*)
upsertDF: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int]

scala> val deleteDF = sqlContext.range(1, 2).map(_.getLong(0).toInt).
toDF("a")
deleteDF: org.apache.spark.sql.DataFrame = [a: int]

// Do some inserts/upserts/deletes

scala> insertDF.write.option("kudu.table", "t").option("kudu.operation",
"insert").mode("append").kudu

scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  1|  1|
|  2|  2|  2|
+---+---+---+

// Attempting the insert again will fail due to duplicate key constraint
violations

scala> insertDF.write.option("kudu.table", "t").option("kudu.operation",
"insert").mode("append").kudu
16/08/11 18:03:45 ERROR Executor: Exception in task 7.0 in stage 34.0 (TID
131)
java.lang.RuntimeException: failed to write 1 rows from DataFrame to Kudu;
sample errors: Already present: key already present (error 0)

scala> upsertDF.write.option("kudu.table", "t").option("kudu.operation",
"upsert").mode("append").kudu

scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|101|101|
|  2|  2|  2|
|  3|  3|  3|
+---+---+---+

scala> deleteDF.write.option("kudu.table", "t").option("kudu.operation",
"delete").mode("append").kudu

scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  2|  2|  2|
|  3|  3|  3|
+---+---+---+

The patch is here
<https://github.com/danburkert/kudu/commit/1970bc7e490eabc02844a0761e5b90bb759b7470>.
Comments welcome.

- Dan

On Thu, Aug 11, 2016 at 5:41 PM, Dan Burkert <[email protected]> wrote:

> Hi all,
>
> Will Berkeley has been working on some proposed updates updates to the
> Kudu Spark connector (see the code in review here
> <https://gerrit.cloudera.org/#/c/3871>).  The high level overview of the
> patch is
>
> * Explicit support for inserting, dropping, upserting, and deleting rows
> in DataFrames into Kudu tables (via methods on KuduContext).
>
> * Possibly dropping the implementation of CreatableRelationProvider.  I
> originally suggested this because the Spark connector can't create a table
> without specifying many table properties that aren't part of the
> interface.  Since then, I've been convinced by Chris George that this is a
> bad idea in terms of usability, and that we should just limit the
> implementation to only support the "append" save mode, and error if the
> table doesn't exist.
>
> If you use the Kudu Spark connector, please take a look at the patch and
> weigh in!  We're trying to get this in for the upcoming 0.10 release.
>
> - Dan
>

Reply via email to