stevedlawrence commented on a change in pull request #453:
URL: https://github.com/apache/incubator-daffodil/pull/453#discussion_r520581742
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
}
override def hasNext(): Boolean = {
- if (endDocumentReceived) false
- else if (!nextEvent.isEmpty) true
- else {
- val event = this.resume(unparseContentHandler, Try(currentEvent))
- copyEvent(source = event, dest = nextEvent)
+ val nextIndex = currentIndex + 1
+ if (endDocumentReceived) {
+ // if the current Element is EndDocument, then there is no valid next
+ false
+ } else if (batchedInfosetEvents != null &&
batchedInfosetEvents.lift(nextIndex).nonEmpty
+ && !batchedInfosetEvents(nextIndex).isEmpty) {
+ // if the next element exists and is nonEmpty
Review comment:
In what case can the nextIndex be empty? It seems like the
ContentHandler will either fill up the entire array with events and not resume
the coroutine until it does so, or it will stop and a EndDocument. I wonder if
we can make an assumption in all this logic that there are never any empty
events, as long as we never try to read past EndDocument?
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
}
override def hasNext(): Boolean = {
- if (endDocumentReceived) false
- else if (!nextEvent.isEmpty) true
- else {
- val event = this.resume(unparseContentHandler, Try(currentEvent))
- copyEvent(source = event, dest = nextEvent)
+ val nextIndex = currentIndex + 1
+ if (endDocumentReceived) {
+ // if the current Element is EndDocument, then there is no valid next
+ false
+ } else if (batchedInfosetEvents != null &&
batchedInfosetEvents.lift(nextIndex).nonEmpty
+ && !batchedInfosetEvents(nextIndex).isEmpty) {
+ // if the next element exists and is nonEmpty
+ true
+ } else {
+ // there is no nextEvent or it was empty, but we still have no
EndDocument. So load the next
+ // batch from the contentHandler
+ returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+ batchedInfosetEvents = this.resume(unparseContentHandler,
returnedInfosetEvent)
+ // we reset the index once we receive a new batch, but we don't use the
first element from
+ // this point on
+ currentIndex = 0
Review comment:
I think this comment could be a little confusing. It maybe implies that
we're doing something wasteful? The reason this must be set to 0 is because
hasNext isn't allowed to change what we are looking at. Calling resume took the
thing we were last looking at and made it the first thing in the array. So the
currentIndex must now be changed to look at the zeroth thing. Only when next()
is called are we allowed to actually increment currentIndex so we are actually
looking at the next element.
##########
File path:
daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
##########
@@ -55,8 +55,13 @@ object TestSAXParseUnparseAPI {
lazy val dp: DataProcessor = testDataprocessor(testSchema)
- def testDataprocessor(testSchema: scala.xml.Elem): DataProcessor = {
- val schemaCompiler = Compiler()
+ def testDataprocessor(testSchema: scala.xml.Elem, tunablesArg:
Option[Map[String, String]] = None): DataProcessor = {
+ val schemaCompiler =
+ if (tunablesArg.nonEmpty) {
+ Compiler().withTunables(tunablesArg.get)
+ } else {
+ Compiler()
+ }
Review comment:
Rather than making tunablesArg an ``Option``, how about it's just a
``Map`` and it defaults to ``Map.empty``. The ``withTunables`` function should
work just fine with an empty map, so this can just become
```scala
val schemaCompiler = Compiler.withTuanbles(tunablesArg)
```
So it cleans things up a little bit.
Only downside is It will create a copy of the ``Compiler`` instance even if
there are no tunables, but that doesn't seem like too big of a deal to me. If
we really cared, we could modify the ``withTunables(Map)`` method to not do the
copy if the map is empty, but I'm not sure if that's worth it.
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
}
override def hasNext(): Boolean = {
- if (endDocumentReceived) false
- else if (!nextEvent.isEmpty) true
- else {
- val event = this.resume(unparseContentHandler, Try(currentEvent))
- copyEvent(source = event, dest = nextEvent)
+ val nextIndex = currentIndex + 1
+ if (endDocumentReceived) {
+ // if the current Element is EndDocument, then there is no valid next
+ false
+ } else if (batchedInfosetEvents != null &&
batchedInfosetEvents.lift(nextIndex).nonEmpty
+ && !batchedInfosetEvents(nextIndex).isEmpty) {
+ // if the next element exists and is nonEmpty
+ true
+ } else {
+ // there is no nextEvent or it was empty, but we still have no
EndDocument. So load the next
+ // batch from the contentHandler
+ returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+ batchedInfosetEvents = this.resume(unparseContentHandler,
returnedInfosetEvent)
+ // we reset the index once we receive a new batch, but we don't use the
first element from
+ // this point on
+ currentIndex = 0
true
}
}
override def next(): Unit = {
if (hasNext()) {
- copyEvent(source = Try(nextEvent), dest = currentEvent)
- nextEvent.clear()
- if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived =
true
+ // clear element at current index as we're done with it, except in the
case we just loaded the
+ // new elements, then do nothing
+ batchedInfosetEvents(currentIndex).clear()
+
Review comment:
Just so it's clear to me, the contract between the ContenttHandler and
the InofsetInputter is that the ContentHandler fills up the array, and as the
SAXInfosetInputter is done with each event it clears that event. So when we
resume to the infoset inputter it can assume that the entire array has been
cleared and doesn't need to clear anything itself?
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
}
override def hasNext(): Boolean = {
- if (endDocumentReceived) false
- else if (!nextEvent.isEmpty) true
- else {
- val event = this.resume(unparseContentHandler, Try(currentEvent))
- copyEvent(source = event, dest = nextEvent)
+ val nextIndex = currentIndex + 1
+ if (endDocumentReceived) {
+ // if the current Element is EndDocument, then there is no valid next
+ false
+ } else if (batchedInfosetEvents != null &&
batchedInfosetEvents.lift(nextIndex).nonEmpty
+ && !batchedInfosetEvents(nextIndex).isEmpty) {
+ // if the next element exists and is nonEmpty
+ true
+ } else {
+ // there is no nextEvent or it was empty, but we still have no
EndDocument. So load the next
+ // batch from the contentHandler
+ returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+ batchedInfosetEvents = this.resume(unparseContentHandler,
returnedInfosetEvent)
+ // we reset the index once we receive a new batch, but we don't use the
first element from
+ // this point on
+ currentIndex = 0
true
}
}
override def next(): Unit = {
if (hasNext()) {
- copyEvent(source = Try(nextEvent), dest = currentEvent)
- nextEvent.clear()
- if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived =
true
+ // clear element at current index as we're done with it, except in the
case we just loaded the
+ // new elements, then do nothing
+ batchedInfosetEvents(currentIndex).clear()
+
+ // increment current index to the next index
+ currentIndex += 1
+
+ // check if new current index is EndDocument
+ if (batchedInfosetEvents.lift(currentIndex).nonEmpty
+ && batchedInfosetEvents(currentIndex).eventType.contains(EndDocument))
{
+ endDocumentReceived = true
+ }
Review comment:
We called hasNext already, so that should mean currentIndex must exist,
so I think this lift call can be removed.
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
##########
@@ -121,38 +124,45 @@ class SAXInfosetInputter(
}
override def hasNext(): Boolean = {
- if (endDocumentReceived) false
- else if (!nextEvent.isEmpty) true
- else {
- val event = this.resume(unparseContentHandler, Try(currentEvent))
- copyEvent(source = event, dest = nextEvent)
+ val nextIndex = currentIndex + 1
+ if (endDocumentReceived) {
+ // if the current Element is EndDocument, then there is no valid next
+ false
+ } else if (batchedInfosetEvents != null &&
batchedInfosetEvents.lift(nextIndex).nonEmpty
Review comment:
We might want to avoid the lift function. This allocates an Option,
which we try to avoid where possible. Also, looking at the definition of
Array.lift in scala doc is says:
> This member is added by an implicit conversion from Array[T]
toIndexedSeq[T] performed by method copyArrayToImmutableIndexedSeq in
scala.LowPriorityImplicits2.
So just do do this lift, there's some copying going on. This copy is
hopefully efficient, but probably best to avoid. Especially since it seems to
be just testing if there is a nextIndex. I think this can be done instead with
something like ``nextIndex < batchedInfosetEvents.length``
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
}
private def sendToInputter(): Unit = {
- val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
- infosetEvent.clear()
- // if event is wrapped in a Try failure, we will not have an
unparseResult, so we only set
- // unparseResults for events wrapped in Try Success, including those
events that have expected
- // errors
- if (infosetEventWithResponse.isSuccess &&
infosetEventWithResponse.get.unparseResult.isDefined) {
- unparseResult = infosetEventWithResponse.get.unparseResult.get
- }
- // the exception from events wrapped in Try failures and events wrapped in
Try Successes
- // (with an unparse error state i.e unparseResult.isError) are collected
and thrown to stop
- // the execution of the xmlReader
- if (infosetEventWithResponse.isFailure ||
infosetEventWithResponse.get.isError) {
- val causeError = if(infosetEventWithResponse.isFailure) {
- infosetEventWithResponse.failed.get
- } else {
- infosetEventWithResponse.get.causeError.get
+ val nextIndex = currentIndex + 1
+ if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+ && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+ // if we have room left on the batchedInfosetEvents array and the
current element != EndDocument
+ currentIndex += 1
+ // if the new currentElement has any existing content, clear it
+ if (!batchedInfosetEvents(currentIndex).isEmpty)
batchedInfosetEvents(currentIndex).clear()
+ } else {
+ // ready to send it off
+ val infosetEventWithResponse = this.resume(inputter,
batchedInfosetEvents).head
+ // we only ever return a one element array
+
+ // it is possible for unparseResult to be null, in the case of an
DaffodilUnhandledSAXException
+ if (infosetEventWithResponse.unparseResult.isDefined) {
+ unparseResult = infosetEventWithResponse.unparseResult.get
}
- causeError match {
- case unparseError: DaffodilUnparseErrorSAXException =>
- // although this is an expected error, we need to throw it so we can
stop the xmlReader
- // parse and this thread
- throw unparseError
- case unhandled: DaffodilUnhandledSAXException => throw unhandled
- case unknown => throw new DaffodilUnhandledSAXException("Unknown
exception: ", new Exception(unknown))
+ // any exception is collected and thrown to stop the execution of the
xmlReader
+ if (infosetEventWithResponse.isError) {
+ val causeError = infosetEventWithResponse.causeError.get
+ causeError match {
+ case unparseError: DaffodilUnparseErrorSAXException =>
+ // although this is an expected error, we need to throw it so we
can stop the xmlReader
+ // parse and this thread
+ throw unparseError
+ case unhandled: DaffodilUnhandledSAXException => throw unhandled
+ case unknown => throw new DaffodilUnhandledSAXException("Unknown
exception: ",
+ new Exception(unknown))
+ }
}
+ // clear the last element, then start to load elements starting at the
second element since
+ // the first element is used as a buffer by the SAXInfosetInputter when
new batches are sent
+ batchedInfosetEvents(currentIndex).clear()
Review comment:
I was saying other calls to clear in the ContentHandler maybe weren't
needed, but I think this one is actually needed. That's because the
InfosetInputter only clears the current element when it calls next(), but it
never calls next when the current element is the last one in the array. So the
InfosetInputter clears all elements except the last one. Might be worth adding
a comment to that affect.
##########
File path:
daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
##########
@@ -46,6 +49,24 @@ class TestSAXUnparseAPI {
assertEquals(testData, bao.toString)
}
+ @Test def testUnparseContentHandler_unparse_saxUnparseEventBatchSize_0():
Unit = {
+ val dpT = testDataprocessor(testSchema,
Some(Map("saxUnparseEventBatchSize" -> "0")))
Review comment:
I've also created
[DAFFODIL-2434](https://issues.apache.org/jira/browse/DAFFODIL-2432) as a way
to more throughly ensure our tunables have valid values. That way we can
validate them when they are set rather than when the values are actually used.
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
}
private def sendToInputter(): Unit = {
Review comment:
Suggest renaming this to something like ``maybeSendToInputter`` since it
might not necesarily send to the inputter unless the batch is full?
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
}
private def sendToInputter(): Unit = {
- val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
- infosetEvent.clear()
- // if event is wrapped in a Try failure, we will not have an
unparseResult, so we only set
- // unparseResults for events wrapped in Try Success, including those
events that have expected
- // errors
- if (infosetEventWithResponse.isSuccess &&
infosetEventWithResponse.get.unparseResult.isDefined) {
- unparseResult = infosetEventWithResponse.get.unparseResult.get
- }
- // the exception from events wrapped in Try failures and events wrapped in
Try Successes
- // (with an unparse error state i.e unparseResult.isError) are collected
and thrown to stop
- // the execution of the xmlReader
- if (infosetEventWithResponse.isFailure ||
infosetEventWithResponse.get.isError) {
- val causeError = if(infosetEventWithResponse.isFailure) {
- infosetEventWithResponse.failed.get
- } else {
- infosetEventWithResponse.get.causeError.get
+ val nextIndex = currentIndex + 1
+ if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+ && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+ // if we have room left on the batchedInfosetEvents array and the
current element != EndDocument
+ currentIndex += 1
+ // if the new currentElement has any existing content, clear it
+ if (!batchedInfosetEvents(currentIndex).isEmpty)
batchedInfosetEvents(currentIndex).clear()
Review comment:
I think this shouldn't be necessary since the the InfosetInputter is
already clearing everything? Perhaps this should just be
``Assert(batchedInfosetEvents(currentIndex).isEmpty)`` if we want to ensure
that the ContentHandler is doing the right hthing?
##########
File path:
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
##########
@@ -212,31 +224,39 @@ class DaffodilUnparseContentHandler(
}
private def sendToInputter(): Unit = {
- val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
- infosetEvent.clear()
- // if event is wrapped in a Try failure, we will not have an
unparseResult, so we only set
- // unparseResults for events wrapped in Try Success, including those
events that have expected
- // errors
- if (infosetEventWithResponse.isSuccess &&
infosetEventWithResponse.get.unparseResult.isDefined) {
- unparseResult = infosetEventWithResponse.get.unparseResult.get
- }
- // the exception from events wrapped in Try failures and events wrapped in
Try Successes
- // (with an unparse error state i.e unparseResult.isError) are collected
and thrown to stop
- // the execution of the xmlReader
- if (infosetEventWithResponse.isFailure ||
infosetEventWithResponse.get.isError) {
- val causeError = if(infosetEventWithResponse.isFailure) {
- infosetEventWithResponse.failed.get
- } else {
- infosetEventWithResponse.get.causeError.get
+ val nextIndex = currentIndex + 1
+ if (nextIndex < SAX_UNPARSE_EVENT_BATCH_SIZE
+ && batchedInfosetEvents(currentIndex).eventType.get != EndDocument) {
+ // if we have room left on the batchedInfosetEvents array and the
current element != EndDocument
+ currentIndex += 1
+ // if the new currentElement has any existing content, clear it
+ if (!batchedInfosetEvents(currentIndex).isEmpty)
batchedInfosetEvents(currentIndex).clear()
+ } else {
+ // ready to send it off
+ val infosetEventWithResponse = this.resume(inputter,
batchedInfosetEvents).head
+ // we only ever return a one element array
+
Review comment:
I feel like something is missing somewhere after this.
At this point, the InfosetInputter has consumed all of the events in the the
array, it called hasNext, and we are back at the ContentHandler here to get
more events. That means the InfoseInputter thinks the current event is the last
one in the batch array. When the ContentHander eventually yeilds back to the
InfosetInputter, the InfosetInputter will expect that the event it was looking
at to now be at the first index in the array (that's why it resets currentIndex
back to zero). So I would expect this to copy that last event from the last
index to the first index. And in fact, I think the last current event is the
infosetEventWIthRepponse, right? So I think you can do something like:
```scala
if (infosetEventWithResponse.unparseResult.isEmpty) {
copyEvent(infosetEventWithResponse, batchedInfosetEvents(0))
}
```
Note that I think this technically doesn't matter because anytime we call
hasNext we immediately call next(), so the infoset inputter will never actually
access this zeroth event after hasNext is called. However, someday we might,
and so we want to ensure things work.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]