Dan Burkert has posted comments on this change. Change subject: Kudu Spark Datasource for insertions and updates ......................................................................
Patch Set 5: (12 comments) http://gerrit.cloudera.org:8080/#/c/2992/5/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala: Line 54: BaseRelation = { Should this be returning KuduRelation? It looks like you can't do much with BaseRelation, since it doesn't have any of the scanner or insert interfaces by default. Line 66: * @param mode Append will not overwrite existing data, Overwrite will perform update, but will not insert data, use upsert on KuduContext if you require both please wrap long doc lines (over 80 or 100 columns). Line 71: override def createRelation(sqlContext: SQLContext, mode: SaveMode, parameters: Map[String, String], data: DataFrame): BaseRelation = { please wrap (in same style as createRelation above) Line 84: case Ignore => kuduRelation.insert(data, overwrite = false) This can be combined with the append case: case Append => case Ignore => kuduRelation.insert(data, overwrite = false) Line 193: * Inserts data into an existing kudu table. please capitalize Kudu Line 194: * @param data Dataframe to be inserted into kudu Please add a link to the DataFrame class by wrapping it in brackets: * @param data [[DataFrame]] to be inserted into kudu http://gerrit.cloudera.org:8080/#/c/2992/5/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala: Line 137: case _ => throw new RuntimeException(s"No support for Spark SQL type $dt") This should be IllegalArgumentException Line 147: data.foreachPartition(iterator => new KuduContext(kuduMaster).writeRows(iterator, tableName, overwrite)) This should be doing some error checking. In the version of writeRows below I used IgnoreAllDuplicateRows, which means any errors that come back are actual failures. Here's what I think it should look like: /** * Inserts or updates rows in kudu from a [[DataFrame]]. * @param data `DataFrame` to insert/update * @param tableName table to perform insertion on * @param overwrite true=update, false=insert */ def writeRows(data: DataFrame, tableName: String, overwrite: Boolean) { val schema = data.schema data.foreachPartition(iterator => { val pendingErrors = writeRows(iterator, schema, tableName, overwrite) val errorCount = pendingErrors.getRowErrors.length if (errorCount > 0) { val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString throw new RuntimeException( s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors") } }) } Line 155: def writeRows(rows: Iterator[Row], I refactored this a bit in order to take use row indices everywhere, here is my version: /** * Saves partitions of a [[DataFrame]] into Kudu. * @param rows rows to insert or update * @param tableName table to insert or update on */ def writeRows(rows: Iterator[Row], schema: StructType, tableName: String, performAsUpdate : Boolean = false): RowErrorsAndOverflowStatus = { val table: KuduTable = syncClient.openTable(tableName) val kuduSchema = table.getSchema val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) => sparkIdx -> table.getSchema.getColumnIndex(field.name) }) val session: KuduSession = syncClient.newSession session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND) session.setIgnoreAllDuplicateRows(true) try { for (row <- rows) { val operation = if (performAsUpdate) { table.newUpdate() } else { table.newInsert() } for ((sparkIdx, kuduIdx) <- indices) { if (row.isNullAt(sparkIdx)) { operation.getRow.setNull(kuduIdx) } else schema.fields(sparkIdx).dataType match { case DataTypes.StringType => operation.getRow.addString(kuduIdx, row.getString(sparkIdx)) case DataTypes.BinaryType => operation.getRow.addBinary(kuduIdx, row.getAs[Array[Byte]](sparkIdx)) case DataTypes.BooleanType => operation.getRow.addBoolean(kuduIdx, row.getBoolean(sparkIdx)) case DataTypes.ByteType => operation.getRow.addByte(kuduIdx, row.getByte(sparkIdx)) case DataTypes.ShortType => operation.getRow.addShort(kuduIdx, row.getShort(sparkIdx)) case DataTypes.IntegerType => operation.getRow.addInt(kuduIdx, row.getInt(sparkIdx)) case DataTypes.LongType => operation.getRow.addLong(kuduIdx, row.getLong(sparkIdx)) case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, row.getFloat(sparkIdx)) case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, row.getDouble(sparkIdx)) case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx))) case t => throw new IllegalArgumentException(s"No support for Spark SQL type $t") } } session.apply(operation) } } finally { session.close() } session.getPendingErrors } Line 190: case DataTypes.TimestampType => kuduRow.addLong(fieldName, dfRow.getAs[Timestamp](sparkIdx).getTime * 1000) This should use DefaultSource.timestampToMicros in order to retain microsecond precision. Line 201: extraneous whitespace http://gerrit.cloudera.org:8080/#/c/2992/5/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala File java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala: Line 104: Map("kudu.table" -> tableName, "kudu.master" -> miniCluster.getMasterAddresses)).kudu Please refactor the literal options map into a field of the class. It's repeated in pretty much every test. -- To view, visit http://gerrit.cloudera.org:8080/2992 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: I74f20a6c17c47f424dfda62854963dc19c3b78c3 Gerrit-PatchSet: 5 Gerrit-Project: kudu Gerrit-Branch: master Gerrit-Owner: Chris George <[email protected]> Gerrit-Reviewer: Andy Grove <[email protected]> Gerrit-Reviewer: Arun Sharma Gerrit-Reviewer: Dan Burkert <[email protected]> Gerrit-Reviewer: Kudu Jenkins Gerrit-HasComments: Yes
