Dan Burkert has posted comments on this change.

Change subject: Kudu Spark Datasource for insertions and updates
......................................................................


Patch Set 3:

(9 comments)

Added some feedback, but as we discussed on Slack I'm hoping we can fix the 
Java client to not require the in order column shenanigans, it would make this 
patch a lot simpler.

http://gerrit.cloudera.org:8080/#/c/2992/3/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala:

Line 75:     val tableName = parameters.get(TABLE_KEY)
I think these lines would be more scala-esque as:

val tableName = parameters.getOrElse(TABLE_KEY,
      throw new IllegalArgumentException(s"Kudu table name must be specified in 
create options using key '$TABLE_KEY'"))


the the tableName.get calls below can just reference tableName


Line 109: with InsertableRelation with Serializable {
What is prompting the addition of Serializable?  As far as I can tell, none of 
the other traits require it, and I can't think of a reason KuduRelation would 
need to be instantiated in each task.


Line 205:     * @param overwrite If True it will update existing records, but 
will not perform inserts. Use upsert on kudu context if you require both
please wrap the long line.  Is this supposed to be referencing savePartition?  
Maybe it would be nice to use a scaladoc link


http://gerrit.cloudera.org:8080/#/c/2992/3/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala:

Line 88:   def savePartition(rows: Iterator[Row], tableName: String, 
performAsUpdate : Boolean = false, errorsCallback: 
(scala.collection.mutable.Buffer[OperationResponse]) => Unit = null): Unit = {
please wrap the arguments like with `kuduRDD` above.

I think the errorsCallback should take a Buffer of RowError instead of 
OperationResponse.  The OperationResponse can be recovered from the RowError if 
ncessary.


Line 114:             case DataTypes.LongType => kuduRow.addLong(field, 
dfRow.getAs[Long](field))
missing Timestamp type.


Line 117:             case _ => throw new RuntimeException(s"No support for 
Spark SQL type $dt")
probably should be IllegalArgumentException


Line 122:         val operationResponse = session.close()
It's hard to tell, but I think this is happening inside the while { } block?


http://gerrit.cloudera.org:8080/#/c/2992/3/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala:

Line 32:     * Adds a method, `kudu`, to DataFrameWriter that allows you to 
write avro files using
s/avro files/to Kudu/


http://gerrit.cloudera.org:8080/#/c/2992/3/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
File 
java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala:

Line 68:   test("insertion") {
Add a test that inserts more than a single row, I think it will blow up because 
of the scope issue with closing the session.


-- 
To view, visit http://gerrit.cloudera.org:8080/2992
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I74f20a6c17c47f424dfda62854963dc19c3b78c3
Gerrit-PatchSet: 3
Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-Owner: Chris George <[email protected]>
Gerrit-Reviewer: Andy Grove <[email protected]>
Gerrit-Reviewer: Dan Burkert <[email protected]>
Gerrit-Reviewer: Kudu Jenkins
Gerrit-HasComments: Yes

Reply via email to