This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 926802ba848dc571a087eb3a92249fa500fd0da7
Author: Brian McDevitt <[email protected]>
AuthorDate: Thu May 7 16:02:06 2020 -0500

    [KUDU-3116] Enhance KuduContext row operation metrics
    
    Adds the ability to track operation counts per table. Introduces the
    MapAccumulator to track these metrics in a single accumulator per
    operation type.
    
    Change-Id: Ie66dab95041310c27ef62dacccbcc0977a84857e
    Reviewed-on: http://gerrit.cloudera.org:8080/15882
    Tested-by: Kudu Jenkins
    Reviewed-by: Grant Henke <[email protected]>
    Reviewed-by: Andrew Wong <[email protected]>
---
 .../org/apache/kudu/spark/kudu/KuduContext.scala   | 43 ++++++-----
 .../apache/kudu/spark/kudu/MapAccumulator.scala    | 83 ++++++++++++++++++++++
 .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 65 +++++++++++++----
 3 files changed, 158 insertions(+), 33 deletions(-)

diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index a829073..de37ac1 100644
--- 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.AccumulatorV2
 import org.apache.spark.util.CollectionAccumulator
-import org.apache.spark.util.LongAccumulator
 import org.apache.yetus.audience.InterfaceAudience
 import org.apache.yetus.audience.InterfaceStability
 import org.slf4j.Logger
@@ -75,18 +74,26 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, 
val socketReadTimeou
   /**
    * A collection of accumulator metrics describing the usage of a KuduContext.
    */
