This is an automated email from the ASF dual-hosted git repository.

slawrence pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/daffodil.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e7b8e5cb Ensure layers work well with suspensions and bitOrder
0e7b8e5cb is described below

commit 0e7b8e5cb8d3e7aa0d4a746548861e0a117a2af8
Author: Steve Lawrence <[email protected]>
AuthorDate: Fri Sep 16 07:31:36 2022 -0400

    Ensure layers work well with suspensions and bitOrder
    
    Prior to these changes, the combination of layers and suspensions could
    cause issues with bit order checking, which could lead to suspensions
    trying to create new suspensions, which isn't allowed.
    
    - When creating layers, rather than writing directly to the current DOS
      stream, we instead create a new buffered DOS just for this layer. This
      ensures that the layer is written starting with a zero offset and no
      fragments or bitOreder state to cause confusion.
    - The original DOS is marked as finished, allowing the new layer DOS to
      be flushed to it when marked as finished. If the original DOS was
      direct, it would make this new layer DOS direct and so buffered data
      should be minimized.
    - We must create a clone of the UState that is passed into the layer
      creation functions as well as used to finish the original DOS. This
      ensures the new layer DOS and original DOS use the correct state
      snapshot if suspensions cause their evaluation and flushing of data to
      be delayed.
    
    DAFFODIL-2726
---
 .../io/DirectOrBufferedDataOutputStream.scala      | 16 ++++++
 .../unparsers/LayeredSequenceUnparser.scala        | 57 ++++++++++++++++++----
 .../apache/daffodil/layers/LayerTransformer.scala  |  4 +-
 3 files changed, 65 insertions(+), 12 deletions(-)

