mbeckerle commented on a change in pull request #539:
URL: https://github.com/apache/daffodil/pull/539#discussion_r620566258
##########
File path:
daffodil-io/src/main/scala/org/apache/daffodil/io/InputSourceDataInputStream.scala
##########
@@ -117,6 +118,57 @@ final class InputSourceDataInputStream private (val
inputSource: InputSource)
@inline
override final def bitLimit0b: MaybeULong = cst.bitLimit0b
+ /**
+ * Tells us if the underlying input source has detected end-of-data
+ * (the read(...) call returned -1.
+ *
+ * But this does NOT tell us we are positioned at the end, only whether
+ * in the course of reading, we encountered the end of data. If we
+ * backtracked we could have seen the end of data, but backed up in
+ * the data to an earlier position.
+ */
+ def hasReachedEndOfData: Boolean = inputSource.hasReachedEndOfData
+
+ /**
+ * Determine if we're positioned at the end of data without
+ * doing any additional blocking operation such as reading more
+ * data to test if there is any.
+ *
+ * This depends on the underlying inputSource keeping track of
+ * whether it has previously hit the end of data or not.
+ *
+ * @return
+ */
+ final def isAtEnd(): Boolean = {
+ if (bitLimit0b.isDefined) {
+ bitPos0b == bitLimit0b.get
+ } else {
+ // There is no bit limit, so whether we're at the end
+ // requires that we've hit then end-of-data on a read
+ // operation, AND that we've not backtracked since then,
+ // meaning we're positioned at the end of the last byte
+ // read.
+ hasReachedEndOfData && {
Review comment:
Correct. isAtEnd used to do another blocking read to force it to get the
EOD at the end, but that behavior of course causes it to wait for the sender
side of the network stream to either send some data, or close the stream.
Should I put back in the old behavior now that Daffodil itself doesn't use
it anywhere?
##########
File path:
daffodil-io/src/main/scala/org/apache/daffodil/io/InputSourceDataInputStream.scala
##########
@@ -117,6 +118,57 @@ final class InputSourceDataInputStream private (val
inputSource: InputSource)
@inline
override final def bitLimit0b: MaybeULong = cst.bitLimit0b
+ /**
+ * Tells us if the underlying input source has detected end-of-data
+ * (the read(...) call returned -1.
+ *
+ * But this does NOT tell us we are positioned at the end, only whether
+ * in the course of reading, we encountered the end of data. If we
+ * backtracked we could have seen the end of data, but backed up in
+ * the data to an earlier position.
+ */
+ def hasReachedEndOfData: Boolean = inputSource.hasReachedEndOfData
+
+ /**
+ * Determine if we're positioned at the end of data without
+ * doing any additional blocking operation such as reading more
+ * data to test if there is any.
+ *
+ * This depends on the underlying inputSource keeping track of
+ * whether it has previously hit the end of data or not.
+ *
+ * @return
+ */
+ final def isAtEnd(): Boolean = {
+ if (bitLimit0b.isDefined) {
+ bitPos0b == bitLimit0b.get
+ } else {
+ // There is no bit limit, so whether we're at the end
+ // requires that we've hit then end-of-data on a read
+ // operation, AND that we've not backtracked since then,
+ // meaning we're positioned at the end of the last byte
+ // read.
+ hasReachedEndOfData && {
+ //
+ // No real input source can end in the middle of a byte.
+ // The only way that can happen is with the bitLimit having been
+ // set, and we've already dealt with that.
+ //
+ // So we just have to know that we're positioned at the
+ // end of the data.
+ //
+ val inputStreamBytePos0b = inputSource.position()
+ val isAtFinalByte = bytePos0b == inputStreamBytePos0b
Review comment:
Ok. I get your reasoning.
Not sure the fix for this. (This code is now gone, but the question is still
relevant.)
The scenario is we parsed forward, possibly ran into EOD, possibly not, but
then we backtracked, and are no longer at the extreme end of where we reached
with the parser.
We need to recognize that case, and DataInputStream needs a way to support
this, perhaps by keeping track of the bytePos of the last byte of the last
filled bucket?
##########
File path:
daffodil-io/src/test/scala/org/apache/daffodil/layers/TestBase64.scala
##########
@@ -66,7 +66,7 @@
W1vdXMgcXVvdGVzIG9yIHNvbmcgbHlyaWNzIG9yIGFueXRoaW5nIGxpa2UgdGhhdCBpbnRyb2R1Y
pairs.foreach {
case (exp, act) => {
if (exp != act) {
- println("differ at character %s (0-based). Expected '%s' got
'%s'.".format(i, exp, act))
+ // println("differ at character %s (0-based). Expected '%s' got
'%s'.".format(i, exp, act))
Review comment:
Sure. Will do
##########
File path: daffodil-japi/src/main/scala/org/apache/daffodil/japi/Daffodil.scala
##########
@@ -490,7 +490,11 @@ class DataLocation private[japi] (dl: SDataLocation) {
override def toString() = dl.toString
/**
- * Determine if this data location is at the end of the input data
+ * Determine if this data location is known to be at the end of the data
stream.
+ *
+ * Note that this can be false, yet all bits consumed. This occurs if no
+ * subsequent input read operation from the data occurred causing the
+ * input stream to detect end-of-data/EOF.
Review comment:
I suggest we deprecate isAtEnd, but give it the bad-old functionality.
##########
File path:
daffodil-core/src/test/scala/org/apache/daffodil/api/TestParseIndividualMessages.scala
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.api
+
+import org.apache.daffodil.Implicits.intercept
+import org.apache.daffodil.io.SocketPairTestRig
+import org.apache.daffodil.util.SchemaUtils
+import org.apache.daffodil.util.TestUtils
+import org.junit.Assert._
+import org.junit.Test
+
+import java.io.InputStream
+import java.io.OutputStream
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.DurationInt
+import scala.xml.Node
+
+
+/**
+ * Shows that we can parse exactly 1 message from a TCP network socket
+ * without blocking for bytes past the end of the messsage.
+ *
+ * This only works for DFDL schemas of formats that are specified length.
+ */
+class TestParseIndividualMessages {
+
+ import SocketPairTestRig._
+
+ //
+ // DFDL schema for element e1 which occupies exactly 4 bytes.
+ //
+ val exactly4ByteSch = SchemaUtils.dfdlTestSchema(
+ <xs:include
schemaLocation="org/apache/daffodil/xsd/DFDLGeneralFormat.dfdl.xsd"/>,
+ <dfdl:format representation="binary" byteOrder="bigEndian"
binaryNumberRep="binary" ref="tns:GeneralFormat"/>,
+ <xs:element name="e1" type="xs:string" dfdl:lengthKind="explicit"
dfdl:length="4"/>)
+
+ /**
+ * Test shows that at least for simple fixed-length data, Daffodil parse
returns
+ * without requiring more bytes to be read than the exact length required.
+ */
+ @Test def testDaffodilParseFromNetwork1(): Unit = {
+ val sptr = new SocketPairTestRig {
+ override def test(pos: OutputStream, cis: InputStream): Unit = {
+
+ val dp = TestUtils.compileSchema(exactly4ByteSch)
+
+ //
+ // Write exactly 4 bytes to producer network stream
+ //
+ pos.write("1234".getBytes)
+ pos.flush()
+
+ //
+ // Daffodil parse element e1 from consumer input stream
+ //
+ // If we need more than 4 bytes to successfully parse (we shouldn't
for this schema)
+ // then this will hang, because only 4 bytes are in fact available.
+ //
+ // Caution: if debugging, this will timeout if you stop inside here!
+ //
+ val (pr: DFDL.ParseResult, xml: Node) =
+ withTimeout("Daffodil parse") {
+ TestUtils.runDataProcessorOnInputStream(dp, cis, areTracing = false)
+ }
+
+ assertFalse(pr.isError)
+ assertEquals("1234", xml.text)
+ }
+ }
+ sptr.run()
+ }
+
+ /**
+ * Test shows that the isAtEnd method does not block looking for additional
data.
+ * This was DAFFODIL-2502
+ */
+ @Test def testDaffodilParseFromNetworkAtEnd1(): Unit = {
+ val sptr = new SocketPairTestRig {
+ override def test(pos: OutputStream, cis: InputStream): Unit = {
+
+ //
+ // compile first separately because we don't want to be timing that.
+ //
+ val dp = TestUtils.compileSchema(exactly4ByteSch)
+
+ //
+ // Write exactly 4 bytes to producer network stream
+ //
+ pos.write("1234".getBytes)
+ pos.flush()
+
+ //
+ // Daffodil parse element e1 from consumer input stream
+ //
+ // If we need more than 4 bytes to successfully parse (we shouldn't
for this schema)
+ // then this will hang, because only 4 bytes are in fact available.
+ //
+ // Because this may hang, we run it in a Future so we can timeout on
it.
+ //
+ val (pr: DFDL.ParseResult, xml: Node) = withTimeout("Daffodil parse") {
+ TestUtils.runDataProcessorOnInputStream(dp, cis, areTracing = false)
+ }
+
+ assertFalse(pr.isError)
+ assertEquals("1234", xml.text)
+ assertEquals(33, pr.resultState.currentLocation.bitPos1b)
+ }
+ }
+ sptr.run()
+ }
+
+ //
+ // DFDL schema for delimited element.
+ //
+ private def delimitedSchema(term: String) = SchemaUtils.dfdlTestSchema(
+ <xs:include
schemaLocation="org/apache/daffodil/xsd/DFDLGeneralFormat.dfdl.xsd"/>,
+ <dfdl:format representation="text" ref="tns:GeneralFormat"/>,
+ <xs:element name="e1" type="xs:string" dfdl:lengthKind="delimited"
+ dfdl:terminator={ term } />)
+
+ /**
+ * Helper so we can test various delimiter-oriented scenarios.
+ *
+ * @param stringData Data to write. Should be small enough that the
parse will block.
+ * @param terminator String to use as terminating delimiter of
element. Can be more than one delimiter.
+ * @param followingDataString Data to write which should unblock the parse.
+ */
+ private def testHelperDaffodilParseDelimitedFromNetwork(
+ data: String,
+ terminator: String,
+ followingDataString: String) = {
+ val sptr = new SocketPairTestRig {
+ override def test(pos: OutputStream, cis: InputStream): Unit = {
+
+ val dp = TestUtils.compileSchema(delimitedSchema(terminator))
+
+ // Write the data. Should be too short to satisfy the parse.
+ //
+ pos.write(data.getBytes)
+ pos.flush()
+
+ val fut = Future {
+ TestUtils.runDataProcessorOnInputStream(dp, cis, areTracing = false)
+ }
+
+ Thread.sleep(100)
+ assertFalse(fut.isCompleted)
+ //
+ // Writing additional character(s) should unblock the parse.
+ //
+ pos.write(followingDataString.getBytes)
+ pos.flush()
+ val (pr, xml) = Await.result(fut, 100.milliseconds)
+ if (!pr.isError) {
+ assertEquals("1234", xml.text)
+ println("Successful parse.")
Review comment:
Remove println from test
##########
File path:
daffodil-japi/src/main/scala/org/apache/daffodil/japi/io/InputSourceDataInputStream.scala
##########
@@ -43,5 +43,13 @@ class InputSourceDataInputStream private[japi] (private
[japi] val dis: SInputSo
/**
* Create an InputSourceDataInputStream from a byte array
*/
- def this(arr: Array[Byte]) = this(SInputSourceDataInputStream(arr))
+ def this(arr: Array[Byte]) = this(SInputSourceDataInputStream(arr))
+
+ /**
+ * Used to see if there is more data or not after a parse operation.
+ * Whether more data is a left-over-data error, or just more data to
+ * parse in a subsequent parse call depends on the intention of the
+ * code calling the parse operation.
+ */
Review comment:
I will update the comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]