-  private[kudu] val numInserts: LongAccumulator = 
sc.longAccumulator("kudu.num_inserts")
-  private[kudu] val numUpserts: LongAccumulator = 
sc.longAccumulator("kudu.num_upserts")
-  private[kudu] val numUpdates: LongAccumulator = 
sc.longAccumulator("kudu.num_updates")
-  private[kudu] val numDeletes: LongAccumulator = 
sc.longAccumulator("kudu.num_deletes")
-
-  // Increments the appropriate metric given an OperationType and a count.
-  private def addForOperation(count: Long, opType: OperationType): Unit = {
+  private[kudu] val numInserts: MapAccumulator[String, Long] =
+    new MapAccumulator[String, Long](Math.addExact)
+  private[kudu] val numUpserts: MapAccumulator[String, Long] =
+    new MapAccumulator[String, Long](Math.addExact)
+  private[kudu] val numUpdates: MapAccumulator[String, Long] =
+    new MapAccumulator[String, Long](Math.addExact)
+  private[kudu] val numDeletes: MapAccumulator[String, Long] =
+    new MapAccumulator[String, Long](Math.addExact)
+  sc.register(numInserts, "kudu.num_inserts")
+  sc.register(numUpserts, "kudu.num_upserts")
+  sc.register(numUpdates, "kudu.num_updates")
+  sc.register(numDeletes, "kudu.num_deletes")
+
+  // Increments the appropriate metric given an OperationType and a count per 
table.
+  private def addForOperation(count: Long, opType: OperationType, tableName: 
String): Unit = {
     opType match {
-      case Insert => numInserts.add(count)
-      case Upsert => numUpserts.add(count)
-      case Update => numUpdates.add(count)
-      case Delete => numDeletes.add(count)
+      case Insert => numInserts.add((tableName, count))
+      case Upsert => numUpserts.add((tableName, count))
+      case Update => numUpdates.add((tableName, count))
+      case Delete => numDeletes.add((tableName, count))
     }
   }
 
@@ -254,7 +261,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, 
val socketReadTimeou
       writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
     log.info(s"inserting into table '$tableName'")
     writeRows(data, tableName, Insert, writeOptions)
-    log.info(s"inserted ${numInserts.value} rows into table '$tableName'")
+    log.info(s"inserted ${numInserts.value.get(tableName)} rows into table 
'$tableName'")
   }
 
   /**
@@ -276,7 +283,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, 
val socketReadTimeou
     val writeOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
     log.info(s"inserting into table '$tableName'")
     writeRows(data, tableName, Insert, writeOptions)
-    log.info(s"inserted ${numInserts.value} rows into table '$tableName'")
+    log.info(s"inserted ${numInserts.value.get(tableName)} rows into table 
'$tableName'")
   }
 
   /**
@@ -292,7 +299,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, 
val socketReadTimeou
       writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
     log.info(s"upserting into table '$tableName'")
     writeRows(data, tableName, Upsert, writeOptions)
-    log.info(s"upserted ${numUpserts.value} rows into table '$tableName'")
+    log.info(s"upserted ${numUpserts.value.get(tableName)} rows into table 
'$tableName'")
   }
 
   /**
@@ -308,7 +315,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, 
val socketReadTimeou
       writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
     log.info(s"updating rows in table '$tableName'")
     writeRows(data, tableName, Update, writeOptions)
-    log.info(s"updated ${numUpdates.value} rows in table '$tableName'")
+    log.info(s"updated ${numUpdates.value.get(tableName)} rows in table 
'$tableName'")
   }
 
   /**
@@ -325,7 +332,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, 
val socketReadTimeou
       writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = {
     log.info(s"deleting rows from table '$tableName'")
     writeRows(data, tableName, Delete, writeOptions)
-    log.info(s"deleted ${numDeletes.value} rows from table '$tableName'")
+    log.info(s"deleted ${numDeletes.value.get(tableName)} rows from table 
'$tableName'")
   }
 
   private[kudu] def writeRows(
@@ -480,7 +487,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, 
val socketReadTimeou
       // Update timestampAccumulator with the client's last propagated
       // timestamp on each executor.
       timestampAccumulator.add(syncClient.getLastPropagatedTimestamp)
-      addForOperation(numRows, opType)
+      addForOperation(numRows, opType, tableName)
       val elapsedTime = (System.currentTimeMillis() - startTime).toInt
       durationHistogram.add(elapsedTime)
       log.info(s"applied $numRows ${opType}s to table '$tableName' in 
${elapsedTime}ms")
diff --git 
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/MapAccumulator.scala
 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/MapAccumulator.scala
new file mode 100644
index 0000000..c0fea78
--- /dev/null
+++ 
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/MapAccumulator.scala
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.spark.kudu
+
+import java.util.Collections
+import java.util.function.BiConsumer
+import java.util.function.BiFunction
+
+import org.apache.spark.util.AccumulatorV2
+
+/**
+ * Spark accumulator implementation that takes 2-tuples as input and
+ * Map[K, V] as output. The accumulator requires a merge function
+ * to handle updates to existing entries in the map.
+ *
+ * @param mergeFn a function applied to two values for the same Map key
+ * @tparam K type of the map key
+ * @tparam V type of the map value
+ */
+class MapAccumulator[K, V](mergeFn: (V, V) => V)
+    extends AccumulatorV2[(K, V), java.util.Map[K, V]] {
+  import MapAccumulator._
+
+  private val map = Collections.synchronizedMap(new java.util.HashMap[K, V]())
+  private val mergeFunc = new SerializableBiFunction[V, V, V] {
+    override def apply(t: V, u: V): V = mergeFn(t, u)
+  }
+
+  override def isZero: Boolean = map.isEmpty
+
+  override def copy(): AccumulatorV2[(K, V), java.util.Map[K, V]] = {
+    val newAcc = new MapAccumulator[K, V](mergeFn)
+    map.synchronized {
+      newAcc.map.putAll(map)
+    }
+    newAcc
+  }
+
+  override def reset(): Unit = map.clear()
+
+  override def add(v: (K, V)): Unit = {
+    map.merge(v._1, v._2, mergeFunc)
+  }
+
+  override def merge(other: AccumulatorV2[(K, V), java.util.Map[K, V]]): Unit 
= {
+    other match {
+      case o: MapAccumulator[K, V] =>
+        map.synchronized {
+          o.map.forEach(new BiConsumer[K, V]() {
+            override def accept(k: K, v: V): Unit = {
+              add((k, v))
+            }
+          })
+        }
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Cannot merge ${this.getClass.getName} with 
${other.getClass.getName}")
+    }
+  }
+
+  override def value: java.util.Map[K, V] = map.synchronized {
+    java.util.Collections.unmodifiableMap[K, V](new java.util.HashMap[K, 
V](map))
+  }
+}
+
+object MapAccumulator {
+  abstract class SerializableBiFunction[T, U, R] extends BiFunction[T, U, R] 
with Serializable
+}
diff --git 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index c6baf83..0ef1e7d 100644
--- 
a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ 
b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -93,9 +93,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
       new CreateTableOptions()
         .setRangePartitionColumns(List("key").asJava)
         .setNumReplicas(1))
