tuxji commented on code in PR #879:
URL: https://github.com/apache/daffodil/pull/879#discussion_r1027144850


##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala:
##########
@@ -19,98 +19,88 @@ package org.apache.daffodil.infoset
 
 import java.net.URI
 import java.net.URISyntaxException
+import java.util.concurrent.ArrayBlockingQueue
 
-import org.apache.daffodil.api.DFDL
-import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
-import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.dpath.NodeInfo
 import org.apache.daffodil.exceptions.Assert
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
-import org.apache.daffodil.util.Maybe.One
+import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
+import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
+import org.apache.daffodil.util.Maybe
+import org.apache.daffodil.util.Maybe.Nope
 import org.apache.daffodil.util.MaybeBoolean
 import org.apache.daffodil.util.Misc
 import org.apache.daffodil.xml.XMLUtils
 
 /**
- * The SAXInfosetInputter consumes SAXInfosetEvent objects from the 
DaffodilUnparseContentHandler
- * class and converts them to events that the DataProcessor unparse can use. 
This class contains an
- * array of batched SAXInfosetEvent objects that it receives from the 
contentHandler and the index
- * of the current element being processed.
+ * The SAXInfosetInputter consumes SAXInfosetInputterEvent objects from the
+ * DaffodilUnparseContentHandler class and converts them to events that the
+ * DataProcessor unparse can use. When created, the 
DaffodilUnaprseContentHandler
+ * passes in a thread-safe array backed FIFO queue where events are added.
+ * However, there is really no special logic in this class regarding those
+ * evens. All the complexity of the received SAX events and dealing with thread
+ * and deadlocks is all encompassed in the DaffodilUnparseContentHandler--this
+ * just needs to take() the events from the queue as needed and provide the to

Review Comment:
   "the to" -> "them to", and please end the next line with a period.



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This

Review Comment:
   ArrayBlockigQueue -> ArrayBlockingQueue, SAXInfosetInputerEvents -> 
SAXInfosetInputterEvents



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala:
##########
@@ -192,26 +160,29 @@ class SAXInfosetInputter(
   override def fini(): Unit = {
     // do nothing
   }
+}
+
+class SAXInfosetInputterEvent() {
+  var eventType: Maybe[InfosetInputterEventType] = Nope
+  var localName: Maybe[String] = Nope
+  var namespaceURI: Maybe[String] = Nope
+  var nilValue: Maybe[String] = Nope
+  var simpleText: Maybe[String] = Nope
+  var mixedContent: Maybe[String] = Nope
+
+  def clear(): Unit = {
+    eventType = Nope
+    localName = Nope
+    namespaceURI = Nope
+    nilValue = Nope
+    simpleText = Nope
+    mixedContent = Nope
+  }

Review Comment:
   How does SAXInfosetInputterEvent ever get its fields initialized to other 
values besides Nope?  I've searched this pull request and I don't see a field 
assignment anywhere.  I also don't see any callers of the clear() method in the 
newly added green-colored text, only the removed red-colored text.  It's as if 
I'm not seeing all the changes I should be seeing.  Oh, the GitHub UI was 
hiding the DaffodilUnparseContentHandler.scala because the diff were so large.  
I had to make it display the diffs before I could find the field assignments 
and clear() calls I was looking for.



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the 
former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first 
SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of 
SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the 
inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready 
for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine 
system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. 
In the expected case,
- * the single element array will contain no new information until the unparse 
complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded 
into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it 
signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and 
returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = 
dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] 
= None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new 
ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the 
inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will 
triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user 
provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the 
initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. 
Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the 
event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of 
next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, 
increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a 
viable index 2, as
-   * there is not, it will perform the context switch so 
DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so 
the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with 
[event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the 
currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] 
batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the 
initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As 
currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to 
get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the
+   * eventQueue, plus another event that the SAXInfosetInputter may have from
+   * the most recent take(), plus the event that this ContentHander is mutating
+   * in preparation to put() on the queue.
    */
-  private lazy val actualBatchSize = tunablesBatchSize + 1
-  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
-    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; 
minimum value is 1")
-    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  private val preAllocatedEvents = {
+    Array.fill(eventQueueSize + 2)(new SAXInfosetInputterEvent)
   }
-  private var currentIndex: Int = 0
 
   /**
-   * This is a flag that is set to true when startPrefixMapping is called. 
When true, we make
-   * the assumption that we don't need to use the Attributes parameter from 
startElement to get the
-   * namespaceURI information and will solely rely on start/endPrefixMapping. 
If false, we will use
-   * Attributes to get the namespaceURI info.
+   * Index into the pre-allocated event array that the content handler is
+   * currently mutating and preparing to put() on the eventQueue(). When all
+   * information for this event is gathered, we put() it on the eventQueue and
+   * increment the index into the pre-allocated events array and start muating
+   * the next event.
    */
-  private var contentHandlerPrefixMappingUsed = false
+  private var currentIndex = 0
 
   /**
-   * returns null in the case of an DaffodilUnhandledSAXException
+   * Variable to make it easier to access the pre-allocated event at
+   * currentIndex. This is updated each time currentIndex is incremented so
+   * that methods in this class should never need to access currentIndex or the
+   * preAllocated events array directly.
    */
-  def getUnparseResult: DFDL.UnparseResult = unparseResult
+  private var currentEvent = preAllocatedEvents(currentIndex)
 
-  def enableInputterResolutionOfRelativeInfosetBlobURIs(): Unit = 
inputter.enableResolutionOfRelativeInfosetBlobURIs()
+  /**
+   * Buffer to accumulate characters received from the SAX characters() 
function
+   */
+  private val characterData = new StringBuilder
 
