This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9da2dd4  [HUDI-3719] High performance costs of AvroSerizlizer in 
DataSource wr… (#5137)
9da2dd4 is described below

commit 9da2dd416ec76ec68393e38f8e1e94c7de141eb0
Author: xiarixiaoyao <[email protected]>
AuthorDate: Mon Mar 28 02:01:43 2022 +0800

    [HUDI-3719] High performance costs of AvroSerizlizer in DataSource wr… 
(#5137)
    
    * [HUDI-3719] High performance costs of AvroSerizlizer in DataSource writing
    
    * add benchmark framework which modify from spark
    add avroSerDerBenchmark
---
 .../org/apache/hudi/AvroConversionUtils.scala      |   9 +-
 .../spark/hudi/benchmark/HoodieBenchmark.scala     | 239 +++++++++++++++++++++
 .../spark/hudi/benchmark/HoodieBenchmarkBase.scala |  87 ++++++++
 .../spark/hudi/benchmark/HoodieBenchmarks.scala    | 143 ++++++++++++
 .../execution/benchmark/AvroSerDerBenchmark.scala  |  99 +++++++++
 5 files changed, 574 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 0844c73..df878d7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -62,10 +62,12 @@ object AvroConversionUtils {
    * @param rootCatalystType Catalyst [[StructType]] to be transformed into
    * @return converter accepting Avro payload and transforming it into a 
Catalyst one (in the form of [[InternalRow]])
    */
-  def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType: 
StructType): GenericRecord => Option[InternalRow] =
-    record => sparkAdapter.createAvroDeserializer(rootAvroType, 
rootCatalystType)
+  def createAvroToInternalRowConverter(rootAvroType: Schema, rootCatalystType: 
StructType): GenericRecord => Option[InternalRow] = {
+    val deserializer = sparkAdapter.createAvroDeserializer(rootAvroType, 
rootCatalystType)
+    record => deserializer
       .deserialize(record)
       .map(_.asInstanceOf[InternalRow])
+  }
 
   /**
    * Creates converter to transform Catalyst payload into Avro one
@@ -76,7 +78,8 @@ object AvroConversionUtils {
    * @return converter accepting Catalyst payload (in the form of 
[[InternalRow]]) and transforming it into an Avro one
    */
   def createInternalRowToAvroConverter(rootCatalystType: StructType, 
rootAvroType: Schema, nullable: Boolean): InternalRow => GenericRecord = {
-    row => sparkAdapter.createAvroSerializer(rootCatalystType, rootAvroType, 
nullable)
+    val serializer = sparkAdapter.createAvroSerializer(rootCatalystType, 
rootAvroType, nullable)
+    row => serializer
       .serialize(row)
       .asInstanceOf[GenericRecord]
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala
new file mode 100644
index 0000000..6d4317a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmark.scala
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.hudi.benchmark
+
+
+import java.io.{OutputStream, PrintStream}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.util.Try
+
+import org.apache.commons.io.output.TeeOutputStream
+import org.apache.commons.lang3.SystemUtils
+
+import org.apache.spark.util.Utils
+
+/**
+ * Reference from spark.
+ * Utility class to benchmark components. An example of how to use this is:
+ *  val benchmark = new Benchmark("My Benchmark", valuesPerIteration)
+ *   benchmark.addCase("V1")(<function>)
+ *   benchmark.addCase("V2")(<function>)
+ *   benchmark.run
+ * This will output the average time to run each function and the rate of each 
function.
+ *
+ * The benchmark function takes one argument that is the iteration that's 
being run.
+ *
+ * @param name name of this benchmark.
+ * @param valuesPerIteration number of values used in the test case, used to 
compute rows/s.
+ * @param minNumIters the min number of iterations that will be run per case, 
not counting warm-up.
+ * @param warmupTime amount of time to spend running dummy case iterations for 
JIT warm-up.
+ * @param minTime further iterations will be run for each case until this time 
is used up.
+ * @param outputPerIteration if true, the timing for each run will be printed 
to stdout.
+ * @param output optional output stream to write benchmark results to
+ */
+class HoodieBenchmark(
+     name: String,
+     valuesPerIteration: Long,
+     minNumIters: Int = 2,
+     warmupTime: FiniteDuration = 2.seconds,
+     minTime: FiniteDuration = 2.seconds,
+     outputPerIteration: Boolean = false,
+     output: Option[OutputStream] = None) {
+  import HoodieBenchmark._
+  val benchmarks = mutable.ArrayBuffer.empty[HoodieBenchmark.Case]
+
+  val out = if (output.isDefined) {
+    new PrintStream(new TeeOutputStream(System.out, output.get))
+  } else {
+    System.out
+  }
+
+  /**
+   * Adds a case to run when run() is called. The given function will be run 
for several
+   * iterations to collect timing statistics.
+   *
+   * @param name of the benchmark case
+   * @param numIters if non-zero, forces exactly this many iterations to be run
+   */
+  def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
+    addTimerCase(name, numIters) { timer =>
+      timer.startTiming()
+      f(timer.iteration)
+      timer.stopTiming()
+    }
+  }
+
+  /**
+   * Adds a case with manual timing control. When the function is run, timing 
does not start
+   * until timer.startTiming() is called within the given function. The 
corresponding
+   * timer.stopTiming() method must be called before the function returns.
+   *
+   * @param name of the benchmark case
+   * @param numIters if non-zero, forces exactly this many iterations to be run
+   */
+  def addTimerCase(name: String, numIters: Int = 0)(f: HoodieBenchmark.Timer 
=> Unit): Unit = {
+    benchmarks += HoodieBenchmark.Case(name, f, numIters)
+  }
+
+  /**
+   * Runs the benchmark and outputs the results to stdout. This should be 
copied and added as
+   * a comment with the benchmark. Although the results vary from machine to 
machine, it should
+   * provide some baseline.
+   */
+  def run(): Unit = {
+    require(benchmarks.nonEmpty)
+    // scalastyle:off
+    println("Running benchmark: " + name)
+
+    val results = benchmarks.map { c =>
+      println("  Running case: " + c.name)
+      measure(valuesPerIteration, c.numIters)(c.fn)
+    }
+    println
+
+    val firstBest = results.head.bestMs
+    // The results are going to be processor specific so it is useful to 
include that.
+    out.println(HoodieBenchmark.getJVMOSInfo())
+    out.println(HoodieBenchmark.getProcessorName())
+    val nameLen = Math.max(40, Math.max(name.length, 
benchmarks.map(_.name.length).max))
+    out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
+      name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)", 
"Per Row(ns)", "Relative")
+    out.println("-" * (nameLen + 80))
+    results.zip(benchmarks).foreach { case (result, benchmark) =>
+      out.printf(s"%-${nameLen}s %14s %14s %11s %12s %13s %10s\n",
+        benchmark.name,
+        "%5.0f" format result.bestMs,
+        "%4.0f" format result.avgMs,
+        "%5.0f" format result.stdevMs,
+        "%10.1f" format result.bestRate,
+        "%6.1f" format (1000 / result.bestRate),
+        "%3.1fX" format (firstBest / result.bestMs))
+    }
+    out.println
+    // scalastyle:on
+  }
+
+  /**
+   * Runs a single function `f` for iters, returning the average time the 
function took and
+   * the rate of the function.
+   */
+  def measure(num: Long, overrideNumIters: Int)(f: Timer => Unit): Result = {
+    System.gc()  // ensures garbage from previous cases don't impact this one
+    val warmupDeadline = warmupTime.fromNow
+    while (!warmupDeadline.isOverdue) {
+      f(new HoodieBenchmark.Timer(-1))
+    }
+    val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
+    val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
+    val runTimes = ArrayBuffer[Long]()
+    var totalTime = 0L
+    var i = 0
+    while (i < minIters || totalTime < minDuration) {
+      val timer = new HoodieBenchmark.Timer(i)
+      f(timer)
+      val runTime = timer.totalTime()
+      runTimes += runTime
+      totalTime += runTime
+
+      if (outputPerIteration) {
+        // scalastyle:off
+        println(s"Iteration $i took ${NANOSECONDS.toMicros(runTime)} 
microseconds")
+        // scalastyle:on
+      }
+      i += 1
+    }
+    // scalastyle:off
+    println(s"  Stopped after $i iterations, 
${NANOSECONDS.toMillis(runTimes.sum)} ms")
+    // scalastyle:on
+    assert(runTimes.nonEmpty)
+    val best = runTimes.min
+    val avg = runTimes.sum / runTimes.size
+    val stdev = if (runTimes.size > 1) {
+      math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / 
(runTimes.size - 1))
+    } else 0
+    Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 
1000000.0)
+  }
+}
+
+object HoodieBenchmark {
+
+  /**
+   * Object available to benchmark code to control timing e.g. to exclude 
set-up time.
+   *
+   * @param iteration specifies this is the nth iteration of running the 
benchmark case
+   */
+  class Timer(val iteration: Int) {
+    private var accumulatedTime: Long = 0L
+    private var timeStart: Long = 0L
+
+    def startTiming(): Unit = {
+      assert(timeStart == 0L, "Already started timing.")
+      timeStart = System.nanoTime
+    }
+
+    def stopTiming(): Unit = {
+      assert(timeStart != 0L, "Have not started timing.")
+      accumulatedTime += System.nanoTime - timeStart
+      timeStart = 0L
+    }
+
+    def totalTime(): Long = {
+      assert(timeStart == 0L, "Have not stopped timing.")
+      accumulatedTime
+    }
+  }
+
+  case class Case(name: String, fn: Timer => Unit, numIters: Int)
+  case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: 
Double)
+
+  /**
+   * This should return a user helpful processor information. Getting at this 
depends on the OS.
+   * This should return something like "Intel(R) Core(TM) i7-4870HQ CPU @ 
2.50GHz"
+   */
+  def getProcessorName(): String = {
+    val cpu = if (SystemUtils.IS_OS_MAC_OSX) {
+      Utils.executeAndGetOutput(Seq("/usr/sbin/sysctl", "-n", 
"machdep.cpu.brand_string"))
+        .stripLineEnd
+    } else if (SystemUtils.IS_OS_LINUX) {
+      Try {
+        val grepPath = Utils.executeAndGetOutput(Seq("which", 
"grep")).stripLineEnd
+        Utils.executeAndGetOutput(Seq(grepPath, "-m", "1", "model name", 
"/proc/cpuinfo"))
+          .stripLineEnd.replaceFirst("model name[\\s*]:[\\s*]", "")
+      }.getOrElse("Unknown processor")
+    } else {
+      System.getenv("PROCESSOR_IDENTIFIER")
+    }
+    cpu
+  }
+
+  /**
+   * This should return a user helpful JVM & OS information.
+   * This should return something like
+   * "OpenJDK 64-Bit Server VM 1.8.0_65-b17 on Linux 4.1.13-100.fc21.x86_64"
+   */
+  def getJVMOSInfo(): String = {
+    val vmName = System.getProperty("java.vm.name")
+    val runtimeVersion = System.getProperty("java.runtime.version")
+    val osName = System.getProperty("os.name")
+    val osVersion = System.getProperty("os.version")
+    s"${vmName} ${runtimeVersion} on ${osName} ${osVersion}"
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala
new file mode 100644
index 0000000..ff4f0bc
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarkBase.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.hudi.benchmark
+
+import java.io.{File, FileOutputStream, OutputStream}
+
+/**
+ * Reference from spark.
+ * A base class for generate benchmark results to a file.
+ * For JDK9+, JDK major version number is added to the file names to 
distinguish the results.
+ */
+abstract class HoodieBenchmarkBase {
+  var output: Option[OutputStream] = None
+
+  /**
+   * Main process of the whole benchmark.
+   * Implementations of this method are supposed to use the wrapper method 
`runBenchmark`
+   * for each benchmark scenario.
+   */
+  def runBenchmarkSuite(mainArgs: Array[String]): Unit
+
+  final def runBenchmark(benchmarkName: String)(func: => Any): Unit = {
+    val separator = "=" * 96
+    val testHeader = (separator + '\n' + benchmarkName + '\n' + separator + 
'\n' + '\n').getBytes
+    output.foreach(_.write(testHeader))
+    func
+    output.foreach(_.write('\n'))
+  }
+
+  def main(args: Array[String]): Unit = {
+    // turning this on so the behavior between running benchmark via 
`spark-submit` or SBT will
+    // be consistent, also allow users to turn on/off certain behavior such as
+    // `spark.sql.codegen.factoryMode`
+    val regenerateBenchmarkFiles: Boolean = 
System.getenv("SPARK_GENERATE_BENCHMARK_FILES") == "1"
+    if (regenerateBenchmarkFiles) {
+      val version = System.getProperty("java.version").split("\\D+")(0).toInt
+      val jdkString = if (version > 8) s"-jdk$version" else ""
+      val resultFileName =
+        s"${this.getClass.getSimpleName.replace("$", 
"")}jdkStringsuffix-results.txt"
+      val prefix = HoodieBenchmarks.currentProjectRoot.map(_ + 
"/").getOrElse("")
+      val dir = new File(s"${prefix}benchmarks/")
+      if (!dir.exists()) {
+        // scalastyle:off println
+        println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
+        // scalastyle:on println
+        dir.mkdirs()
+      }
+      val file = new File(dir, resultFileName)
+      if (!file.exists()) {
+        file.createNewFile()
+      }
+      output = Some(new FileOutputStream(file))
+    }
+
+    runBenchmarkSuite(args)
+
+    output.foreach { o =>
+      if (o != null) {
+        o.close()
+      }
+    }
+
+    afterAll()
+  }
+
+  def suffix: String = ""
+
+  /**
+   * Any shutdown code to ensure a clean shutdown
+   */
+  def afterAll(): Unit = {}
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala
new file mode 100644
index 0000000..8729910
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/hudi/benchmark/HoodieBenchmarks.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.hudi.benchmark
+
+import java.io.File
+import java.lang.reflect.Modifier
+import java.nio.file.{FileSystems, Paths}
+import java.util.Locale
+import scala.collection.JavaConverters._
+import scala.util.Try
+import org.apache.hbase.thirdparty.com.google.common.reflect.ClassPath
+
+/**
+ * Reference from spark.
+ * Run all benchmarks. To run this benchmark, you should build Spark with 
either Maven or SBT.
+ * After that, you can run as below:
+ *
+ * {{{
+ *   1. with spark-submit
+ *      bin/spark-submit --class <this class>
+ *        --jars <all spark test jars>,<spark external package jars>
+ *        <spark core test jar> <glob pattern for class> <extra arguments>
+ *   2. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class <this class>
+ *        --jars <all spark test jars>,<spark external package jars>
+ *        <spark core test jar> <glob pattern for class> <extra arguments>
+ *      Results will be written to all corresponding files under "benchmarks/".
+ *      Notice that it detects the sub-project's directories from jar's paths 
so the provided jars
+ *      should be properly placed under target (Maven build) or target/scala-* 
(SBT) when you
+ *      generate the files.
+ * }}}
+ *
+ * You can use a command as below to find all the test jars.
+ * Make sure to do not select duplicated jars created by different versions of 
builds or tools.
+ * {{{
+ *   find . -name '*-SNAPSHOT-tests.jar' | paste -sd ',' -
+ * }}}
+ *
+ * The example below runs all benchmarks and generates the results:
+ * {{{
+ *   SPARK_GENERATE_BENCHMARK_FILES=1 bin/spark-submit --class \
+ *     org.apache.spark.benchmark.Benchmarks --jars \
+ *     "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | 
paste -sd ',' -`" \
+ *     "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
+ *     "*"
+ * }}}
+ *
+ * The example below runs all benchmarks under 
"org.apache.spark.sql.execution.datasources"
+ * {{{
+ *   bin/spark-submit --class \
+ *     org.apache.spark.benchmark.Benchmarks --jars \
+ *     "`find . -name '*-SNAPSHOT-tests.jar' -o -name '*avro*-SNAPSHOT.jar' | 
paste -sd ',' -`" \
+ *     "`find . -name 'spark-core*-SNAPSHOT-tests.jar'`" \
+ *     "org.apache.spark.sql.execution.datasources.*"
+ * }}}
+ */
+
+object HoodieBenchmarks {
+  var currentProjectRoot: Option[String] = None
+
+  def main(args: Array[String]): Unit = {
+    val isFailFast = sys.env.get(
+      
"SPARK_BENCHMARK_FAILFAST").map(_.toLowerCase(Locale.ROOT).trim.toBoolean).getOrElse(true)
+    val numOfSplits = sys.env.get(
+      
"SPARK_BENCHMARK_NUM_SPLITS").map(_.toLowerCase(Locale.ROOT).trim.toInt).getOrElse(1)
+    val currentSplit = sys.env.get(
+      "SPARK_BENCHMARK_CUR_SPLIT").map(_.toLowerCase(Locale.ROOT).trim.toInt - 
1).getOrElse(0)
+    var numBenchmark = 0
+
+    var isBenchmarkFound = false
+    val benchmarkClasses = ClassPath.from(
+      Thread.currentThread.getContextClassLoader
+    ).getTopLevelClassesRecursive("org.apache.spark").asScala.toArray
+    val matcher = FileSystems.getDefault.getPathMatcher(s"glob:${args.head}")
+
+    benchmarkClasses.foreach { info =>
+      lazy val clazz = info.load
+      lazy val runBenchmark = clazz.getMethod("main", classOf[Array[String]])
+      // isAssignableFrom seems not working with the reflected class from 
Guava's
+      // getTopLevelClassesRecursive.
+      require(args.length > 0, "Benchmark class to run should be specified.")
+      if (
+        info.getName.endsWith("Benchmark") &&
+          // TODO(SPARK-34927): Support TPCDSQueryBenchmark in Benchmarks
+          !info.getName.endsWith("TPCDSQueryBenchmark") &&
+          matcher.matches(Paths.get(info.getName)) &&
+          Try(runBenchmark).isSuccess && // Does this has a main method?
+          !Modifier.isAbstract(clazz.getModifiers) // Is this a regular class?
+      ) {
+        numBenchmark += 1
+        if (numBenchmark % numOfSplits == currentSplit) {
+          isBenchmarkFound = true
+
+          val targetDirOrProjDir =
+            new File(clazz.getProtectionDomain.getCodeSource.getLocation.toURI)
+              .getParentFile.getParentFile
+
+          // The root path to be referred in each benchmark.
+          currentProjectRoot = Some {
+            if (targetDirOrProjDir.getName == "target") {
+              // SBT build
+              targetDirOrProjDir.getParentFile.getCanonicalPath
+            } else {
+              // Maven build
+              targetDirOrProjDir.getCanonicalPath
+            }
+          }
+
+          // scalastyle:off println
+          println(s"Running ${clazz.getName}:")
+          // scalastyle:on println
+          // Force GC to minimize the side effect.
+          System.gc()
+          try {
+            runBenchmark.invoke(null, args.tail.toArray)
+          } catch {
+            case e: Throwable if !isFailFast =>
+              // scalastyle:off println
+              println(s"${clazz.getName} failed with the exception below:")
+              // scalastyle:on println
+              e.printStackTrace()
+          }
+        }
+      }
+    }
+
+    if (!isBenchmarkFound) throw new RuntimeException("No benchmark found to 
run.")
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
new file mode 100644
index 0000000..5e092bd
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
+import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+/**
+ * Benchmark to measure Avro SerDer performance.
+ */
+object AvroSerDerBenchmark extends HoodieBenchmarkBase {
+  protected val spark: SparkSession = getSparkSession
+
+  def getSparkSession: SparkSession = SparkSession
+    .builder()
+    .master("local[1]")
+    .config("spark.driver.memory", "8G")
+    .appName(this.getClass.getCanonicalName)
+    .getOrCreate()
+
+  def getDataFrame(numbers: Long): DataFrame = {
+    spark.range(0, numbers).toDF("id")
+      .withColumn("c1", lit("AvroSerDerBenchmark"))
+      .withColumn("c2", lit(12.99d))
+      .withColumn("c3", lit(1))
+  }
+
+  /**
+   * Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Windows 10 10.0
+   * Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
+   * perf avro serializer for hoodie:          Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+   * 
------------------------------------------------------------------------------------------------------------------------
+   * serialize internalRow to avro Record               6391           6683    
     413          7.8         127.8       1.0X
+   */
+  private def avroSerializerBenchmark: Unit = {
+    val benchmark = new HoodieBenchmark(s"perf avro serializer for hoodie", 
50000000)
+    benchmark.addCase("serialize internalRow to avro Record") { _ =>
+      val df = getDataFrame(50000000)
+      val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
+      spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
+      HoodieSparkUtils.createRdd(df,"record", "my", 
Some(avroSchema)).foreach(f => f)
+    }
+    benchmark.run()
+  }
+
+  /**
+   * Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Windows 10 10.0
+   * Intel64 Family 6 Model 94 Stepping 3, GenuineIntel
+   * perf avro deserializer for hoodie:        Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+   * 
------------------------------------------------------------------------------------------------------------------------
+   * deserialize avro Record to internalRow             1340           1360    
      27          7.5         134.0       1.0X
+   */
+  private def avroDeserializerBenchmark: Unit = {
+    val benchmark = new HoodieBenchmark(s"perf avro deserializer for hoodie", 
10000000)
+    val df = getDataFrame(10000000)
+    val sparkSchema = df.schema
+    val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
+    val testRdd = HoodieSparkUtils.createRdd(df,"record", "my", 
Some(avroSchema))
+    testRdd.cache()
+    testRdd.foreach(f => f)
+    spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
+    benchmark.addCase("deserialize avro Record to internalRow") { _ =>
+      testRdd.mapPartitions { iter =>
+        val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my")
+        val avroToRowConverter = 
AvroConversionUtils.createAvroToInternalRowConverter(schema, sparkSchema)
+        iter.map(record => 
avroToRowConverter.apply(record.asInstanceOf[GenericRecord]).get)
+      }.foreach(f => f)
+    }
+    benchmark.run()
+  }
+
+  override def afterAll(): Unit = {
+    spark.stop()
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    avroSerializerBenchmark
+    avroDeserializerBenchmark
+  }
+}

Reply via email to