This is an automated email from the ASF dual-hosted git repository.

slawrence pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/daffodil.git


The following commit(s) were added to refs/heads/main by this push:
     new e6b76fbc8 Refactor CLI performance command to reduce overhead
e6b76fbc8 is described below

commit e6b76fbc8bed5a794e7b2826bff81968114f2d2b
Author: Steve Lawrence <[email protected]>
AuthorDate: Thu Jan 15 12:45:03 2026 -0500

    Refactor CLI performance command to reduce overhead
    
    In the CLI performance command, instead of creating one Future per
    parse/unparse, this now creates one Future per thread, reducing the
    number of Futures from potentially hundreds of thousands to a small
    handful. Test data is now added to a ConcurrentLinkedQueue and the
    Futures read from this queue until empty. Not only does this avoid a
    large number of Future allocations, it also more closely mimics common
    real world usage where there is a single serial producer and multiple
    parallel consumers.
    
    Also no longer store the results of each parse/unparse run to avoid
    allocations. Instead we just keep track of the number of files processed
    and the total latency and use those numbers to calculate average latency
    at the end.
    
    Also removes the Either allocation, match/case, and Timer functions in the
    critical loop.
    
    Reports vary about whether all these changes improve performance numbers
    but they do not seem to make performance worse. And they do make the
    results of profiling more clear with less allocations.
    
    DAFFODIL-3064
---
 .../main/scala/org/apache/daffodil/cli/Main.scala  | 165 ++++++++++-----------
 .../daffodil/cli/cliTest/TestCLIPerformance.scala  |  30 ++--
 2 files changed, 92 insertions(+), 103 deletions(-)

diff --git a/daffodil-cli/src/main/scala/org/apache/daffodil/cli/Main.scala 
b/daffodil-cli/src/main/scala/org/apache/daffodil/cli/Main.scala
index 99b1dc54d..6cb4db47a 100644
--- a/daffodil-cli/src/main/scala/org/apache/daffodil/cli/Main.scala
+++ b/daffodil-cli/src/main/scala/org/apache/daffodil/cli/Main.scala
@@ -30,14 +30,13 @@ import java.nio.file.Files
 import java.nio.file.Paths
 import java.nio.file.StandardOpenOption
 import java.util.Scanner
-import java.util.concurrent.Executors
+import java.util.concurrent.ConcurrentLinkedQueue
 import javax.xml.parsers.SAXParserFactory
 import javax.xml.transform.TransformerException
 import javax.xml.transform.TransformerFactory
 import javax.xml.transform.stream.StreamResult
-import scala.collection.immutable.ArraySeq
 import scala.concurrent.Await
-import scala.concurrent.ExecutionContext
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.jdk.CollectionConverters.*
@@ -1456,104 +1455,98 @@ class Main(
               forPerformance = true
             )
 