-  override def setDocumentLocator(locator: Locator): Unit = {
-    // do nothing
-  }
+  /**
+   * Used to store the current in-scope prefix namespace mappings, determined
+   * by the start/endPrefixMapping functions, or from xmlns Attributes in the
+   * startElement function.
+   */
+  private var prefixMapping: NamespaceBinding = TopScope
 
-  override def startDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(StartDocument)
-    maybeSendToInputter()
+  /**
+   * If the XMLReader doesn't use the start/endPrefixMapping() functions to
+   * pass along namespaceMapping information, we must extract the information
+   * from the Attributes parameter in the startElement() function. Doing so can
+   * potentially add multiple namespace mappings that must be removed in the
+   * associated endElement() function. This stack keeps track of which mappings
+   * are added in startElement() so they can easily be removed in endElement()
+   * by calling the pop function.
+   */
+  private val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  /**
+   * Flag to keep track if the XMLReader is using start/endPrefixMapping() for
+   * namespace mappings or if mappings should be gathered from Attributes in
+   * startElement(). This is set to true for the former and false for the
+   * latter. If true, the prefixMappingTrackingStack is unused.
+   */
+  private var contentHandlerPrefixMappingUsed = false
+
+  /**
+   * Add the current event to the eventQueue. If the eventQueue is full, this
+   * blocks until the SAXInfosetInputter take()'s an event and frees up a slot
+   * in the queue. Once the event is added, we update state to point to the
+   * next SAXInfosetInputterEvent that we should mutate as we receive more SAX
+   * events.
+   */
+  private def addCurrentEventToQueue(): Unit = {
+    // before we put() an event, check to make sure there hasn't been an error
+    // from the SAXInfosetInputter. If there is an error, checkUnparseResult()
+    // throws and exception which bubbles up to the XMLReader to handle. Note

Review Comment:
   and -> an



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/infoset/SAXInfosetInputter.scala:
##########
@@ -19,98 +19,88 @@ package org.apache.daffodil.infoset
 
 import java.net.URI
 import java.net.URISyntaxException
+import java.util.concurrent.ArrayBlockingQueue
 
-import org.apache.daffodil.api.DFDL
-import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
-import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.dpath.NodeInfo
 import org.apache.daffodil.exceptions.Assert
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
-import org.apache.daffodil.util.Maybe.One
+import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
+import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
+import org.apache.daffodil.util.Maybe
+import org.apache.daffodil.util.Maybe.Nope
 import org.apache.daffodil.util.MaybeBoolean
 import org.apache.daffodil.util.Misc
 import org.apache.daffodil.xml.XMLUtils
 
 /**
- * The SAXInfosetInputter consumes SAXInfosetEvent objects from the 
DaffodilUnparseContentHandler
- * class and converts them to events that the DataProcessor unparse can use. 
This class contains an
- * array of batched SAXInfosetEvent objects that it receives from the 
contentHandler and the index
- * of the current element being processed.
+ * The SAXInfosetInputter consumes SAXInfosetInputterEvent objects from the
+ * DaffodilUnparseContentHandler class and converts them to events that the
+ * DataProcessor unparse can use. When created, the 
DaffodilUnaprseContentHandler
+ * passes in a thread-safe array backed FIFO queue where events are added.
+ * However, there is really no special logic in this class regarding those
+ * evens. All the complexity of the received SAX events and dealing with thread
+ * and deadlocks is all encompassed in the DaffodilUnparseContentHandler--this
+ * just needs to take() the events from the queue as needed and provide the to
+ * the unparse() function
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the 
former to the latter.
- * The following is the general process:
- *
- * - the run method is called, with the first batch of events, starting with 
the StartDocument event,
- * already loaded on the inputter's queue.
- * This is collected and stored in the batchedInfosetEvents member, and the 
currentIndex is set to 0
- * - The dp.unparse method is called, and it calls hasNext to make sure an 
event exists to be
- * processed and then queries the event at currentIndex. The hasNext call also 
checks that there is
- * a next event to be processed (currentIndex+1), and if not, queues the next 
batch of events by
- * transferring control to the contentHandler so it can load them.
- * - After it is done with the current event, it calls inputter.next to get 
the next event, and that
- * increments the currentIndex and cleans out the event at the previous index
- * - This process continues until the event at currentIndex either contains an 
EndDocument event or
- * the currentIndex is the last in the batch. If it is the former, the 
endDocumentReceived flag is
- * set to true and hasNext will return false. If it is the latter, the next 
batch of events will be
- * queued by transferring control to the contentHandler so it can load them.
- * - This ends the unparse process, and the unparseResult and/or any Errors 
are set on a single element
- * array containing response events. We call resumeFinal passing along that 
array, terminating this
- * thread and resuming the contentHandler for the last time.
- *
- * @param unparseContentHandler producer coroutine that sends the 
SAXInfosetEvent to this class
- * @param dp dataprocessor that we use to kickstart the unparse process and 
that consumes the
- *           currentEvent
- * @param output  outputChannel of choice where the unparsed data is stored
+ * @param eventQueue queue of SAXInfosetInputterEvents to guide the unparse
  */
 class SAXInfosetInputter(
-  unparseContentHandler: DFDL.DaffodilUnparseContentHandler,
-  dp: DFDL.DataProcessor,
-  output: DFDL.Output)
-  extends InfosetInputter with DFDL.ConsumerCoroutine {
+  eventQueue: ArrayBlockingQueue[SAXInfosetInputterEvent])
+  extends InfosetInputter {
 
   /**
-   * allows support for converting relative URIs in Blobs to absolute URIs, 
this is necessary
-   * for running TDML tests as they allow relative URIs. Because Daffodil 
proper only uses
-   * absolute URIs, we hide this functionality behind this flag. It can be set 
to true by calling
-   * the 
unparseContentHandler.enableInputterResolutionOfRelativeInfosetBlobURIs(), 
which calls the
-   * inputter's enableResolutionOfRelativeInfosetBlobURIs() function to set 
the below variable to true
+   * Flag to store if relative blob URI resolution should be enabled in the
+   * infoset inputtter. This is set to true when
+   * enableResolutionOfRelativeInfosetBlobURIs() is called.
    */
-  private var resolveRelativeInfosetBlobURIs: Boolean = false
+  private var enableRelativeBlobURIs = false
 
-  private var endDocumentReceived = false
-  private var currentIndex: Int = 0
-  private var batchedInfosetEvents: Array[SAXInfosetEvent] = _
-  private lazy val returnedInfosetEvent: Array[SAXInfosetEvent] = new 
Array[SAXInfosetEvent](1)
+  /**
+   * Enable the resolution of relative infoset blob URIs. This should only be
+   * used when running TDML tests where relative blob URIs are often used. In
+   * production, blob URIs should always be absolute and this should not be
+   * needed.
+   */
+  def enableResolutionOfRelativeInfosetBlobURIs(): Unit = 
enableRelativeBlobURIs = true
+
+  /**
+   * Keeps track of the current event that SAXInfosetInputter returns
+   * information about. This must be initialized to a StartDocument event
+   */
+  private var currentEvent: SAXInfosetInputterEvent = {
+    val ev = eventQueue.take()
+    Assert.invariant(ev.eventType.contains(StartDocument))
+    ev
+  }

Review Comment:
   Oh, I had to look twice before I understood how this invariant works.  I 
initially misread and thought I was looking at def currentEvent, but now I 
realize this is var currentEvent and the invariant is being checked only after 
the first time eventQueue.take() is called.  



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble

Review Comment:
   saxUnaprseEventBatchSize -> saxUnparseEventBatchSize 



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base

Review Comment:
   base -> based



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the 
former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first 
SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of 
SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the 
inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready 
for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine 
system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. 
In the expected case,
- * the single element array will contain no new information until the unparse 
complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded 
into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it 
signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and 
returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = 
dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] 
= None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new 
ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the 
inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will 
triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user 
provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the 
initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. 
Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the 
event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of 
next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, 
increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a 
viable index 2, as
-   * there is not, it will perform the context switch so 
DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so 
the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with 
[event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the 
currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] 
batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the 
initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As 
currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to 
get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the

Review Comment:
   a -> at



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the 
former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first 
SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of 
SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the 
inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready 
for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine 
system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. 
In the expected case,
- * the single element array will contain no new information until the unparse 
complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded 
into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it 
signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and 
returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = 
dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] 
= None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new 
ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the 
inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will 
triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user 
provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the 
initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. 
Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the 
event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of 
next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, 
increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a 
viable index 2, as
-   * there is not, it will perform the context switch so 
DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so 
the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with 
[event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the 
currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] 
batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the 
initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As 
currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to 
get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the
+   * eventQueue, plus another event that the SAXInfosetInputter may have from
+   * the most recent take(), plus the event that this ContentHander is mutating
+   * in preparation to put() on the queue.
    */