diff --git 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
index 0bc073f2a..ff77477a9 100644
--- 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
+++ 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/DirectOrBufferedDataOutputStream.scala
@@ -303,6 +303,22 @@ class DirectOrBufferedDataOutputStream private[io] (
     if (_following.isEmpty) this
     else _following.get.lastInChain
 
+  /**
+   * Helpful when debugging suspension or data output stream issues. Outputs
+   * the list of all DataOutputStreams connected to this one, with a marker
+   * pointing to this stream
+   */
+  // $COVERAGE-OFF$
+  def dumpChain(): Unit = {
+    var curDOS = lastInChain
+    while (curDOS != null) {
+      val marker = if (curDOS eq this) " > " else "   "
+      System.err.println(marker + curDOS.toString)
+      curDOS = curDOS.splitFrom
+    }
+  }
+  // $COVERAGE-ON$
+
   /**
    * Provides a new buffered data output stream. Note that this must
    * be completely configured (byteOrder, encoding, bitOrder, etc.)
diff --git 
a/daffodil-runtime1-unparser/src/main/scala/org/apache/daffodil/processors/unparsers/LayeredSequenceUnparser.scala
 
b/daffodil-runtime1-unparser/src/main/scala/org/apache/daffodil/processors/unparsers/LayeredSequenceUnparser.scala
index 97f9829d5..b279e9dbd 100644
--- 
a/daffodil-runtime1-unparser/src/main/scala/org/apache/daffodil/processors/unparsers/LayeredSequenceUnparser.scala
+++ 
b/daffodil-runtime1-unparser/src/main/scala/org/apache/daffodil/processors/unparsers/LayeredSequenceUnparser.scala
@@ -35,29 +35,66 @@ class LayeredSequenceUnparser(ctxt: SequenceRuntimeData,
   override def unparse(state: UState): Unit = {
     val layerTransformer = 
layerTransformerFactory.newInstance(layerRuntimeInfo)
 
-    val originalDOS = state.dataOutputStream // layer will output to the 
original, then finish it upon closing.
+    val originalDOS = state.dataOutputStream
+
+    // create a new buffered DOS that this layer will flush to when the layer
+    // completes unparsing. This ensures that any fragment bits or bitOrder
+    // state in the original DOS does not affect how the layer flushes bytes to
+    // the underlying DOS. Only when this new DOS is delivered to the
+    // originalDOS will the bitOrder checks be done.
+    val layerUnderlyingDOS = originalDOS.addBuffered()
+
+    // clone the UState to be used when the layer flushes its buffered content
+    // to layerUnderlyingDOS. This layer isn't a suspension, but this gives us
+    // an immutable clone that the layer can safely use. This is important
+    // since the flushing of the layer might be delayed due to suspensions. By
+    // getting an immutable state, we ensure that the flushing of the layer
+    // occurs with the state at this point.
+    val formatInfoPre = 
state.asInstanceOf[UStateMain].cloneForSuspension(layerUnderlyingDOS)
+
+    // mark the original DOS as finished--no more data will be unparsed to it.
+    // If known, this will carry bit position forward to the layerUnerlyingDOS,
+    // or could even make that DOS direct. Note that it is important to use the
+    // cloned state from above. This is because the setFinished function stores
+    // the formatInfo (as finishedFormatInfo) to be used when delivering
+    // content to its previous direct DOS. The deliver content call could be
+    // delayed due to suspensions/unfinished DOSs, so it must store an
+    // immutable state that won't change as unparsers are evaluated.
+    originalDOS.setFinished(formatInfoPre)
+
+    // create a new DOS where unparsers following this layer will unparse
+    val layerFollowingDOS = layerUnderlyingDOS.addBuffered()
 
-    val newDOS = originalDOS.addBuffered() // newDOS is where unparsers after 
this one returns will unparse into.
-    //
     // New layerDOS is where the layer will unparse into. Ultimately anything 
written
-    // to layerDOS ends up, post transform, in originalDOS.
-    //
-    val layerDOS = layerTransformer.addLayer(originalDOS, state)
+    // to layerDOS ends up, post transform, in layerUnderlyingDOS
+    val layerDOS = layerTransformer.addLayer(layerUnderlyingDOS, state, 
formatInfoPre)
 
     // unparse the layer body into layerDOS
     state.dataOutputStream = layerDOS
     super.unparse(state)
     layerTransformer.endLayerForUnparse(state)
 
-    // now we're done with the layer, so finalize the layer
-    layerDOS.lastInChain.setFinished(state)
+    // now we're done unparsing the layer, so finalize the last DOS in the
+    // chain. Note that there might be suspensions so some parts of the
+    // layerDOS chain may not be finished. When those suspensions are all
+    // finished, the layerDOS content will be written to the
+    // layerUnderlyingDOS, which will subsequently be finished. Like above, it
+    // is important to pass an immutable UState into the setFinished function
+    // because that state is stored in the DOS (as finishedFormatInfo) and its
+    // use may be delayed to after the actual UState has been changed. The
+    // UState has almost certainly changed since the last cloneForSuspension
+    // call, so we need a new clone to finish this DOS.
+    val layerDOSLast = layerDOS.lastInChain
+    val formatInfoPost = 
state.asInstanceOf[UStateMain].cloneForSuspension(layerDOSLast)
+    layerDOSLast.setFinished(formatInfoPost)
 
     // clean up resources - note however, that due to suspensions, the whole
     // layer stack is potentially still needed, so not clear what can be
     // cleaned up at this point.
-    //
     layerTransformer.removeLayer(layerDOS, state)
-    state.dataOutputStream = newDOS
+
+    // reset the state so subsequent unparsers write to the following DOS
+    state.dataOutputStream = layerFollowingDOS
   }
 
 }
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
index e4bc1a5c6..376da85a8 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
@@ -101,8 +101,8 @@ abstract class LayerTransformer(layerName: String, 
layerRuntimeInfo: LayerRuntim
     // nothing for now
   }
 
-  final def addLayer(s: DataOutputStream, state: UState): 
DirectOrBufferedDataOutputStream = {
-    val jos = wrapJavaOutputStream(s, state)
+  final def addLayer(s: DataOutputStream, state: UState, finfo: FormatInfo): 
DirectOrBufferedDataOutputStream = {
+    val jos = wrapJavaOutputStream(s, finfo)
     val limitedJOS = wrapLimitingStream(state, jos)
     val encodedOutputStream = wrapLayerEncoder(limitedJOS)
     val newDOS = DirectOrBufferedDataOutputStream(

Reply via email to