mbeckerle commented on code in PR #1187:
URL: https://github.com/apache/daffodil/pull/1187#discussion_r1531003052


##########
daffodil-cli/src/test/resources/org/apache/daffodil/layers/xsd/buggyLayer.dfdl.xsd:
##########
@@ -28,17 +28,6 @@
 
   <include 
schemaLocation="/org/apache/daffodil/xsd/DFDLGeneralFormat.dfdl.xsd" />
 
-  <annotation>
-    <appinfo source="http://www.ogf.org/dfdl/";>
-
-      <dfdl:format ref="buggy:GeneralFormat" />
-
-      <dfdl:defineFormat name="buggyFormat">
-        <dfdl:format dfdlx:layerTransform="buggy" 
dfdlx:layerLengthKind="explicit" dfdlx:layerLengthUnits="bytes"
-                     dfdlx:layerEncoding="ascii" />
-      </dfdl:defineFormat>
-
-    </appinfo>
-  </annotation>
+  <!-- This layer has no parameters parameters nor return variables -->

Review Comment:
   Some layers could well be closed source. So if users can't see the 
java/scala code, then they're dependent on javadoc (which would be unusual for 
a DFDL schema author to be using) or...  the only thing that has the layer 
namespace in it is this file, so I think the convention should be that a layer 
*always* has a corresponding schema file, if only to document the layer's 
namespace, the name of the layer, and the fact that it has no variables. 
   
   It also serves to provide a suggested prefix to use for the namespace. 



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/runtime1/layers/ChecksumLayerBase.scala:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.daffodil.runtime1.layers
+
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.InputStream
+import java.io.OutputStream
+import java.nio.ByteBuffer
+
+import org.apache.daffodil.lib.exceptions.Assert
+import org.apache.daffodil.runtime1.layers.api.Layer
+
+import org.apache.commons.io.IOUtils
+
+/**
+ * A base class for ChecksumLayer - hides details from the API user who is 
deriving a
+ * concrete class from ChecksumLayer.
+ *
+ * Suitable only for checksums computed over small sections of data, not large 
data streams or whole files.
+ */
+abstract class ChecksumLayerBase(
+  localName: String,
+  targetNamespace: String,
+) extends Layer(localName, targetNamespace) {
+
+  private var checksum: Int = -1
+
+  private var length: Int = -1
+  private var isInitialized: Boolean = false
+
+  final def getLength: Int = length
+
+  /**
+   * Must be called to specify the length, or it is a schema definition error.
+   * @param len The fixed length over which the checksum is computed.
+   */
+  final protected def setLength(len: Int): Unit = {
+    this.length = len
+    if (len < 0) throw new Exception(s"The layer length is negative: $len.")
+    isInitialized = true
+  }
+
+  final protected def getChecksum: Int = checksum
+  final private[layers] def setChecksum(checksum: Int): Unit = { this.checksum 
= checksum }
+
+  def compute(
+    isUnparse: Boolean,
+    byteBuffer: ByteBuffer,
+  ): Int
+
+  final override def wrapLayerInput(jis: InputStream): InputStream = {
+    if (!isInitialized) runtimeSchemaDefinitionError("setLength method was 
never called.")

Review Comment:
   Changing this to throw of IllegalStateException which is a RuntimeException, 
so will be fatal. 



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/runtime1/layers/LayerDriver.scala:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.runtime1.layers
+
+import java.io.FilterInputStream
+import java.io.InputStream
+import java.io.OutputStream
+import java.lang.reflect.InvocationTargetException
+
+import org.apache.daffodil.io.DataInputStream.Mark
+import org.apache.daffodil.io.DataOutputStream
+import org.apache.daffodil.io.DirectOrBufferedDataOutputStream
+import org.apache.daffodil.io.InputSourceDataInputStream
+import org.apache.daffodil.lib.exceptions.Assert
+import org.apache.daffodil.lib.exceptions.UnsuppressableException
+import org.apache.daffodil.lib.schema.annotation.props.gen.BitOrder
+import org.apache.daffodil.lib.util.Maybe
+import org.apache.daffodil.lib.util.Maybe.Nope
+import org.apache.daffodil.lib.util.Maybe.One
+import org.apache.daffodil.lib.util.Misc
+import org.apache.daffodil.runtime1.dsom.RuntimeSchemaDefinitionError
+import org.apache.daffodil.runtime1.layers.api.Layer
+import org.apache.daffodil.runtime1.processors.ParseOrUnparseState
+import org.apache.daffodil.runtime1.processors.ProcessingError
+import org.apache.daffodil.runtime1.processors.unparsers.UState
+
+import passera.unsigned.ULong
+
+/**
+ * Mandatory factory to create and initialize the layer driver object.
+ */
+object LayerDriver {
+  def apply(state: ParseOrUnparseState, lrd: LayerRuntimeData): LayerDriver = {
+    val lvr = state.layerRuntimeCompiler.compile(lrd) // could fail if layer 
not SPI loaded
+    val lr = new LayerRuntime(state, lrd, lvr) // can't fail
+    val newLayer = lvr.constructInstance() // can't fail
+    newLayer.setLayerRuntime(lr) // can't fail
+    val instance = new LayerDriver(newLayer) // can't fail
+    lvr.callParamSetterWithParameterVars(newLayer) // could fail if user's 
method causes errors
+    instance
+  }
+}
+
+/**
+ * Driving mechanism that incorporates a layer at runtime into the I/O streams
+ * to transform the data stream and optionally receive parameter values from,
+ * and populate result values back into DFDL variables.
+ *
+ * A layer driver is created at runtime as part of a single parse/unparse call.
+ * Hence, they can be stateful without causing thread-safety issues.
+ *
+ * @param layer            The Layer instance to parameterize, drive, and get 
results back from.
+ */
+class LayerDriver private (val layer: Layer) {
+
+  def layerRuntime: LayerRuntime = layer.getLayerRuntime
+  private def layerVarsRuntime = layerRuntime.layerVarsRuntime
+
+  private def wrapJavaInputStream(s: InputSourceDataInputStream) =
+    new JavaIOInputStream(s, layer)
+
+  private def wrapJavaOutputStream(s: DataOutputStream) =
+    new JavaIOOutputStream(s, layer)
+
+  /**
+   *  Override these if we ever have transformers that don't have these
+   *  requirements.
+   */
+  val mandatoryLayerAlignmentInBits: Int = 8
+
+  final def addInputLayer(s: InputSourceDataInputStream): 
InputSourceDataInputStream = {
+    val jis = wrapJavaInputStream(s)
+    // This wrapLayerInput could fail via a throw. It's calling layer code.
+    val wrapped =
+      try {
+        layer.wrapLayerInput(jis)
+      } catch {
+        case t: Throwable => escalateThrowFromWrap(t)
+      }
+    // This assured close thing is just in case the user's layer stream 
doesn't propagate
+    // the close. That would be incorrect of it, but we want to tolerate that 
not happening.
+    val decodedInputStream = new AssuredCloseInputStream(wrapped, jis)
+    val newDIS = InputSourceDataInputStream(decodedInputStream)
+    // must initialize priorBitOrder
+    newDIS.cst.setPriorBitOrder(BitOrder.MostSignificantBitFirst)
+    newDIS.setDebugging(s.areDebugging)
+    newDIS
+  }
+
+  /**
+   * Parsing works as a tree traversal, so when the parser unwinds from
+   * parsing the layer we can invoke this to handle cleanups, and
+   * finalization issues like assigning the result variables
+   */
+  final def removeInputLayer(s: InputSourceDataInputStream): Unit =
+    try {
+      layerVarsRuntime.callGettersToPopulateResultVars(layer) // could throw.
+    } catch {
+      case ite: InvocationTargetException =>
+        layer.processingError(ite.getCause)

Review Comment:
   I can confirm this is covered. Test testBombGetterThrowEX gets this 
InvocationTargetException. 
   



##########
daffodil-cli/src/test/scala/org/apache/daffodil/cli/cliTest/TestCLIParsing.scala:
##########
@@ -938,8 +938,8 @@ class TestCLIParsing {
     )
     runCLI(args"parse -s $schema") { cli =>
       cli.sendLine("0", inputDone = true)
-      cli.expectErr("Unexpected exception in layer transformer 'buggy'")
-    }(ExitCode.LayerExecutionError)

Review Comment:
   An abort is not caught. 
   
   A RuntimeException converts to a LayerRuntimeException, but that then 
propagates to top level. I.e., isn't caught.
   
   Any unexpected exception becomes a processingError carrying a 
LayerUnexpectedException carrying the exception. It's a processing error 
because variables can be populated from expressions, and expressions values can 
come from the infoset, which could be full of gibberish since the parser could 
be speculating down a bad path. That's why anything about the variables causes 
a processing error. 
   
   Consider the gzip layer, and the parser speculating down a path which does 
not have gzip data. The gzip layer is going to fail somehow and throw an 
IOException mostlikely but it could throw NPE, or any of a variety of things. 
   
   A layer author could, if they choose, call runtimeSchemaDefinitonError from 
their layer code. Or they can throw Error (which is at Throwable, but not an 
Exception. Error will not be caught.)



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/runtime1/layers/LayerDriver.scala:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.runtime1.layers
+
+import java.io.FilterInputStream
+import java.io.InputStream
+import java.io.OutputStream
+import java.lang.reflect.InvocationTargetException
+
+import org.apache.daffodil.io.DataInputStream.Mark
+import org.apache.daffodil.io.DataOutputStream
+import org.apache.daffodil.io.DirectOrBufferedDataOutputStream
+import org.apache.daffodil.io.InputSourceDataInputStream
+import org.apache.daffodil.lib.exceptions.Assert
+import org.apache.daffodil.lib.exceptions.UnsuppressableException
+import org.apache.daffodil.lib.schema.annotation.props.gen.BitOrder
+import org.apache.daffodil.lib.util.Maybe
+import org.apache.daffodil.lib.util.Maybe.Nope
+import org.apache.daffodil.lib.util.Maybe.One
+import org.apache.daffodil.lib.util.Misc
+import org.apache.daffodil.runtime1.dsom.RuntimeSchemaDefinitionError
+import org.apache.daffodil.runtime1.layers.api.Layer
+import org.apache.daffodil.runtime1.processors.ParseOrUnparseState
+import org.apache.daffodil.runtime1.processors.ProcessingError
+import org.apache.daffodil.runtime1.processors.unparsers.UState
+
+import passera.unsigned.ULong
+
+/**
+ * Mandatory factory to create and initialize the layer driver object.
+ */
+object LayerDriver {
+  def apply(state: ParseOrUnparseState, lrd: LayerRuntimeData): LayerDriver = {
+    val lvr = state.layerRuntimeCompiler.compile(lrd) // could fail if layer 
not SPI loaded
+    val lr = new LayerRuntime(state, lrd, lvr) // can't fail
+    val newLayer = lvr.constructInstance() // can't fail
+    newLayer.setLayerRuntime(lr) // can't fail
+    val instance = new LayerDriver(newLayer) // can't fail
+    lvr.callParamSetterWithParameterVars(newLayer) // could fail if user's 
method causes errors
+    instance
+  }
+}
+
+/**
+ * Driving mechanism that incorporates a layer at runtime into the I/O streams
+ * to transform the data stream and optionally receive parameter values from,
+ * and populate result values back into DFDL variables.
+ *
+ * A layer driver is created at runtime as part of a single parse/unparse call.
+ * Hence, they can be stateful without causing thread-safety issues.
+ *
+ * @param layer            The Layer instance to parameterize, drive, and get 
results back from.
+ */
+class LayerDriver private (val layer: Layer) {
+
+  def layerRuntime: LayerRuntime = layer.getLayerRuntime
+  private def layerVarsRuntime = layerRuntime.layerVarsRuntime
+
+  private def wrapJavaInputStream(s: InputSourceDataInputStream) =
+    new JavaIOInputStream(s, layer)
+
+  private def wrapJavaOutputStream(s: DataOutputStream) =
+    new JavaIOOutputStream(s, layer)
+
+  /**
+   *  Override these if we ever have transformers that don't have these
+   *  requirements.
+   */
+  val mandatoryLayerAlignmentInBits: Int = 8
+
+  final def addInputLayer(s: InputSourceDataInputStream): 
InputSourceDataInputStream = {
+    val jis = wrapJavaInputStream(s)
+    // This wrapLayerInput could fail via a throw. It's calling layer code.
+    val wrapped =
+      try {
+        layer.wrapLayerInput(jis)
+      } catch {
+        case t: Throwable => escalateThrowFromWrap(t)
+      }
+    // This assured close thing is just in case the user's layer stream 
doesn't propagate
+    // the close. That would be incorrect of it, but we want to tolerate that 
not happening.
+    val decodedInputStream = new AssuredCloseInputStream(wrapped, jis)
+    val newDIS = InputSourceDataInputStream(decodedInputStream)
+    // must initialize priorBitOrder
+    newDIS.cst.setPriorBitOrder(BitOrder.MostSignificantBitFirst)
+    newDIS.setDebugging(s.areDebugging)
+    newDIS
+  }
+
+  /**
+   * Parsing works as a tree traversal, so when the parser unwinds from
+   * parsing the layer we can invoke this to handle cleanups, and
+   * finalization issues like assigning the result variables
+   */
+  final def removeInputLayer(s: InputSourceDataInputStream): Unit =
+    try {
+      layerVarsRuntime.callGettersToPopulateResultVars(layer) // could throw.
+    } catch {
+      case ite: InvocationTargetException =>
+        layer.processingError(ite.getCause)
+    } finally {
+      s.close()
+    }
+
+  /**
+   * Escalate processing errors and thrown exceptions to RSDE
+   * @param t a thrown thing: exception, PE, UE, RSDE, etc.
+   */
+  private def escalateThrowFromWrap(t: Throwable): Nothing = {
+    val lr = layer.getLayerRuntime
+    t match {
+      case rsde: RuntimeSchemaDefinitionError =>
+        throw rsde
+      case pe: ProcessingError if pe.maybeCause.isDefined =>
+        lr.runtimeSchemaDefinitionError(pe.maybeCause.get)
+      case pe: ProcessingError if pe.maybeFormatString.isDefined =>
+        lr.runtimeSchemaDefinitionError(pe.maybeFormatString.get)
+      case e: Exception =>
+        lr.runtimeSchemaDefinitionError(e)
+      case _ => throw t
+    }
+  }
+
+  final def addOutputLayer(s: DataOutputStream): 
DirectOrBufferedDataOutputStream = {
+    val jos = wrapJavaOutputStream(s)
+    val wrappedStream =
+      try {
+        layer.wrapLayerOutput(jos)
+      } catch {
+        case t: Throwable => escalateThrowFromWrap(t)
+      }
+    val encodedOutputStream = new 
ThrowProtectedAssuredCloseOutputStream(wrappedStream, jos)
+    val newDOS = DirectOrBufferedDataOutputStream(
+      encodedOutputStream,
+      null,
+      isLayer = true,
+      s.chunkSizeInBytes,
+      s.maxBufferSizeInBytes,
+      s.tempDirPath,
+    )
+    newDOS.setPriorBitOrder(BitOrder.MostSignificantBitFirst)
+    newDOS.setAbsStartingBitPos0b(ULong(0L))
+    newDOS.setDebugging(s.areDebugging)
+    newDOS
+  }
+
+  /**
+   * Unparsing is very asynchronous due to suspensions.
+   * So what is done here has to arrange for the layer stream to be closed,
+   * but in the future when everything that has been written to it
+   * but was suspended, has actually been written.
+   */
+  final def removeOutputLayer(s: DirectOrBufferedDataOutputStream, state: 
UState): Unit = {
+    // do nothing
+  }
+
+  private final class AssuredCloseInputStream(stream: InputStream, inner: 
InputStream)
+    extends FilterInputStream(stream) {
+    override def close(): Unit = {
+      stream.close()
+      inner.close()
+    }
+  }
+
+  /**
+   * Besides making sure the inner stream gets closed, this also
+   * provides handling of exceptions thrown by the layer-author's code
+   * (or things that that code calls).
+   *
+   * This is needed because unparsing can involve suspensions, so these
+   * calls can happen long after the call stack of unparsers is over; hence,
+   * any nested try/catches around the actual unparser calls may not be
+   * surrounding these actions.
+   *
+   * To get sensible behavior even if these throws happen late due to
+   * suspensions, we have to encapsulate here with our own throw/catches.
+   *
+   * @param stream the stream we're encapsulating
+   * @param inner the stream we're assuring will be closed even if a close on 
the primary stream doesn't propagate the stream
+   */
+  private final class ThrowProtectedAssuredCloseOutputStream(
+    stream: OutputStream,
+    inner: OutputStream,
+  ) extends OutputStream {
+
+    /**
+     * We hope this isn't get used because a try-catch around every individual 
write of a byte
+     * is inefficient certainly.
+     * @param b
+     */
+    override def write(b: Int): Unit = {
+      try {
+        stream.write(b)
+      } catch {
+        case e: Exception => handleOutputStreamException(e)

Review Comment:
   This *does* get called. I can put a breakpoint here and it is hit by a test. 
   
   Not sure why codecov doesn't find this. 
   



##########
daffodil-runtime1/src/main/scala/org/apache/daffodil/runtime1/layers/LayerDriver.scala:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.runtime1.layers
+
+import java.io.FilterInputStream
+import java.io.InputStream
+import java.io.OutputStream
+import java.lang.reflect.InvocationTargetException
+
+import org.apache.daffodil.io.DataInputStream.Mark
+import org.apache.daffodil.io.DataOutputStream
+import org.apache.daffodil.io.DirectOrBufferedDataOutputStream
+import org.apache.daffodil.io.InputSourceDataInputStream
+import org.apache.daffodil.lib.exceptions.Assert
+import org.apache.daffodil.lib.exceptions.UnsuppressableException
+import org.apache.daffodil.lib.schema.annotation.props.gen.BitOrder
+import org.apache.daffodil.lib.util.Maybe
+import org.apache.daffodil.lib.util.Maybe.Nope
+import org.apache.daffodil.lib.util.Maybe.One
+import org.apache.daffodil.lib.util.Misc
+import org.apache.daffodil.runtime1.dsom.RuntimeSchemaDefinitionError
+import org.apache.daffodil.runtime1.layers.api.Layer
+import org.apache.daffodil.runtime1.processors.ParseOrUnparseState
+import org.apache.daffodil.runtime1.processors.ProcessingError
+import org.apache.daffodil.runtime1.processors.unparsers.UState
+
+import passera.unsigned.ULong
+
+/**
+ * Mandatory factory to create and initialize the layer driver object.
+ */
+object LayerDriver {
+  def apply(state: ParseOrUnparseState, lrd: LayerRuntimeData): LayerDriver = {
+    val lvr = state.layerRuntimeCompiler.compile(lrd) // could fail if layer 
not SPI loaded
+    val lr = new LayerRuntime(state, lrd, lvr) // can't fail
+    val newLayer = lvr.constructInstance() // can't fail
+    newLayer.setLayerRuntime(lr) // can't fail
+    val instance = new LayerDriver(newLayer) // can't fail
+    lvr.callParamSetterWithParameterVars(newLayer) // could fail if user's 
method causes errors
+    instance
+  }
+}
+
+/**
+ * Driving mechanism that incorporates a layer at runtime into the I/O streams
+ * to transform the data stream and optionally receive parameter values from,
+ * and populate result values back into DFDL variables.
+ *
+ * A layer driver is created at runtime as part of a single parse/unparse call.
+ * Hence, they can be stateful without causing thread-safety issues.
+ *
+ * @param layer            The Layer instance to parameterize, drive, and get 
results back from.
+ */
+class LayerDriver private (val layer: Layer) {
+
+  def layerRuntime: LayerRuntime = layer.getLayerRuntime
+  private def layerVarsRuntime = layerRuntime.layerVarsRuntime
+
+  private def wrapJavaInputStream(s: InputSourceDataInputStream) =
+    new JavaIOInputStream(s, layer)
+
+  private def wrapJavaOutputStream(s: DataOutputStream) =
+    new JavaIOOutputStream(s, layer)
+
+  /**
+   *  Override these if we ever have transformers that don't have these
+   *  requirements.
+   */
+  val mandatoryLayerAlignmentInBits: Int = 8
+
+  final def addInputLayer(s: InputSourceDataInputStream): 
InputSourceDataInputStream = {
+    val jis = wrapJavaInputStream(s)
+    // This wrapLayerInput could fail via a throw. It's calling layer code.
+    val wrapped =
+      try {
+        layer.wrapLayerInput(jis)
+      } catch {
+        case t: Throwable => escalateThrowFromWrap(t)
+      }
+    // This assured close thing is just in case the user's layer stream 
doesn't propagate
+    // the close. That would be incorrect of it, but we want to tolerate that 
not happening.
+    val decodedInputStream = new AssuredCloseInputStream(wrapped, jis)
+    val newDIS = InputSourceDataInputStream(decodedInputStream)
+    // must initialize priorBitOrder
+    newDIS.cst.setPriorBitOrder(BitOrder.MostSignificantBitFirst)
+    newDIS.setDebugging(s.areDebugging)
+    newDIS
+  }
+
+  /**
+   * Parsing works as a tree traversal, so when the parser unwinds from
+   * parsing the layer we can invoke this to handle cleanups, and
+   * finalization issues like assigning the result variables
+   */
+  final def removeInputLayer(s: InputSourceDataInputStream): Unit =
+    try {
+      layerVarsRuntime.callGettersToPopulateResultVars(layer) // could throw.
+    } catch {
+      case ite: InvocationTargetException =>
+        layer.processingError(ite.getCause)
+    } finally {
+      s.close()
+    }
+
+  /**
+   * Escalate processing errors and thrown exceptions to RSDE
+   * @param t a thrown thing: exception, PE, UE, RSDE, etc.
+   */
+  private def escalateThrowFromWrap(t: Throwable): Nothing = {
+    val lr = layer.getLayerRuntime
+    t match {
+      case rsde: RuntimeSchemaDefinitionError =>
+        throw rsde
+      case pe: ProcessingError if pe.maybeCause.isDefined =>
+        lr.runtimeSchemaDefinitionError(pe.maybeCause.get)
+      case pe: ProcessingError if pe.maybeFormatString.isDefined =>
+        lr.runtimeSchemaDefinitionError(pe.maybeFormatString.get)
+      case e: Exception =>
+        lr.runtimeSchemaDefinitionError(e)
+      case _ => throw t
+    }
+  }
+
+  final def addOutputLayer(s: DataOutputStream): 
DirectOrBufferedDataOutputStream = {
+    val jos = wrapJavaOutputStream(s)
+    val wrappedStream =
+      try {
+        layer.wrapLayerOutput(jos)
+      } catch {
+        case t: Throwable => escalateThrowFromWrap(t)
+      }
+    val encodedOutputStream = new 
ThrowProtectedAssuredCloseOutputStream(wrappedStream, jos)
+    val newDOS = DirectOrBufferedDataOutputStream(
+      encodedOutputStream,
+      null,
+      isLayer = true,
+      s.chunkSizeInBytes,
+      s.maxBufferSizeInBytes,
+      s.tempDirPath,
+    )
+    newDOS.setPriorBitOrder(BitOrder.MostSignificantBitFirst)
+    newDOS.setAbsStartingBitPos0b(ULong(0L))
+    newDOS.setDebugging(s.areDebugging)
+    newDOS
+  }
+
+  /**
+   * Unparsing is very asynchronous due to suspensions.
+   * So what is done here has to arrange for the layer stream to be closed,
+   * but in the future when everything that has been written to it
+   * but was suspended, has actually been written.
+   */
+  final def removeOutputLayer(s: DirectOrBufferedDataOutputStream, state: 
UState): Unit = {
+    // do nothing
+  }
+
+  private final class AssuredCloseInputStream(stream: InputStream, inner: 
InputStream)
+    extends FilterInputStream(stream) {
+    override def close(): Unit = {
+      stream.close()
+      inner.close()
+    }
+  }
+
+  /**
+   * Besides making sure the inner stream gets closed, this also
+   * provides handling of exceptions thrown by the layer-author's code
+   * (or things that that code calls).
+   *
+   * This is needed because unparsing can involve suspensions, so these
+   * calls can happen long after the call stack of unparsers is over; hence,
+   * any nested try/catches around the actual unparser calls may not be
+   * surrounding these actions.
+   *
+   * To get sensible behavior even if these throws happen late due to
+   * suspensions, we have to encapsulate here with our own throw/catches.
+   *
+   * @param stream the stream we're encapsulating
+   * @param inner the stream we're assuring will be closed even if a close on 
the primary stream doesn't propagate the stream
+   */
+  private final class ThrowProtectedAssuredCloseOutputStream(
+    stream: OutputStream,
+    inner: OutputStream,
+  ) extends OutputStream {
+
+    /**
+     * We hope this isn't get used because a try-catch around every individual 
write of a byte
+     * is inefficient certainly.
+     * @param b
+     */
+    override def write(b: Int): Unit = {
+      try {
+        stream.write(b)
+      } catch {
+        case e: Exception => handleOutputStreamException(e)
+      }
+    }
+
+    /**
+     * We hope this is the way most writes are done and with fairly large 
buffers to amortize the
+     * cost of the try/catch logic over many bytes of output.
+     * @param b
+     * @param off
+     * @param len
+     */
+    override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+      try {
+        stream.write(b, off, len)
+      } catch {
+        case e: Exception => handleOutputStreamException(e)
+      }
+    }
+
+    override def close(): Unit = {
+      try {
+        stream.close()
+        inner.close()
+      } catch {
+        case e: Exception =>
+          handleOutputStreamException(e)
+      }
+    }
+
+    private def handleOutputStreamException(exception: Exception) = {
+      exception match {
+        case u: UnsuppressableException =>
+          throw u
+        case rsde: RuntimeSchemaDefinitionError =>
+          throw rsde
+        case pe: ProcessingError =>
+          throw pe
+        case re: RuntimeException =>
+          throw re
+        case e: Exception =>
+          layer.getLayerRuntime.processingError(new 
LayerUnexpectedException(e))
+      }
+    }
+
+  }
+}
+
+/**
+ * Turns Daffodil's bits-capable InputSourceDataInputStream objects into 
ordinary
+ * java InputStream API for byte-oriented reads.
+ *
+ * @param s the Daffodil InputSourceDataInputStream
+ * @return a java.io.InputStream which when read from, delegates to the 
InputSourceDataInputStream
+ */
+class JavaIOInputStream(s: InputSourceDataInputStream, layer: Layer) extends 
InputStream {
+  private def finfo = layer.getLayerRuntime.finfo
+  private lazy val id = Misc.getNameFromClass(this)
+
+  override def read(): Int = {
+    if (!s.isDefinedForLength(8)) -1
+    else {
+      val ul = s.getUnsignedLong(8, finfo)
+      val byte: Int = ul.toInt & 0xff
+      byte
+    }
+  }
+
+  override def available(): Int = 0
+
+  override def close(): Unit = {
+    // do nothing. Important. We do not want the close to cascade beyond here.
+  }
+
+  /**
+   * @param readlimit Ignored. The limits of daffodil's input system are 
specified
+   *                  elsewhere. See BucketingInputSource in the daffodil-io 
module.
+   */
+  override def mark(readlimit: Int): Unit = {
+    maybeSavedMark = One(s.mark(id))
+  }
+
+  private var maybeSavedMark: Maybe[Mark] = Nope
+
+  override def reset(): Unit = {
+    Assert.usage(maybeSavedMark.isDefined)
+    s.reset(maybeSavedMark.get)
+    maybeSavedMark = Nope
+  }
+
+  override def markSupported() = true
+}
+
+/**
+ * Turns a Daffodil bits-capable DataOutputStream into an ordinary 
java.io.OutputStream.
+ *
+ * @param dos   The DataOutputStream to write the data to.
+ */
+class JavaIOOutputStream(dos: DataOutputStream, layer: Layer) extends 
OutputStream {
+
+  private def lr = layer.getLayerRuntime
+  private def lvr = lr.layerVarsRuntime
+  private def finfo = lr.finfo
+
+  private var closed = false
+
+  private var nBytes = 0
+
+  override def write(b: Int): Unit = {
+    val wasWritten = dos.putLong(b, 8, finfo)
+    if (wasWritten) nBytes += 1
+  }
+
+  override def close(): Unit = {
+    if (!closed) {
+      dos.setFinished(finfo)
+      //
+      // This is where we can reliably wrap-up the layer processing.
+      //
+      try {
+        lvr.callGettersToPopulateResultVars(layer)
+      } catch {
+        case ite: InvocationTargetException =>
+          layer.processingError(ite.getCause)

Review Comment:
   Test added. There wasn't actually a test exercising this specific way of 
generating an error. 



##########
daffodil-core/src/main/scala/org/apache/daffodil/core/layers/LayerSchemaCompiler.scala:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.daffodil.core.layers
+
+import org.apache.daffodil.core.dsom.SequenceGroupTermBase
+import org.apache.daffodil.lib.exceptions.Assert
+
+object LayerSchemaCompiler {
+
+  /**
+   * Compiles the layer.
+   *
+   * This is mostly checking for errant use of DFDL properties that
+   * can't be used on layers.
+   *
+   * The real compilation - creating runtime data structures based
+   * on constructor signatures and DFDL variables that are in the
+   * layer's namespace, that is called here, but it is part of
+   * the daffodil runtime because that step has to be carried out
+   * *also* at runtime to verify that the dynamically loaded layer
+   * classes are compatible with the variables and their definitions.
+   *
+   * This compilation produces no output object, as the LayerRuntimeData
+   * already contains everything needed. 
+   */
+  def compile(sq: SequenceGroupTermBase): Unit = {
+    val lc = new LayerSchemaCompiler(sq)
+    lc.compile
+  }
+}
+
+private class LayerSchemaCompiler private (sq: SequenceGroupTermBase) {
+
+  Assert.usage(sq.isLayered)
+
+  private def lrd = sq.optionLayerRuntimeData.get
+
+  private val layeredSequenceAllowedProps = Seq("layer") // no other 
properties are allowed.
+
+  private def checkOnlyAllowedProperties(): Unit = {
+    // need to check that only layering properties are specified
+    val localProps = sq.formatAnnotation.justThisOneProperties
+    val localKeys = localProps.keySet
+    val disallowedKeys = localKeys.filterNot(k => 
layeredSequenceAllowedProps.contains(k))
+    if (disallowedKeys.nonEmpty)
+      sq.SDE(
+        "Sequence has dfdlx:layer specified, so cannot have non-layering 
properties: %s",
+        disallowedKeys.mkString(", "),
+      )
+  }
+
+  lazy val compile: Unit = {
+    checkOnlyAllowedProperties()
+    //
+    // This is called only for side-effect of checking that a LayerVarsRuntime
+    // can be constructed. That object is not serialized and saved as part of 
the
+    // processor. Rather, it will be recreated at runtime with hopefully the
+    // same results, which verifies that the layer class loaded at runtime from
+    // the classpath at least has the same DFDL variables signature as the one 
on the classpath
+    // when the schema was compiled.
+    //
+    sq.schemaSet.layerRuntimeCompiler.compile(lrd)

Review Comment:
   We have to be careful not to save the result of this runtime compilation 
across schemas. That's why this object is on the schema set, and the runtime 
one is on the pstate/ustate object. 
   
   I was running into lots of bugs due to caching these things across the 
schema compilations of different tests. Each schema compilation can give a 
particular variable a different vmapIndex. So we cannot share these objects 
across different schemas. 
   
   Perhaps these could be stored and cached on the SchemaSetRuntimeData object, 
After we deserialize the runtime data we could trigger the finding and 
computing of these LayerVarsRuntime objects and they could be accessed by way 
of the SchemaSetRuntimeData. That would be the best time to find these errors. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to