-  private lazy val actualBatchSize = tunablesBatchSize + 1
-  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
-    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; 
minimum value is 1")
-    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  private val preAllocatedEvents = {
+    Array.fill(eventQueueSize + 2)(new SAXInfosetInputterEvent)
   }
-  private var currentIndex: Int = 0
 
   /**
-   * This is a flag that is set to true when startPrefixMapping is called. 
When true, we make
-   * the assumption that we don't need to use the Attributes parameter from 
startElement to get the
-   * namespaceURI information and will solely rely on start/endPrefixMapping. 
If false, we will use
-   * Attributes to get the namespaceURI info.
+   * Index into the pre-allocated event array that the content handler is
+   * currently mutating and preparing to put() on the eventQueue(). When all
+   * information for this event is gathered, we put() it on the eventQueue and
+   * increment the index into the pre-allocated events array and start muating

Review Comment:
   muating -> mutating



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the 
former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first 
SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of 
SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the 
inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready 
for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine 
system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. 
In the expected case,
- * the single element array will contain no new information until the unparse 
complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded 
into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it 
signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and 
returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = 
dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] 
= None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new 
ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the 
inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will 
triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user 
provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the 
initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. 
Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the 
event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of 
next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, 
increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a 
viable index 2, as
-   * there is not, it will perform the context switch so 
DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so 
the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with 
[event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the 
currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] 
batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the 
initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As 
currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to 
get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the
+   * eventQueue, plus another event that the SAXInfosetInputter may have from
+   * the most recent take(), plus the event that this ContentHander is mutating
+   * in preparation to put() on the queue.
    */
-  private lazy val actualBatchSize = tunablesBatchSize + 1
-  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
-    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; 
minimum value is 1")
-    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  private val preAllocatedEvents = {
+    Array.fill(eventQueueSize + 2)(new SAXInfosetInputterEvent)
   }
-  private var currentIndex: Int = 0
 
   /**
-   * This is a flag that is set to true when startPrefixMapping is called. 
When true, we make
-   * the assumption that we don't need to use the Attributes parameter from 
startElement to get the
-   * namespaceURI information and will solely rely on start/endPrefixMapping. 
If false, we will use
-   * Attributes to get the namespaceURI info.
+   * Index into the pre-allocated event array that the content handler is
+   * currently mutating and preparing to put() on the eventQueue(). When all
+   * information for this event is gathered, we put() it on the eventQueue and
+   * increment the index into the pre-allocated events array and start muating
+   * the next event.
    */