-    val insertsBefore = kuduContext.numInserts.value
+    val insertsBefore = kuduContext.numInserts.value.get(tableName)
     kuduContext.insertRows(df, tableName)
-    assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value)
+    assertEquals(insertsBefore + df.count(), 
kuduContext.numInserts.value.get(tableName))
 
     // Now use new options to refer to the new table name.
     val newOptions: Map[String, String] =
@@ -133,9 +133,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers 
{
         .addRangePartition(lower, upper)
         .setNumReplicas(1)
     )
-    val insertsBefore = kuduContext.numInserts.value
+    val insertsBefore = kuduContext.numInserts.value.get(tableName)
     kuduContext.insertRows(df, tableName)
-    assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value)
+    assertEquals(insertsBefore + df.count(), 
kuduContext.numInserts.value.get(tableName))
 
     // now use new options to refer to the new table name
     val newOptions: Map[String, String] =
@@ -152,14 +152,15 @@ class DefaultSourceTest extends KuduTestSuite with 
Matchers {
 
   @Test
   def testInsertion() {
-    val insertsBefore = kuduContext.numInserts.value
+    val insertsBefore = kuduContext.numInserts.value.get(tableName)
+    println(s"insertsBefore: $insertsBefore")
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
     val changedDF = df
       .limit(1)
       .withColumn("key", df("key").plus(100))
       .withColumn("c2_s", lit("abc"))
     kuduContext.insertRows(changedDF, tableName)
-    assertEquals(insertsBefore + changedDF.count(), 
kuduContext.numInserts.value)
+    assertEquals(insertsBefore + changedDF.count(), 
kuduContext.numInserts.value.get(tableName))
 
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
     val collected = newDF.filter("key = 100").collect()
@@ -170,14 +171,14 @@ class DefaultSourceTest extends KuduTestSuite with 
Matchers {
 
   @Test
   def testInsertionMultiple() {
-    val insertsBefore = kuduContext.numInserts.value
+    val insertsBefore = kuduContext.numInserts.value.get(tableName)
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
     val changedDF = df
       .limit(2)
       .withColumn("key", df("key").plus(100))
       .withColumn("c2_s", lit("abc"))
     kuduContext.insertRows(changedDF, tableName)
-    assertEquals(insertsBefore + changedDF.count(), 
kuduContext.numInserts.value)
+    assertEquals(insertsBefore + changedDF.count(), 
kuduContext.numInserts.value.get(tableName))
 
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
     val collected = newDF.filter("key = 100").collect()
@@ -318,13 +319,13 @@ class DefaultSourceTest extends KuduTestSuite with 
Matchers {
     kuduContext.upsertRows(upsertDF, tableName)
 
     // Change the key and insert.
-    val upsertsBefore = kuduContext.numUpserts.value
+    val upsertsBefore = kuduContext.numUpserts.value.get(tableName)
     val insertDF = df
       .limit(1)
       .withColumn("key", df("key").plus(100))
       .withColumn("c2_s", lit("def"))
     kuduContext.upsertRows(insertDF, tableName)
-    assertEquals(upsertsBefore + insertDF.count(), 
kuduContext.numUpserts.value)
+    assertEquals(upsertsBefore + insertDF.count(), 
kuduContext.numUpserts.value.get(tableName))
 
     // Read the data back.
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
@@ -334,15 +335,49 @@ class DefaultSourceTest extends KuduTestSuite with 
Matchers {
     assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
 
     // Restore the original state of the table, and test the numUpdates metric.
-    val updatesBefore = kuduContext.numUpdates.value
+    val updatesBefore = kuduContext.numUpdates.value.get(tableName)
     val updateDF = baseDF.filter("key = 0").withColumn("c2_s", lit("0"))
     val updatesApplied = updateDF.count()
     kuduContext.updateRows(updateDF, tableName)
-    assertEquals(updatesBefore + updatesApplied, kuduContext.numUpdates.value)
+    assertEquals(updatesBefore + updatesApplied, 
kuduContext.numUpdates.value.get(tableName))
     deleteRow(100)
   }
 
   @Test
+  def testMultipleTableOperationCounts() {
+    val df = sqlContext.read.options(kuduOptions).format("kudu").load
+
+    val tableUpsertsBefore = kuduContext.numUpserts.value.get(tableName)
+    val simpleTableUpsertsBefore = 
kuduContext.numUpserts.value.get(simpleTableName)
+
+    // Change the key and insert.
+    val insertDF = df
+      .limit(1)
+      .withColumn("key", df("key").plus(100))
+      .withColumn("c2_s", lit("def"))
+    kuduContext.upsertRows(insertDF, tableName)
+
+    // insert new row to simple table
+    val insertSimpleDF = sqlContext.createDataFrame(Seq((0, 
"foo"))).toDF("key", "val")
+    kuduContext.upsertRows(insertSimpleDF, simpleTableName)
+
+    assertEquals(tableUpsertsBefore + insertDF.count(), 
kuduContext.numUpserts.value.get(tableName))
+    assertEquals(
+      simpleTableUpsertsBefore + insertSimpleDF.count(),
+      kuduContext.numUpserts.value.get(simpleTableName))
+
+    // Restore the original state of the tables, and test the numDeletes 
metric.
+    val deletesBefore = kuduContext.numDeletes.value.get(tableName)
+    val simpleDeletesBefore = kuduContext.numDeletes.value.get(simpleTableName)
+    kuduContext.deleteRows(insertDF, tableName)
+    kuduContext.deleteRows(insertSimpleDF, simpleTableName)
+    assertEquals(deletesBefore + insertDF.count(), 
kuduContext.numDeletes.value.get(tableName))
+    assertEquals(
+      simpleDeletesBefore + insertSimpleDF.count(),
+      kuduContext.numDeletes.value.get(simpleTableName))
+  }
+
+  @Test
   def testWriteWithSink() {
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
     val baseDF = df.limit(1) // Filter down to just the first row.
@@ -514,10 +549,10 @@ class DefaultSourceTest extends KuduTestSuite with 
Matchers {
   def testDeleteRows() {
     val df = sqlContext.read.options(kuduOptions).format("kudu").load
     val deleteDF = df.filter("key = 0").select("key")
-    val deletesBefore = kuduContext.numDeletes.value
+    val deletesBefore = kuduContext.numDeletes.value.get(tableName)
     val deletesApplied = deleteDF.count()
     kuduContext.deleteRows(deleteDF, tableName)
-    assertEquals(deletesBefore + deletesApplied, kuduContext.numDeletes.value)
+    assertEquals(deletesBefore + deletesApplied, 
kuduContext.numDeletes.value.get(tableName))
 
     // Read the data back.
     val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
@@ -711,7 +746,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers 
{
       "kudu.scanRequestTimeoutMs" -> "66666")
     val dataFrame = sqlContext.read.options(kuduOptions).format("kudu").load
     val kuduRelation = kuduRelationFromDataFrame(dataFrame)
-    assert(kuduRelation.readOptions.scanRequestTimeoutMs == Some(66666))
+    assert(kuduRelation.readOptions.scanRequestTimeoutMs.contains(66666))
   }
 
   @Test

Reply via email to