mbeckerle commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r519950055
##########
File path: daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
##########
@@ -38,7 +38,7 @@
trait Coroutine[T] {
private val queueCapacity: Int = 1
Review comment:
So here the queue size is hard wired to 1. You aren't using the tunable
size. That makes sense because if you enlarge this queue, then the producer and
consumer threads would run concurrently, which is specifically something we're
trying to avoid when using Coroutines.
Worth it to point this structural issue with coroutines here with a comment
that if the context-switching overhead is too large, that passing a larger data
structure containing multiple smaller events is the way to fix it, not by
enlarging this queue.
I like the design choice here to make that the responsibility of the calling
application of the coroutines. Burying a high-overhead abstraction and trying
to buffer it into submission seldom helps.
##########
File path:
daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
##########
@@ -46,6 +49,24 @@ class TestSAXUnparseAPI {
assertEquals(testData, bao.toString)
}
+ @Test def testUnparseContentHandler_unparse_saxUnparseEventBatchSize_0():
Unit = {
+ val dpT = testDataprocessor(testSchema,
Some(Map("saxUnparseEventBatchSize" -> "0")))
Review comment:
Comment: what does batch size 0 test? I would expect min batch size to
be 1.
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -183,22 +193,23 @@ class SAXInfosetInputter(
override protected def run(): Unit = {
try {
- // startDocument kicks off this entire process, so it should be on the
queue so the
- // waitForResume call can grab it. That is set to our current event, so
when hasNext is called
- // the nextEvent after the StartDocument can be queued
- copyEvent(source = this.waitForResume(), dest = currentEvent)
+ // startDocument kicks off this entire process, so the first batch of
events starting with it
+ // should be on the queue so the waitForResume call can grab it. This
populates the
+ // batchedInfosetEvents global var for use by the Inputter
Review comment:
Can't be "global" in the sense of a JVM-wide singleton here. It has to
be per inputter. Otherwise you can't run multiple SAX parsers/unparsers at the
same time in one JVM. We need to be able to do that.
I assume this isn't really "global", so please update comment and indicate
the real scope of this var, which is, I think a private var in this
SAXInfosetInputter ??
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -70,10 +74,18 @@ class DaffodilUnparseContentHandler(
extends DFDL.DaffodilUnparseContentHandler {
private lazy val inputter = new SAXInfosetInputter(this, dp, output)
private var unparseResult: DFDL.UnparseResult = _
- private lazy val infosetEvent: DFDL.SAXInfosetEvent = new
DFDL.SAXInfosetEvent
private lazy val characterData = new StringBuilder
private var prefixMapping: NamespaceBinding = _
private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+ private lazy val tunablesBatchSize =
dp.getTunables().saxUnparseEventBatchSize
+ private lazy val SAX_UNPARSE_EVENT_BATCH_SIZE = tunablesBatchSize + 1
Review comment:
Also, why all caps? Seems to me this should just be actualBatchSize.
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -70,10 +74,18 @@ class DaffodilUnparseContentHandler(
extends DFDL.DaffodilUnparseContentHandler {
private lazy val inputter = new SAXInfosetInputter(this, dp, output)
private var unparseResult: DFDL.UnparseResult = _
- private lazy val infosetEvent: DFDL.SAXInfosetEvent = new
DFDL.SAXInfosetEvent
private lazy val characterData = new StringBuilder
private var prefixMapping: NamespaceBinding = _
private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+ private lazy val tunablesBatchSize =
dp.getTunables().saxUnparseEventBatchSize
+ private lazy val SAX_UNPARSE_EVENT_BATCH_SIZE = tunablesBatchSize + 1
Review comment:
We really do, for testing purposes, want to be able to make this 1, not
2. So suggest this is changed to
```
if (tunablesBatchSize < 1) 1 else tunablesBatchSize
```
##########
File path:
daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
##########
@@ -55,8 +55,13 @@ object TestSAXParseUnparseAPI {
lazy val dp: DataProcessor = testDataprocessor(testSchema)
- def testDataprocessor(testSchema: scala.xml.Elem): DataProcessor = {
- val schemaCompiler = Compiler()
+ def testDataprocessor(testSchema: scala.xml.Elem, tunablesArg:
Option[Map[String, String]] = None): DataProcessor = {
Review comment:
name conventions: testDataProcessor (cap P)
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -35,25 +34,28 @@ import org.apache.daffodil.xml.XMLUtils
/**
* The SAXInfosetInputter consumes SAXInfosetEvent objects from the
DaffodilUnparseContentHandler
- * class and converts them to events that the DataProcessor unparse can use.
This class contains two
- * SAXInfosetEvent objects, the current event the unparse method is processing
and the next event
- * to be processed later.
+ * class and converts them to events that the DataProcessor unparse can use.
This class contains an
+ * array of batched SAXInfosetEvent objects that it receives from the
contentHandler and the index
+ * of the current element being processed.
*
- * This class together with the DaffodilUnparseContentHandler use coroutines
to ensure that only one event,
- * at a time, is passed between the two classes. The following is the general
process:
+ * This class, together with the SAXInfosetInputter, uses coroutines to ensure
that a batch of events
+ * (based on the tunable saxUnparseEventBatchSize) can be passed from the
former to the latter.
+ * The following is the general process:
*
- * - the run method is called, with a StartDocument event already loaded on
the inputter's queue.
- * This is collected and stored in the currentEvent member
+ * - the run method is called, with the first batch of events, starting with
the StartDocument event,
+ * already loaded on the inputter's queue.
+ * This is collected and stored in the batchedInfosetEvents member, and the
currentIndex is set to 0
* - The dp.unparse method is called, and it calls hasNext to make sure an
event exists to be
- * processed and then queries the currentEvent. The hasNext call also queues
the nextEvent by
- * transferring control to the contentHandler so it can load the next event.
- * - After it is done with the currentEvent, it calls inputter.next to get the
next event, and that
- * copies the queued nextEvent into the currentEvent
- * - This process continues until the currentEvent contains an EndDocument
event, at which point, the
- * nextEvent is cleared, endDocumentReceived is set to true and hasNext will
return false
- * - This ends the unparse process, and the unparseResult and/or any Errors
are set on the event. We
- * call resumeFinal passing along that element, terminating this thread and
resuming the
- * contentHandler for the last time.
+ * processed and then queries the event at currentIndex. The hasNext call also
checks that there is
+ * a next event to be processed (currentIndex+1), and if not, queues the next
batch of events by
+ * transferring control to the contentHandler so it can load them.
+ * - After it is done with the current event, it calls inputter.next to get
the next event, and that
+ * increments the currentIndex and cleans out the event at the previous index
+ * - This process continues until the event at currentIndex contains an
EndDocument event, at which point, the
+ * endDocumentReceived is set to true and hasNext will return false
Review comment:
Add comment: This implies that all these arrays are full of events,
except the last one that contains the EndDocument event which will be the only
"short" array. There is no reason the first array can't also be the last one if
all the events from StartDocument to EndDocument fit in a single array.
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -183,22 +193,23 @@ class SAXInfosetInputter(
override protected def run(): Unit = {
try {
- // startDocument kicks off this entire process, so it should be on the
queue so the
- // waitForResume call can grab it. That is set to our current event, so
when hasNext is called
- // the nextEvent after the StartDocument can be queued
- copyEvent(source = this.waitForResume(), dest = currentEvent)
+ // startDocument kicks off this entire process, so the first batch of
events starting with it
+ // should be on the queue so the waitForResume call can grab it. This
populates the
+ // batchedInfosetEvents global var for use by the Inputter
+ batchedInfosetEvents = this.waitForResume()
val unparseResult = dp.unparse(this, output)
- currentEvent.unparseResult = One(unparseResult)
+ batchedInfosetEvents(currentIndex).unparseResult = One(unparseResult)
if (unparseResult.isError) {
// unparseError is contained within unparseResult
- currentEvent.causeError = One(new
DaffodilUnparseErrorSAXException(unparseResult))
+ batchedInfosetEvents(currentIndex).causeError = One(new
DaffodilUnparseErrorSAXException(unparseResult))
}
} catch {
case e: Exception => {
- currentEvent.causeError = One(new
DaffodilUnhandledSAXException(e.getMessage, e))
+ batchedInfosetEvents(currentIndex).causeError = One(new
DaffodilUnhandledSAXException(e.getMessage, e))
Review comment:
DaffodilUnhandledSAXException really should take just the 'e' argument.
No reason to grab the string and make it construct the message, etc here. All
the information is in the 'e' exception object. This is just wrapping it in an
exception type we know about. getMesage should be called by code that is
planning to display the message somewhere, or put it into a log, perhaps
causing internationalized translation of the message to be triggered, etc.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]