This is an automated email from the ASF dual-hosted git repository.

slawrence pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-daffodil.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a390f6  Allow garbage collection of UStateForSuspension's and 
DirectOrBufferedDataOutputStream's
0a390f6 is described below

commit 0a390f67a28f1f65b84b52583917beda34de1a88
Author: Steve Lawrence <[email protected]>
AuthorDate: Tue Feb 9 07:51:00 2021 -0500

    Allow garbage collection of UStateForSuspension's and 
DirectOrBufferedDataOutputStream's
    
    The UStateForSuspensions and DirectOrBufferedDataOutputStream classes
    have members that effectively create linked lists. In each of these
    cases, we unknowingly hold onto the head of these linked lists, which
    prevents garbage collection of all UStateForSuspensions and
    DirectOrBufferedDataOutputStream instances. This means we essentially
    hold on to all unparse state, which quickly leads to out of memory
    errors for large format that require many suspensions.
    
    - The first issue is the "prior" member of UStateMain/UStateForSuspensions.
      This member is set so that each UState points to the previous
      UStateForSuspension that has been created, essentially creating a
      linked list of all UStateForSuspensions, with the head in UStateMain.
      This prevents all UStateForSuspensions from being garbage collected,
      as well all the state they point to (it's a lot).
    
      Fortunately, this member isn't used anywhere anymore. Presumably it
      was once used for debugging suspensions, but is no longer used or
      needed. So we can simply remove this member so these
      UStateForSuspensions can be garbage collected once the Suspensions
      that use them are finished and garbage collected.
    
    - The second issue is related to the "following" member in
      DirectOrBufferedDataOutputStream's. This member is used too keep track
      of the buffered DOS that follows this DOS (and iteratively, all
      following DOS's). As the Direct DOS is finished, we make the following
      DOS direct update pointers correctly. However, we create the very
      first direct DOS in the "unparse" function, which means it lives on
      the stack and cannot be garbage collected until unparse finished. And
      because this DOS iteratively points to all following DOS's via the
      "following" member, it means we can never free any DOS's (and all the
      buffered data associated with those DOS's) until the end of unparse.
    
      The solution in this case is to not create the initial direct DOS in
      the unparse function on the stack, but instead to create it as part of
      the UState creation when we initialize the "dataOutputStream" var.
      This way there is no pointer to the initial DOS except for those held
      in UState or Suspensions. As the UState mutates or Suspensions
      resolve, we will complete lose a reference to earlier DOS's and they
      can be garbage collected.
    
    Fixing these two issues allows unparsing very large infosets that
    require buffering, without running into out of memory errors.
    
    DAFFODIL-2468
---
 .../apache/daffodil/processors/DataProcessor.scala | 13 ++------
 .../daffodil/processors/unparsers/UState.scala     | 39 ++++++++++++----------
 2 files changed, 23 insertions(+), 29 deletions(-)

diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataProcessor.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataProcessor.scala
index 3f8c9f9..50c0692 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataProcessor.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataProcessor.scala
@@ -56,7 +56,6 @@ import org.apache.daffodil.infoset.InfosetOutputter
 import org.apache.daffodil.infoset.TeeInfosetOutputter
 import org.apache.daffodil.infoset.XMLTextInfosetOutputter
 import org.apache.daffodil.io.BitOrderChangeException
-import org.apache.daffodil.io.DirectOrBufferedDataOutputStream
 import org.apache.daffodil.io.FileIOException
 import org.apache.daffodil.io.InputSourceDataInputStream
 import org.apache.daffodil.oolag.ErrorAlreadyHandled
@@ -553,18 +552,10 @@ class DataProcessor private (
   }
 
   def unparse(inputter: InfosetInputter, outStream: java.io.OutputStream) = {
-    val out = DirectOrBufferedDataOutputStream(
-      outStream,
-      null, // null means no other stream created this one.
-      isLayer = false,
-      tunables.outputStreamChunkSizeInBytes,
-      tunables.maxByteArrayOutputStreamBufferSizeInBytes,
-      tunables.tempFilePath)
-
     inputter.initialize(ssrd.elementRuntimeData, getTunables())
     val unparserState =
       UState.createInitialUState(
-        out,
+        outStream,
         this,
         inputter,
         areDebugging)
@@ -575,7 +566,7 @@ class DataProcessor private (
         unparserState.notifyDebugging(true)
       }
       unparserState.dataProc.get.init(unparserState, ssrd.unparser)
-      out.setPriorBitOrder(ssrd.elementRuntimeData.defaultBitOrder)
+      
unparserState.dataOutputStream.setPriorBitOrder(ssrd.elementRuntimeData.defaultBitOrder)
       doUnparse(unparserState)
       unparserState.evalSuspensions(isFinal = true)
       unparserState.unparseResult
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
index 617f484..9604803 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
@@ -70,7 +70,6 @@ import org.apache.daffodil.util.Maybe.One
 object ENoWarn { EqualitySuppressUnusedImportWarning() }
 
 abstract class UState(
-  dos: DirectOrBufferedDataOutputStream,
   vbox: VariableBox,
   diagnosticsArg: List[Diagnostic],
   dataProcArg: Maybe[DataProcessor],
@@ -102,9 +101,8 @@ abstract class UState(
     "UState(" + elt + " DOS=" + dataOutputStream.toString() + ")"
   }
 
-  var dataOutputStream: DirectOrBufferedDataOutputStream = dos
+  var dataOutputStream: DirectOrBufferedDataOutputStream
 
-  def prior: UStateForSuspension
   def currentInfosetNode: DINode
   def currentInfosetNodeMaybe: Maybe[DINode]
   def escapeSchemeEVCache: MStackOfMaybe[EscapeSchemeUnparserHelper]
@@ -366,16 +364,15 @@ abstract class UState(
  */
 final class UStateForSuspension(
   val mainUState: UStateMain,
-  dos: DirectOrBufferedDataOutputStream,
+  override var dataOutputStream: DirectOrBufferedDataOutputStream,
   vbox: VariableBox,
   override val currentInfosetNode: DINode,
   occursIndex: Long,
   escapeSchemeEVCacheMaybe: Maybe[MStackOfMaybe[EscapeSchemeUnparserHelper]],
   delimiterStackMaybe: Maybe[MStackOf[DelimiterStackUnparseNode]],
-  override val prior: UStateForSuspension,
   tunable: DaffodilTunables,
   areDebugging: Boolean)
-  extends UState(dos, vbox, mainUState.diagnostics, mainUState.dataProc, 
tunable, areDebugging) {
+  extends UState(vbox, mainUState.diagnostics, mainUState.dataProc, tunable, 
areDebugging) {
 
   dState.setMode(UnparserBlocking)
   dState.setCurrentNode(thisElement.asInstanceOf[DINode])
@@ -445,29 +442,37 @@ final class UStateForSuspension(
 
 final class UStateMain private (
   private val inputter: InfosetInputter,
+  outStream: java.io.OutputStream,
   vbox: VariableBox,
   diagnosticsArg: List[Diagnostic],
   dataProcArg: DataProcessor,
-  dos: DirectOrBufferedDataOutputStream,
   tunable: DaffodilTunables,
   areDebugging: Boolean)
-  extends UState(dos, vbox, diagnosticsArg, One(dataProcArg), tunable, 
areDebugging) {
+  extends UState(vbox, diagnosticsArg, One(dataProcArg), tunable, 
areDebugging) {
 
   dState.setMode(UnparserBlocking)
 
   def this(
     inputter: InfosetInputter,
+    outputStream: java.io.OutputStream,
     vmap: VariableMap,
     diagnosticsArg: List[Diagnostic],
     dataProcArg: DataProcessor,
-    dataOutputStream: DirectOrBufferedDataOutputStream,
     tunable: DaffodilTunables,
     areDebugging: Boolean) =
-    this(inputter, new VariableBox(vmap), diagnosticsArg, dataProcArg,
-      dataOutputStream, tunable, areDebugging)
-
-  private var _prior: UStateForSuspension = null
-  override def prior = _prior
+    this(inputter, outputStream, new VariableBox(vmap), diagnosticsArg, 
dataProcArg,
+      tunable, areDebugging)
+
+  override var dataOutputStream: DirectOrBufferedDataOutputStream = {
+    val out = DirectOrBufferedDataOutputStream(
+      outStream,
+      null, // null means no other stream created this one.
+      isLayer = false,
+      tunable.outputStreamChunkSizeInBytes,
+      tunable.maxByteArrayOutputStreamBufferSizeInBytes,
+      tunable.tempFilePath)
+    out
+  }
 
   def cloneForSuspension(suspendedDOS: DirectOrBufferedDataOutputStream): 
UState = {
     val es =
@@ -502,13 +507,11 @@ final class UStateMain private (
       arrayIndexStack.top, // only need the top of the stack, not the whole 
thing
       es,
       ds,
-      prior,
       tunable,
       areDebugging)
 
     clone.setProcessor(processor)
 
-    this._prior = clone
     clone
   }
 
@@ -654,7 +657,7 @@ final class UStateMain private (
 object UState {
 
   def createInitialUState(
-    out: DirectOrBufferedDataOutputStream,
+    outStream: java.io.OutputStream,
     dataProc: DFDL.DataProcessor,
     inputter: InfosetInputter,
     areDebugging: Boolean): UStateMain = {
@@ -669,10 +672,10 @@ object UState {
     val diagnostics = Nil
     val newState = new UStateMain(
         inputter,
+        outStream,
         variables,
         diagnostics,
         dataProc.asInstanceOf[DataProcessor],
-        out,
         dataProc.getTunables(),
         areDebugging)
     newState

Reply via email to