-  private var contentHandlerPrefixMappingUsed = false
+  private var currentIndex = 0
 
   /**
-   * returns null in the case of an DaffodilUnhandledSAXException
+   * Variable to make it easier to access the pre-allocated event at
+   * currentIndex. This is updated each time currentIndex is incremented so
+   * that methods in this class should never need to access currentIndex or the
+   * preAllocated events array directly.
    */
-  def getUnparseResult: DFDL.UnparseResult = unparseResult
+  private var currentEvent = preAllocatedEvents(currentIndex)
 
-  def enableInputterResolutionOfRelativeInfosetBlobURIs(): Unit = 
inputter.enableResolutionOfRelativeInfosetBlobURIs()
+  /**
+   * Buffer to accumulate characters received from the SAX characters() 
function
+   */
+  private val characterData = new StringBuilder
 
-  override def setDocumentLocator(locator: Locator): Unit = {
-    // do nothing
-  }
+  /**
+   * Used to store the current in-scope prefix namespace mappings, determined
+   * by the start/endPrefixMapping functions, or from xmlns Attributes in the
+   * startElement function.
+   */
+  private var prefixMapping: NamespaceBinding = TopScope
 
-  override def startDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(StartDocument)
-    maybeSendToInputter()
+  /**
+   * If the XMLReader doesn't use the start/endPrefixMapping() functions to
+   * pass along namespaceMapping information, we must extract the information
+   * from the Attributes parameter in the startElement() function. Doing so can
+   * potentially add multiple namespace mappings that must be removed in the
+   * associated endElement() function. This stack keeps track of which mappings
+   * are added in startElement() so they can easily be removed in endElement()
+   * by calling the pop function.
+   */
+  private val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  /**
+   * Flag to keep track if the XMLReader is using start/endPrefixMapping() for
+   * namespace mappings or if mappings should be gathered from Attributes in
+   * startElement(). This is set to true for the former and false for the
+   * latter. If true, the prefixMappingTrackingStack is unused.
+   */
+  private var contentHandlerPrefixMappingUsed = false
+
+  /**
+   * Add the current event to the eventQueue. If the eventQueue is full, this
+   * blocks until the SAXInfosetInputter take()'s an event and frees up a slot
+   * in the queue. Once the event is added, we update state to point to the
+   * next SAXInfosetInputterEvent that we should mutate as we receive more SAX
+   * events.
+   */
+  private def addCurrentEventToQueue(): Unit = {
+    // before we put() an event, check to make sure there hasn't been an error
+    // from the SAXInfosetInputter. If there is an error, checkUnparseResult()
+    // throws and exception which bubbles up to the XMLReader to handle. Note
+    // that doing this here avoids a potential deadlock since the
+    // SAXInfosetInputter will no longer take() events, so the queue could
+    // potentially be full and the put() would block forever. By checking for
+    // an error first, we should never reach a deadlocking call to put()
+    checkUnparseResult()
+
+    eventQueue.put(currentEvent)
+    currentIndex = (currentIndex + 1) % preAllocatedEvents.length
+    currentEvent = preAllocatedEvents(currentIndex)
+    currentEvent.clear()
   }
 
-  override def endDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(EndDocument)
-    maybeSendToInputter()
+  /**
+   * Flag to store if relative blob URI resolution should be enabled in the
+   * infoset inputtter. This is set to true when
+   * enableResolutionOfRelativeInfosetBlobURIs() is called.
+   */
+  private var enableRelativeBlobURIs = false
+
+  /**
+   * Enable the resolution of relative infoset blob URIs. This should only be
+   * used when running TDML tests where relative blob URIs are often used. In
+   * non test usages, blob URIs should always be absolute and this should not
+   * be needed. This must be called before the startDocument() function is
+   * called for it to have an affect.
+   */
+  def enableResolutionOfRelativeInfosetBlobURIs(): Unit = 
enableRelativeBlobURIs = true
+
+  /**
+   * Start the unparse() thread, potentially reusing a thread from our cached
+   * thread pool, which should help to minimize the overhead with creating new
+   * threads. Note that the Future does not return any or expect to capture any
+   * exceptions. We instead mutate the unparseResult variable depending on the
+   * unparse result. This ensures we can always clear the eventQueue to prevent
+   * deadlocks.
+   */
+  private def startUnparse(): Unit = {
+    unparseTask = Future[Unit] {
+      try {
+        // It is important to create the SAXInfosetInputter in this Future
+        // because during construction it reads from the queue waiting for the
+        // StartDocument event. If we create it outside of the future and in
+        // the same thread as the ContentHandler, then we may end up blocked
+        // since this ConentHandler would not be able to put() the
+        // StartDocument event for the SAXInfosetInputter construction to
+        // take().
+        val input = new SAXInfosetInputter(eventQueue)
+        if (enableRelativeBlobURIs) 
input.enableResolutionOfRelativeInfosetBlobURIs()
+
+        val res = dp.unparse(input, output)
+        if (res.isError) {
+          unparseResult = Some(Left(new DaffodilUnparseErrorSAXException(res)))
+        } else {
+          unparseResult = Some(Right(res))
+        }
+      } catch {
+        //$COVERAGE-OFF$
+        case e: Exception => {
+          unparseResult = Some(Left(new 
DaffodilUnhandledSAXException(e.getMessage, e)))
+        }
+        //$COVERAGE-ON$
+      }
+
+      // We are finished unparsing. At this point, it is possible that the
+      // ContentHandler thread is blocked trying to put() an event on the
+      // eventQueue, but the SAXInfosetInputter thread could have failed in a
+      // way where it won't ever take() an item from the queue, leaving the
+      // ContentHandler deadlocked. Now that unparseResult is defined, we can
+      // clear eventQueue which unblocks the ContentHandler. And since
+      // unparseResult is set, the ContentHandler should not attempt anymore
+      // put()'s and so should never get re-blocked.
+      eventQueue.clear()
+    }(DaffodilUnparseContentHandler.executionContext)
   }
 
