This is an automated email from the ASF dual-hosted git repository. awong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 926802ba848dc571a087eb3a92249fa500fd0da7 Author: Brian McDevitt <[email protected]> AuthorDate: Thu May 7 16:02:06 2020 -0500 [KUDU-3116] Enhance KuduContext row operation metrics Adds the ability to track operation counts per table. Introduces the MapAccumulator to track these metrics in a single accumulator per operation type. Change-Id: Ie66dab95041310c27ef62dacccbcc0977a84857e Reviewed-on: http://gerrit.cloudera.org:8080/15882 Tested-by: Kudu Jenkins Reviewed-by: Grant Henke <[email protected]> Reviewed-by: Andrew Wong <[email protected]> --- .../org/apache/kudu/spark/kudu/KuduContext.scala | 43 ++++++----- .../apache/kudu/spark/kudu/MapAccumulator.scala | 83 ++++++++++++++++++++++ .../apache/kudu/spark/kudu/DefaultSourceTest.scala | 65 +++++++++++++---- 3 files changed, 158 insertions(+), 33 deletions(-) diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index a829073..de37ac1 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.types.DataType import org.apache.spark.sql.types.StructType import org.apache.spark.util.AccumulatorV2 import org.apache.spark.util.CollectionAccumulator -import org.apache.spark.util.LongAccumulator import org.apache.yetus.audience.InterfaceAudience import org.apache.yetus.audience.InterfaceStability import org.slf4j.Logger @@ -75,18 +74,26 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou /** * A collection of accumulator metrics describing the usage of a KuduContext. */ - private[kudu] val numInserts: LongAccumulator = sc.longAccumulator("kudu.num_inserts") - private[kudu] val numUpserts: LongAccumulator = sc.longAccumulator("kudu.num_upserts") - private[kudu] val numUpdates: LongAccumulator = sc.longAccumulator("kudu.num_updates") - private[kudu] val numDeletes: LongAccumulator = sc.longAccumulator("kudu.num_deletes") - - // Increments the appropriate metric given an OperationType and a count. - private def addForOperation(count: Long, opType: OperationType): Unit = { + private[kudu] val numInserts: MapAccumulator[String, Long] = + new MapAccumulator[String, Long](Math.addExact) + private[kudu] val numUpserts: MapAccumulator[String, Long] = + new MapAccumulator[String, Long](Math.addExact) + private[kudu] val numUpdates: MapAccumulator[String, Long] = + new MapAccumulator[String, Long](Math.addExact) + private[kudu] val numDeletes: MapAccumulator[String, Long] = + new MapAccumulator[String, Long](Math.addExact) + sc.register(numInserts, "kudu.num_inserts") + sc.register(numUpserts, "kudu.num_upserts") + sc.register(numUpdates, "kudu.num_updates") + sc.register(numDeletes, "kudu.num_deletes") + + // Increments the appropriate metric given an OperationType and a count per table. + private def addForOperation(count: Long, opType: OperationType, tableName: String): Unit = { opType match { - case Insert => numInserts.add(count) - case Upsert => numUpserts.add(count) - case Update => numUpdates.add(count) - case Delete => numDeletes.add(count) + case Insert => numInserts.add((tableName, count)) + case Upsert => numUpserts.add((tableName, count)) + case Update => numUpdates.add((tableName, count)) + case Delete => numDeletes.add((tableName, count)) } } @@ -254,7 +261,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = { log.info(s"inserting into table '$tableName'") writeRows(data, tableName, Insert, writeOptions) - log.info(s"inserted ${numInserts.value} rows into table '$tableName'") + log.info(s"inserted ${numInserts.value.get(tableName)} rows into table '$tableName'") } /** @@ -276,7 +283,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou val writeOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true) log.info(s"inserting into table '$tableName'") writeRows(data, tableName, Insert, writeOptions) - log.info(s"inserted ${numInserts.value} rows into table '$tableName'") + log.info(s"inserted ${numInserts.value.get(tableName)} rows into table '$tableName'") } /** @@ -292,7 +299,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = { log.info(s"upserting into table '$tableName'") writeRows(data, tableName, Upsert, writeOptions) - log.info(s"upserted ${numUpserts.value} rows into table '$tableName'") + log.info(s"upserted ${numUpserts.value.get(tableName)} rows into table '$tableName'") } /** @@ -308,7 +315,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = { log.info(s"updating rows in table '$tableName'") writeRows(data, tableName, Update, writeOptions) - log.info(s"updated ${numUpdates.value} rows in table '$tableName'") + log.info(s"updated ${numUpdates.value.get(tableName)} rows in table '$tableName'") } /** @@ -325,7 +332,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou writeOptions: KuduWriteOptions = new KuduWriteOptions): Unit = { log.info(s"deleting rows from table '$tableName'") writeRows(data, tableName, Delete, writeOptions) - log.info(s"deleted ${numDeletes.value} rows from table '$tableName'") + log.info(s"deleted ${numDeletes.value.get(tableName)} rows from table '$tableName'") } private[kudu] def writeRows( @@ -480,7 +487,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou // Update timestampAccumulator with the client's last propagated // timestamp on each executor. timestampAccumulator.add(syncClient.getLastPropagatedTimestamp) - addForOperation(numRows, opType) + addForOperation(numRows, opType, tableName) val elapsedTime = (System.currentTimeMillis() - startTime).toInt durationHistogram.add(elapsedTime) log.info(s"applied $numRows ${opType}s to table '$tableName' in ${elapsedTime}ms") diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/MapAccumulator.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/MapAccumulator.scala new file mode 100644 index 0000000..c0fea78 --- /dev/null +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/MapAccumulator.scala @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.kudu.spark.kudu + +import java.util.Collections +import java.util.function.BiConsumer +import java.util.function.BiFunction + +import org.apache.spark.util.AccumulatorV2 + +/** + * Spark accumulator implementation that takes 2-tuples as input and + * Map[K, V] as output. The accumulator requires a merge function + * to handle updates to existing entries in the map. + * + * @param mergeFn a function applied to two values for the same Map key + * @tparam K type of the map key + * @tparam V type of the map value + */ +class MapAccumulator[K, V](mergeFn: (V, V) => V) + extends AccumulatorV2[(K, V), java.util.Map[K, V]] { + import MapAccumulator._ + + private val map = Collections.synchronizedMap(new java.util.HashMap[K, V]()) + private val mergeFunc = new SerializableBiFunction[V, V, V] { + override def apply(t: V, u: V): V = mergeFn(t, u) + } + + override def isZero: Boolean = map.isEmpty + + override def copy(): AccumulatorV2[(K, V), java.util.Map[K, V]] = { + val newAcc = new MapAccumulator[K, V](mergeFn) + map.synchronized { + newAcc.map.putAll(map) + } + newAcc + } + + override def reset(): Unit = map.clear() + + override def add(v: (K, V)): Unit = { + map.merge(v._1, v._2, mergeFunc) + } + + override def merge(other: AccumulatorV2[(K, V), java.util.Map[K, V]]): Unit = { + other match { + case o: MapAccumulator[K, V] => + map.synchronized { + o.map.forEach(new BiConsumer[K, V]() { + override def accept(k: K, v: V): Unit = { + add((k, v)) + } + }) + } + case _ => + throw new UnsupportedOperationException( + s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") + } + } + + override def value: java.util.Map[K, V] = map.synchronized { + java.util.Collections.unmodifiableMap[K, V](new java.util.HashMap[K, V](map)) + } +} + +object MapAccumulator { + abstract class SerializableBiFunction[T, U, R] extends BiFunction[T, U, R] with Serializable +} diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala index c6baf83..0ef1e7d 100644 --- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala +++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala @@ -93,9 +93,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { new CreateTableOptions() .setRangePartitionColumns(List("key").asJava) .setNumReplicas(1)) - val insertsBefore = kuduContext.numInserts.value + val insertsBefore = kuduContext.numInserts.value.get(tableName) kuduContext.insertRows(df, tableName) - assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value) + assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value.get(tableName)) // Now use new options to refer to the new table name. val newOptions: Map[String, String] = @@ -133,9 +133,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { .addRangePartition(lower, upper) .setNumReplicas(1) ) - val insertsBefore = kuduContext.numInserts.value + val insertsBefore = kuduContext.numInserts.value.get(tableName) kuduContext.insertRows(df, tableName) - assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value) + assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value.get(tableName)) // now use new options to refer to the new table name val newOptions: Map[String, String] = @@ -152,14 +152,15 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testInsertion() { - val insertsBefore = kuduContext.numInserts.value + val insertsBefore = kuduContext.numInserts.value.get(tableName) + println(s"insertsBefore: $insertsBefore") val df = sqlContext.read.options(kuduOptions).format("kudu").load val changedDF = df .limit(1) .withColumn("key", df("key").plus(100)) .withColumn("c2_s", lit("abc")) kuduContext.insertRows(changedDF, tableName) - assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value) + assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value.get(tableName)) val newDF = sqlContext.read.options(kuduOptions).format("kudu").load val collected = newDF.filter("key = 100").collect() @@ -170,14 +171,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { @Test def testInsertionMultiple() { - val insertsBefore = kuduContext.numInserts.value + val insertsBefore = kuduContext.numInserts.value.get(tableName) val df = sqlContext.read.options(kuduOptions).format("kudu").load val changedDF = df .limit(2) .withColumn("key", df("key").plus(100)) .withColumn("c2_s", lit("abc")) kuduContext.insertRows(changedDF, tableName) - assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value) + assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value.get(tableName)) val newDF = sqlContext.read.options(kuduOptions).format("kudu").load val collected = newDF.filter("key = 100").collect() @@ -318,13 +319,13 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { kuduContext.upsertRows(upsertDF, tableName) // Change the key and insert. - val upsertsBefore = kuduContext.numUpserts.value + val upsertsBefore = kuduContext.numUpserts.value.get(tableName) val insertDF = df .limit(1) .withColumn("key", df("key").plus(100)) .withColumn("c2_s", lit("def")) kuduContext.upsertRows(insertDF, tableName) - assertEquals(upsertsBefore + insertDF.count(), kuduContext.numUpserts.value) + assertEquals(upsertsBefore + insertDF.count(), kuduContext.numUpserts.value.get(tableName)) // Read the data back. val newDF = sqlContext.read.options(kuduOptions).format("kudu").load @@ -334,15 +335,49 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { assertEquals("def", collectedInsert(0).getAs[String]("c2_s")) // Restore the original state of the table, and test the numUpdates metric. - val updatesBefore = kuduContext.numUpdates.value + val updatesBefore = kuduContext.numUpdates.value.get(tableName) val updateDF = baseDF.filter("key = 0").withColumn("c2_s", lit("0")) val updatesApplied = updateDF.count() kuduContext.updateRows(updateDF, tableName) - assertEquals(updatesBefore + updatesApplied, kuduContext.numUpdates.value) + assertEquals(updatesBefore + updatesApplied, kuduContext.numUpdates.value.get(tableName)) deleteRow(100) } @Test + def testMultipleTableOperationCounts() { + val df = sqlContext.read.options(kuduOptions).format("kudu").load + + val tableUpsertsBefore = kuduContext.numUpserts.value.get(tableName) + val simpleTableUpsertsBefore = kuduContext.numUpserts.value.get(simpleTableName) + + // Change the key and insert. + val insertDF = df + .limit(1) + .withColumn("key", df("key").plus(100)) + .withColumn("c2_s", lit("def")) + kuduContext.upsertRows(insertDF, tableName) + + // insert new row to simple table + val insertSimpleDF = sqlContext.createDataFrame(Seq((0, "foo"))).toDF("key", "val") + kuduContext.upsertRows(insertSimpleDF, simpleTableName) + + assertEquals(tableUpsertsBefore + insertDF.count(), kuduContext.numUpserts.value.get(tableName)) + assertEquals( + simpleTableUpsertsBefore + insertSimpleDF.count(), + kuduContext.numUpserts.value.get(simpleTableName)) + + // Restore the original state of the tables, and test the numDeletes metric. + val deletesBefore = kuduContext.numDeletes.value.get(tableName) + val simpleDeletesBefore = kuduContext.numDeletes.value.get(simpleTableName) + kuduContext.deleteRows(insertDF, tableName) + kuduContext.deleteRows(insertSimpleDF, simpleTableName) + assertEquals(deletesBefore + insertDF.count(), kuduContext.numDeletes.value.get(tableName)) + assertEquals( + simpleDeletesBefore + insertSimpleDF.count(), + kuduContext.numDeletes.value.get(simpleTableName)) + } + + @Test def testWriteWithSink() { val df = sqlContext.read.options(kuduOptions).format("kudu").load val baseDF = df.limit(1) // Filter down to just the first row. @@ -514,10 +549,10 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { def testDeleteRows() { val df = sqlContext.read.options(kuduOptions).format("kudu").load val deleteDF = df.filter("key = 0").select("key") - val deletesBefore = kuduContext.numDeletes.value + val deletesBefore = kuduContext.numDeletes.value.get(tableName) val deletesApplied = deleteDF.count() kuduContext.deleteRows(deleteDF, tableName) - assertEquals(deletesBefore + deletesApplied, kuduContext.numDeletes.value) + assertEquals(deletesBefore + deletesApplied, kuduContext.numDeletes.value.get(tableName)) // Read the data back. val newDF = sqlContext.read.options(kuduOptions).format("kudu").load @@ -711,7 +746,7 @@ class DefaultSourceTest extends KuduTestSuite with Matchers { "kudu.scanRequestTimeoutMs" -> "66666") val dataFrame = sqlContext.read.options(kuduOptions).format("kudu").load val kuduRelation = kuduRelationFromDataFrame(dataFrame) - assert(kuduRelation.readOptions.scanRequestTimeoutMs == Some(66666)) + assert(kuduRelation.readOptions.scanRequestTimeoutMs.contains(66666)) } @Test
