This is an automated email from the ASF dual-hosted git repository.
olabusayo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-daffodil.git
The following commit(s) were added to refs/heads/master by this push:
new c38a551 Add SAX Unparse Event Batching
c38a551 is described below
commit c38a551f353aed53479c60d38c4929743f82daf7
Author: olabusayoT <[email protected]>
AuthorDate: Fri Nov 6 17:04:55 2020 -0500
Add SAX Unparse Event Batching
- add saxUnparseEventBatchSize tunable with default 100
- update coroutine to not expect a generic type wrapped in a Try, but any
generic event; the implementation can then pass in whatever they wish
- Add tests for tunables and batching tests
DAFFODIL-2383
---
.../processor/TestSAXParseUnparseAPI.scala | 7 +-
.../daffodil/processor/TestSAXUnparseAPI.scala | 24 +++
.../org/apache/daffodil/util/Coroutines.scala | 26 ++--
.../resources/org/apache/daffodil/xsd/dafext.xsd | 12 ++
.../apache/daffodil/api/DFDLParserUnparser.scala | 17 ++-
.../daffodil/infoset/SAXInfosetInputter.scala | 125 ++++++++-------
.../processors/DaffodilUnparseContentHandler.scala | 170 ++++++++++++++-------
.../section00/general/testUnparserSAX.tdml | 157 +++++++++++++++++++
.../section00/general/TestUnparserSAX.scala | 39 +++++
9 files changed, 449 insertions(+), 128 deletions(-)
diff --git
a/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
b/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
index 4126cf2..388fc5f 100644
---
a/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
+++
b/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXParseUnparseAPI.scala
@@ -53,10 +53,11 @@ object TestSAXParseUnparseAPI {
val testInfosetString: String = testInfoset.toString()
val testData = "910"
- lazy val dp: DataProcessor = testDataprocessor(testSchema)
+ lazy val dp: DataProcessor = testDataProcessor(testSchema)
+
+ def testDataProcessor(testSchema: scala.xml.Elem, tunablesArg: Map[String,
String] = Map.empty): DataProcessor = {
+ val schemaCompiler = Compiler().withTunables(tunablesArg)
- def testDataprocessor(testSchema: scala.xml.Elem): DataProcessor = {
- val schemaCompiler = Compiler()
val pf = schemaCompiler.compileNode(testSchema)
if (pf.isError) {
val msgs = pf.getDiagnostics.map { _.getMessage() }.mkString("\n")
diff --git
a/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
b/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
index 198fd51..de6f263 100644
---
a/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
+++
b/daffodil-core/src/test/scala/org/apache/daffodil/processor/TestSAXUnparseAPI.scala
@@ -20,7 +20,10 @@ package org.apache.daffodil.processor
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
+import scala.xml.SAXException
+
import javax.xml.parsers.SAXParserFactory
+import org.apache.daffodil.Implicits.intercept
import org.apache.daffodil.xml.XMLUtils
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
@@ -46,6 +49,27 @@ class TestSAXUnparseAPI {
assertEquals(testData, bao.toString)
}
+ /**
+ * Test the case when a user supplies 0 as the batch size. Minimum batchsize
must be 1
+ */
+ @Test def testUnparseContentHandler_unparse_saxUnparseEventBatchSize_0():
Unit = {
+ val dpT = testDataProcessor(testSchema, Map("saxUnparseEventBatchSize" ->
"0"))
+ val xmlReader: XMLReader =
SAXParserFactory.newInstance.newSAXParser.getXMLReader
+ val bao = new ByteArrayOutputStream()
+ val wbc = java.nio.channels.Channels.newChannel(bao)
+ val unparseContentHandler = dpT.newContentHandlerInstance(wbc)
+ xmlReader.setContentHandler(unparseContentHandler)
+ xmlReader.setFeature(XMLUtils.SAX_NAMESPACES_FEATURE, true)
+ xmlReader.setFeature(XMLUtils.SAX_NAMESPACE_PREFIXES_FEATURE, true)
+ val bai = new ByteArrayInputStream(testInfosetString.getBytes)
+ val e = intercept[SAXException] {
+ xmlReader.parse(new InputSource(bai))
+ }
+ val eMsg = e.getMessage
+ assertTrue(eMsg.contains("invalid saxUnparseEventBatchSize"))
+ assertTrue(eMsg.contains("minimum value is 1"))
+ }
+
@Test def testUnparseContentHandler_unparse_namespace_feature(): Unit = {
val xmlReader: XMLReader =
SAXParserFactory.newInstance.newSAXParser.getXMLReader
val bao = new ByteArrayOutputStream()
diff --git
a/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
b/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
index b652915..4b06b8f 100644
--- a/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
+++ b/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
@@ -34,11 +34,16 @@
*
* Definition of Coroutine - separate stacks, but NO CONCURRENCY. Only one
* of a set of coroutines is running at any given time.
+ *
+ * The queueCapacity being set to 1 ensures the NO CONCURRENCY property of
this trait, so to reduce
+ * the context-switching overhead is implementation specific, and can be done
by passing in a
+ * larger data structure containing multiple event for the Coroutine generic
Type, rather than
+ * enlarging the queue size
*/
trait Coroutine[T] {
private val queueCapacity: Int = 1
- private val inboundQueue = new ArrayBlockingQueue[Try[T]](queueCapacity)
+ private val inboundQueue = new ArrayBlockingQueue[T](queueCapacity)
private val self = this
@@ -69,25 +74,25 @@
* and then terminates. The coroutine calling this must return from the
run()
* method immediately after calling this.
*/
- final def resumeFinal(coroutine: Coroutine[T], in: Try[T]): Unit = {
+ final def resumeFinal(coroutine: Coroutine[T], in: T): Unit = {
coroutine.init()
- coroutine.inboundQueue.put(in) // allows other to run final
+ coroutine.inboundQueue.put(in) // allows other to run final
}
/**
- * Call when one co-routine wants to resume another, tranmitting a
+ * Call when one co-routine wants to resume another, transmitting a
* argument value to it.
*
* The current co-routine will be suspended until it is resumed later.
*/
- final def resume(coroutine: Coroutine[T], in: Try[T]): Try[T] = {
+ final def resume(coroutine: Coroutine[T], in: T): T = {
resumeFinal(coroutine, in)
val res = waitForResume() // blocks until it is resumed
res
}
- final def waitForResume(): Try[T] = {
- inboundQueue.take
+ final def waitForResume(): T = {
+ inboundQueue.take()
}
protected def run(): Unit
@@ -113,7 +118,7 @@
*
https://scalaenthusiast.wordpress.com/2013/06/12/transform-a-callback-function-to-an-iteratorlist-in-scala/
*/
- final class InvertControl[S](body: => Unit) extends Iterator[S] with
Coroutine[S] {
+ final class InvertControl[S](body: => Unit) extends Iterator[S] with
Coroutine[Try[S]] {
private object EndMarker extends Throwable
private val EndOfData = Failure(EndMarker)
@@ -127,7 +132,7 @@
* After the last value is produced, the consumer is resumed with EndOfData
* and the producer terminates.
*/
- class Producer(val consumer: Coroutine[S]) extends Coroutine[S] {
+ class Producer(val consumer: Coroutine[Try[S]]) extends Coroutine[Try[S]] {
override final def run(): Unit = {
try {
waitForResume()
@@ -171,8 +176,7 @@
private lazy val iterator = gen.toIterator
override def hasNext: Boolean = {
- if (failed) false
- else iterator.hasNext
+ !failed && iterator.hasNext
}
override def next(): S = {
if (failed) throw new IllegalStateException()
diff --git
a/daffodil-propgen/src/main/resources/org/apache/daffodil/xsd/dafext.xsd
b/daffodil-propgen/src/main/resources/org/apache/daffodil/xsd/dafext.xsd
index f7540c7..6a7548c 100644
--- a/daffodil-propgen/src/main/resources/org/apache/daffodil/xsd/dafext.xsd
+++ b/daffodil-propgen/src/main/resources/org/apache/daffodil/xsd/dafext.xsd
@@ -390,6 +390,18 @@
</xs:documentation>
</xs:annotation>
</xs:element>
+ <xs:element name="saxUnparseEventBatchSize" type="xs:int"
default="100" minOccurs="0">
+ <xs:annotation>
+ <xs:documentation>
+ Daffodil's SAX Unparse API allows events to be batched in memory
to minimize the
+ frequency of context switching between the SAXInfosetInputter
thread that processes
+ the events, and the DaffodilUnparseContentHandler thread that
generates the events.
+ Setting this value to a low number will increase the frequency
of context switching,
+ but will reduce the memory footprint. Swtting it to a high
number will decrease the
+ frequency of context switching, but increase the memory
footprint.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
<xs:element name="suppressSchemaDefinitionWarnings"
type="daf:TunableSuppressSchemaDefinitionWarnings"
default="emptyElementParsePolicyError" minOccurs="0">
<xs:annotation>
<xs:documentation>
diff --git
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
index a2b03a3..b0f27ca 100644
---
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
+++
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
@@ -269,14 +269,27 @@ object DFDL {
}
}
- trait ProducerCoroutine extends Coroutine[SAXInfosetEvent] {
+ object SAXInfosetEvent {
+ def copyEvent(source: DFDL.SAXInfosetEvent, dest: DFDL.SAXInfosetEvent):
Unit = {
+ if (source == null) dest.clear()
+ else {
+ dest.eventType = source.eventType
+ dest.namespaceURI = source.namespaceURI
+ dest.localName = source.localName
+ dest.nilValue = source.nilValue
+ dest.simpleText = source.simpleText
+ }
+ }
+ }
+
+ trait ProducerCoroutine extends Coroutine[Array[SAXInfosetEvent]] {
override def isMain = true
override protected def run(): Unit = {
throw new Error("Main thread co-routine run method should not be
called.")
}
}
- trait ConsumerCoroutine extends Coroutine[SAXInfosetEvent]
+ trait ConsumerCoroutine extends Coroutine[Array[SAXInfosetEvent]]
trait ParseResult extends Result with WithDiagnostics {
def resultState: State
diff --git
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
index 433e474..da3f45d 100644
---
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
+++
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala
@@ -20,11 +20,10 @@ package org.apache.daffodil.infoset
import java.net.URI
import java.net.URISyntaxException
-import scala.util.Try
-
import org.apache.daffodil.api.DFDL
import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
+import org.apache.daffodil.api.DFDL.SAXInfosetEvent
import org.apache.daffodil.dpath.NodeInfo
import org.apache.daffodil.exceptions.Assert
import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
@@ -35,25 +34,30 @@ import org.apache.daffodil.xml.XMLUtils
/**
* The SAXInfosetInputter consumes SAXInfosetEvent objects from the
DaffodilUnparseContentHandler
- * class and converts them to events that the DataProcessor unparse can use.
This class contains two
- * SAXInfosetEvent objects, the current event the unparse method is processing
and the next event
- * to be processed later.
+ * class and converts them to events that the DataProcessor unparse can use.
This class contains an
+ * array of batched SAXInfosetEvent objects that it receives from the
contentHandler and the index
+ * of the current element being processed.
*
- * This class together with the DaffodilUnparseContentHandler use coroutines
to ensure that only one event,
- * at a time, is passed between the two classes. The following is the general
process:
+ * This class, together with the SAXInfosetInputter, uses coroutines to ensure
that a batch of events
+ * (based on the tunable saxUnparseEventBatchSize) can be passed from the
former to the latter.
+ * The following is the general process:
*
- * - the run method is called, with a StartDocument event already loaded on
the inputter's queue.
- * This is collected and stored in the currentEvent member
+ * - the run method is called, with the first batch of events, starting with
the StartDocument event,
+ * already loaded on the inputter's queue.
+ * This is collected and stored in the batchedInfosetEvents member, and the
currentIndex is set to 0
* - The dp.unparse method is called, and it calls hasNext to make sure an
event exists to be
- * processed and then queries the currentEvent. The hasNext call also queues
the nextEvent by
- * transferring control to the contentHandler so it can load the next event.
- * - After it is done with the currentEvent, it calls inputter.next to get the
next event, and that
- * copies the queued nextEvent into the currentEvent
- * - This process continues until the currentEvent contains an EndDocument
event, at which point, the
- * nextEvent is cleared, endDocumentReceived is set to true and hasNext will
return false
- * - This ends the unparse process, and the unparseResult and/or any Errors
are set on the event. We
- * call resumeFinal passing along that element, terminating this thread and
resuming the
- * contentHandler for the last time.
+ * processed and then queries the event at currentIndex. The hasNext call also
checks that there is
+ * a next event to be processed (currentIndex+1), and if not, queues the next
batch of events by
+ * transferring control to the contentHandler so it can load them.
+ * - After it is done with the current event, it calls inputter.next to get
the next event, and that
+ * increments the currentIndex and cleans out the event at the previous index
+ * - This process continues until the event at currentIndex either contains an
EndDocument event or
+ * the currentIndex is the last in the batch. If it is the former, the
endDocumentReceived flag is
+ * set to true and hasNext will return false. If it is the latter, the next
batch of events will be
+ * queued by transferring control to the contentHandler so it can load them.
+ * - This ends the unparse process, and the unparseResult and/or any Errors
are set on a single element
+ * array containing response events. We call resumeFinal passing along that
array, terminating this
+ * thread and resuming the contentHandler for the last time.
*
* @param unparseContentHandler producer coroutine that sends the
SAXInfosetEvent to this class
* @param dp dataprocessor that we use to kickstart the unparse process and
that consumes the
@@ -76,18 +80,19 @@ class SAXInfosetInputter(
private var resolveRelativeInfosetBlobURIs: Boolean = false
private var endDocumentReceived = false
- private lazy val currentEvent: DFDL.SAXInfosetEvent = new
DFDL.SAXInfosetEvent
- private lazy val nextEvent: DFDL.SAXInfosetEvent = new DFDL.SAXInfosetEvent
+ private var currentIndex: Int = 0
+ private var batchedInfosetEvents: Array[SAXInfosetEvent] = _
+ private lazy val returnedInfosetEvent: Array[SAXInfosetEvent] = new
Array[SAXInfosetEvent](1)
- override def getEventType(): InfosetInputterEventType =
currentEvent.eventType.orNull
+ override def getEventType(): InfosetInputterEventType =
batchedInfosetEvents(currentIndex).eventType.orNull
- override def getLocalName(): String = currentEvent.localName.orNull
+ override def getLocalName(): String =
batchedInfosetEvents(currentIndex).localName.orNull
- override def getNamespaceURI(): String = currentEvent.namespaceURI.orNull
+ override def getNamespaceURI(): String =
batchedInfosetEvents(currentIndex).namespaceURI.orNull
override def getSimpleText(primType: NodeInfo.Kind): String = {
- val res = if (currentEvent.simpleText.isDefined) {
- currentEvent.simpleText.get
+ val res = if (batchedInfosetEvents(currentIndex).simpleText.isDefined) {
+ batchedInfosetEvents(currentIndex).simpleText.get
} else {
throw new NonTextFoundInSimpleContentException(getLocalName())
}
@@ -104,8 +109,8 @@ class SAXInfosetInputter(
}
override def isNilled(): MaybeBoolean = {
- val _isNilled = if (currentEvent.nilValue.isDefined) {
- val nilValue = currentEvent.nilValue.get
+ val _isNilled = if (batchedInfosetEvents(currentIndex).nilValue.isDefined)
{
+ val nilValue = batchedInfosetEvents(currentIndex).nilValue.get
if (nilValue == "true" || nilValue == "1") {
MaybeBoolean(true)
} else if (nilValue == "false" || nilValue == "0") {
@@ -121,38 +126,45 @@ class SAXInfosetInputter(
}
override def hasNext(): Boolean = {
- if (endDocumentReceived) false
- else if (!nextEvent.isEmpty) true
- else {
- val event = this.resume(unparseContentHandler, Try(currentEvent))
- copyEvent(source = event, dest = nextEvent)
+ val nextIndex = currentIndex + 1
+ if (endDocumentReceived) {
+ // if the current Element is EndDocument, then there is no valid next
+ false
+ } else if (batchedInfosetEvents != null && nextIndex <
batchedInfosetEvents.length) {
+ // if we have not yet reached the end of the array and endDocument has
not yet been received
+ true
+ } else {
+ // there is no nextEvent or it was empty, but we still have no
EndDocument. So load the next
+ // batch from the contentHandler
+ returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+ batchedInfosetEvents = this.resume(unparseContentHandler,
returnedInfosetEvent)
+ // we reset the index to 0 to guarantee that the last element we were
looking at when hasNext
+ // was called is still the event we'll be looking at, when we leave this
function. This is
+ // guaranteed because the DaffodilUnparseContentHandler moves the last
element into the first
+ // index when it resumed.
+ currentIndex = 0
true
}
}
override def next(): Unit = {
if (hasNext()) {
- copyEvent(source = Try(nextEvent), dest = currentEvent)
- nextEvent.clear()
- if (currentEvent.eventType.contains(EndDocument)) endDocumentReceived =
true
+ // clear element at current index as we're done with it, except in the
case we just loaded the
+ // new elements, then do nothing
+ batchedInfosetEvents(currentIndex).clear()
+
+ // increment current index to the next index
+ currentIndex += 1
+
+ // check if new current index is EndDocument
+ if (batchedInfosetEvents(currentIndex).eventType.contains(EndDocument)) {
+ endDocumentReceived = true
+ }
} else {
- // we should never call next() if hasNext() is false
Assert.abort()
}
}
- def copyEvent(source: Try[DFDL.SAXInfosetEvent], dest:
DFDL.SAXInfosetEvent): Unit= {
- if (source.isFailure) dest.clear()
- else {
- val src = source.get
- dest.eventType = src.eventType
- dest.namespaceURI = src.namespaceURI
- dest.localName = src.localName
- dest.nilValue = src.nilValue
- dest.simpleText = src.simpleText
- }
- }
-
def enableResolutionOfRelativeInfosetBlobURIs(): Unit =
resolveRelativeInfosetBlobURIs = true
/**
@@ -183,22 +195,23 @@ class SAXInfosetInputter(
override protected def run(): Unit = {
try {
- // startDocument kicks off this entire process, so it should be on the
queue so the
- // waitForResume call can grab it. That is set to our current event, so
when hasNext is called
- // the nextEvent after the StartDocument can be queued
- copyEvent(source = this.waitForResume(), dest = currentEvent)
+ // startDocument kicks off this entire process, so the first batch of
events, of which
+ // startDocument is first, should be on the queue so the waitForResume
call can grab it.
+ // This populates the inputter.batchedInfosetEvents var for use by the
rest of the Inputter
+ batchedInfosetEvents = this.waitForResume()
val unparseResult = dp.unparse(this, output)
- currentEvent.unparseResult = One(unparseResult)
+ batchedInfosetEvents(currentIndex).unparseResult = One(unparseResult)
if (unparseResult.isError) {
// unparseError is contained within unparseResult
- currentEvent.causeError = One(new
DaffodilUnparseErrorSAXException(unparseResult))
+ batchedInfosetEvents(currentIndex).causeError = One(new
DaffodilUnparseErrorSAXException(unparseResult))
}
} catch {
case e: Exception => {
- currentEvent.causeError = One(new
DaffodilUnhandledSAXException(e.getMessage, e))
+ batchedInfosetEvents(currentIndex).causeError = One(new
DaffodilUnhandledSAXException(e.getMessage, e))
}
} finally {
- this.resumeFinal(unparseContentHandler, Try(currentEvent))
+ returnedInfosetEvent(0) = batchedInfosetEvents(currentIndex)
+ this.resumeFinal(unparseContentHandler, returnedInfosetEvent)
}
}
}
diff --git
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
index 88b3554..b0257d3 100644
---
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
+++
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala
@@ -17,13 +17,14 @@
package org.apache.daffodil.processors
-import scala.util.Try
import scala.xml.NamespaceBinding
import javax.xml.XMLConstants
import org.apache.daffodil.api.DFDL
import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
+import org.apache.daffodil.api.DFDL.SAXInfosetEvent
+import org.apache.daffodil.exceptions.Assert
import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
@@ -39,26 +40,29 @@ import org.xml.sax.Locator
/**
* DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the
SAXInfosetInputter to
- * consume and convert to an event that the Dataprocessor unparse can use. The
SAXInfosetEvent object
+ * consume and convert to events that the Dataprocessor unparse can use. The
SAXInfosetEvent object
* is built from information that is passed to the ContentHandler from an
XMLReader parser. In
* order to receive the uri and prefix information from the XMLReader, the
XMLReader must have
* support for XML Namespaces
*
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure
that only one event,
- * at a time, is passed between the two classes. The following is the general
process:
+ * This class, together with the SAXInfosetInputter, uses coroutines to ensure
that a batch of events
+ * (based on the tunable saxUnparseEventBatchSize) can be passed from the
former to the latter.
+ * The following is the general process:
*
* - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first
SAXInfosetEvent that is sent to
- * the SAXInfosetInputter. That event is put on the inputter's queue, this
thread is paused, and
- * that inputter's thread is run
- * - when the SAXInfosetInputter is done processing an event and is ready for
a new event, it
- * sends the completed event via the coroutine system, and loads it on the
contentHandler's
- * queue, which restarts this thread and pauses that one. In the expected
case, the events will
- * contain no new information, until the unparse is completed, otherwise it
will contain error
- * information
- * - this process continues until the EndDocument method is called. Once that
SAXInfosetEvent is
- * sent to the inputter, it signals the end of events coming from the
contentHandler. This
- * ends the unparseProcess and returns the event with the unparseResult and/or
any error
+ * - this class receives a StartDocument call, which is the first
SAXInfosetEvent that should be
+ * sent to the SAXInfosetInputter. That event is put onto an array of
SAXInfosetEvents of size the
+ * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the
inputter's queue,
+ * this thread is paused, and that inputter's thread is run
+ * - when the SAXInfosetInputter is done processing that batch and is ready
for a new batch, it
+ * sends a 1 element array with the last completed event via the coroutine
system, which loads it on
+ * the contentHandler's queue, which restarts this thread and pauses that one.
In the expected case,
+ * the single element array will contain no new information until the unparse
complete. In the case of
+ * an unexpected error though, it will contain error information
+ * - this process continues until the EndDocument SAXInfosetEvent is loaded
into the batch.
+ * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it
signals the end of batched
+ * events coming from the contentHandler. This ends the unparseProcess and
returns the event with
+ * the unparseResult and/or any error
* information
*
* @param dp dataprocessor object that will be used to call the parse
@@ -70,10 +74,48 @@ class DaffodilUnparseContentHandler(
extends DFDL.DaffodilUnparseContentHandler {
private lazy val inputter = new SAXInfosetInputter(this, dp, output)
private var unparseResult: DFDL.UnparseResult = _
- private lazy val infosetEvent: DFDL.SAXInfosetEvent = new
DFDL.SAXInfosetEvent
private lazy val characterData = new StringBuilder
private var prefixMapping: NamespaceBinding = _
private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+ private lazy val tunablesBatchSize =
dp.getTunables().saxUnparseEventBatchSize
+
+ /**
+ * we always have an extra buffer in the array that we use for the
inputter.hasNext call. For each
+ * element, we need to know if it has a viable next, if it doesn't, it will
triggers the context
+ * switch to DaffodilUnparseContentHandler. So for example, if the user
provides 1 as the
+ * batchSize, under the hood we'll batch [event1, event2].
+ *
+ * - DataProcessor.unparse will call hasNext and getEventType for the
initialization call
+ * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty.
Since currentIndex is 0,
+ * it will return true since event2 exists.
+ * - getEventType (which signifies our processing step) is called for the
event at currentIndex
+ * - After the initialization step, subsequent calls will be a loop of
next(), ...some processing
+ * of the current event ..., and hasNext()
+ * - For our scenario, next() will clear out the contents at currentIndex,
increment the currentIndex,
+ * and our event2 will be processed, then hasNext will check if there is a
viable index 2, as
+ * there is not, it will perform the context switch so
DaffodilUnparseContentHandler can batch
+ * more events
+ * - DaffodilUnparseContentHandler copies the last event into the first so
the currentEvent stays
+ * the same for the inputter until it decides to change it so we end up with
[event2, event3]
+ * - When we context switch back to inputter.hasNext, it resets the
currentIndex to 0, and our loop
+ * begins again with a call to next
+ *
+ * Without us having the extra buffer, things would happen like this:
+ * user provides 1 as the batchSize, under the hood we'll have [event1]
batched.
+ *
+ * DataProcessor.unparse will call hasNext and getEventType for the
initialization call, and that
+ * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As
currentIndex is 0, and
+ * it is the maximum index, there is no index 1. It will context switch to
get a new batched event,
+ * which, would overwrite event1 before we get to process it.
+ */
+ private lazy val actualBatchSize = tunablesBatchSize + 1
+ private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
+ Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize;
minimum value is 1")
+ Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+ }
+ private var currentIndex: Int = 0
+
/**
* This is a flag that is set to true when startPrefixMapping is called.
When true, we make
* the assumption that we don't need to use the Attributes parameter from
startElement to get the
@@ -94,13 +136,13 @@ class DaffodilUnparseContentHandler(
}
override def startDocument(): Unit = {
- infosetEvent.eventType = One(StartDocument)
- sendToInputter()
+ batchedInfosetEvents(currentIndex).eventType = One(StartDocument)
+ maybeSendToInputter()
}
override def endDocument(): Unit = {
- infosetEvent.eventType = One(EndDocument)
- sendToInputter()
+ batchedInfosetEvents(currentIndex).eventType = One(EndDocument)
+ maybeSendToInputter()
}
override def startPrefixMapping(prefix: String, uri: String): Unit = {
@@ -164,14 +206,14 @@ class DaffodilUnparseContentHandler(
mapPrefixMappingFromAttributesImpl(atts)
}
- if (!infosetEvent.isEmpty && infosetEvent.localName.isDefined) {
+ if (!batchedInfosetEvents(currentIndex).isEmpty &&
batchedInfosetEvents(currentIndex).localName.isDefined) {
// we started another element while we were in the process of building a
startElement
// this means the first element was complex and we are ready for the
inputter queue
- sendToInputter()
+ maybeSendToInputter()
}
// use Attributes to determine xsi:nil value
val nilIn = atts.getIndex(XMLConstants.W3C_XML_SCHEMA_INSTANCE_NS_URI,
"nil")
- infosetEvent.nilValue = if (nilIn >= 0) {
+ batchedInfosetEvents(currentIndex).nilValue = if (nilIn >= 0) {
val nilValue = atts.getValue(nilIn)
One(nilValue)
} else {
@@ -181,62 +223,78 @@ class DaffodilUnparseContentHandler(
// set localName and namespaceURI
setLocalNameAndNamespaceUri(uri, localName, qName)
- infosetEvent.eventType = One(StartElement)
+ batchedInfosetEvents(currentIndex).eventType = One(StartElement)
}
override def endElement(uri: String, localName: String, qName: String): Unit
= {
// if infosetEvent is a startElement, send that first
- if (infosetEvent.eventType.contains(StartElement)) {
+ if (batchedInfosetEvents(currentIndex).eventType.contains(StartElement)) {
// any characterData that exists at this point is valid data as padding
data has been
// taken care of in startElement
val maybeNewStr = One(characterData.toString)
- infosetEvent.simpleText = maybeNewStr
+ batchedInfosetEvents(currentIndex).simpleText = maybeNewStr
characterData.setLength(0)
- sendToInputter()
+ maybeSendToInputter()
}
+ batchedInfosetEvents(currentIndex).eventType = One(EndElement)
+
// set localName and namespaceURI
setLocalNameAndNamespaceUri(uri, localName, qName)
- infosetEvent.eventType = One(EndElement)
-
if (!contentHandlerPrefixMappingUsed) {
// always pops
prefixMapping = prefixMappingTrackingStack.pop
}
- sendToInputter()
+ maybeSendToInputter()
}
override def characters(ch: Array[Char], start: Int, length: Int): Unit = {
characterData.appendAll(ch, start, length)
}
- private def sendToInputter(): Unit = {
- val infosetEventWithResponse = this.resume(inputter, Try(infosetEvent))
- infosetEvent.clear()
- // if event is wrapped in a Try failure, we will not have an
unparseResult, so we only set
- // unparseResults for events wrapped in Try Success, including those
events that have expected
- // errors
- if (infosetEventWithResponse.isSuccess &&
infosetEventWithResponse.get.unparseResult.isDefined) {
- unparseResult = infosetEventWithResponse.get.unparseResult.get
- }
- // the exception from events wrapped in Try failures and events wrapped in
Try Successes
- // (with an unparse error state i.e unparseResult.isError) are collected
and thrown to stop
- // the execution of the xmlReader
- if (infosetEventWithResponse.isFailure ||
infosetEventWithResponse.get.isError) {
- val causeError = if(infosetEventWithResponse.isFailure) {
- infosetEventWithResponse.failed.get
- } else {
- infosetEventWithResponse.get.causeError.get
+ /**
+ * we only context swtich to the InfosetInputter if batchedInfosetEvents is
full or we hit an
+ * EndDocument event
+ */
+ private def maybeSendToInputter(): Unit = {
+ val nextIndex = currentIndex + 1
+ if (nextIndex < actualBatchSize &&
+ !batchedInfosetEvents(currentIndex).eventType.contains(EndDocument)) {
+ // if we have room left on the batchedInfosetEvents array and the
current element != EndDocument
+ currentIndex += 1
+ // at this point where we're loading the contents of the array, it
should have been cleared
+ // mostly by the InfosetInputter, and the last element by us.
+ Assert.invariant(batchedInfosetEvents(currentIndex).isEmpty)
+ } else {
+ // ready to send it off
+ val infosetEventWithResponse = this.resume(inputter,
batchedInfosetEvents).head
+ // we only ever return a one element array
+
+ // it is possible for unparseResult to be null, in the case of an
DaffodilUnhandledSAXException
+ if (infosetEventWithResponse.unparseResult.isDefined) {
+ unparseResult = infosetEventWithResponse.unparseResult.get
}
- causeError match {
- case unparseError: DaffodilUnparseErrorSAXException =>
- // although this is an expected error, we need to throw it so we can
stop the xmlReader
- // parse and this thread
- throw unparseError
- case unhandled: DaffodilUnhandledSAXException => throw unhandled
- case unknown => throw new DaffodilUnhandledSAXException("Unknown
exception: ", new Exception(unknown))
+ // any exception is collected and thrown to stop the execution of the
xmlReader
+ if (infosetEventWithResponse.isError) {
+ val causeError = infosetEventWithResponse.causeError.get
+ causeError match {
+ case unparseError: DaffodilUnparseErrorSAXException =>
+ // although this is an expected error, we need to throw it so we
can stop the xmlReader
+ // parse and this thread
+ throw unparseError
+ case unhandled: DaffodilUnhandledSAXException => throw unhandled
+ case unknown => throw new DaffodilUnhandledSAXException("Unknown
exception: ",
+ new Exception(unknown))
+ }
}
+ // copy the last element into the first for use by inputter becuase that
last element was
+ // its current element when we did the context switch. When done clear
the last element,
+ // since the infosetinputter clears all elements except the last one,
then set the index to
+ // 1 so we can start to load elements starting at the second element
+ SAXInfosetEvent.copyEvent(batchedInfosetEvents(currentIndex),
batchedInfosetEvents(0))
+ batchedInfosetEvents(currentIndex).clear()
+ currentIndex = 1
}
}
@@ -276,8 +334,8 @@ class DaffodilUnparseContentHandler(
Nope
}
- infosetEvent.localName = maybelocalName
- infosetEvent.namespaceURI = maybeNamespaceURI
+ batchedInfosetEvents(currentIndex).localName = maybelocalName
+ batchedInfosetEvents(currentIndex).namespaceURI = maybeNamespaceURI
}
override def ignorableWhitespace(ch: Array[Char], start: Int, length: Int):
Unit = {
diff --git
a/daffodil-test/src/test/resources/org/apache/daffodil/section00/general/testUnparserSAX.tdml
b/daffodil-test/src/test/resources/org/apache/daffodil/section00/general/testUnparserSAX.tdml
new file mode 100644
index 0000000..05b481c
--- /dev/null
+++
b/daffodil-test/src/test/resources/org/apache/daffodil/section00/general/testUnparserSAX.tdml
@@ -0,0 +1,157 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<tdml:testSuite xmlns:tdml="http://www.ibm.com/xmlns/dfdl/testData"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dfdl="http://www.ogf.org/dfdl/dfdl-1.0/"
+ xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:ex="http://example.com"
+ xmlns:daf="urn:ogf:dfdl:2013:imp:daffodil.apache.org:2018:ext"
+ suiteName="saxUnparserTests">
+
+ <tdml:defineSchema name="saxUnparseSchema.embedded.dfdl.xsd">
+ <xs:include
schemaLocation="org/apache/daffodil/xsd/DFDLGeneralFormat.dfdl.xsd" />
+ <dfdl:format ref="ex:GeneralFormat" />
+
+ <xs:element name="record" type="ex:itemType"/>
+ <xs:complexType name="itemType">
+ <xs:sequence>
+ <xs:element name="item" type="xs:string" dfdl:lengthKind="explicit"
dfdl:length="1" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ </tdml:defineSchema>
+
+ <tdml:defineConfig name="cfg_saxUnparseEventBatchSize_1">
+ <daf:tunables xmlns="http://www.w3.org/2001/XMLSchema"
+ xmlns:xs="http://www.w3.org/2001/XMLSchema">
+ <daf:saxUnparseEventBatchSize>1</daf:saxUnparseEventBatchSize>
+ </daf:tunables>
+ </tdml:defineConfig>
+
+ <tdml:defineConfig name="cfg_saxUnparseEventBatchSize_5">
+ <daf:tunables xmlns="http://www.w3.org/2001/XMLSchema"
+ xmlns:xs="http://www.w3.org/2001/XMLSchema">
+ <daf:saxUnparseEventBatchSize>5</daf:saxUnparseEventBatchSize>
+ </daf:tunables>
+ </tdml:defineConfig>
+
+ <tdml:defineConfig name="cfg_saxUnparseEventBatchSize_1000">
+ <daf:tunables xmlns="http://www.w3.org/2001/XMLSchema"
+ xmlns:xs="http://www.w3.org/2001/XMLSchema">
+ <daf:saxUnparseEventBatchSize>1000</daf:saxUnparseEventBatchSize>
+ </daf:tunables>
+ </tdml:defineConfig>
+
+ <tdml:unparserTestCase name="test_saxUnparseBatchSize_1" root="record"
+ model="saxUnparseSchema.embedded.dfdl.xsd"
+ roundTrip="true"
+ config="cfg_saxUnparseEventBatchSize_1">
+
+ <tdml:infoset>
+ <tdml:dfdlInfoset>
+ <record xmlns="http://example.com">
+ <item>H</item>
+ <item>e</item>
+ <item>l</item>
+ <item>l</item>
+ <item>o</item>
+ <item>!</item>
+ <item>-</item>
+ <item>W</item>
+ <item>o</item>
+ <item>r</item>
+ <item>l</item>
+ <item>d</item>
+ <item>.</item>
+ <item>1</item>
+ <item>2</item>
+ <item>3</item>
+ </record>
+ </tdml:dfdlInfoset>
+ </tdml:infoset>
+
+
+ <tdml:document>Hello!-World.123</tdml:document>
+
+ </tdml:unparserTestCase>
+
+ <tdml:unparserTestCase name="test_saxUnparseBatchSize_5" root="record"
+ model="saxUnparseSchema.embedded.dfdl.xsd"
+ roundTrip="true"
+ config="cfg_saxUnparseEventBatchSize_5">
+
+ <tdml:infoset>
+ <tdml:dfdlInfoset>
+ <record xmlns="http://example.com">
+ <item>H</item>
+ <item>e</item>
+ <item>l</item>
+ <item>l</item>
+ <item>o</item>
+ <item>!</item>
+ <item>-</item>
+ <item>W</item>
+ <item>o</item>
+ <item>r</item>
+ <item>l</item>
+ <item>d</item>
+ <item>.</item>
+ <item>1</item>
+ <item>2</item>
+ <item>3</item>
+ </record>
+ </tdml:dfdlInfoset>
+ </tdml:infoset>
+
+
+ <tdml:document>Hello!-World.123</tdml:document>
+
+ </tdml:unparserTestCase>
+
+ <tdml:unparserTestCase name="test_saxUnparseBatchSize_1000" root="record"
+ model="saxUnparseSchema.embedded.dfdl.xsd"
+ roundTrip="true"
+ config="cfg_saxUnparseEventBatchSize_1000">
+
+ <tdml:infoset>
+ <tdml:dfdlInfoset>
+ <record xmlns="http://example.com">
+ <item>H</item>
+ <item>e</item>
+ <item>l</item>
+ <item>l</item>
+ <item>o</item>
+ <item>!</item>
+ <item>-</item>
+ <item>W</item>
+ <item>o</item>
+ <item>r</item>
+ <item>l</item>
+ <item>d</item>
+ <item>.</item>
+ <item>1</item>
+ <item>2</item>
+ <item>3</item>
+ </record>
+ </tdml:dfdlInfoset>
+ </tdml:infoset>
+
+ <tdml:document>Hello!-World.123</tdml:document>
+
+ </tdml:unparserTestCase>
+</tdml:testSuite>
+
diff --git
a/daffodil-test/src/test/scala/org/apache/daffodil/section00/general/TestUnparserSAX.scala
b/daffodil-test/src/test/scala/org/apache/daffodil/section00/general/TestUnparserSAX.scala
new file mode 100644
index 0000000..0934ccd
--- /dev/null
+++
b/daffodil-test/src/test/scala/org/apache/daffodil/section00/general/TestUnparserSAX.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.section00.general
+
+import org.apache.daffodil.tdml.Runner
+import org.junit.AfterClass
+import org.junit.Test
+
+object TestUnparserSAX {
+ val testDir = "/org/apache/daffodil/section00/general/"
+ val runner2 = Runner(testDir, "testUnparserSAX.tdml")
+
+ @AfterClass def shutDown: Unit = {
+ runner2.reset
+ }
+}
+
+class TestUnparserSAX {
+ import TestUnparserSAX._
+
+ @Test def test_saxUnparseBatchSize_1() = {
runner2.runOneTest("test_saxUnparseBatchSize_1") }
+ @Test def test_saxUnparseBatchSize_5() = {
runner2.runOneTest("test_saxUnparseBatchSize_5") }
+ @Test def test_saxUnparseBatchSize_1000() = {
runner2.runOneTest("test_saxUnparseBatchSize_1000") }
+}