olabusayoT commented on code in PR #1608:
URL: https://github.com/apache/daffodil/pull/1608#discussion_r2666146638
##########
daffodil-cli/src/main/scala/org/apache/daffodil/cli/Main.scala:
##########
@@ -1477,83 +1477,79 @@ class Main(
data
}
- val inputs = (0 until performanceOpts.number()).map { n =>
+ val inputQueue = new ConcurrentLinkedQueue[Either[AnyRef,
Array[Byte]]]()
+ (0 until performanceOpts.number()).foreach { n =>
val index = n % dataSeq.length
- dataSeq(index)
- }
- val inputsWithIndex = inputs.zipWithIndex
-
- implicit val executionContext = new ExecutionContext {
- val threadPool =
Executors.newFixedThreadPool(performanceOpts.threads())
-
- def execute(runnable: Runnable): Unit = {
- threadPool.submit(runnable)
- }
-
- def reportFailure(t: Throwable): Unit = {
- // do nothing
- }
+ inputQueue.add(dataSeq(index))
}
val nullChannelForUnparse =
Channels.newChannel(NullOutputStream.INSTANCE)
val nullOutputStreamForParse = NullOutputStream.INSTANCE
- val NSConvert = 1000000000.0
- val (totalTime, results) = Timer.getTimeResult({
- val tasks = inputsWithIndex.map { case (inData, n) =>
- val task: Future[(Int, Long, Boolean)] = Future {
- val (time, result) = inData match {
- case Left(anyRef) =>
- Timer.getTimeResult({
- val unparseResult =
- infosetHandler.unparse(anyRef, nullChannelForUnparse)
- unparseResult
- })
- case Right(bytes) =>
- Timer.getTimeResult({
- Using.resource(InputSourceDataInputStream(bytes)) {
input =>
+ val (totalTimeNS, results) = Timer.getTimeResult({
+ val tasks = (0 until performanceOpts.threads()).map { id =>
+ Future {
+ var threadLatencyNS: Long = 0
+ var threadFiles: Long = 0
+
+ var input: Either[AnyRef, Array[Byte]] = null
+ while ({ input = inputQueue.poll(); input != null }) {
+ val latency = input match {
+ case Left(unparseInput) => {
+ Timer.getTimeNS({
+ val unparseResult =
+ infosetHandler.unparse(unparseInput,
nullChannelForUnparse)
+ if (unparseResult.isError) throw Exception("Failed
to unparse")
+ })
+ }
+ case Right(parseInput) => {
+ Timer.getTimeNS({
+ val isdis = InputSourceDataInputStream(parseInput)
val infosetResult =
- infosetHandler.parse(input,
nullOutputStreamForParse)
- val parseResult = infosetResult.parseResult
- parseResult
- }
- })
+ infosetHandler.parse(isdis,
nullOutputStreamForParse)
+ if (infosetResult.parseResult.isError)
+ throw new Exception("Failed to parse")
+ })
+ }
+ }
+ threadLatencyNS += latency
+ threadFiles += 1
}
- (n, time, result.isError)
+ (threadLatencyNS, threadFiles)
}
- task
}
- val results = tasks.map { Await.result(_, Duration.Inf) }
- results
- })
- val rates = results.map { results =>
- val (runNum: Int, nsTime: Long, error: Boolean) = results
- val rate = 1 / (nsTime / NSConvert)
- val status = if (error) "fail" else "pass"
- Logger.log.info(
- s"run: ${runNum}, seconds: ${nsTime / NSConvert}, rate:
${rate}, status: ${status}"
- )
- rate
- }
+ val results =
+ try {
+ tasks.map { Await.result(_, Duration.Inf) }
+ } catch {
+ // if a future throws an exception, we return an empty seq
to indicate failure
+ case _: Exception => Seq()
+ }
- val numFailures = results.map { _._3 }.filter { e => e }.length
- if (numFailures > 0) {
- Logger.log.error(s"${numFailures} failures found\n")
- }
+ results
+ })
- val sec = totalTime / NSConvert
- val action = performanceOpts.unparse() match {
- case true => "unparse"
- case false => "parse"
+ if (results.length == 0) {
+ Logger.log.error("one or more files failed to parse/unparse")
+ ExitCode.PerformanceTestError
+ } else {
+ val action = performanceOpts.unparse() match {
+ case true => "unparse"
+ case false => "parse"
+ }
+ val (totalLatencyNS, totalFiles) = results.reduce { case ((a1,
a2), (b1, b2)) =>
+ (a1 + b1, a2 + b2)
+ }
+ val totalTimeSec = totalTimeNS.toDouble / 1_000_000_000
+ val averageThroughput = (totalFiles.toDouble / totalTimeSec)
+ val averageLatency = (totalLatencyNS.toDouble / totalFiles /
1_000_000)
Review Comment:
Would it be clearer to label these averages with Sec/MS, NBD if not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]