This is an automated email from the ASF dual-hosted git repository. granthenke pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 5f199c16d46fc6bd6003f00d4e56ba75923f192c Author: Grant Henke <[email protected]> AuthorDate: Thu Feb 20 10:56:18 2020 -0600 KUDU-3056: Reduce HdrHistogramAccumulator overhead This patch makes a few changes to reduce the overhead of the HdrHistogramAccumulator. It changes from using `SynchronizedHistogram` (value type long) to using `IntCountsHistogram` (value type int). This significantly reduces the data footprint of the histogram and is safe given write durations will never exceed `Integer.MAX_VALUE`. Because thread safety is still important we syncronize all access to `IntCountsHistogram` in `HistogramWrapper`. It also adjusts the `HistogramWrapper` to lazily instantiate an `IntCountsHistogram`. This means that if no values are recorded, the overhead of the `HdrHistogramAccumulator` should be almost zero. Lastly it reduces the `numberOfSignificantValueDigits` tracked in the histogram from 3 to 2. The result is relatively similar output in the Spark accumulator with a significantly smaller histogram. I tested each variant using `getEstimatedFootprintInBytes()` and the result is that the new implementation is 90% smaller when the HdrHistogramAccumulator is used. The new implementation is 100% smaller when no values are stored: long w/ precision 3 & max 30000ms: 49664 (current) long w/ precision 2 & max 30000ms: 9728 long w/ precision 1 & max 30000ms: 2048 int w/ precision 3 & max 30000ms: 25088 int w/ precision 2 & max 30000ms: 5120 (new) int w/ precision 1 & max 30000ms: 1280 Note: I used a max of 30000ms in these calculations because that is the default operation timeout Below is sample string output from before and after this patch generated with 1000 random values between 0ms and 500ms. Before: 0.2%: 0ms, 50.3%: 265ms, 75.1%: 376ms, 87.5%: 437ms, 93.8%: 470ms, 96.9%: 484ms, 98.6%: 493ms, 99.5%: 496ms, 99.8%: 498ms, 100.0%: 499ms, 100.0%: 499ms After: 0.2%: 0ms, 50.3%: 265ms, 75.4%: 377ms, 87.5%: 437ms, 93.9%: 471ms, 97.3%: 485ms, 98.6%: 493ms, 99.5%: 497ms, 100.0%: 499ms, 100.0%: 499ms Note: I used the same seed so as to generate the same values for both strings. Change-Id: Ic7c2a33bc61a2baa38703ea3340a07e06ab39db3 Reviewed-on: http://gerrit.cloudera.org:8080/15254 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Grant Henke <[email protected]> --- .../kudu/spark/kudu/HdrHistogramAccumulator.scala | 97 ++++++++++++++++------ .../org/apache/kudu/spark/kudu/KuduContext.scala | 2 +- 2 files changed, 72 insertions(+), 27 deletions(-) diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/HdrHistogramAccumulator.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/HdrHistogramAccumulator.scala index e42db5f..c775930 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/HdrHistogramAccumulator.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/HdrHistogramAccumulator.scala @@ -17,11 +17,10 @@ package org.apache.kudu.spark.kudu -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.util.AccumulatorV2 -import org.HdrHistogram.HistogramIterationValue -import org.HdrHistogram.SynchronizedHistogram +import org.HdrHistogram.IntCountsHistogram /* * A Spark accumulator that aggregates values into an HDR histogram. @@ -37,29 +36,27 @@ import org.HdrHistogram.SynchronizedHistogram * [1]: https://github.com/HdrHistogram/HdrHistogram * [2]: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L216 */ -private[kudu] class HdrHistogramAccumulator(val histogram: HistogramWrapper) - extends AccumulatorV2[Long, HistogramWrapper] { - - def this() = this(new HistogramWrapper(new SynchronizedHistogram(3))) +private[kudu] class HdrHistogramAccumulator(histogram: HistogramWrapper = new HistogramWrapper()) + extends AccumulatorV2[Int, HistogramWrapper] { override def isZero: Boolean = { - histogram.inner_histogram.getTotalCount == 0 + histogram.isZero } - override def copy(): AccumulatorV2[Long, HistogramWrapper] = { + override def copy(): AccumulatorV2[Int, HistogramWrapper] = { new HdrHistogramAccumulator(histogram.copy()) } override def reset(): Unit = { - histogram.inner_histogram.reset() + histogram.reset() } - override def add(v: Long): Unit = { - histogram.inner_histogram.recordValue(v) + override def add(v: Int): Unit = { + histogram.add(v) } - override def merge(other: AccumulatorV2[Long, HistogramWrapper]): Unit = { - histogram.inner_histogram.add(other.value.inner_histogram) + override def merge(other: AccumulatorV2[Int, HistogramWrapper]): Unit = { + histogram.add(other.value) } override def value: HistogramWrapper = histogram @@ -68,24 +65,69 @@ private[kudu] class HdrHistogramAccumulator(val histogram: HistogramWrapper) } /* - * A wrapper for a SychronizedHistogram from the HdrHistogram library. See the + * A wrapper for a IntCountsHistogram from the HdrHistogram library. See the * comment on the declaration of the HdrHistogramAccumulator for why this class * exists. * - * A synchronized histogram is used because accumulators may be read from - * multiple threads concurrently. + * synchronized is used because accumulators may be read from multiple threads concurrently. + * + * An option is used for innerHistogram so we can only initialize the histogram if it is used. */ -private[kudu] class HistogramWrapper(val inner_histogram: SynchronizedHistogram) +private[kudu] class HistogramWrapper(var innerHistogram: Option[IntCountsHistogram] = None) extends Serializable { + def isZero: Boolean = { + innerHistogram.synchronized { + innerHistogram.isEmpty + } + } + def copy(): HistogramWrapper = { - new HistogramWrapper(inner_histogram.copy()) + innerHistogram.synchronized { + new HistogramWrapper(innerHistogram.map(_.copy())) + } + } + + def reset(): Unit = { + innerHistogram.synchronized { + if (innerHistogram.isDefined) { + innerHistogram.get.reset() + } + innerHistogram = None + } + } + + def add(v: Int) { + innerHistogram.synchronized { + initializeIfEmpty() + innerHistogram.get.recordValue(v) + } + } + + def add(other: HistogramWrapper) { + innerHistogram.synchronized { + if (other.innerHistogram.isEmpty) { + return + } + initializeIfEmpty() + innerHistogram.get.add(other.innerHistogram.get) + } + } + + private def initializeIfEmpty(): Unit = { + if (innerHistogram.isEmpty) { + innerHistogram = Some(new IntCountsHistogram(2)) + } } override def toString: String = { - inner_histogram.synchronized { - if (inner_histogram.getTotalCount == 1) { - return s"${inner_histogram.getMinValue}ms" + innerHistogram.synchronized { + if (innerHistogram.isEmpty) { + return "0ms" + } + + if (innerHistogram.get.getTotalCount == 1) { + return s"${innerHistogram.get.getMinValue}ms" } // The argument to SynchronizedHistogram#percentiles is the number of // ticks per half distance to 100%. So, a value of 1 produces values for @@ -93,10 +135,13 @@ private[kudu] class HistogramWrapper(val inner_histogram: SynchronizedHistogram) // values have been exhausted. It's a little wonky if there are very few // values in the histogram-- it might print out the same percentile a // couple of times- but it's really nice for larger histograms. - val pvs = for (pv: HistogramIterationValue <- inner_histogram.percentiles(1)) yield { - s"${pv.getPercentile}%: ${pv.getValueIteratedTo}ms" - } - pvs.mkString(", ") + innerHistogram.get + .percentiles(1) + .asScala + .map { pv => + s"${pv.getPercentile}%: ${pv.getValueIteratedTo}ms" + } + .mkString(", ") } } } diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala index 85f1416..a829073 100644 --- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala +++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala @@ -481,7 +481,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou // timestamp on each executor. timestampAccumulator.add(syncClient.getLastPropagatedTimestamp) addForOperation(numRows, opType) - val elapsedTime = System.currentTimeMillis() - startTime + val elapsedTime = (System.currentTimeMillis() - startTime).toInt durationHistogram.add(elapsedTime) log.info(s"applied $numRows ${opType}s to table '$tableName' in ${elapsedTime}ms") }
