Dan Burkert has posted comments on this change. Change subject: Kudu Spark Datasource for insertions and updates ......................................................................
Patch Set 3: (9 comments) Added some feedback, but as we discussed on Slack I'm hoping we can fix the Java client to not require the in order column shenanigans, it would make this patch a lot simpler. http://gerrit.cloudera.org:8080/#/c/2992/3/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala: Line 75: val tableName = parameters.get(TABLE_KEY) I think these lines would be more scala-esque as: val tableName = parameters.getOrElse(TABLE_KEY, throw new IllegalArgumentException(s"Kudu table name must be specified in create options using key '$TABLE_KEY'")) the the tableName.get calls below can just reference tableName Line 109: with InsertableRelation with Serializable { What is prompting the addition of Serializable? As far as I can tell, none of the other traits require it, and I can't think of a reason KuduRelation would need to be instantiated in each task. Line 205: * @param overwrite If True it will update existing records, but will not perform inserts. Use upsert on kudu context if you require both please wrap the long line. Is this supposed to be referencing savePartition? Maybe it would be nice to use a scaladoc link http://gerrit.cloudera.org:8080/#/c/2992/3/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala: Line 88: def savePartition(rows: Iterator[Row], tableName: String, performAsUpdate : Boolean = false, errorsCallback: (scala.collection.mutable.Buffer[OperationResponse]) => Unit = null): Unit = { please wrap the arguments like with `kuduRDD` above. I think the errorsCallback should take a Buffer of RowError instead of OperationResponse. The OperationResponse can be recovered from the RowError if ncessary. Line 114: case DataTypes.LongType => kuduRow.addLong(field, dfRow.getAs[Long](field)) missing Timestamp type. Line 117: case _ => throw new RuntimeException(s"No support for Spark SQL type $dt") probably should be IllegalArgumentException Line 122: val operationResponse = session.close() It's hard to tell, but I think this is happening inside the while { } block? http://gerrit.cloudera.org:8080/#/c/2992/3/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala: Line 32: * Adds a method, `kudu`, to DataFrameWriter that allows you to write avro files using s/avro files/to Kudu/ http://gerrit.cloudera.org:8080/#/c/2992/3/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala File java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala: Line 68: test("insertion") { Add a test that inserts more than a single row, I think it will blow up because of the scope issue with closing the session. -- To view, visit http://gerrit.cloudera.org:8080/2992 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I74f20a6c17c47f424dfda62854963dc19c3b78c3 Gerrit-PatchSet: 3 Gerrit-Project: kudu Gerrit-Branch: master Gerrit-Owner: Chris George <[email protected]> Gerrit-Reviewer: Andy Grove <[email protected]> Gerrit-Reviewer: Dan Burkert <[email protected]> Gerrit-Reviewer: Kudu Jenkins Gerrit-HasComments: Yes
