Another big change in that patch is changing the default writing behavior
from Insert with ignore duplicates to Upsert.
I've been thinking about this a bit, and I think it would be nice if we
could use the API to do upserts or inserts or updates or deletes (with the
default being upsert). Towards that end I put together a patch that builds
off Will's patch that makes this a configurable parameter:
This example starts with an empty kudu table t, created as:
CREATE TABLE t (a INT32 NOT NULL, b INT32 NOT NULL, c INT32 NOT NULL)
PRIMARY KEY (a)
DISTRIBUTE BY HASH (a) INTO 4 BUCKETS
WITH 1 REPLICA;
Spark Shell output:
scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
| a| b| c|
+---+---+---+
+---+---+---+
// Define some data frames for writing into the table
scala> val insertDF = Seq((1, 1, 1), (2, 2, 2)).toDF(Array("a", "b", "c"):
_*)
insertDF: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int]
scala> val upsertDF = Seq((1, 101, 101), (3, 3, 3)).toDF(Array("a", "b",
"c"): _*)
upsertDF: org.apache.spark.sql.DataFrame = [a: int, b: int, c: int]
scala> val deleteDF = sqlContext.range(1, 2).map(_.getLong(0).toInt).
toDF("a")
deleteDF: org.apache.spark.sql.DataFrame = [a: int]
// Do some inserts/upserts/deletes
scala> insertDF.write.option("kudu.table", "t").option("kudu.operation",
"insert").mode("append").kudu
scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 1| 1|
| 2| 2| 2|
+---+---+---+
// Attempting the insert again will fail due to duplicate key constraint
violations
scala> insertDF.write.option("kudu.table", "t").option("kudu.operation",
"insert").mode("append").kudu
16/08/11 18:03:45 ERROR Executor: Exception in task 7.0 in stage 34.0 (TID
131)
java.lang.RuntimeException: failed to write 1 rows from DataFrame to Kudu;
sample errors: Already present: key already present (error 0)
scala> upsertDF.write.option("kudu.table", "t").option("kudu.operation",
"upsert").mode("append").kudu
scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
| a| b| c|
+---+---+---+
| 1|101|101|
| 2| 2| 2|
| 3| 3| 3|
+---+---+---+
scala> deleteDF.write.option("kudu.table", "t").option("kudu.operation",
"delete").mode("append").kudu
scala> sqlContext.read.option("kudu.table", "t").kudu.show
+---+---+---+
| a| b| c|
+---+---+---+
| 2| 2| 2|
| 3| 3| 3|
+---+---+---+
The patch is here
<https://github.com/danburkert/kudu/commit/1970bc7e490eabc02844a0761e5b90bb759b7470>.
Comments welcome.
- Dan
On Thu, Aug 11, 2016 at 5:41 PM, Dan Burkert <[email protected]> wrote:
> Hi all,
>
> Will Berkeley has been working on some proposed updates updates to the
> Kudu Spark connector (see the code in review here
> <https://gerrit.cloudera.org/#/c/3871>). The high level overview of the
> patch is
>
> * Explicit support for inserting, dropping, upserting, and deleting rows
> in DataFrames into Kudu tables (via methods on KuduContext).
>
> * Possibly dropping the implementation of CreatableRelationProvider. I
> originally suggested this because the Spark connector can't create a table
> without specifying many table properties that aren't part of the
> interface. Since then, I've been convinced by Chris George that this is a
> bad idea in terms of usability, and that we should just limit the
> implementation to only support the "append" save mode, and error if the
> table doesn't exist.
>
> If you use the Kudu Spark connector, please take a look at the patch and
> weigh in! We're trying to get this in for the upcoming 0.10 release.
>
> - Dan
>