-  override def startPrefixMapping(prefix: String, uri: String): Unit = {
-    if (!contentHandlerPrefixMappingUsed) contentHandlerPrefixMappingUsed = 
true
-    val pre = if (prefix == "") null else prefix
-    prefixMapping = NamespaceBinding(pre, uri, prefixMapping)
+  /**
+   * Throw an exception if the uparseTask has finished and signified there was
+   * an error, either from a failed UnparseResult or an unexpected thrown
+   * exception. The XMLReader is expected to catch the thrown SAXException and
+   * handle it accordingly.
+   */
+  private def checkUnparseResult(): Unit = {
+    unparseResult match {
+      case Some(Left(e)) => throw e
+      case _ =>
+    }
   }
 
   /**
-   * XMLReader does not guarantee the order of the prefixes called for this 
function, but it does
-   * guarantee that this method is called after its corresponding endElement, 
which means we can
-   * can just take off the top mappings, because the element that might have 
cared about the order
-   * is already done using the prefixMappings
+   * Return the UnparseResult of the ContentHandler if Daffodil finished
+   * without throwing a DaffodilUnhandledSAXException. Returns null if there is
+   * no unparse result yet (i.e. Daffodil is still unparsing) or if the
+   * Daffodil unparse() function terminated due to an unhandled exception. This
+   * can return an UnparseResult where isError is true, though that is also
+   * thrown as a DaffodilUnparseErrorSAXException.
    */
-  override def endPrefixMapping(prefix: String): Unit = {
-    prefixMapping = if (prefixMapping == null) prefixMapping else 
prefixMapping.parent
+  def getUnparseResult: DFDL.UnparseResult = unparseResult match {
+    case Some(Right(ur)) => ur
+    case Some(Left(DaffodilUnparseErrorSAXException(ur))) => ur
+    case _ => null
   }
 
   /**
-   * Uses Attributes, which is passed in to the startElement callback, to 
extract prefix mappings and
-   * populate the global prefixMapping
+   * Gather prefix namespace mappings from the Attributes passed to
+   * startElement() SAX event. This should only be called if we have detected
+   * that start/endPrefixMapping() functions are not used.
    */
-  def mapPrefixMappingFromAttributesImpl(atts:Attributes): Unit = {
+  private def addPrefixMappingsFromAttributesImpl(atts: Attributes): Unit = {
     var i = 0
-    while (i < atts.getLength) {
+    val numAtts = atts.getLength
+    while (i < numAtts) {
       val qName = atts.getQName(i)
-      val uri =  atts.getValue(i)
       if (qName == "xmlns") {
-        prefixMapping = NamespaceBinding(null, uri, prefixMapping)
+        prefixMapping = NamespaceBinding(null, atts.getValue(i), prefixMapping)
       } else if (qName.startsWith("xmlns:")) {
         val prefix = qName.substring(6)
-        prefixMapping = NamespaceBinding(prefix, uri, prefixMapping)
+        prefixMapping = NamespaceBinding(prefix, atts.getValue(i), 
prefixMapping)
       } else {
         // do nothing, not a namespace mapping attribute
       }
       i += 1
     }
   }
 
-  override def startElement(uri: String, localName: String, qName: String, 
atts: Attributes): Unit = {
-    // we need to check if the characters data is all whitespace, if it is we 
drop the whitespace
-    // data, if it is not, it is an error as starting a new element with 
actual characterData means
-    // we haven't hit an endElement yet, which means we're in a complexElement 
and a complexElement
-    // cannot have character content
-    if (characterData.nonEmpty && !Misc.isAllWhitespace(characterData)) {
-      throw new IllegalContentWhereEventExpected("Non-whitespace characters in 
complex " +
-        "Element: " + characterData.toString
-      )
-    } else {
-      // reset since it was whitespace only
+  /**
+   * Helper function for the start/endElement() SAX events to determine the
+   * localName and namespceURI values for the current event.
+   */
+  private def setLocalNameAndNamespaceUri(uri: String, localName: String, 
qName: String): Unit = {
+    lazy val prefixColonIndex = qName.indexOf(':')
+
+    currentEvent.localName =
+      if (localName.nonEmpty) {
+        One(localName)
+      } else {
+        Assert.invariant(qName.nonEmpty)
+        if (prefixColonIndex > 0) {
+          One(qName.substring(prefixColonIndex + 1))
+        } else {
+          One(qName)
+        }
+      }
+
+    currentEvent.namespaceURI =
+      if (uri.nonEmpty) {
+        One(uri)
+      } else {
+        Assert.invariant(qName.nonEmpty)
+        val qNamePrefix =
+          if (prefixColonIndex > 0) {
+            qName.substring(0, prefixColonIndex)
+          } else {
+            // if there is no prefix in the qName, then we want the namespace
+            // associated with the "" prefix, which NamespaceBinding represents
+            // as null
+            null
+          }
+        val uri = prefixMapping.getURI(qNamePrefix)
+        Maybe(uri)
+      }
+  }
+
+  /**
+   * This function should be called whenever mixed content (i.e non-whitespace
+   * characters) should be checked for and, since it isn't allowed in a DFDL
+   * infoset, state mutated to indicate the error.
+   *
+   * Note that if mixed content is detected we cannot simply throw a
+   * SAXException. This is because if this ContentHandler thread finishes, the
+   * unparseTask is going to eventually consume all events from the eventQueue
+   * and block trying to take() an event that will never come, leaving a
+   * hanging thread. Instead, we set the mixedContent value of the current
+   * event to the characterData we have accumulated, and the SAXInfosetInputer

Review Comment:
   SAXInfosetInputer -> SAXInfosetInputter



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DaffodilUnparseContentHandler.scala:
##########
@@ -17,327 +17,486 @@
 
 package org.apache.daffodil.processors
 
-import scala.xml.NamespaceBinding
 
+
+import java.util.concurrent.ArrayBlockingQueue
+import java.util.concurrent.Executors
 import javax.xml.XMLConstants
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+import scala.xml.NamespaceBinding
+import scala.xml.TopScope
+
+import org.xml.sax.Attributes
+import org.xml.sax.Locator
+import org.xml.sax.SAXException
+
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DFDL.DaffodilUnhandledSAXException
 import org.apache.daffodil.api.DFDL.DaffodilUnparseErrorSAXException
-import org.apache.daffodil.api.DFDL.SAXInfosetEvent
 import org.apache.daffodil.exceptions.Assert
-import org.apache.daffodil.infoset.IllegalContentWhereEventExpected
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.EndElement
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartDocument
 import org.apache.daffodil.infoset.InfosetInputterEventType.StartElement
 import org.apache.daffodil.infoset.SAXInfosetInputter
+import org.apache.daffodil.infoset.SAXInfosetInputterEvent
 import org.apache.daffodil.util.MStackOf
-import org.apache.daffodil.util.Maybe.Nope
+import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.One
 import org.apache.daffodil.util.Misc
-import org.xml.sax.Attributes
-import org.xml.sax.Locator
 
 /**
- * DaffodilUnparseContentHandler produces SAXInfosetEvent objects for the 
SAXInfosetInputter to
- * consume and convert to events that the Dataprocessor unparse can use. The 
SAXInfosetEvent object
- * is built from information that is passed to the ContentHandler from an 
XMLReader parser. In
- * order to receive the uri and prefix information from the XMLReader, the 
XMLReader must have
- * support for XML Namespaces
+ * Provides a cached thread pool that Scala Futures can use so that SAX
+ * unparse() calls can reuse threads, avoiding overhead related to creating new
+ * threads.
+ */
+object DaffodilUnparseContentHandler {
+  val executionContext = new ExecutionContext {
+    private val threadPool = Executors.newCachedThreadPool()
+    def execute(runnable: Runnable): Unit = threadPool.submit(runnable)
+
+    //$COVERAGE-OFF$
+    def reportFailure(t: Throwable): Unit = {} //do nothing
+    //$COVERAGE-ON$
+  }
+}
+
+/**
+ * DaffodilUnparseContentHandler produces SAXInfosetInputterEvent objects for
+ * the SAXInfosetInputter to consume and convert to events that the
+ * Dataprocessor unparse() can use. The SAXInfosetInputterEvents are built base
+ * on information that is passed to this ContentHandler from an XMLReader.
+ *
+ * This class runs the DataProcessor unparse() method in a separate thread,
+ * with a shared ArrayBlockigQueue to provide SAXInfosetInputerEvents. This
+ * queue has a maximum size defined by the saxUnaprseEventBatchSize tunble
  *
- * This class, together with the SAXInfosetInputter, uses coroutines to ensure 
that a batch of events
- * (based on the tunable saxUnparseEventBatchSize) can be passed from the 
former to the latter.
  * The following is the general process:
  *
- * - an external call is made to parse an XML Document
- * - this class receives a StartDocument call, which is the first 
SAXInfosetEvent that should be
- * sent to the SAXInfosetInputter. That event is put onto an array of 
SAXInfosetEvents of size the
- * saxUnparseEventBatchSize tunable. Once the array is full, it is put on the 
inputter's queue,
- * this thread is paused, and that inputter's thread is run
- * - when the SAXInfosetInputter is done processing that batch and is ready 
for a new batch, it
- * sends a 1 element array with the last completed event via the coroutine 
system, which loads it on
- * the contentHandler's queue, which restarts this thread and pauses that one. 
In the expected case,
- * the single element array will contain no new information until the unparse 
complete. In the case of
- * an unexpected error though, it will contain error information
- * - this process continues until the EndDocument SAXInfosetEvent is loaded 
into the batch.
- * Once that SAXInfosetEvent is processed by the SAXInfosetInputter, it 
signals the end of batched
- * events coming from the contentHandler. This ends the unparseProcess and 
returns the event with
- * the unparseResult and/or any error
- * information
+ * - An external call is made to parse from an XMLReader
+ * - This class receives a startDocument() event from the XMLReader. This
+ *   put()'s a StartDocument event on the eventQueue and starts a Future that
+ *   runs the DataProcessor unparse() method with a SAXInfosetInputter in a
+ *   thread
+ * - As additional ContentHandler functions are called from the XMLReader,
+ *   events are created and added to the eventQueue
+ * - This continues until the endDocument() SAX event is reached, at which
+ *   point we create a EndDocument event, put() it on the eventQueue, wait for
+ *   the Future to complete, and determine the result
+ * - While this is all going on, the SAXInfosetInputter take()'s from the
+ *   eventQueue and provides the infoset information necessary to unparse
  *
- * @param dp dataprocessor object that will be used to call the parse
- * @param output outputChannel of choice where the unparsed data is stored
+ * @param dp DataProcessor used to call the unparse() method
+ * @param output Output to write unparsed data
  */
 class DaffodilUnparseContentHandler(
   dp: DFDL.DataProcessor,
   output: DFDL.Output)
   extends DFDL.DaffodilUnparseContentHandler {
-  private lazy val inputter = new SAXInfosetInputter(this, dp, output)
-  private var unparseResult: DFDL.UnparseResult = _
-  private lazy val characterData = new StringBuilder
-  private var prefixMapping: NamespaceBinding = _
-  private lazy val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
 
-  private lazy val tunablesBatchSize = 
dp.getTunables().saxUnparseEventBatchSize
+  /**
+   * The Future thread running the DataProcessor.unparse() method
+   */
+  private var unparseTask: Future[Unit] = _
+  
+  /**
+   * The result of the unparseTask. This is None as long as the Future thread
+   * is still running. Oncee the unparseTask completes, this is set to a
+   * Right[UnparseResult] if the unparse completed without error. If this
+   * completed but UnparseResult.isError is true or an unexpected exception was
+   * thrown during unparse, this is set to a Left[SAXException].
+   */
+  private var unparseResult: Option[Either[SAXException, DFDL.UnparseResult]] 
= None
+ 
+  /**
+    * Maximum number of SAXInfosetInputterEvents that this ContentHandler can
+    * put() in the eventQueue at a time. If this queue is full, the
+    * ContentHandler thread is blocked until the unparseTask thread take()'s
+    * from the queue.
+   */
+  private val eventQueueSize = dp.getTunables().saxUnparseEventBatchSize
+
+  /**
+   * A thread-safe array-backed FIFO blocking queue to store events to be used
+   * by the SAXInfosetInputter. This ContentHandler put()'s to this queue as it
+   * receives SAX events, and the unparseTask take()'s from this queue to
+   * unparse().
+   */
+  private val eventQueue = new 
ArrayBlockingQueue[SAXInfosetInputterEvent](eventQueueSize)
 
   /**
-   * we always have an extra buffer in the array that we use for the 
inputter.hasNext call. For each
-   * element, we need to know if it has a viable next, if it doesn't, it will 
triggers the context
-   * switch to DaffodilUnparseContentHandler. So for example, if the user 
provides 1 as the
-   * batchSize, under the hood we'll batch [event1, event2].
-   *
-   * - DataProcessor.unparse will call hasNext and getEventType for the 
initialization call
-   * - hasNext will check if nextIndex (i.e currentIndex + 1) is non-empty. 
Since currentIndex is 0,
-   * it will return true since event2 exists.
-   * - getEventType (which signifies our processing step) is called for the 
event at currentIndex
-   * - After the initialization step, subsequent calls will be a loop of 
next(), ...some processing
-   * of the current event ..., and hasNext()
-   * - For our scenario, next() will clear out the contents at currentIndex, 
increment the currentIndex,
-   * and our event2 will be processed, then hasNext will check if there is a 
viable index 2, as
-   * there is not, it will perform the context switch so 
DaffodilUnparseContentHandler can batch
-   * more events
-   * - DaffodilUnparseContentHandler copies the last event into the first so 
the currentEvent stays
-   * the same for the inputter until it decides to change it so we end up with 
[event2, event3]
-   * - When we context switch back to inputter.hasNext, it resets the 
currentIndex to 0, and our loop
-   * begins again with a call to next
-   *
-   * Without us having the extra buffer, things would happen like this:
-   * user provides 1 as the batchSize, under the hood we'll have [event1] 
batched.
-   *
-   * DataProcessor.unparse will call hasNext and getEventType for the 
initialization call, and that
-   * hasNext will check if cnextIndex (i.e currentIndex + 1) is non-empty. As 
currentIndex is 0, and
-   * it is the maximum index, there is no index 1. It will context switch to 
get a new batched event,
-   * which, would overwrite event1 before we get to process it.
+   * To avoid allocating many SAXInfosetInputterEvents, we instead pre-allocate
+   * events and clear/mutate each event as needed. Note that we need 2 more
+   * pre-allocated events than the queue size. This is because while
+   * processing, there could be a most eventQueueSize events already on the
+   * eventQueue, plus another event that the SAXInfosetInputter may have from
+   * the most recent take(), plus the event that this ContentHander is mutating
+   * in preparation to put() on the queue.
    */
-  private lazy val actualBatchSize = tunablesBatchSize + 1
-  private lazy val batchedInfosetEvents: Array[SAXInfosetEvent] = {
-    Assert.invariant(tunablesBatchSize > 0, "invalid saxUnparseEventBatchSize; 
minimum value is 1")
-    Array.fill[SAXInfosetEvent](actualBatchSize)(new SAXInfosetEvent)
+  private val preAllocatedEvents = {
+    Array.fill(eventQueueSize + 2)(new SAXInfosetInputterEvent)
   }
-  private var currentIndex: Int = 0
 
   /**
-   * This is a flag that is set to true when startPrefixMapping is called. 
When true, we make
-   * the assumption that we don't need to use the Attributes parameter from 
startElement to get the
-   * namespaceURI information and will solely rely on start/endPrefixMapping. 
If false, we will use
-   * Attributes to get the namespaceURI info.
+   * Index into the pre-allocated event array that the content handler is
+   * currently mutating and preparing to put() on the eventQueue(). When all
+   * information for this event is gathered, we put() it on the eventQueue and
+   * increment the index into the pre-allocated events array and start muating
+   * the next event.
    */
-  private var contentHandlerPrefixMappingUsed = false
+  private var currentIndex = 0
 
   /**
-   * returns null in the case of an DaffodilUnhandledSAXException
+   * Variable to make it easier to access the pre-allocated event at
+   * currentIndex. This is updated each time currentIndex is incremented so
+   * that methods in this class should never need to access currentIndex or the
+   * preAllocated events array directly.
    */
-  def getUnparseResult: DFDL.UnparseResult = unparseResult
+  private var currentEvent = preAllocatedEvents(currentIndex)
 
-  def enableInputterResolutionOfRelativeInfosetBlobURIs(): Unit = 
inputter.enableResolutionOfRelativeInfosetBlobURIs()
+  /**
+   * Buffer to accumulate characters received from the SAX characters() 
function
+   */
+  private val characterData = new StringBuilder
 
-  override def setDocumentLocator(locator: Locator): Unit = {
-    // do nothing
-  }
+  /**
+   * Used to store the current in-scope prefix namespace mappings, determined
+   * by the start/endPrefixMapping functions, or from xmlns Attributes in the
+   * startElement function.
+   */
+  private var prefixMapping: NamespaceBinding = TopScope
 
-  override def startDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(StartDocument)
-    maybeSendToInputter()
+  /**
+   * If the XMLReader doesn't use the start/endPrefixMapping() functions to
+   * pass along namespaceMapping information, we must extract the information
+   * from the Attributes parameter in the startElement() function. Doing so can
+   * potentially add multiple namespace mappings that must be removed in the
+   * associated endElement() function. This stack keeps track of which mappings
+   * are added in startElement() so they can easily be removed in endElement()
+   * by calling the pop function.
+   */
+  private val prefixMappingTrackingStack = new MStackOf[NamespaceBinding]
+
+  /**
+   * Flag to keep track if the XMLReader is using start/endPrefixMapping() for
+   * namespace mappings or if mappings should be gathered from Attributes in
+   * startElement(). This is set to true for the former and false for the
+   * latter. If true, the prefixMappingTrackingStack is unused.
+   */
+  private var contentHandlerPrefixMappingUsed = false
+
+  /**
+   * Add the current event to the eventQueue. If the eventQueue is full, this
+   * blocks until the SAXInfosetInputter take()'s an event and frees up a slot
+   * in the queue. Once the event is added, we update state to point to the
+   * next SAXInfosetInputterEvent that we should mutate as we receive more SAX
+   * events.
+   */
+  private def addCurrentEventToQueue(): Unit = {
+    // before we put() an event, check to make sure there hasn't been an error
+    // from the SAXInfosetInputter. If there is an error, checkUnparseResult()
+    // throws and exception which bubbles up to the XMLReader to handle. Note
+    // that doing this here avoids a potential deadlock since the
+    // SAXInfosetInputter will no longer take() events, so the queue could
+    // potentially be full and the put() would block forever. By checking for
+    // an error first, we should never reach a deadlocking call to put()
+    checkUnparseResult()
+
+    eventQueue.put(currentEvent)
+    currentIndex = (currentIndex + 1) % preAllocatedEvents.length
+    currentEvent = preAllocatedEvents(currentIndex)
+    currentEvent.clear()
   }
 
-  override def endDocument(): Unit = {
-    batchedInfosetEvents(currentIndex).eventType = One(EndDocument)
-    maybeSendToInputter()
+  /**
+   * Flag to store if relative blob URI resolution should be enabled in the
+   * infoset inputtter. This is set to true when
+   * enableResolutionOfRelativeInfosetBlobURIs() is called.
+   */
+  private var enableRelativeBlobURIs = false
+
+  /**
+   * Enable the resolution of relative infoset blob URIs. This should only be
+   * used when running TDML tests where relative blob URIs are often used. In
+   * non test usages, blob URIs should always be absolute and this should not
+   * be needed. This must be called before the startDocument() function is
+   * called for it to have an affect.
+   */
+  def enableResolutionOfRelativeInfosetBlobURIs(): Unit = 
enableRelativeBlobURIs = true
+
+  /**
+   * Start the unparse() thread, potentially reusing a thread from our cached
+   * thread pool, which should help to minimize the overhead with creating new
+   * threads. Note that the Future does not return any or expect to capture any
+   * exceptions. We instead mutate the unparseResult variable depending on the
+   * unparse result. This ensures we can always clear the eventQueue to prevent
+   * deadlocks.
+   */
+  private def startUnparse(): Unit = {
+    unparseTask = Future[Unit] {
+      try {
+        // It is important to create the SAXInfosetInputter in this Future
+        // because during construction it reads from the queue waiting for the
+        // StartDocument event. If we create it outside of the future and in
+        // the same thread as the ContentHandler, then we may end up blocked
+        // since this ConentHandler would not be able to put() the
+        // StartDocument event for the SAXInfosetInputter construction to
+        // take().
+        val input = new SAXInfosetInputter(eventQueue)
+        if (enableRelativeBlobURIs) 
input.enableResolutionOfRelativeInfosetBlobURIs()
+
+        val res = dp.unparse(input, output)
+        if (res.isError) {
+          unparseResult = Some(Left(new DaffodilUnparseErrorSAXException(res)))
+        } else {
+          unparseResult = Some(Right(res))
+        }
+      } catch {
+        //$COVERAGE-OFF$
+        case e: Exception => {
+          unparseResult = Some(Left(new 
DaffodilUnhandledSAXException(e.getMessage, e)))
+        }
+        //$COVERAGE-ON$
+      }
+
+      // We are finished unparsing. At this point, it is possible that the
+      // ContentHandler thread is blocked trying to put() an event on the
+      // eventQueue, but the SAXInfosetInputter thread could have failed in a
+      // way where it won't ever take() an item from the queue, leaving the
+      // ContentHandler deadlocked. Now that unparseResult is defined, we can
+      // clear eventQueue which unblocks the ContentHandler. And since
+      // unparseResult is set, the ContentHandler should not attempt anymore

Review Comment:
   anymore -> any more



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to