-            val dataSeq: Seq[Either[AnyRef, Array[Byte]]] =
-              ArraySeq
-                .unsafeWrapArray(files)
-                .map { filePath =>
-                  // For performance testing, we want everything in memory so 
as to
-                  // remove I/O from consideration. Additionally, for both 
parse
-                  // and unparse we need immutable inputs since we could parse 
the
-                  // same input data multiple times in different performance 
runs.
-                  // So read the file data into an Array[Byte], and use that 
for
-                  // everything.
-                  val input = (new FileInputStream(filePath))
-                  val dataSize = filePath.length()
-                  val bytes = new Array[Byte](dataSize.toInt)
-                  input.read(bytes)
-                  val data = performanceOpts.unparse() match {
-                    case true => Left(infosetHandler.dataToInfoset(bytes))
-                    case false => Right(bytes)
+            val isUnparse = performanceOpts.unparse()
+
+            val dataArray: Array[AnyRef] = files
+              .map { filePath =>
+                // For performance testing, we want everything in memory so as 
to
+                // remove I/O from consideration. Additionally, for both parse
+                // and unparse we need immutable inputs since we could parse 
the
+                // same input data multiple times in different performance 
runs.
+                // So read the file data into an Array[Byte], and use that for
+                // everything.
+                val input = (new FileInputStream(filePath))
+                val dataSize = filePath.length()
+                val bytes = new Array[Byte](dataSize.toInt)
+                input.read(bytes)
+                val data =
+                  if (isUnparse) {
+                    infosetHandler.dataToInfoset(bytes)
+                  } else {
+                    bytes
                   }
-                  data
-                }
-
-            val inputs = (0 until performanceOpts.number()).map { n =>
-              val index = n % dataSeq.length
-              dataSeq(index)
-            }
-            val inputsWithIndex = inputs.zipWithIndex
-
-            implicit val executionContext = new ExecutionContext {
-              val threadPool = 
Executors.newFixedThreadPool(performanceOpts.threads())
-
-              def execute(runnable: Runnable): Unit = {
-                threadPool.submit(runnable)
+                data
               }
 
-              def reportFailure(t: Throwable): Unit = {
-                // do nothing
-              }
+            val inputQueue = new ConcurrentLinkedQueue[AnyRef]()
+            (0 until performanceOpts.number()).foreach { n =>
+              val index = n % dataArray.length
+              inputQueue.add(dataArray(index))
             }
 
             val nullChannelForUnparse = 
Channels.newChannel(NullOutputStream.INSTANCE)
             val nullOutputStreamForParse = NullOutputStream.INSTANCE
 
-            val NSConvert = 1000000000.0
-            val (totalTime, results) = Timer.getTimeResult({
-              val tasks = inputsWithIndex.map { case (inData, n) =>
-                val task: Future[(Int, Long, Boolean)] = Future {
-                  val (time, result) = inData match {
-                    case Left(anyRef) =>
-                      Timer.getTimeResult({
-                        val unparseResult =
-                          infosetHandler.unparse(anyRef, nullChannelForUnparse)
-                        unparseResult
-                      })
-                    case Right(bytes) =>
-                      Timer.getTimeResult({
-                        Using.resource(InputSourceDataInputStream(bytes)) { 
input =>
-                          val infosetResult =
-                            infosetHandler.parse(input, 
nullOutputStreamForParse)
-                          val parseResult = infosetResult.parseResult
-                          parseResult
-                        }
-                      })
-                  }
+            val totalTimeStart = System.nanoTime
 
-                  (n, time, result.isError)
+            val tasks = (0 until performanceOpts.threads()).map { id =>
+              Future {
+                var threadLatencyNS: Long = 0
+                var threadFiles: Long = 0
+
+                var input: AnyRef = null
+                while ({ input = inputQueue.poll(); input != null }) {
+                  val latencyTimeStart = System.nanoTime
+                  if (isUnparse) {
+                    val unparseResult = infosetHandler.unparse(input, 
nullChannelForUnparse)
+                    if (unparseResult.isError) throw Exception("Failed to 
unparse")
+                  } else {
+                    val isdis = 
InputSourceDataInputStream(input.asInstanceOf[Array[Byte]])
+                    val infosetResult = infosetHandler.parse(isdis, 
nullOutputStreamForParse)
+                    if (infosetResult.parseResult.isError)
+                      throw new Exception("Failed to parse")
+                  }
+                  val latencyTimeEnd = System.nanoTime
+                  threadLatencyNS += latencyTimeEnd - latencyTimeStart
+                  threadFiles += 1
                 }
-                task
+
+                (threadLatencyNS, threadFiles)
               }
-              val results = tasks.map { Await.result(_, Duration.Inf) }
-              results
-            })
-
-            val rates = results.map { results =>
-              val (runNum: Int, nsTime: Long, error: Boolean) = results
-              val rate = 1 / (nsTime / NSConvert)
-              val status = if (error) "fail" else "pass"
-              Logger.log.info(
-                s"run: ${runNum}, seconds: ${nsTime / NSConvert}, rate: 
${rate}, status: ${status}"
-              )
-              rate
             }
 
-            val numFailures = results.map { _._3 }.filter { e => e }.length
-            if (numFailures > 0) {
-              Logger.log.error(s"${numFailures} failures found\n")
-            }
+            val results =
+              try {
+                tasks.map { Await.result(_, Duration.Inf) }
+              } catch {
+                // if a future throws an exception, we return an empty seq to 
indicate failure
+                case _: Exception => Seq()
+              }
 
-            val sec = totalTime / NSConvert
-            val action = performanceOpts.unparse() match {
-              case true => "unparse"
-              case false => "parse"
-            }
-            STDOUT.println(s"total $action time (sec): $sec")
-            STDOUT.println(s"min rate (files/sec): ${rates.min.toFloat}")
-            STDOUT.println(s"max rate (files/sec): ${rates.max.toFloat}")
-            STDOUT.println(s"avg rate (files/sec): ${(performanceOpts.number() 
/ sec).toFloat}")
+            val totalTimeEnd = System.nanoTime
 
-            if (numFailures == 0) ExitCode.Success else 
ExitCode.PerformanceTestError
+            if (results.length == 0) {
+              Logger.log.error("one or more files failed to parse/unparse")
+              ExitCode.PerformanceTestError
+            } else {
+              val action = performanceOpts.unparse() match {
+                case true => "unparse"
+                case false => "parse"
+              }
+              val (totalLatencyNS, totalFiles) = results.reduce { case ((a1, 
a2), (b1, b2)) =>
+                (a1 + b1, a2 + b2)
+              }
+              val totalTimeNS = totalTimeEnd - totalTimeStart
+              val totalTimeSec = totalTimeNS.toDouble / 1_000_000_000
+              val averageThroughputFilesPerSec = (totalFiles.toDouble / 
totalTimeSec)
+              val averageLatencyMS = (totalLatencyNS.toDouble / totalFiles / 
1_000_000)
+              STDOUT.println(f"total $action time (sec): $totalTimeSec%.3f")
+              STDOUT.println(
+                f"average throughput (files/sec): 
$averageThroughputFilesPerSec%.3f"
+              )
+              STDOUT.println(f"average latency (ms): $averageLatencyMS%.3f")
+              ExitCode.Success
+            }
           }
 
         }
diff --git 
a/daffodil-cli/src/test/scala/org/apache/daffodil/cli/cliTest/TestCLIPerformance.scala
 
b/daffodil-cli/src/test/scala/org/apache/daffodil/cli/cliTest/TestCLIPerformance.scala
index 488ada0c8..8fbb507e6 100644
--- 
a/daffodil-cli/src/test/scala/org/apache/daffodil/cli/cliTest/TestCLIPerformance.scala
+++ 
b/daffodil-cli/src/test/scala/org/apache/daffodil/cli/cliTest/TestCLIPerformance.scala
@@ -32,7 +32,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance -N 2 -t 2 -s $schema -r matrix $input") { cli =>
       cli.expect("total parse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -44,7 +44,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance -I sax -N 2 -t 2 -s $schema -r matrix $input") { 
cli =>
       cli.expect("total parse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -56,7 +56,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance -I exi -N 2 -t 2 -s $schema -r matrix $input") { 
cli =>
       cli.expect("total parse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -68,7 +68,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance -I exisa -N 2 -t 2 -s $schema -r matrix $input") { 
cli =>
       cli.expect("total parse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -80,7 +80,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance -N 20 -t 3 -s $schema -r matrix $input") { cli =>
       cli.expect("total parse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -92,7 +92,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance -N 50 -t 5 -s $schema -r Item2 $input") { cli =>
       cli.expect("total parse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -103,8 +103,6 @@ class TestCLIPerformance {
     val input = 
path("daffodil-cli/src/test/resources/org/apache/daffodil/cli/input/input5.txt")
 
     runCLI(args"performance -N 2 -t 2 -s $schema $input") { cli =>
-      cli.expect("total parse time (sec):")
-      cli.expect("avg rate (files/sec):")
       cli.expectErr("error")
     }(ExitCode.PerformanceTestError)
   }
@@ -119,7 +117,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance --unparse -N 2 -t 2 -s $schema -r e3 $input") { 
cli =>
       cli.expect("total unparse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -133,7 +131,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance --unparse -I sax -N 2 -t 2 -s $schema -r e3 
$input") { cli =>
       cli.expect("total unparse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -147,7 +145,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance --unparse -I exi -N 2 -t 2 -s $schema -r e3 
$input") { cli =>
       cli.expect("total unparse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -161,7 +159,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance --unparse -I exisa -N 2 -t 2 -s $schema -r e3 
$input") { cli =>
       cli.expect("total unparse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -175,7 +173,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance --unparse -I null -N 2 -t 2 -s $schema -r e3 
$input") { cli =>
       cli.expect("total unparse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -189,7 +187,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance --unparse -N 20 -t 3 -s $schema -r e3 $input") { 
cli =>
       cli.expect("total unparse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -203,7 +201,7 @@ class TestCLIPerformance {
 
     runCLI(args"performance --unparse -N 50 -t 5 -s $schema -r e3 $input") { 
cli =>
       cli.expect("total unparse time (sec):")
-      cli.expect("avg rate (files/sec):")
+      cli.expect("average throughput (files/sec):")
     }(ExitCode.Success)
   }
 
@@ -216,8 +214,6 @@ class TestCLIPerformance {
     )
 
     runCLI(args"performance --unparse -N 2 -t 2 -s $schema $input") { cli =>
-      cli.expect("total unparse time (sec):")
-      cli.expect("avg rate (files/sec):")
       cli.expectErr("error")
     }(ExitCode.PerformanceTestError)
   }

Reply via email to