Dan Burkert has posted comments on this change.

Change subject: Kudu Spark Datasource for insertions and updates
......................................................................


Patch Set 5:

(12 comments)

http://gerrit.cloudera.org:8080/#/c/2992/5/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala:

Line 54:   BaseRelation = {
Should this be returning KuduRelation?  It looks like you can't do much with 
BaseRelation, since it doesn't have any of the scanner or insert interfaces by 
default.


Line 66:     * @param mode Append will not overwrite existing data, Overwrite 
will perform update, but will not insert data, use upsert on KuduContext if you 
require both
please wrap long doc lines (over 80 or 100 columns).


Line 71:   override def createRelation(sqlContext: SQLContext, mode: SaveMode, 
parameters: Map[String, String], data: DataFrame): BaseRelation = {
please wrap (in same style as createRelation above)


Line 84:       case Ignore => kuduRelation.insert(data, overwrite = false)
This can be combined with the append case:

case Append =>
case Ignore => kuduRelation.insert(data, overwrite = false)


Line 193:     * Inserts data into an existing kudu table.
please capitalize Kudu


Line 194:     * @param data Dataframe to be inserted into kudu
Please add a link to the DataFrame class by wrapping it in brackets:

    * @param data [[DataFrame]] to be inserted into kudu


http://gerrit.cloudera.org:8080/#/c/2992/5/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
File java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala:

Line 137:     case _ => throw new RuntimeException(s"No support for Spark SQL 
type $dt")
This should be IllegalArgumentException


Line 147:     data.foreachPartition(iterator => new 
KuduContext(kuduMaster).writeRows(iterator, tableName, overwrite))
This should be doing some error checking.  In the version of writeRows below I 
used IgnoreAllDuplicateRows, which means any errors that come back are actual 
failures.  Here's what I think it should look like:

  /**
    * Inserts or updates rows in kudu from a [[DataFrame]].
    * @param data `DataFrame` to insert/update
    * @param tableName table to perform insertion on
    * @param overwrite true=update, false=insert
    */
  def writeRows(data: DataFrame, tableName: String, overwrite: Boolean) {
    val schema = data.schema
    data.foreachPartition(iterator => {
      val pendingErrors = writeRows(iterator, schema, tableName, overwrite)
      val errorCount = pendingErrors.getRowErrors.length
      if (errorCount > 0) {
        val errors = 
pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
        throw new RuntimeException(
          s"failed to write $errorCount rows from DataFrame to Kudu; sample 
errors: $errors")
      }
    })
  }


Line 155:   def writeRows(rows: Iterator[Row],
I refactored this a bit in order to take use row indices everywhere, here is my 
version:

  /**
    * Saves partitions of a [[DataFrame]] into Kudu.
    * @param rows rows to insert or update
    * @param tableName table to insert or update on
    */
  def writeRows(rows: Iterator[Row],
                schema: StructType,
                tableName: String,
                performAsUpdate : Boolean = false): RowErrorsAndOverflowStatus 
= {
    val table: KuduTable = syncClient.openTable(tableName)
    val kuduSchema = table.getSchema

    val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case 
(field, sparkIdx) =>
      sparkIdx -> table.getSchema.getColumnIndex(field.name)
    })

    val session: KuduSession = syncClient.newSession
    session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
    session.setIgnoreAllDuplicateRows(true)
    try {
      for (row <- rows) {
        val operation = if (performAsUpdate) { table.newUpdate() } else { 
table.newInsert() }
        for ((sparkIdx, kuduIdx) <- indices) {
          if (row.isNullAt(sparkIdx)) {
            operation.getRow.setNull(kuduIdx)
          } else schema.fields(sparkIdx).dataType match {
            case DataTypes.StringType => operation.getRow.addString(kuduIdx, 
row.getString(sparkIdx))
            case DataTypes.BinaryType => operation.getRow.addBinary(kuduIdx, 
row.getAs[Array[Byte]](sparkIdx))
            case DataTypes.BooleanType => operation.getRow.addBoolean(kuduIdx, 
row.getBoolean(sparkIdx))
            case DataTypes.ByteType => operation.getRow.addByte(kuduIdx, 
row.getByte(sparkIdx))
            case DataTypes.ShortType => operation.getRow.addShort(kuduIdx, 
row.getShort(sparkIdx))
            case DataTypes.IntegerType => operation.getRow.addInt(kuduIdx, 
row.getInt(sparkIdx))
            case DataTypes.LongType => operation.getRow.addLong(kuduIdx, 
row.getLong(sparkIdx))
            case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, 
row.getFloat(sparkIdx))
            case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, 
row.getDouble(sparkIdx))
            case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, 
KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx)))
            case t => throw new IllegalArgumentException(s"No support for Spark 
SQL type $t")
          }
        }
        session.apply(operation)
      }
    } finally {
      session.close()
    }
    session.getPendingErrors
  }


Line 190:             case DataTypes.TimestampType => 
kuduRow.addLong(fieldName, dfRow.getAs[Timestamp](sparkIdx).getTime * 1000)
This should use DefaultSource.timestampToMicros in order to retain microsecond 
precision.


Line 201: 
extraneous whitespace


http://gerrit.cloudera.org:8080/#/c/2992/5/java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala
File 
java/kudu-spark/src/test/scala/org/kududb/spark/kudu/DefaultSourceTest.scala:

Line 104:       Map("kudu.table" -> tableName, "kudu.master" -> 
miniCluster.getMasterAddresses)).kudu
Please refactor the literal options map into a field of the class.  It's 
repeated in pretty much every test.


-- 
To view, visit http://gerrit.cloudera.org:8080/2992
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I74f20a6c17c47f424dfda62854963dc19c3b78c3
Gerrit-PatchSet: 5
Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-Owner: Chris George <[email protected]>
Gerrit-Reviewer: Andy Grove <[email protected]>
Gerrit-Reviewer: Arun Sharma
Gerrit-Reviewer: Dan Burkert <[email protected]>
Gerrit-Reviewer: Kudu Jenkins
Gerrit-HasComments: Yes

Reply via email to