This is an automated email from the ASF dual-hosted git repository.

mbeckerle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/daffodil.git


The following commit(s) were added to refs/heads/master by this push:
     new fcb4dc0  Deprecated isAtEnd. Added hasData() and test rig for network 
streams
fcb4dc0 is described below

commit fcb4dc0a0722b1fea7e98c58f88464f45b07abf9
Author: Michael Beckerle <[email protected]>
AuthorDate: Fri Apr 23 11:55:47 2021 -0400

    Deprecated isAtEnd. Added hasData() and test rig for network streams
    
    Removed all usage of isAtEnd. Updated javadoc/scaladoc to
    reflect message-streaming parsing that doesn't use isAtEnd.
    
    Added hasData() to JAPI/SAPI InputSourceDataInputStream objects because 
this is needed
    to determine if there is data before invoking parse() now.
    
    Basically, daffodil doesn't know whether we're at end really unless 
something tries
    to read *beyond* the end of the data. Many parses will consume exactly N 
bits, and
    not do any further reads of the data stream, so Daffodil really does not 
know whether
    we are at end of data or not until another read is attempted.
    
    We do NOT want to do that additional read in the parse() call because if so 
then
    when a network TCP connection is the data stream, we'll be doing a blocking 
read
    past the end of the data, which will mean we cannot return a message given 
exactly
    and only the bytes of that message.
    
    So it becomes the job of the caller of parse() to ask the
    InputSourceDataInputStream object whether there is any more data or not.
    Typically, BEFORE calling parse().
    
    The TDML runner works because data for TDML tests always has a 
bitLengthLimit,
    and it checks that.
    
    Fixed Runtime2 so that it computes the bitPos/bytePos after
    a parse (was just returning 0) which was tripping up TDML
    runner which wants to use those values to detect if
    all data is consumed.
    
    Cleanups: found some @deprecation(...) with the args backward.
    So I flipped them.
    
    Added test rig and tests showing isAtEnd hangs on network
    TCP stream when no bytes are available at that time.
    
    Cleanup: InputSource now throws a very specific exception in the case
    where an InputStream provided by caller misbehaves.
    
    Cleanup: Exception class derivations improved to
    allow cause or message, not only messages.
    
    Cleanup: Coroutines improved to allow the kinds of objects passed to, and
    returned (from resume or resumeFinal) to be different for
    different coroutines.
    
    Cleanup: Missing tests - improve coverage.
    Coroutines unit tests added back (these had been missing).
    
    Cleanup: Layering - fixed bad available() implementation.
    Shouldn't be called.
    
    Cleanups: removed println from some tests per review comments.
    
    Improve Codecov results.
    
    Partly this by putting coverage suppression in a few places.
    Partly this was adding to tests to cover a few things.
    
    DAFFODIL-2502, DAFFODIL-2503, DAFFODIL-2504
---
 build.sbt                                          |   4 +-
 .../src/main/scala/org/apache/daffodil/Main.scala  |  12 +-
 .../daffodil/api/TestParseIndividualMessages.scala | 213 ++++++++++++++++++
 .../apache/daffodil/dsom/TestBinaryInput_01.scala  |  32 +--
 .../scala/org/apache/daffodil/util/TestUtils.scala |  43 ++--
 .../org/apache/daffodil/io/DataInputStream.scala   |  27 ++-
 .../scala/org/apache/daffodil/io/InputSource.scala |  65 +++++-
 .../daffodil/io/InputSourceDataInputStream.scala   |  46 +++-
 .../io/StringDataInputStreamForUnparse.scala       |   7 +-
 .../apache/daffodil/io/FormatInfoForUnitTest.scala |  25 +++
 .../org/apache/daffodil/io/SocketPairTestRig.scala | 246 +++++++++++++++++++++
 .../io/TestInputSourceDataInputStream.scala        |   6 +
 .../io/TestInputSourceDataInputStream2.scala       |   2 +-
 .../io/TestInputSourceDataInputStream8.scala       | 170 ++++++++++++++
 .../org/apache/daffodil/layers/TestBase64.scala    |   4 +-
 .../apache/daffodil/layers/TestJavaIOStreams.scala |   2 -
 .../layers/TestLimitingJavaIOStreams.scala         |   1 -
 .../org/apache/daffodil/japi/package-info.java     |   8 +-
 .../scala/org/apache/daffodil/japi/Daffodil.scala  |  13 +-
 .../japi/io/InputSourceDataInputStream.scala       |  28 ++-
 .../org/apache/daffodil/example/TestJavaAPI.java   |  46 ++--
 .../scala/org/apache/daffodil/api/Diagnostic.scala |   3 +
 .../org/apache/daffodil/exceptions/Assert.scala    |  19 +-
 .../org/apache/daffodil/util/Coroutines.scala      |  46 ++--
 .../org/apache/daffodil/util/TestCoroutines.scala  | 173 +++++++++++++++
 .../apache/daffodil/api/DFDLParserUnparser.scala   |  14 +-
 .../apache/daffodil/dsom/CompiledExpression1.scala |   2 +-
 .../apache/daffodil/layers/LayerTransformer.scala  |   2 +-
 .../org/apache/daffodil/processors/DataLoc.scala   |  19 +-
 .../daffodil/processors/parsers/PState.scala       |   6 +-
 .../daffodil/processors/unparsers/UState.scala     |   6 +-
 .../daffodil/runtime2/Runtime2DataProcessor.scala  |  37 ++--
 .../scala/org/apache/daffodil/sapi/Daffodil.scala  |  13 +-
 .../sapi/io/InputSourceDataInputStream.scala       |  23 +-
 .../scala/org/apache/daffodil/sapi/package.scala   |   8 +-
 .../org/apache/daffodil/example/TestScalaAPI.scala |  41 ++--
 .../org/apache/daffodil/tdml/TDMLRunner.scala      |  24 +-
 .../tdml/processor/DaffodilTDMLDFDLProcessor.scala |  17 +-
 .../section12/lengthKind/ExplicitTests.tdml        |   2 +-
 39 files changed, 1250 insertions(+), 205 deletions(-)

diff --git a/build.sbt b/build.sbt
index 3172067..658b8f6 100644
--- a/build.sbt
+++ b/build.sbt
@@ -15,8 +15,8 @@
  * limitations under the License.
  */
 
-import sbt.io.Path.flatRebase
 import sbtcc._
+
 import scala.collection.immutable.ListSet
 
 // Silence an errant sbt linter warning about unused sbt settings. For some
@@ -86,7 +86,7 @@ lazy val runtime2         = Project("daffodil-runtime2", 
file("daffodil-runtime2
                               )
 
 lazy val core             = Project("daffodil-core", 
file("daffodil-core")).configs(IntegrationTest)
-                              .dependsOn(runtime1Unparser, udf, lib % 
"test->test", runtime1 % "test->test")
+                              .dependsOn(runtime1Unparser, udf, lib % 
"test->test", runtime1 % "test->test", io % "test->test")
                               .settings(commonSettings)
 
 lazy val japi             = Project("daffodil-japi", 
file("daffodil-japi")).configs(IntegrationTest)
diff --git a/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala 
b/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
index 46abfea..ad52ce3 100644
--- a/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
+++ b/daffodil-cli/src/main/scala/org/apache/daffodil/Main.scala
@@ -976,14 +976,15 @@ object Main extends Logging {
                 }
                 output.flush()
 
-                if (loc.isAtEnd) {
+                if (!inStream.hasData()) {
+                  // not even 1 more bit is available.
                   // do not try to keep parsing, nothing left to parse
                   keepParsing = false
                   error = false
                 } else {
-                  // remaining data exists
-
+                  // There is more data available.
                   if (parseOpts.stream.toOption.get) {
+                    // Streaming mode
                     if (lastParseBitPosition == loc.bitPos0b) {
                       // this parse consumed no data, that means this would get
                       // stuck in an infinite loop if we kept trying to stream,
@@ -998,13 +999,16 @@ object Main extends Logging {
                       keepParsing = false
                       error = true
                     } else {
+                      // last parse did consume data, and we know there is more
+                      // data to come, so try to parse again.
                       lastParseBitPosition = loc.bitPos0b
                       keepParsing = true
                       error = false
                       output.write(0) // NUL-byte separates streams
                     }
                   } else {
-                    // not streaming, show left over data warning
+                    // not streaming mode, and there is more data available,
+                    // so show left over data warning
                     val Dump = new DataDumper
                     val bitsAlreadyConsumed = loc.bitPos0b % 8
                     val firstByteString = if (bitsAlreadyConsumed != 0) {
diff --git 
a/daffodil-core/src/test/scala/org/apache/daffodil/api/TestParseIndividualMessages.scala
 
b/daffodil-core/src/test/scala/org/apache/daffodil/api/TestParseIndividualMessages.scala
new file mode 100644
index 0000000..ad1171d
--- /dev/null
+++ 
b/daffodil-core/src/test/scala/org/apache/daffodil/api/TestParseIndividualMessages.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.api
+
+import org.apache.daffodil.Implicits.intercept
+import org.apache.daffodil.io.SocketPairTestRig
+import org.apache.daffodil.util.SchemaUtils
+import org.apache.daffodil.util.TestUtils
+import org.junit.Assert._
+import org.junit.Test
+
+import java.io.InputStream
+import java.io.OutputStream
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.DurationInt
+import scala.xml.Node
+
+
+/**
+ * Shows that we can parse exactly 1 message from a TCP network socket
+ * without blocking for bytes past the end of the messsage.
+ *
+ * This only works for DFDL schemas of formats that are specified length.
+ */
+class TestParseIndividualMessages {
+
+  //
+  // DFDL schema for element e1 which occupies exactly 4 bytes.
+  //
+  val exactly4ByteSch = SchemaUtils.dfdlTestSchema(
+      <xs:include 
schemaLocation="org/apache/daffodil/xsd/DFDLGeneralFormat.dfdl.xsd"/>,
+      <dfdl:format representation="binary" byteOrder="bigEndian" 
binaryNumberRep="binary" ref="tns:GeneralFormat"/>,
+      <xs:element name="e1" type="xs:string" dfdl:lengthKind="explicit" 
dfdl:length="4"/>)
+
+  /**
+   * Test shows that at least for simple fixed-length data, Daffodil parse 
returns
+   * without requiring more bytes to be read than the exact length required.
+   */
+  @Test def testDaffodilParseFromNetwork1(): Unit = {
+    val sptr = new SocketPairTestRig {
+      override def test(pos: OutputStream, cis: InputStream): Unit = {
+
+        val dp = TestUtils.compileSchema(exactly4ByteSch)
+
+        //
+        // Write exactly 4 bytes to producer network stream
+        //
+        pos.write("1234".getBytes)
+        pos.flush()
+
+        //
+        // Daffodil parse element e1 from consumer input stream
+        //
+        // If we need more than 4 bytes to successfully parse (we shouldn't 
for this schema)
+        // then this will hang, because only 4 bytes are in fact available.
+        //
+        // Caution: if debugging, this will timeout if you stop inside here!
+        //
+        val (pr: DFDL.ParseResult, xml: Node) =
+        SocketPairTestRig.withTimeout("Daffodil parse") {
+          TestUtils.runDataProcessorOnInputStream(dp, cis, areTracing = false)
+        }
+
+        assertFalse(pr.isError)
+        assertEquals("1234", xml.text)
+        assertEquals(33, pr.resultState.currentLocation.bitPos1b)
+      }
+    }
+    sptr.run()
+  }
+
+  //
+  // DFDL schema for delimited element.
+  //
+  private def delimitedSchema(term: String) = SchemaUtils.dfdlTestSchema(
+      <xs:include 
schemaLocation="org/apache/daffodil/xsd/DFDLGeneralFormat.dfdl.xsd"/>,
+      <dfdl:format representation="text" ref="tns:GeneralFormat"/>,
+      <xs:element name="e1" type="xs:string" dfdl:lengthKind="delimited"
+                  dfdl:terminator={ term } />)
+
+  /**
+   * Helper so we can test various delimiter-oriented scenarios.
+   *
+   * @param stringData          Data to write. Should be small enough that the 
parse will block.
+   * @param terminator          String to use as terminating delimiter of 
element. Can be more than one delimiter.
+   * @param followingDataString Data to write which should unblock the parse.
+   */
+  private def testHelperDaffodilParseDelimitedFromNetwork(
+    data: String,
+    terminator: String,
+    followingDataString: String) = {
+    val sptr = new SocketPairTestRig {
+      override def test(pos: OutputStream, cis: InputStream): Unit = {
+
+        val dp = TestUtils.compileSchema(delimitedSchema(terminator))
+
+        // Write the data. Should be too short to satisfy the parse.
+        //
+        pos.write(data.getBytes)
+        pos.flush()
+
+        val fut = Future {
+          TestUtils.runDataProcessorOnInputStream(dp, cis, areTracing = false)
+        }
+
+        Thread.sleep(100)
+        assertFalse(fut.isCompleted)
+        //
+        // Writing additional character(s) should unblock the parse.
+        //
+        pos.write(followingDataString.getBytes)
+        pos.flush()
+        val (pr, xml) = Await.result(fut, 100.milliseconds)
+        if (!pr.isError) {
+          assertEquals("1234", xml.text)
+        } else {
+          //parse failed.
+          val diagString = pr.getDiagnostics.map {
+            _.getMessage()
+          }.mkString("\n")
+          fail("Parse failed, but did not time-out.\n" + diagString)
+        }
+      }
+    }
+    sptr.run()
+  }
+
+  /**
+   * This test fails (and so is commented out) but *should* pass.
+   *
+   * Test (when it works) shows that for delimited data, we block seeking more 
data,
+   * but once the need for a terminator with longest match is satisfied
+   * the parse is unblocked.
+   */
+  // @Test // DAFFODIL-2504 - this fails looking for more data than is needed 
for terminator.
+  def testDaffodilParseFromNetworkDelimited1(): Unit = {
+    intercept[TimeoutException] {
+      testHelperDaffodilParseDelimitedFromNetwork("1234", "$", "$")
+    }
+    fail("if we get here, then we intercepted a TimeoutException, which means 
Daffodil was hung.")
+  }
+
+  /**
+   * This test works, and it should work.
+   *
+   * It characterizes that currently we need to be able to read
+   * not only the terminator, but 7 more characters, in order for the reads 
associated with the delimiter
+   * scanning to be satisfied.
+   *
+   * We're not supposed to need 7 characters though. Only 1 character should 
do it.
+   */
+  @Test // DAFFODIL-2504
+  def testDaffodilParseFromNetworkDelimited1b(): Unit = {
+    testHelperDaffodilParseDelimitedFromNetwork("1234", "$",
+      "$1234567")
+  }
+
+  /**
+   * This test fails (and so is commented out) but *should* pass.
+   *
+   * Test (when it works) shows that for delimited data, we block seeking more 
data,
+   * but once the need for a terminator with longest match is satisfied
+   * the parse is unblocked.
+   *
+   * This variant has 2 options for terminator, $ or $$, and one is a prefix 
of the other
+   * So this test insures that getting a shorter delimiter match doesn't 
unblock
+   * when getting more data might match a longer delimiter.
+   *
+   * The test then subsequently provides the character for that longer 
delimiter
+   * which should unblock daffodil.
+   */
+  // @Test // DAFFODIL-2504 - this fails looking for more data than is needed 
for terminator.
+  def testDaffodilParseFromNetworkDelimited2(): Unit = {
+    intercept[TimeoutException] {
+      testHelperDaffodilParseDelimitedFromNetwork("1234$", "$ $$", "$")
+    }
+    fail("If we get here, we intercepted a TimeoutException, which means 
Daffodil is hung.")
+  }
+
+  /**
+   * This test works, and it should work.
+   *
+   * It characterizes that currently we need to be able to read
+   * not only the longest possible terminator, but 7 more characters,
+   * in order for the reads associated with the delimiter
+   * scanning to be satisfied.
+   *
+   * We're not supposed to need 7 characters though. Only 1 character should 
do it.
+   */
+  @Test // DAFFODIL-2504 - this shouldn't require 7 more characters.
+  def testDaffodilParseFromNetworkDelimited2b(): Unit = {
+    testHelperDaffodilParseDelimitedFromNetwork("1234$", "$ $$", "$1234567")
+  }
+
+}
diff --git 
a/daffodil-core/src/test/scala/org/apache/daffodil/dsom/TestBinaryInput_01.scala
 
b/daffodil-core/src/test/scala/org/apache/daffodil/dsom/TestBinaryInput_01.scala
index 46ef12f..64dba00 100644
--- 
a/daffodil-core/src/test/scala/org/apache/daffodil/dsom/TestBinaryInput_01.scala
+++ 
b/daffodil-core/src/test/scala/org/apache/daffodil/dsom/TestBinaryInput_01.scala
@@ -17,51 +17,23 @@
 
 package org.apache.daffodil.dsom
 
-import java.math.{ BigInteger => JBigInt }
+import java.math.{BigInteger => JBigInt}
 import java.nio.ByteBuffer
-import java.nio.CharBuffer
-import java.nio.LongBuffer
-
-import org.apache.daffodil.api.DaffodilTunables
 import org.apache.daffodil.io.DataInputStream
+import org.apache.daffodil.io.FakeFormatInfo
 import org.apache.daffodil.io.FormatInfo
 import org.apache.daffodil.io.InputSourceDataInputStream
-import org.apache.daffodil.processors.charset.BitsCharsetDecoder
-import org.apache.daffodil.processors.charset.BitsCharsetEncoder
-import org.apache.daffodil.schema.annotation.props.gen.BinaryFloatRep
 import org.apache.daffodil.schema.annotation.props.gen.BitOrder
 import org.apache.daffodil.schema.annotation.props.gen.ByteOrder
-import org.apache.daffodil.schema.annotation.props.gen.EncodingErrorPolicy
-import org.apache.daffodil.schema.annotation.props.gen.UTF16Width
-import org.apache.daffodil.util.Maybe
-import org.apache.daffodil.util.MaybeInt
 import org.apache.daffodil.util.Misc
 import org.junit.After
 import org.junit.Test
-
 import org.junit.Assert.assertEquals
 
 // Do no harm number 16 of 626 fail in regression, 154 in total of 797
 
 class TestBinaryInput_01 {
 
-  class FakeFormatInfo(val bitOrder: BitOrder, val byteOrder: ByteOrder) 
extends FormatInfo {
-    def encoder: BitsCharsetEncoder = ???
-    def decoder: BitsCharsetDecoder = ???
-    def reportingDecoder: BitsCharsetDecoder = ???
-    def replacingDecoder: BitsCharsetDecoder = ???
-    def fillByte: Byte = ???
-
-    def binaryFloatRep: BinaryFloatRep = ???
-    def maybeCharWidthInBits: MaybeInt = ???
-    def maybeUTF16Width: Maybe[UTF16Width] = ???
-    def encodingMandatoryAlignmentInBits: Int = ???
-    def encodingErrorPolicy: EncodingErrorPolicy = ???
-    def tunable: DaffodilTunables = ???
-    def regexMatchBuffer: CharBuffer = ???
-    def regexMatchBitPositionBuffer: LongBuffer = ???
-  }
-
   var startOver: DataInputStream.Mark = null
   var dis: DataInputStream = null
 
diff --git 
a/daffodil-core/src/test/scala/org/apache/daffodil/util/TestUtils.scala 
b/daffodil-core/src/test/scala/org/apache/daffodil/util/TestUtils.scala
index 0e3df6a..ca53fe0 100644
--- a/daffodil-core/src/test/scala/org/apache/daffodil/util/TestUtils.scala
+++ b/daffodil-core/src/test/scala/org/apache/daffodil/util/TestUtils.scala
@@ -22,15 +22,13 @@ import java.io.FileNotFoundException
 import java.nio.channels.Channels
 import java.nio.channels.ReadableByteChannel
 import java.nio.channels.WritableByteChannel
-
 import scala.util.Try
 import scala.xml._
-
 import org.apache.commons.io.output.NullOutputStream
-
 import org.junit.Assert.assertEquals
+import org.apache.daffodil.Implicits._
 
-import org.apache.daffodil.Implicits._; object INoWarnU2 { 
ImplicitsSuppressUnusedImportWarning() }
+import java.io.InputStream
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api._
 import org.apache.daffodil.compiler.Compiler
@@ -48,6 +46,8 @@ import org.apache.daffodil.processors.VariableMap
 import org.apache.daffodil.xml.XMLUtils
 import org.apache.daffodil.xml._
 
+object INoWarnU2 { ImplicitsSuppressUnusedImportWarning() }
+
 /*
  * This is not a file of tests.
  *
@@ -82,7 +82,7 @@ object TestUtils {
   }
 
   def testString(testSchema: Node, data: String, areTracing: Boolean = false) 
= {
-    runSchemaOnData(testSchema, Misc.stringToReadableByteChannel(data), 
areTracing)
+    runSchemaOnRBC(testSchema, Misc.stringToReadableByteChannel(data), 
areTracing)
   }
 
   def testBinary(testSchema: Node, hexData: String, areTracing: Boolean = 
false): (DFDL.ParseResult, Node) = {
@@ -92,11 +92,11 @@ object TestUtils {
 
   def testBinary(testSchema: Node, data: Array[Byte], areTracing: Boolean): 
(DFDL.ParseResult, Node) = {
     val rbc = Misc.byteArrayToReadableByteChannel(data)
-    runSchemaOnData(testSchema, rbc, areTracing)
+    runSchemaOnRBC(testSchema, rbc, areTracing)
   }
 
   def testFile(testSchema: Node, fileName: String) = {
-    runSchemaOnData(testSchema, Misc.fileToReadableByteChannel(new 
java.io.File(fileName)))
+    runSchemaOnRBC(testSchema, Misc.fileToReadableByteChannel(new 
java.io.File(fileName)))
   }
 
   val useSerializedProcessor = true
@@ -182,7 +182,7 @@ object TestUtils {
     } else p
   }
 
-  def runSchemaOnData(testSchema: Node, data: ReadableByteChannel, areTracing: 
Boolean = false) = {
+  def compileSchema(testSchema: Node) = {
     val compiler = Compiler()
     val pf = compiler.compileNode(testSchema)
     val isError = pf.isError
@@ -191,19 +191,34 @@ object TestUtils {
     if (isError) {
       throw new Exception(msgs)
     }
-    var p = saveAndReload(pf.onPath("/").asInstanceOf[DataProcessor])
+    val p = saveAndReload(pf.onPath("/").asInstanceOf[DataProcessor])
     val pIsError = p.isError
     if (pIsError) {
       val msgs = pf.getDiagnostics.map(_.getMessage()).mkString("\n")
       throw new Exception(msgs)
     }
-    p = if (areTracing) {
-      p.withDebugger(builtInTracer).withDebugging(true)
-    } else p
-    p = p.withValidationMode(ValidationMode.Limited)
+    p
+  }
+
+  def runSchemaOnRBC(testSchema: Node, data: ReadableByteChannel, areTracing: 
Boolean = false): (DFDL.ParseResult, Node) = {
+    runSchemaOnInputStream(testSchema, Channels.newInputStream(data), 
areTracing)
+  }
+
+  def runSchemaOnInputStream(testSchema: Node, is: InputStream, areTracing: 
Boolean = false): (DFDL.ParseResult, Node) = {
+    val p = compileSchema(testSchema)
+    runDataProcessorOnInputStream(p, is, areTracing)
+  }
+
+  def runDataProcessorOnInputStream(dp: DataProcessor, is: InputStream, 
areTracing: Boolean = false): (DFDL.ParseResult, Node) = {
+    val p1 =
+      if (areTracing) {
+        dp.withDebugger(builtInTracer).withDebugging(true)
+      } else dp
+
+    val p = p1.withValidationMode(ValidationMode.Limited)
 
     val outputter = new ScalaXMLInfosetOutputter()
-    val input = InputSourceDataInputStream(Channels.newInputStream(data))
+    val input = InputSourceDataInputStream(is)
     val actual = p.parse(input, outputter)
     if (actual.isProcessingError) {
       val diags = actual.getDiagnostics
diff --git 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/DataInputStream.scala 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/DataInputStream.scala
index 690798d..010e33d 100644
--- a/daffodil-io/src/main/scala/org/apache/daffodil/io/DataInputStream.scala
+++ b/daffodil-io/src/main/scala/org/apache/daffodil/io/DataInputStream.scala
@@ -263,11 +263,36 @@ trait DataInputStream
   /**
    * Determines whether the input stream has this much more data.
    *
-   * Does not advance the position
+   * Does not advance the position.
+   *
+   * On a network input stream, this may block to determine if the stream
+   * contains enough data or is at end-of-data.
    */
   def isDefinedForLength(nBits: Long): Boolean
 
   /**
+   * Returns true if the input stream has at least 1 bit of data.
+   *
+   * Does not advance the position.
+   *
+   * Returns true immediately if the input stream has available data that
+   * has not yet been consumed.
+   *
+   * On a network input stream, this may block to determine if the stream
+   * contains data or is at end-of-data.
+   *
+   * This is used when parsing multiple elements from a stream to see if there
+   * is data or not before calling parse().
+   *
+   * It may also be used after a parse() operation that is intended to consume
+   * the entire data stream (such as for a file) to determine if all data has
+   * been consumed or some data is left-over.
+   *
+   * This is equivalent to calling isDefinedForLength(1)
+   */
+  def hasData(): Boolean
+
+  /**
    * Returns a byte array containing the bits between the current bit position
    * and that position plus bitLengthFrom1.
    * <p>
diff --git 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/InputSource.scala 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/InputSource.scala
index 0c6586f..af18c16 100644
--- a/daffodil-io/src/main/scala/org/apache/daffodil/io/InputSource.scala
+++ b/daffodil-io/src/main/scala/org/apache/daffodil/io/InputSource.scala
@@ -18,11 +18,12 @@
 package org.apache.daffodil.io
 
 import java.nio.ByteBuffer
-
 import scala.collection.mutable.ArrayBuffer
-
 import org.apache.daffodil.exceptions.Assert
 import org.apache.daffodil.exceptions.ThinException
+import org.apache.daffodil.exceptions.UnsuppressableException
+
+import java.io.InputStream
 
 /**
  * There is a finite limit to the distance one can backtrack which is given by
@@ -54,6 +55,27 @@ case class BacktrackingException(position: Long, 
maxBacktrackLength: Int)
   extends ThinException("Attempted to backtrack to byte %d, which exceeds 
maximum backtrack length of %d", position, maxBacktrackLength)
 
 /**
+ * Thrown in the specific case where a java.io.InputStream is not properly
+ * implemented and is returning 0 from the read(buf, off, len) call when len > 
0.
+ *
+ * This non-blocking behavior is not supported by java.io.InputStream's 
contract
+ * and Daffodil depends on InputStreams having only blocking behavior.
+ *
+ * To properly place blame for this error on the InputStream (and not 
Daffodil's
+ * I/O layer built on top of it) we throw this very specific, informative
+ * exception in this case.
+ * @param inputStream The stream that is misbehaving.
+ */
+class InputStreamReadZeroError(inputStream: InputStream)
+extends UnsuppressableException(
+  s"""InputStream ${ inputStream.toString } illegally
+  | returned 0 from a call to read(buf, off, len).
+  | This is illegal behavior from a java.io.InputStream instance, as 
InputStream is a blocking API.
+  | This is not a Daffodil bug, but a problem with the InputStream supplied
+  | to Daffodil as a data source.""".stripMargin){
+}
+
+/**
  * The InputSource class is really just a mechanism to provide bytes an
  * InputSourceDataInputStream, which does the heavily lift about converter
  * bits/bytes to numbers and characters. This class does not need to know
@@ -68,6 +90,15 @@ case class BacktrackingException(position: Long, 
maxBacktrackLength: Int)
 abstract class InputSource {
 
   /**
+   * Determine if the InputSource has encountered the end-of-data.
+   *
+   * This does NOT perform a read operation (which would be blocking), but just
+   * answers the question of whether prior read operations in fact encountered
+   * the -1 indicating end-of-data
+   */
+  def hasReachedEndOfData: Boolean
+
+  /**
    * Determine whether the underlying data has the specified number of bytes
    * available starting at the current byte position. This function must block
    * until either nBytes are known to be available or end-of-file is reached.
@@ -245,6 +276,12 @@ class BucketingInputSource(
    */
   private var bytesFilledInLastBucket: Int = 0
 
+  private var hasMoreData = true
+
+  override def hasReachedEndOfData: Boolean =
+    !hasMoreData
+
+
   /**
    * Adds new buckets to the buckets array until either we run out of data or
    * we fill up to byteIndex bytes in the bucketIndex. This modifies
@@ -255,7 +292,7 @@ class BucketingInputSource(
    */
   private def fillBucketsToIndex(goalBucketIndex: Long, bytesNeededInBucket: 
Long): Boolean = {
     var lastBucketIndex = buckets.length - 1
-    var hasMoreData = true
+
     var needsMoreData = goalBucketIndex > lastBucketIndex || (goalBucketIndex 
== lastBucketIndex && bytesNeededInBucket > bytesFilledInLastBucket)
 
     while (needsMoreData && hasMoreData) {
@@ -263,10 +300,21 @@ class BucketingInputSource(
       // actually needed
       val emptyBytesInLastBucket = bucketSize - bytesFilledInLastBucket
 
+      // we never call read passing len of 0.
+      Assert.invariant(emptyBytesInLastBucket > 0)
+
       // Try to read enough bytes to fill the rest of this bucket. Note that
       // the .read() function could hit EOF (returns -1) or could return
-      // anywhere from zero to emptyBytesInLastBucket bytes
-      val bytesRead = inputStream.read(buckets(lastBucketIndex).bytes, 
bytesFilledInLastBucket, emptyBytesInLastBucket)
+      // anywhere from 1 to emptyBytesInLastBucket bytes
+      val bytesRead =
+        inputStream.read(
+          buckets(lastBucketIndex).bytes,
+          bytesFilledInLastBucket,
+          emptyBytesInLastBucket)
+
+      // check for bad inputStream behavior. It's not our fault!
+      if (bytesRead == 0)
+        throw new InputStreamReadZeroError(inputStream)
 
       if (bytesRead == -1) {
         // Needed more data but hit EOF, break out with error
@@ -550,4 +598,11 @@ class ByteBufferInputSource(byteBuffer: ByteBuffer)
     // since those are only used to prevent memory segments from being
     // compacted.
   }
+
+  /**
+   * Determine if the InputSource has encountered the end-of-data.
+   *
+   * For a byte buffer, this is always true.
+   */
+  override def hasReachedEndOfData = true
 }
diff --git 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/InputSourceDataInputStream.scala
 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/InputSourceDataInputStream.scala
index af2c078..f6137be 100644
--- 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/InputSourceDataInputStream.scala
+++ 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/InputSourceDataInputStream.scala
@@ -18,7 +18,7 @@
 package org.apache.daffodil.io
 
 import java.io.InputStream
-import java.math.{ BigInteger => JBigInt }
+import java.math.{BigInteger => JBigInt}
 import java.nio.ByteBuffer
 import java.nio.CharBuffer
 import java.nio.LongBuffer
@@ -94,8 +94,9 @@ private[io] class MarkPool() extends Pool[MarkState] {
  *
  * Underlying representation is an InputSource containing all input data.
  */
-final class InputSourceDataInputStream private (val inputSource: InputSource)
+final class InputSourceDataInputStream private(val inputSource: InputSource)
   extends DataInputStreamImplMixin {
+
   import DataInputStream._
 
   override def toString = {
@@ -117,6 +118,38 @@ final class InputSourceDataInputStream private (val 
inputSource: InputSource)
   @inline
   override final def bitLimit0b: MaybeULong = cst.bitLimit0b
 
+  /**
+   * Tells us if the underlying input source has detected end-of-data
+   * (the read(...) call returned -1.
+   *
+   * But this does NOT tell us we are positioned at the end, only whether
+   * in the course of reading, we encountered the end of data. If we
+   * backtracked we could have seen the end of data, but backed up in
+   * the data to an earlier position.
+   */
+  def hasReachedEndOfData: Boolean = inputSource.hasReachedEndOfData
+
+  /**
+   * Determine if we're positioned at the end of data.
+   *
+   * Blocks until either one byte of data can be read, or end-of-data
+   * is encountered.
+   *
+   * It is generally not advised to use this on network TCP data streams
+   * as it will block waiting for the sender of data to provide more data
+   * or close the stream.
+   *
+   * @return boolean indicating whether we are known to be positioned at
+   *         the end of data.
+   */
+  @deprecated(
+    "Use bitPos0b or bitPos1b to compare with expected position (possibly 
bitLimit0b).",
+    "3.1.0")
+  final def isAtEnd(): Boolean = {
+    !hasData() && hasReachedEndOfData
+  }
+
+
   def setBitPos0b(newBitPos0b: Long): Unit = {
     // threadCheck()
     Assert.invariant(newBitPos0b >= 0)
@@ -507,7 +540,10 @@ final class InputSourceDataInputStream private (val 
inputSource: InputSource)
   /**
    * Determines whether the input stream has this much more data.
    *
-   * Does not advance the position
+   * Does not advance the position.
+   *
+   * This operation will block until either n bytes are read or end-of-data
+   * is hit.
    */
   final def isDefinedForLength(nBits: Long): Boolean = {
     val newBitPos0b = bitPos0b + nBits
@@ -519,6 +555,8 @@ final class InputSourceDataInputStream private (val 
inputSource: InputSource)
     }
   }
 
+  final def hasData() = isDefinedForLength(1)
+
   def skip(nBits: Long, finfo: FormatInfo): Boolean = {
     // threadCheck()
     if (!this.isDefinedForLength(nBits)) return false
@@ -565,6 +603,7 @@ final class InputSourceDataInputStream private (val 
inputSource: InputSource)
   }
 
   override def markPos: MarkPos = bitPos0b
+
   override def resetPos(m: MarkPos): Unit = {
     setBitPos0b(m)
   }
@@ -735,6 +774,7 @@ final class InputSourceDataInputStream private (val 
inputSource: InputSource)
   }
 
   private val charIterator = new InputSourceDataInputStreamCharIterator(this)
+
   def asIteratorChar: CharIterator = {
     val ci = charIterator
     ci.reset()
diff --git 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/StringDataInputStreamForUnparse.scala
 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/StringDataInputStreamForUnparse.scala
index 86624e3..275a11d 100644
--- 
a/daffodil-io/src/main/scala/org/apache/daffodil/io/StringDataInputStreamForUnparse.scala
+++ 
b/daffodil-io/src/main/scala/org/apache/daffodil/io/StringDataInputStreamForUnparse.scala
@@ -47,8 +47,6 @@ final class StringDataInputStreamForUnparse
     dis = InputSourceDataInputStream(ba)
   }
 
-  private def doNotUse = Assert.usageError("Not to be called on " + 
Misc.getNameFromClass(this))
-
   override def asIteratorChar: DataInputStream.CharIterator = {
     Assert.usage(dis != null, "Must call reset(str) before any other method.")
     dis.asIteratorChar
@@ -66,6 +64,9 @@ final class StringDataInputStreamForUnparse
   override def getSomeString(nChars: Long,finfo: FormatInfo): Maybe[String] = 
dis.getSomeString(nChars, finfo)
   override def getString(nChars: Long,finfo: FormatInfo): Maybe[String] = 
dis.getString(nChars, finfo)
 
+  // $COVERAGE-OFF$ Nothing should be calling these.
+  private def doNotUse = Assert.usageError("Not to be called on " + 
Misc.getNameFromClass(this))
+
   override def futureData(nBytesRequested: Int): java.nio.ByteBuffer = doNotUse
   override def getBinaryDouble(finfo: FormatInfo): Double = doNotUse
   override def getBinaryFloat(finfo: FormatInfo): Float = doNotUse
@@ -79,7 +80,9 @@ final class StringDataInputStreamForUnparse
   override def setBitLimit0b(bitLimit0b: MaybeULong): Boolean = doNotUse
   override def setDebugging(setting: Boolean): Unit = doNotUse
   override def isDefinedForLength(length: Long): Boolean = doNotUse
+  override def hasData: Boolean = doNotUse
   override def skip(nBits: Long, finfo: FormatInfo): Boolean = doNotUse
   override def resetBitLimit0b(savedBitLimit0b: MaybeULong): Unit = doNotUse
+  // $COVERAGE-ON$
   override def validateFinalStreamState: Unit = {} // does nothing
 }
diff --git 
a/daffodil-io/src/test/scala/org/apache/daffodil/io/FormatInfoForUnitTest.scala 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/FormatInfoForUnitTest.scala
index 233c9e0..754a2f8 100644
--- 
a/daffodil-io/src/test/scala/org/apache/daffodil/io/FormatInfoForUnitTest.scala
+++ 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/FormatInfoForUnitTest.scala
@@ -89,3 +89,28 @@ class FormatInfoForUnitTest private ()
     }
   }
 }
+
+/**
+ * Supplies only bitOrder and byteOrder. Everything else unimplemented.
+ * @param bitOrder
+ * @param byteOrder
+ */
+class FakeFormatInfo(val bitOrder: BitOrder, val byteOrder: ByteOrder) extends 
FormatInfo {
+  def encoder: BitsCharsetEncoder = ???
+  def decoder: BitsCharsetDecoder = ???
+  def reportingDecoder: BitsCharsetDecoder = ???
+  def replacingDecoder: BitsCharsetDecoder = ???
+  def fillByte: Byte = ???
+
+  def binaryFloatRep: BinaryFloatRep = ???
+  def maybeCharWidthInBits: MaybeInt = ???
+  def maybeUTF16Width: Maybe[UTF16Width] = ???
+  def encodingMandatoryAlignmentInBits: Int = ???
+  def encodingErrorPolicy: EncodingErrorPolicy = ???
+  def tunable: DaffodilTunables = ???
+  def regexMatchBuffer: CharBuffer = ???
+  def regexMatchBitPositionBuffer: LongBuffer = ???
+}
+
+object FakeFormatInfo_MSBF_BE extends 
FakeFormatInfo(BitOrder.MostSignificantBitFirst, ByteOrder.BigEndian)
+object FakeFormatInfo_LSBF_LE extends 
FakeFormatInfo(BitOrder.LeastSignificantBitFirst, ByteOrder.LittleEndian)
diff --git 
a/daffodil-io/src/test/scala/org/apache/daffodil/io/SocketPairTestRig.scala 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/SocketPairTestRig.scala
new file mode 100644
index 0000000..635a8f4
--- /dev/null
+++ b/daffodil-io/src/test/scala/org/apache/daffodil/io/SocketPairTestRig.scala
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.io
+
+import org.apache.daffodil.Implicits.intercept
+import org.apache.daffodil.io.SocketPairTestRig.timeLimit
+import org.apache.daffodil.io.SocketPairTestRig.withTimeout
+import org.junit.Assert.assertEquals
+import org.junit.Assert.fail
+import org.junit.Test
+
+import java.io.InputStream
+import java.io.OutputStream
+import java.net.ServerSocket
+import java.net.Socket
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.TimeoutException
+import scala.concurrent.duration.DurationInt
+import scala.concurrent.duration.FiniteDuration
+import scala.util.Try
+
+
+/**
+ * Test rig sets up TCP socket unidirectional between a producer
+ * output stream, and a consumer input stream.
+ */
+abstract class SocketPairTestRig {
+
+  /**
+   * Override with test logic method. This logic can read and write from the 
argument
+   * streams. The fact that these are socket connections means there is some 
buffering
+   * there. But you cannot write an infinite amount of data to producer 
without causing
+   * trouble with consuming all the operating system buffering, and once that 
happens
+   * the write will block.
+   *
+   * @param producerOutputStream
+   * @param consumerInputStream
+   */
+  protected def test(producerOutputStream: OutputStream, consumerInputStream: 
InputStream): Unit
+
+  def run(): Unit = {
+    val serverSocket = new ServerSocket(0) // 0 means to allocate an unused 
port
+    val port = serverSocket.getLocalPort()
+    val csf = Future {
+      serverSocket.accept() // start accepting connections for consumer
+    }
+    val psf = Future {
+      val useLoopBack: String = null
+      new Socket(useLoopBack, port) // connect producer
+    }
+    val (producerSocket, consumerSocket) = Await.result(psf zip csf, 
1000.milliseconds)
+    assert(producerSocket.isConnected)
+    assert(consumerSocket.isConnected)
+    try {
+      val producerOutputStream = producerSocket.getOutputStream()
+      val consumerInputStream = consumerSocket.getInputStream()
+
+      // one way connection (no reverse flow control)
+      // NOTE: Can't do this. It seems to cause the other direction
+      // to also close.
+      // producerSocket.getInputStream().close()
+      // consumerSocket.getOutputStream().close()
+
+      test(producerOutputStream, consumerInputStream)
+    } catch {
+      case th: Throwable =>
+        throw th // good place for a breakpoint - but keep in mind tests use 
timeouts
+    } finally {
+      producerSocket.close()
+      consumerSocket.close()
+      serverSocket.close() // assumption is that this frees up the port
+    }
+  }
+}
+
+object SocketPairTestRig {
+
+  val timeLimit = 1000.milliseconds
+
+  /**
+   * Runs test code with a timeout. Intended for use with tests that
+   * could hang if there are bugs where Daffodil does a blocking read
+   * that it shouldn't.
+   *
+   * @param whatTimedOutDescription Description of what we're testing. Ex: 
"Daffodil parse"
+   * @param testThunk               The test code that could hang. Shouldn't, 
but could.
+   * @tparam T Return type of the testThunk
+   * @return the result of the testThunk
+   */
+  def withTimeout[T](
+    whatTimedOutDescription: String)(testThunk: => T): T = {
+    Try(withTimeout(testThunk)).recover {
+      case e: TimeoutException =>
+        fail(whatTimedOutDescription + " timed out.")
+        ??? // ??? because scala doesn't know fail never returns.
+    }.get
+  }
+
+  /**
+   * Doesn't have the fail(...) built in, so we can
+   * write tests that expect a timeout rather than fail on a timeout.
+   * @param testThunk
+   * @tparam T
+   * @return
+   */
+  def withTimeout[T](testThunk: => T): T =
+    withTimeout(timeLimit)(testThunk)
+
+  /**
+   * Gives control over the timeout limit
+   */
+  def withTimeout[T](limitMillis: FiniteDuration)(testThunk: => T): T =
+    Await.result(Future(testThunk), limitMillis)
+
+}
+
+
+/**
+ * Shows that we can read from a TCP network socket
+ * without blocking for bytes past the end of what we read.
+ */
+class TestSocketPairTestRig {
+
+  /**
+   * Test the test rig. Make sure we can send a byte, and receive that byte,
+   * and if we close the producer stream, we get EOD on the
+   * consumer.
+   */
+  @Test def testSocketPairTestRig1(): Unit = {
+    val sptr = new SocketPairTestRig {
+      override def test(pos: OutputStream, cis: InputStream): Unit = {
+        pos.write(0x31)
+        pos.close()
+        val i: Int = cis.read()
+        assertEquals(0x31, i)
+        val j = cis.read()
+        assertEquals(-1, j)
+      }
+    }
+    sptr.run()
+  }
+
+  /**
+   * Test showing that when a blocking read on the consumer input stream
+   * in fact blocks, that we can detect this via timeout.
+   */
+  @Test def testHangDetection1(): Unit = {
+    val sptr = new SocketPairTestRig {
+      override def test(pos: OutputStream, cis: InputStream): Unit = {
+
+        //
+        // Write exactly 4 bytes to producer network stream
+        //
+        pos.write("1234".getBytes)
+        pos.flush()
+
+        //
+        // read 4 bytes. Should not hang.
+        //
+        // Caution: if debugging, this will timeout if you stop inside here!
+        //
+        val nRead = withTimeout("Read 4 bytes") {
+          val buf = new Array[Byte](4)
+          cis.read(buf, 0, 4)
+          assertEquals("1234".getBytes.toSeq, buf.toSeq)
+        }
+        //
+        // read 1 more byte. This will hang, and timeout.
+        //
+        intercept[TimeoutException] {
+          withTimeout(100.milliseconds) {
+            val buf = new Array[Byte](1)
+            cis.read(buf, 0, 1)
+          }
+          fail("Failed to throw exception")
+        }
+      }
+    }
+    sptr.run()
+  }
+
+  /**
+   * Test showing that a blocking read that hangs on the consumer stream
+   * will eventually be satisfied by something else writing to the producer 
stream.
+   */
+  @Test def testHangDetection2(): Unit = {
+    val sptr = new SocketPairTestRig {
+      override def test(pos: OutputStream, cis: InputStream): Unit = {
+
+        //
+        // Write exactly 3 bytes to producer network stream
+        //
+        pos.write("123".getBytes)
+        pos.flush()
+
+        var buf = new Array[Byte](4)
+        var nRead = withTimeout("Read 4 bytes") {
+          //
+          // This won't hang
+          //
+          cis.read(buf, 0, 4)
+        }
+        assertEquals(3, nRead)
+        assertEquals("123".getBytes.toSeq, buf.toSeq.slice(0, 3))
+
+        //
+        // In parallel, wait a bit, then write another byte.
+        //
+        Future {
+          Thread.sleep(timeLimit.toMillis / 10) // wait 1/10 of the normal 
timeout
+          pos.write("4".getBytes)
+          pos.flush()
+        }
+        //
+        // This won't hang. We'll wait long enough that our
+        // blocking read will get satisfied when the write above
+        // finally happens.
+        //
+        buf = new Array[Byte](1)
+        nRead = withTimeout {
+          cis.read(buf, 0, 1)
+        }
+        assertEquals('4'.toByte, buf(0))
+        assertEquals(1, nRead)
+      }
+    }
+    sptr.run()
+  }
+}
diff --git 
a/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream.scala
 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream.scala
index 010b459..c0a30fd 100644
--- 
a/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream.scala
+++ 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream.scala
@@ -62,6 +62,12 @@ class TestInputSourceDataInputStream {
   def assertEqualsTyped(expected: Float, actual: Float, threshold: Float) = 
assertEquals(expected, actual, threshold)
   def assertEqualsTyped(expected: Double, actual: Double, threshold: Double) = 
assertEquals(expected, actual, threshold)
 
+  @Test def testByteBufferInputSource1(): Unit = {
+    val dis = InputSourceDataInputStream(ten)
+    // provides codecov for this method for ByteBufferInputSource
+    assertTrue(dis.hasReachedEndOfData)
+  }
+
   @Test def testBitAndBytePos0: Unit = {
     val dis = InputSourceDataInputStream(ten)
     0L assertEqualsTyped (dis.bitPos0b)
diff --git 
a/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream2.scala
 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream2.scala
index da1afdf..d82df29 100644
--- 
a/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream2.scala
+++ 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream2.scala
@@ -74,7 +74,7 @@ class TestInputSourceDataInputStream2 {
     assertEquals(81, dis.bitLimit1b.get)
     assertEquals(10, dis.bytePos0b)
     dis.reset(m1)
-    assertFalse(dis.isDefinedForLength(1))
+    assertFalse(dis.hasData())
     dis.asInstanceOf[InputSourceDataInputStream].resetBitLimit0b(MaybeULong(10 
* 8))
     arr = dis.getByteArray(5 * 8, finfo)
     assertEquals(5, arr.size)
diff --git 
a/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream8.scala
 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream8.scala
new file mode 100644
index 0000000..020a12e
--- /dev/null
+++ 
b/daffodil-io/src/test/scala/org/apache/daffodil/io/TestInputSourceDataInputStream8.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.io
+
+
+import org.apache.daffodil.Implicits.intercept
+import org.apache.daffodil.util.MaybeULong
+import org.apache.daffodil.util.Misc
+import org.junit.Assert._
+import org.junit.Test
+
+import java.io.ByteArrayInputStream
+import java.io.InputStream
+import java.io.OutputStream
+
+/**
+ * Tests about detecting end-of-data, and
+ * insuring proper behavior for InputSourceDataInputStream
+ * with respect to reading unnecessary data.
+ *
+ * These cover the fix for DAFFODIL-2502, where users observed
+ * Daffodil would hang because of calls to isAtEnd.
+ */
+class TestInputSourceDataInputStream8 {
+
+  /**
+   * Test provides coverage of special detection in I/O layer
+   * of a incorrectly behaving InputStream.
+   */
+  @Test def testReadZeroBytesDetected(): Unit = {
+    //
+    // an InputStream where the read(buf, off, len) call returns 0
+    // meaning no data available temporarily.
+    // This is a badly behaving input stream. The API
+    // is supposed to be BLOCKING.
+    //
+    val badIS = new InputStream {
+      override def read() = ???
+      override def read(buf: Array[Byte], off: Int, len: Int): Int = {
+        assertTrue(len > 0)
+        0
+      }
+    }
+    val isdis = InputSourceDataInputStream(badIS)
+    val e = intercept[InputStreamReadZeroError] {
+      isdis.hasData()
+    }
+    assertTrue(e.getMessage().toLowerCase.contains("illegal"))
+  }
+
+  @deprecated("Tests isAtEnd", "3.1.0")
+  @Test def testIsAtEndEmpty1(): Unit = {
+    val emptyIS = new InputStream {
+      override def read() = ???
+      override def read(buf: Array[Byte], off: Int, len: Int): Int = {
+        assertTrue(len > 0)
+        -1 // nothing here
+      }
+    }
+    val isdis = InputSourceDataInputStream(emptyIS)
+    //
+    // before we attempt to read anything, we're not at end.
+    assertFalse(isdis.hasReachedEndOfData)
+    // this must attempt to read, and will get end-of-data
+    assertTrue(isdis.isAtEnd())
+    // so now we know we're at the end-of-data
+    assertTrue(isdis.hasReachedEndOfData)
+  }
+
+  @deprecated("Tests isAtEnd", "3.1.0")
+  @Test def testIsAtEndWholeByte1(): Unit = {
+    val oneByteIS = new ByteArrayInputStream("A".getBytes)
+    val isdis = InputSourceDataInputStream(oneByteIS)
+    //
+    // before we attempt to read anything, we're not at end.
+    assertFalse(isdis.hasReachedEndOfData)
+    // this must attempt to read, and will get end-of-data
+    assertTrue(isdis.hasData())
+    assertEquals(0, isdis.bitPos0b)
+    assertEquals(0,isdis.getUnsignedLong(1,FakeFormatInfo_MSBF_BE).toLong)
+    // we've only consumed 1 bit meaning
+    // we should have only fetched 1 byte and this will NOT encounter end of 
data yet.
+    assertFalse(isdis.hasReachedEndOfData)
+    // now retrieve next 7 bits. We still won't be at end of data
+    assertEquals('A',isdis.getUnsignedLong(7,FakeFormatInfo_MSBF_BE).toChar)
+    assertFalse(isdis.hasReachedEndOfData)
+    // try to read one more bit. That will not succeed, but now we'll be at 
end.
+    assertTrue(isdis.isAtEnd())
+    assertFalse(isdis.hasData())
+    assertTrue(isdis.hasReachedEndOfData)
+  }
+
+  @Test def testIsAtEndPartialByte1(): Unit = {
+    val oneByteIS = new ByteArrayInputStream(Misc.hex2Bytes("FF"))
+    val isdis = InputSourceDataInputStream(oneByteIS)
+    isdis.setBitLimit0b(MaybeULong(1))
+    //
+    // before we attempt to read anything, we're not at end.
+    assertFalse(isdis.hasReachedEndOfData)
+    assertTrue(isdis.isDefinedForLength(1)) // matches the bit limit set above
+    assertEquals(0, isdis.bitPos0b)
+    assertEquals(1,isdis.getUnsignedLong(1,FakeFormatInfo_MSBF_BE).toLong)
+    // we've only consumed 1 bit meaning
+    // we should have only fetched 1 byte and this will NOT encounter end of 
data yet.
+    assertFalse(isdis.hasReachedEndOfData)
+    assertEquals(1, isdis.bitPos0b)
+    // try to read 7 more bits.
+    // it will not succeed due to the bit limit, but
+    // it is still not enough for us to have hit end of data
+    assertFalse(isdis.isDefinedForLength(7))
+    assertFalse(isdis.hasReachedEndOfData)
+    // but, if we try to read 8 more bits, that puts us past the end of data
+    // except, since there's a bit limit, we'll never touch the underlying
+    // input source, so we still won't have gotten an end-of-data
+    assertFalse(isdis.isDefinedForLength(8))
+    assertFalse(isdis.hasReachedEndOfData)
+    // if we remove the bit limit however, ...
+    isdis.setBitLimit0b(MaybeULong.Nope)
+    // 7 more bits of the first byte are available
+    assertTrue(isdis.isDefinedForLength(7))
+    // but if we even test for bits beyond that, we'll encounter the 
end-of-data
+    assertFalse(isdis.isDefinedForLength(8))
+    assertTrue(isdis.hasReachedEndOfData)
+  }
+
+  /**
+   * This test shows that for a TCP stream, if 1 byte is sent,
+   * then if you attempt to read more than that many bytes, it will
+   * return just the 1 byte.
+   *
+   * This isn't testing Daffodil code, it's a test that assures us
+   * that the underlying I/O layer of the JVM/Java/Scala has the
+   * behavior we depend on.
+   *
+   * Our I/O layer depends on this behavior to avoid blocking
+   * on reads that attempt to fill a bucket (for efficiency) when
+   * there is temporarily less than that much data on a network
+   * TCP stream.
+   */
+  @Test def networkReadPartial1(): Unit = {
+    val sptr = new SocketPairTestRig {
+      override def test(pos: OutputStream, cis: InputStream): Unit = {
+        assertEquals(0, cis.available())
+        pos.write(0x31)
+        pos.flush()
+        val buf = new Array[Byte](4)
+        val nRead: Int = cis.read(buf, 0, 4)
+        assertEquals(1, nRead)
+        assertEquals(0x31, buf(0))
+      }
+    }
+    sptr.run()
+  }
+
+}
diff --git 
a/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestBase64.scala 
b/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestBase64.scala
index 7aa2cd8..46d7025 100644
--- a/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestBase64.scala
+++ b/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestBase64.scala
@@ -66,7 +66,6 @@ 
W1vdXMgcXVvdGVzIG9yIHNvbmcgbHlyaWNzIG9yIGFueXRoaW5nIGxpa2UgdGhhdCBpbnRyb2R1Y
     pairs.foreach {
       case (exp, act) => {
         if (exp != act) {
-          println("differ at character %s (0-based). Expected '%s' got 
'%s'.".format(i, exp, act))
           failed = true
         }
         i += 1
@@ -92,7 +91,7 @@ 
W1vdXMgcXVvdGVzIG9yIHNvbmcgbHlyaWNzIG9yIGFueXRoaW5nIGxpa2UgdGhhdCBpbnRyb2R1Y
     val data = b64Text.dropRight(3)
 
     intercept[IllegalArgumentException] {
-      println(new String(java.util.Base64.getMimeDecoder.decode(data)))
+      java.util.Base64.getMimeDecoder.decode(data)
     }
 
   }
@@ -203,7 +202,6 @@ 
W1vdXMgcXVvdGVzIG9yIHNvbmcgbHlyaWNzIG9yIGFueXRoaW5nIGxpa2UgdGhhdCBpbnRyb2R1Y
     pairs.foreach {
       case (exp, act) => {
         if (exp != act) {
-          println("differ at character %s (0-based). Expected '%s' got 
'%s'.".format(i, exp, act))
           failed = true
         }
         i += 1
diff --git 
a/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestJavaIOStreams.scala 
b/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestJavaIOStreams.scala
index d597d7f..a3edd5f 100644
--- 
a/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestJavaIOStreams.scala
+++ 
b/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestJavaIOStreams.scala
@@ -140,7 +140,6 @@ ZSBzaG9ydCB2ZWhlbWVuY2Ugb2YgYW55IGNhcm5hbCBwbGVhc3VyZS4="""
     val gzipBufferSize = 1
     val decodedStream = new java.util.zip.GZIPInputStream(inputStream, 
gzipBufferSize)
     val lines = IOUtils.readLines(decodedStream, 
StandardCharsets.ISO_8859_1).asScala.toSeq
-    lines.foreach { println }
     assertEquals(1, lines.length)
     assertEquals(expected, lines(0))
     val additionalLines = IOUtils.readLines(inputStream, 
StandardCharsets.ISO_8859_1).asScala.toSeq
@@ -172,7 +171,6 @@ ZSBzaG9ydCB2ZWhlbWVuY2Ugb2YgYW55IGNhcm5hbCBwbGVhc3VyZS4="""
     val gzipBufferSize = 1
     val decodedStream = new java.util.zip.GZIPInputStream(inputStream, 
gzipBufferSize)
     val lines = IOUtils.readLines(decodedStream, 
StandardCharsets.ISO_8859_1).asScala.toSeq
-    lines.foreach { println }
     assertEquals(1, lines.length)
     assertEquals(expected, lines(0))
     val additionalLines = IOUtils.readLines(inputStream, 
StandardCharsets.ISO_8859_1).asScala.toSeq
diff --git 
a/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestLimitingJavaIOStreams.scala
 
b/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestLimitingJavaIOStreams.scala
index 19d880d..e3160cf 100644
--- 
a/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestLimitingJavaIOStreams.scala
+++ 
b/daffodil-io/src/test/scala/org/apache/daffodil/layers/TestLimitingJavaIOStreams.scala
@@ -133,7 +133,6 @@ ZSBzaG9ydCB2ZWhlbWVuY2Ugb2YgYW55IGNhcm5hbCBwbGVhc3VyZS4="""
 
     val decodedStream = new java.util.zip.GZIPInputStream(limitedStream, 5)
     val lines = IOUtils.readLines(decodedStream, iso8859).asScala.toSeq
-    lines.foreach { println }
     assertEquals(1, lines.length)
     assertEquals(expected, lines(0))
     val additionalLines = IOUtils.readLines(inputStream, iso8859).asScala.toSeq
diff --git 
a/daffodil-japi/src/main/java/org/apache/daffodil/japi/package-info.java 
b/daffodil-japi/src/main/java/org/apache/daffodil/japi/package-info.java
index 1eaa6a0..16bfcc7 100644
--- a/daffodil-japi/src/main/java/org/apache/daffodil/japi/package-info.java
+++ b/daffodil-japi/src/main/java/org/apache/daffodil/japi/package-info.java
@@ -151,11 +151,11 @@
  * InputSourceDataInputStream is = new InputSourceDataInputStream(dataStream);
  * JDOMInfosetOutputter jdomOutputter = new JDOMInfosetOutputter();
  * boolean keepParsing = true;
- * while (keepParsing) {
+ * while (keepParsing && is.hasData()) {
  *   jdomOutputter.reset();
  *   ParseResult pr = dp.parse(is, jdomOutputter);
  *   ...
- *   keepParsing = !pr.location().isAtEnd() && !pr.isError();
+ *   keepParsing = !pr.isError();
  * }
  * }</pre>
  *
@@ -214,12 +214,12 @@
  * SAXHandler contentHandler = new SAXHandler();
  * xmlReader.setContentHandler(contentHandler);
  * Boolean keepParsing = true;
- * while (keepParsing) {
+ * while (keepParsing && is.hasData()) {
  *   contentHandler.reset();
  *   xmlReader.parse(is);
  *   val pr = 
xmlReader.getProperty(DaffodilParseXMLReader.DAFFODIL_SAX_URN_PARSERESULT());
  *   ...
- *   keepParsing = !pr.location().isAtEnd() && !pr.isError();
+ *   keepParsing = !pr.isError();
  * }
  * }
  * </pre>
diff --git 
a/daffodil-japi/src/main/scala/org/apache/daffodil/japi/Daffodil.scala 
b/daffodil-japi/src/main/scala/org/apache/daffodil/japi/Daffodil.scala
index ba48c30..2a30efc 100644
--- a/daffodil-japi/src/main/scala/org/apache/daffodil/japi/Daffodil.scala
+++ b/daffodil-japi/src/main/scala/org/apache/daffodil/japi/Daffodil.scala
@@ -490,10 +490,19 @@ class DataLocation private[japi] (dl: SDataLocation) {
   override def toString() = dl.toString
 
   /**
-   * Determine if this data location is at the end of the input data
+   * Determine if we're positioned at the end of data.
    *
-   * @return true if this represents the end of the input data, false otherwise
+   * Blocks until either one byte of data can be read, or end-of-data
+   * is encountered.
+   *
+   * It is generally not advised to use this on network TCP data streams
+   * as it will block waiting for the sender of data to provide more data
+   * or close the stream.
+   *
+   * @return boolean indicating whether we are known to be positioned at
+   *         the end of data.
    */
+  @deprecated("Use comparison of bitPos1b() with expected position instead.", 
"3.1.0")
   def isAtEnd() = dl.isAtEnd
 
   /**
diff --git 
a/daffodil-japi/src/main/scala/org/apache/daffodil/japi/io/InputSourceDataInputStream.scala
 
b/daffodil-japi/src/main/scala/org/apache/daffodil/japi/io/InputSourceDataInputStream.scala
index 76984a5..76bdf7d 100644
--- 
a/daffodil-japi/src/main/scala/org/apache/daffodil/japi/io/InputSourceDataInputStream.scala
+++ 
b/daffodil-japi/src/main/scala/org/apache/daffodil/japi/io/InputSourceDataInputStream.scala
@@ -20,7 +20,7 @@ package org.apache.daffodil.japi.io
 import java.io.InputStream
 import java.nio.ByteBuffer
 
-import org.apache.daffodil.io.{ InputSourceDataInputStream => 
SInputSourceDataInputStream }
+import org.apache.daffodil.io.{InputSourceDataInputStream => 
SInputSourceDataInputStream}
 
 /**
  * Provides Daffodil with byte data from an InputStream, ByteBuffer, or byte
@@ -28,7 +28,7 @@ import org.apache.daffodil.io.{ InputSourceDataInputStream => 
SInputSourceDataIn
  *
  * @param dis the underlying Scala InputSourceDataInputStream
  */
-class InputSourceDataInputStream private[japi] (private [japi] val dis: 
SInputSourceDataInputStream) {
+class InputSourceDataInputStream private[japi](private[japi] val dis: 
SInputSourceDataInputStream) {
 
   /**
    * Create an InputSourceDataInputStream from a java.io.InputStream
@@ -38,10 +38,30 @@ class InputSourceDataInputStream private[japi] (private 
[japi] val dis: SInputSo
   /**
    * Create an InputSourceDataInputStream from a java.nio.ByteBuffer
    */
-  def this(bb: ByteBuffer) = this(SInputSourceDataInputStream(bb)) 
+  def this(bb: ByteBuffer) = this(SInputSourceDataInputStream(bb))
 
   /**
    * Create an InputSourceDataInputStream from a byte array
    */
-  def this(arr: Array[Byte]) = this(SInputSourceDataInputStream(arr)) 
+  def this(arr: Array[Byte]) = this(SInputSourceDataInputStream(arr))
+
+  /**
+   * Returns true if the input stream has at least 1 bit of data.
+   *
+   * Does not advance the position.
+   *
+   * Returns true immediately if the input stream has available data that
+   * has not yet been consumed.
+   *
+   * On a network input stream, this may block to determine if the stream
+   * contains data or is at end-of-data.
+   *
+   * This is used when parsing multiple elements from a stream to see if there
+   * is data or not before calling parse().
+   *
+   * It may also be used after a parse() operation that is intended to consume
+   * the entire data stream (such as for a file) to determine if all data has
+   * been consumed or some data is left-over.
+   */
+  def hasData(): Boolean = dis.isDefinedForLength(1)
 }
diff --git 
a/daffodil-japi/src/test/java/org/apache/daffodil/example/TestJavaAPI.java 
b/daffodil-japi/src/test/java/org/apache/daffodil/example/TestJavaAPI.java
index 64aa384..1b52140 100644
--- a/daffodil-japi/src/test/java/org/apache/daffodil/example/TestJavaAPI.java
+++ b/daffodil-japi/src/test/java/org/apache/daffodil/example/TestJavaAPI.java
@@ -26,13 +26,18 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.daffodil.japi.*;
 import org.apache.daffodil.japi.infoset.XMLTextInfosetOutputter;
 import org.jdom2.output.Format;
@@ -109,7 +114,6 @@ public class TestJavaAPI {
         ParseResult res = dp.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertTrue(res.location().isAtEnd());
         assertEquals(0, lw.errors.size());
         assertEquals(0, lw.warnings.size());
         assertTrue(lw.others.size() > 0);
@@ -157,14 +161,17 @@ public class TestJavaAPI {
         parser = parser.withDebuggerRunner(debugger);
         parser = parser.withDebugging(true);
 
-        java.io.File file = getResource("/test/japi/myData.dat");
-        java.io.FileInputStream fis = new java.io.FileInputStream(file);
-        InputSourceDataInputStream dis = new InputSourceDataInputStream(fis);
+        File data = getResource("/test/japi/myData.dat");
+        // This test uses a byte array here, just so as to be sure to exercise
+        // the constructor for creating an InputSourceDataInputStream from a 
byte array
+        // and byte buffer.
+        byte[] ba = FileUtils.readFileToByteArray(data);
+        ByteBuffer bb = ByteBuffer.wrap(ba);
+        InputSourceDataInputStream dis = new InputSourceDataInputStream(bb);
         JDOMInfosetOutputter outputter = new JDOMInfosetOutputter();
         ParseResult res = parser.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertTrue(res.location().isAtEnd());
         assertEquals(0, lw.errors.size());
         assertEquals(0, lw.warnings.size());
         assertTrue(lw.others.size() > 0);
@@ -238,8 +245,11 @@ public class TestJavaAPI {
         dp = reserializeDataProcessor(dp);
 
         java.io.File file = getResource("/test/japi/myDataBroken.dat");
-        java.io.FileInputStream fis = new java.io.FileInputStream(file);
-        InputSourceDataInputStream dis = new InputSourceDataInputStream(fis);
+        // This test uses a byte array here, just so as to be sure to exercise
+        // the constructor for creating an InputSourceDataInputStream from a 
byte array
+        // and byte buffer.
+        byte[] ba = FileUtils.readFileToByteArray(file);
+        InputSourceDataInputStream dis = new InputSourceDataInputStream(ba);
         JDOMInfosetOutputter outputter = new JDOMInfosetOutputter();
         ParseResult res = dp.parse(dis, outputter);
 
@@ -295,7 +305,6 @@ public class TestJavaAPI {
         ParseResult res = dp.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertFalse(res.location().isAtEnd());
         assertEquals(2, res.location().bytePos1b());
         assertEquals(9, res.location().bitPos1b());
 
@@ -336,7 +345,6 @@ public class TestJavaAPI {
         ParseResult res = parser.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertFalse(res.location().isAtEnd());
         assertEquals(2, res.location().bytePos1b());
         assertEquals(9, res.location().bitPos1b());
 
@@ -365,7 +373,6 @@ public class TestJavaAPI {
         ParseResult res = dp.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertFalse(res.location().isAtEnd());
         assertEquals(5, res.location().bytePos1b());
         assertEquals(33, res.location().bitPos1b());
 
@@ -394,7 +401,6 @@ public class TestJavaAPI {
         ParseResult res = dp.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertTrue(!res.location().isAtEnd());
         assertEquals(5, res.location().bytePos1b());
         assertEquals(33, res.location().bitPos1b());
 
@@ -468,7 +474,6 @@ public class TestJavaAPI {
         ParseResult res = dp.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertTrue(res.location().isAtEnd());
 
         java.io.ByteArrayOutputStream bos = new 
java.io.ByteArrayOutputStream();
         java.nio.channels.WritableByteChannel wbc = 
java.nio.channels.Channels.newChannel(bos);
@@ -511,7 +516,6 @@ public class TestJavaAPI {
         ParseResult res = dp.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertTrue(res.location().isAtEnd());
 
         java.io.ByteArrayOutputStream bos = new 
java.io.ByteArrayOutputStream();
         java.nio.channels.WritableByteChannel wbc = 
java.nio.channels.Channels.newChannel(bos);
@@ -550,7 +554,6 @@ public class TestJavaAPI {
         ParseResult res = dp.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertTrue(res.location().isAtEnd());
 
         org.jdom2.Document doc1 = outputter.getResult();
 
@@ -602,7 +605,6 @@ public class TestJavaAPI {
         org.jdom2.Element rootNode = doc.getRootElement();
         org.jdom2.Element hidden = rootNode.getChild("hiddenElement", 
rootNode.getNamespace());
         assertTrue(null == hidden);
-        assertTrue(res.location().isAtEnd());
     }
 
     /**
@@ -643,7 +645,6 @@ public class TestJavaAPI {
         assertTrue(null == rootE2);
         org.jdom2.Element rootE3 = rootNode.getChild("e3", null);
         assertTrue(null == rootE3);
-        assertTrue(res.location().isAtEnd());
     }
 
     @Test
@@ -671,7 +672,6 @@ public class TestJavaAPI {
         ParseResult res = dp.parse(dis, outputter);
         boolean err = res.isError();
         assertFalse(err);
-        assertTrue(res.location().isAtEnd());
 
         assertEquals(0, lw2.errors.size());
         assertEquals(0, lw2.warnings.size());
@@ -763,7 +763,6 @@ public class TestJavaAPI {
         boolean containsVar1Value = docString.contains("externallySet");
         assertTrue(containsVar1);
         assertTrue(containsVar1Value);
-        assertTrue(res.location().isAtEnd());
 
         assertEquals(0, lw.errors.size());
         assertEquals(0, lw.warnings.size());
@@ -836,7 +835,6 @@ public class TestJavaAPI {
         assertTrue(res.isError());
         assertFalse(res.isProcessingError());
         assertTrue(res.isValidationError());
-        assertTrue(res.location().isAtEnd());
 
         java.util.List<Diagnostic> diags = res.getDiagnostics();
         assertEquals(1, diags.size());
@@ -863,7 +861,8 @@ public class TestJavaAPI {
         assertTrue(res.isError());
         assertFalse(res.isProcessingError());
         assertTrue(res.isValidationError());
-        assertTrue(res.location().isAtEnd());
+        long actualLength = res.location().bytePos1b() - 1;
+        assertEquals(file.length(), actualLength);
 
         java.util.List<Diagnostic> diags = res.getDiagnostics();
         assertEquals(3, diags.size());
@@ -905,7 +904,6 @@ public class TestJavaAPI {
       res = dp.parse(input, outputter);
       err = res.isError();
       assertFalse(err);
-      assertFalse(res.location().isAtEnd());
       assertEquals(5, res.location().bytePos1b());
       assertEquals("data", outputter.getResult().getRootElement().getText());
 
@@ -913,7 +911,6 @@ public class TestJavaAPI {
       res = dp.parse(input, outputter);
       err = res.isError();
       assertFalse(err);
-      assertFalse(res.location().isAtEnd());
       assertEquals(9, res.location().bytePos1b());
       assertEquals("left", outputter.getResult().getRootElement().getText());
 
@@ -921,7 +918,7 @@ public class TestJavaAPI {
       res = dp.parse(input, outputter);
       err = res.isError();
       assertFalse(err);
-      assertTrue(res.location().isAtEnd());
+      assertFalse(input.hasData());
       assertEquals(13, res.location().bytePos1b());
       assertEquals("over", outputter.getResult().getRootElement().getText());
     }
@@ -999,7 +996,6 @@ public class TestJavaAPI {
         String infosetSAXString = new  
org.jdom2.output.XMLOutputter(pretty).outputString(contentHandler.getDocument());
 
         assertFalse(err);
-        assertTrue(resSAX.location().isAtEnd());
         assertTrue(diags.isEmpty());
         assertEquals(infosetDPString, infosetSAXString);
 
@@ -1112,8 +1108,6 @@ public class TestJavaAPI {
         assertTrue(containsVar1);
         assertTrue(containsVar1Value);
 
-        assertTrue(res.location().isAtEnd());
-
         assertEquals(0, lw.errors.size());
         assertEquals(0, lw.warnings.size());
         assertTrue(lw.others.size() > 0);
diff --git 
a/daffodil-lib/src/main/scala/org/apache/daffodil/api/Diagnostic.scala 
b/daffodil-lib/src/main/scala/org/apache/daffodil/api/Diagnostic.scala
index d9d8ad9..108dcd5 100644
--- a/daffodil-lib/src/main/scala/org/apache/daffodil/api/Diagnostic.scala
+++ b/daffodil-lib/src/main/scala/org/apache/daffodil/api/Diagnostic.scala
@@ -224,7 +224,10 @@ abstract class Diagnostic protected (
  */
 trait DataLocation {
   def toString: String
+
+  @deprecated("Use comparison of bitPos1b with expected position instead.", 
"3.1.0")
   def isAtEnd: Boolean
+
   def bitPos1b: Long
   def bytePos1b: Long
 }
diff --git 
a/daffodil-lib/src/main/scala/org/apache/daffodil/exceptions/Assert.scala 
b/daffodil-lib/src/main/scala/org/apache/daffodil/exceptions/Assert.scala
index b123b50..e096b07 100644
--- a/daffodil-lib/src/main/scala/org/apache/daffodil/exceptions/Assert.scala
+++ b/daffodil-lib/src/main/scala/org/apache/daffodil/exceptions/Assert.scala
@@ -45,13 +45,17 @@ abstract class ThinException protected (dummy: Int, cause: 
Throwable, fmt: Strin
   def this(cause: Throwable) = this(1, cause, null)
 }
 
-abstract class UnsuppressableException(m: String) extends Exception(m) {
-  def this() = this("") // no arg constructor also.
+// $COVERAGE-OFF$ These exception objects should never be created by tests.
+abstract class UnsuppressableException(m: String, th: Throwable) extends 
Exception(m, th) {
+  def this(msg: String) = this(msg, null)
+  def this(th: Throwable) = this(null, th)
 }
+
 class UsageException(m: String) extends UnsuppressableException(m)
 class NotYetImplementedException(m: String) extends 
UnsuppressableException("Not yet implemented: " + m)
-class Abort(m: String) extends UnsuppressableException(m) {
-  def this(th: Throwable) = this(th.getMessage())
+class Abort(m: String, th: Throwable) extends UnsuppressableException(m, th) {
+  def this(th: Throwable) = this(null, th)
+  def this(m: String) = this(m, null)
 }
 
 class Assert {
@@ -64,6 +68,7 @@ class Assert {
     throw x
   }
 }
+// $COVERAGE-ON$
 
 object Assert extends Assert {
 
@@ -102,9 +107,12 @@ object Assert extends Assert {
   /**
    * Conditional behavior for NYIs
    */
-  def notYetImplemented(): Nothing = macro AssertMacros.notYetImplementedMacro0
   def notYetImplemented(testThatWillThrowIfTrue: Boolean): Unit = macro 
AssertMacros.notYetImplementedMacro1
   def notYetImplemented(testThatWillThrowIfTrue: Boolean, msg: String): Unit = 
macro AssertMacros.notYetImplementedMacro2
+
+  // $COVERAGE-OFF$ These unconditional assertions should never get executed 
by tests.
+
+  def notYetImplemented(): Nothing = macro AssertMacros.notYetImplementedMacro0
   //
   // Throughout this file, specifying return type Nothing
   // gets rid of many spurious (scala compiler bug) dead code
@@ -174,5 +182,6 @@ object Assert extends Assert {
   def invariantFailed(msg: String = "") = {
     abort("Invariant broken. " + msg)
   }
+  // $COVERAGE-ON$
 
 }
diff --git 
a/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala 
b/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
index 4b06b8f..3dca4c1 100644
--- a/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
+++ b/daffodil-lib/src/main/scala/org/apache/daffodil/util/Coroutines.scala
@@ -17,14 +17,12 @@
 
  package org.apache.daffodil.util
 
- import java.util.concurrent.ArrayBlockingQueue
+ import org.apache.daffodil.exceptions.UnsuppressableException
 
- import scala.util.Try
- import scala.util.Success
+ import java.util.concurrent.ArrayBlockingQueue
  import scala.util.Failure
-
- import org.apache.daffodil.exceptions.Assert
- import org.apache.daffodil.exceptions.UnsuppressableException
+ import scala.util.Success
+ import scala.util.Try
 
  /**
   * General purpose Co-routines.
@@ -73,8 +71,12 @@
     * Call when a co-routine resumes another (to provide a result of some sort)
     * and then terminates. The coroutine calling this must return from the 
run()
     * method immediately after calling this.
+    *
+    * @tparam R The type of objects transmitted to the other coroutine. These 
need
+    *           not be the same type as the kind transmitted back to this 
calling
+    *           coroutine.
     */
-   final def resumeFinal(coroutine: Coroutine[T], in: T): Unit = {
+   final def resumeFinal[R](coroutine: Coroutine[R], in: R): Unit = {
      coroutine.init()
      coroutine.inboundQueue.put(in) // allows other to run final
    }
@@ -84,8 +86,11 @@
     * argument value to it.
     *
     * The current co-routine will be suspended until it is resumed later.
+    * @tparam R The type of objects transmitted to the other coroutine. These 
need
+    *           not be the same type as the kind transmitted back to this 
calling
+    *           coroutine.
     */
-   final def resume(coroutine: Coroutine[T], in: T): T = {
+   final def resume[R](coroutine: Coroutine[R], in: R): T = {
      resumeFinal(coroutine, in)
      val res = waitForResume() // blocks until it is resumed
      res
@@ -99,6 +104,21 @@
  }
 
  /**
+  * Convenience class, since many Coroutines systems have this as
+  * the main coroutine definition
+  * @tparam T The value type returned to this main coroutine when it is
+  *           resumed by other coroutines.
+  */
+ class MainCoroutine[T] extends Coroutine[T] {
+   final override def isMain = true
+   // $COVERAGE-OFF$
+   override protected def run(): Unit = {
+     throw new Error("Main thread co-routine run method should not be called.")
+   }
+   // $COVERAGE-ON$
+ }
+
+ /**
   * Convert something that has callbacks (e.g., SAX-like parser that calls 
back on events)
   * into a pull-style API aka Iterator.
   *
@@ -118,7 +138,9 @@
   * 
https://scalaenthusiast.wordpress.com/2013/06/12/transform-a-callback-function-to-an-iteratorlist-in-scala/
   */
 
- final class InvertControl[S](body: => Unit) extends Iterator[S] with 
Coroutine[Try[S]] {
+ final class InvertControl[S](body: => Unit)
+   extends MainCoroutine[Try[S]]
+     with Iterator[S] {
 
    private object EndMarker extends Throwable
    private val EndOfData = Failure(EndMarker)
@@ -155,8 +177,6 @@
 
    private val producer = new Producer(this)
 
-   override def isMain = true
-
    private var failed = false
 
    private val dummy: Try[S] = Success(null.asInstanceOf[S])
@@ -183,8 +203,4 @@
      else iterator.next()
    }
 
-   override def run(): Unit= {
-     Assert.invariantFailed("Main thread co-routine run method should not be 
called.")
-   }
-
  }
diff --git 
a/daffodil-lib/src/test/scala/org/apache/daffodil/util/TestCoroutines.scala 
b/daffodil-lib/src/test/scala/org/apache/daffodil/util/TestCoroutines.scala
new file mode 100644
index 0000000..444e926
--- /dev/null
+++ b/daffodil-lib/src/test/scala/org/apache/daffodil/util/TestCoroutines.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.daffodil.util
+
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.daffodil.Implicits._
+
+private class TestException(e: String) extends Exception(e)
+
+private class TestInvertControl {
+
+  /**
+   * We need JInt because Int is not subtype of AnyRef
+   * but Coroutines requires T <: AnyRef
+   */
+  type JInt = java.lang.Integer
+
+  def sprintln(s: String): Unit = {
+    /*
+     * Uncomment this for verbose messsaging so you can see what is going on.
+     */
+    // println(s)
+  }
+
+  /**
+   * Simulates some library that takes a callback function
+   *
+   * Note that this library is allowed to be NON THREAD SAFE.
+   */
+  class NotThreadSafeLibrary[T]() {
+    def doIt(l: Seq[T]) : Unit = {
+      sprintln("NotThreadSafeLibrary running")
+      l.foreach { x =>
+        sprintln("NotThreadSafeLibrary calling back with " + x)
+        handleEvent(x)
+      }
+      sprintln("NotThreadSafeLibrary done (normal)")
+    }
+
+    private var handleEvent: T => Any = _
+
+    def setHandler(f: T => Any) : Unit = {
+      handleEvent = f
+    }
+  }
+
+  /**
+   * Illustrates how to use the InvertControl class
+   */
+  @Test def test1() : Unit = {
+
+    val cb = new NotThreadSafeLibrary[JInt]() // some non-safe library that 
gets a callback handler
+    //
+    // Wrap the initial call that starts the library in the
+    // InvertControl class.
+    //
+    val iter = new InvertControl[JInt](
+      // this argument is the code to call to run the library
+      cb.doIt(List(1, 2, 3))
+      // not executed until we start iterating
+    )
+    assertTrue(iter.isMain)
+    //
+    // define a handler as the library requires
+    //
+    def handler(i: JInt) : Unit = {
+      // sprintln("handler called on: " + i)
+      //
+      // handler must call the callback function of the InvertControl
+      // instance
+      //
+      iter.setNext(i)
+    }
+    //
+    // Don't forget to tell the library that this is your handler
+    //
+    cb.setHandler(handler) // setting callback handler.
+
+    sprintln("asking for first element")
+    var i = iter.next() // library runs until first callback.
+    sprintln("got first element")
+    assertEquals(1, i)
+    sprintln("asking for second element")
+    i = iter.next()
+    sprintln("got second element")
+    assertEquals(2, i)
+    sprintln("asking for third element")
+    i = iter.next()
+    sprintln("got third element")
+    assertEquals(3, i)
+    assertFalse(iter.hasNext)
+    sprintln("done")
+  }
+
+  /**
+   * Illustrates exception in generator ends up back on the consumer.
+   */
+  @Test def test2() : Unit = {
+
+    val cb = new NotThreadSafeLibrary[JInt]() // some non-safe library that 
gets a callback handler
+    //
+    // Wrap the initial call that starts the library in the
+    // InvertControl class.
+    //
+    lazy val iter: InvertControl[JInt] = new InvertControl[JInt]({
+      // this argument is the code to call to run the library
+      var wasThrown = false
+      try {
+        cb.doIt(List(1, 2, 3)) // not executed until we start iterating
+      } catch {
+        case e: TestException =>
+          // iter.setFinal(e)
+          wasThrown = true
+      }
+      assertTrue(wasThrown)
+      sprintln("NotThreadSafeLibrary exiting")
+    })
+
+    //
+    // define a handler as the library requires
+    //
+    def handler(i: JInt) : Unit = {
+      sprintln("handler called on: " + i)
+      //
+      // handler must call the callback function of the InvertControl
+      // instance
+      //
+      if (i == 3) {
+        val e = new TestException("you had to give me a three?")
+        sprintln("NotThreadSafeLibrary throwing :" + e)
+        throw e
+      }
+      iter.setNext(i)
+    }
+    //
+    // Don't forget to tell the library that this is your handler
+    //
+    cb.setHandler(handler) // setting callback handler.
+
+    sprintln("asking for first element")
+    var i = iter.next() // library runs until first callback.
+    sprintln("got first element")
+    assertEquals(1, i)
+    sprintln("asking for second element")
+    i = iter.next()
+    sprintln("got second element")
+    assertEquals(2, i)
+    sprintln("asking for third element")
+    val e = intercept[NoSuchElementException] {
+      i = iter.next()
+      fail()
+    }
+    sprintln("consumer caught exception: " + e)
+    assertFalse(iter.hasNext)
+    sprintln("consumer done")
+  }
+
+}
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
index 88138b9..ecb9468 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/api/DFDLParserUnparser.scala
@@ -19,8 +19,8 @@ package org.apache.daffodil.api
 
 import org.apache.daffodil.processors.ProcessorResult
 import org.apache.daffodil.processors.Success
-import java.io.File
 
+import java.io.File
 import org.apache.daffodil.processors.VariableMap
 import org.apache.daffodil.externalvars.Binding
 import org.apache.daffodil.infoset.InfosetInputter
@@ -29,6 +29,7 @@ import org.apache.daffodil.infoset.InfosetOutputter
 import org.apache.daffodil.processors.Failure
 import org.apache.daffodil.io.InputSourceDataInputStream
 import org.apache.daffodil.util.Coroutine
+import org.apache.daffodil.util.MainCoroutine
 import org.apache.daffodil.util.Maybe
 import org.apache.daffodil.util.Maybe.Nope
 import org.xml.sax.SAXException
@@ -246,7 +247,9 @@ object DFDL {
     def parse(ab: Array[Byte]): Unit
   }
 
-  trait DaffodilUnparseContentHandler extends org.xml.sax.ContentHandler with 
ProducerCoroutine {
+  trait DaffodilUnparseContentHandler
+    extends  ProducerCoroutine
+    with org.xml.sax.ContentHandler {
     def getUnparseResult: UnparseResult
     def enableInputterResolutionOfRelativeInfosetBlobURIs(): Unit
   }
@@ -316,12 +319,7 @@ object DFDL {
     }
   }
 
-  trait ProducerCoroutine extends Coroutine[Array[SAXInfosetEvent]] {
-    override def isMain = true
-    override protected def run(): Unit = {
-      throw new Error("Main thread co-routine run method should not be 
called.")
-    }
-  }
+  trait ProducerCoroutine extends MainCoroutine[Array[SAXInfosetEvent]]
 
   trait ConsumerCoroutine extends Coroutine[Array[SAXInfosetEvent]]
 
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/dsom/CompiledExpression1.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/dsom/CompiledExpression1.scala
index 0006c4b..3c6fc32 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/dsom/CompiledExpression1.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/dsom/CompiledExpression1.scala
@@ -102,7 +102,7 @@ abstract class CompiledExpression[+T <: AnyRef](
    *
    * isConstant must be true or this will throw.
    */
-  @deprecated("2016-02-18", "Code should just call evaluate(...) on an 
Evaluatable object.")
+  @deprecated("Code should just call evaluate(...) on an Evaluatable object.", 
"2016-02-18")
   def constant: T
   def isConstant: Boolean
 
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
index 6e59e32..2d4bc4b 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/layers/LayerTransformer.scala
@@ -216,7 +216,7 @@ class JavaIOInputStream(s: InputSourceDataInputStream, 
finfo: FormatInfo)
     }
   }
 
-  override def available(): Int = 1
+  override def available(): Int = 0
 
   override def close(): Unit = {
     // do nothing
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataLoc.scala 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataLoc.scala
index 6260473..a31608d 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataLoc.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/DataLoc.scala
@@ -17,8 +17,7 @@
 
 package org.apache.daffodil.processors
 
-import org.apache.daffodil.Implicits._; object INoWarn { 
ImplicitsSuppressUnusedImportWarning() }
-import org.apache.daffodil.util._
+import org.apache.daffodil.io.InputSourceDataInputStream
 import org.apache.daffodil.exceptions._
 import org.apache.daffodil.schema.annotation.props.gen.BitOrder
 import org.apache.daffodil.util.Maybe
@@ -33,9 +32,23 @@ import org.apache.daffodil.util.MaybeULong
 import org.apache.daffodil.api.DataLocation
 import org.apache.daffodil.processors.parsers.PState
 
-class DataLoc(val bitPos1b: Long, bitLimit1b: MaybeULong, val isAtEnd: 
Boolean, eitherStream: Either[DataOutputStream, DataInputStream],
+class DataLoc(
+  val bitPos1b: Long,
+  bitLimit1b: MaybeULong,
+  eitherStream: Either[DataOutputStream, DataInputStream],
   val maybeERD: Maybe[ElementRuntimeData]) extends DataLocation {
 
+  // $COVERAGE-OFF$
+  @deprecated("Use bitPos1b to compare with expected position (possibly 
bitLimit1b).", "3.1.0")
+  override def isAtEnd = {
+    eitherStream match {
+      case Right(isdis: InputSourceDataInputStream) => isdis.isAtEnd
+      case Left(_) => Assert.usageError("isAtEnd not defined for unparsing.")
+      case Right(s) => Assert.invariantFailed("Unknown kind of data stream: " 
+ s)
+    }
+  }
+  // $COVERAGE-ON$
+
   // override def toString = "DataLoc(bitPos1b='%s', 
bitLimit1b='%s')".format(bitPos1b, bitLimit1b)
   override def toString() = {
     "byte " + bitPos1b / 8 + (if (bitLimit1b.isDefined) " limit(bytes) " + 
bitLimit1b.get / 8 else "")
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/parsers/PState.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/parsers/PState.scala
index c7352b4..e02cfac 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/parsers/PState.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/parsers/PState.scala
@@ -20,10 +20,7 @@ package org.apache.daffodil.processors.parsers
 import java.nio.channels.Channels
 import java.nio.file.Files
 import java.nio.file.Path
-
-import scala.Right
 import scala.collection.mutable
-
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DaffodilTunables
 import org.apache.daffodil.api.DataLocation
@@ -288,8 +285,7 @@ final class PState private (
   }
 
   def currentLocation: DataLocation = {
-    val isAtEnd = !dataInputStream.isDefinedForLength(1)
-    new DataLoc(bitPos1b, bitLimit1b, isAtEnd, Right(dataInputStream), 
Maybe(thisElement.runtimeData))
+    new DataLoc(bitPos1b, bitLimit1b, Right(dataInputStream), 
Maybe(thisElement.runtimeData))
   }
 
   def bitPos0b = dataInputStream.bitPos0b
diff --git 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
index fa21f25..6a1dd69 100644
--- 
a/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
+++ 
b/daffodil-runtime1/src/main/scala/org/apache/daffodil/processors/unparsers/UState.scala
@@ -20,9 +20,6 @@ package org.apache.daffodil.processors.unparsers
 import java.io.ByteArrayOutputStream
 import java.nio.CharBuffer
 import java.nio.LongBuffer
-
-import scala.Left
-
 import org.apache.daffodil.api.DFDL
 import org.apache.daffodil.api.DaffodilTunables
 import org.apache.daffodil.api.DataLocation
@@ -166,8 +163,7 @@ abstract class UState(
   def currentLocation: DataLocation = {
     val m = maybeCurrentInfosetElement
     val mrd = if (m.isDefined) Maybe(m.value.runtimeData) else Nope
-    val isAtEnd = false // TODO: this isn't right, but what does it mean to be 
at the end? Nothing appears to use this value when unparsing
-    new DataLoc(bitPos1b, bitLimit1b, isAtEnd, Left(dataOutputStream), mrd)
+    new DataLoc(bitPos1b, bitLimit1b, Left(dataOutputStream), mrd)
   }
 
   lazy val unparseResult = new UnparseResult(dataProc.get, this)
diff --git 
a/daffodil-runtime2/src/main/scala/org/apache/daffodil/runtime2/Runtime2DataProcessor.scala
 
b/daffodil-runtime2/src/main/scala/org/apache/daffodil/runtime2/Runtime2DataProcessor.scala
index efd1b0e..9c35d18 100644
--- 
a/daffodil-runtime2/src/main/scala/org/apache/daffodil/runtime2/Runtime2DataProcessor.scala
+++ 
b/daffodil-runtime2/src/main/scala/org/apache/daffodil/runtime2/Runtime2DataProcessor.scala
@@ -25,6 +25,7 @@ import org.apache.daffodil.api.DaffodilTunables
 import org.apache.daffodil.api.DataLocation
 import org.apache.daffodil.api.ValidationMode
 import org.apache.daffodil.api.ValidationResult
+import org.apache.daffodil.exceptions.Assert
 import org.apache.daffodil.externalvars.Binding
 import org.apache.daffodil.processors.Failure
 import org.apache.daffodil.processors.ProcessorResult
@@ -88,12 +89,12 @@ class Runtime2DataProcessor(executableFile: os.Path) 
extends DFDL.DataProcessorB
       os.write(infile, input)
       val result = os.proc(executableFile, "parse", "-I", "xml", "-o", 
outfile, infile).call(cwd = tempDir, stderr = os.Pipe)
       if (result.out.text.isEmpty && result.err.text.isEmpty) {
-        val parseResult = new ParseResult(outfile, Success)
+        val parseResult = new ParseResult(outfile, Success, infile)
         parseResult
       } else {
         val msg = s"Unexpected daffodil output on stdout: ${result.out.text} 
on stderr: ${result.err.text}"
         val parseError = new ParseError(Nope, Nope, Nope, Maybe(msg))
-        val parseResult = new ParseResult(outfile, Failure(parseError))
+        val parseResult = new ParseResult(outfile, Failure(parseError), infile)
         parseResult.addDiagnostic(parseError)
         parseResult
       }
@@ -105,7 +106,7 @@ class Runtime2DataProcessor(executableFile: os.Path) 
extends DFDL.DataProcessorB
           val msg = s"${e.getMessage} with stdout: ${e.result.out.text} and 
stderr: ${e.result.err.text}"
           new ParseError(Nope, Nope, Nope, Maybe(msg))
         }
-        val parseResult = new ParseResult(outfile, Failure(parseError))
+        val parseResult = new ParseResult(outfile, Failure(parseError), infile)
         parseResult.addDiagnostic(parseError)
         parseResult
     } finally {
@@ -154,28 +155,35 @@ class Runtime2DataProcessor(executableFile: os.Path) 
extends DFDL.DataProcessorB
 }
 
 object Runtime2DataLocation {
-  class Runtime2DataLocation(_isAtEnd: Boolean,
-                             _bitPos1b: Long,
+  class Runtime2DataLocation( _bitPos1b: Long,
                              _bytePos1b: Long) extends DataLocation {
-    override def isAtEnd: Boolean = _isAtEnd
     override def bitPos1b: Long = _bitPos1b
     override def bytePos1b: Long = _bytePos1b
+
+    // $COVERAGE-OFF$
+    @deprecated("Use comparison of bitPos1b with expected position instead.", 
"3.1.0")
+    override def isAtEnd: Boolean = Assert.usageError("isAtEnd is deprecated 
and not implemented in Runtime2.")
+    // $COVERAGE-ON$
   }
 
-  def apply(isAtEnd: Boolean = true,
-            bitPos1b: Long = 0L,
-            bytePos1b: Long = 0L): DataLocation = {
-    new Runtime2DataLocation(isAtEnd, bitPos1b, bytePos1b)
+  def apply(bitPos1b: Long,
+            bytePos1b: Long): DataLocation = {
+    new Runtime2DataLocation(bitPos1b, bytePos1b)
   }
 }
 
 final class ParseResult(outfile: os.Path,
                         override val processorStatus: ProcessorResult,
-                        loc: DataLocation = Runtime2DataLocation())
+                        infile: os.Path)
   extends DFDL.ParseResult
     with DFDL.State
     with WithDiagnosticsImpl {
 
+  val loc: DataLocation = {
+    val infileLengthInBytes = infile.toIO.length()
+    Runtime2DataLocation(infileLengthInBytes * 8, infileLengthInBytes)
+  }
+
   override def resultState: DFDL.State = this
 
   override def validationResult(): Option[ValidationResult] = None
@@ -188,11 +196,14 @@ final class ParseResult(outfile: os.Path,
 }
 
 final class UnparseResult(val finalBitPos0b: Long,
-                          override val processorStatus: ProcessorResult,
-                          loc: DataLocation = Runtime2DataLocation())
+                          override val processorStatus: ProcessorResult)
   extends DFDL.UnparseResult
     with DFDL.State
     with WithDiagnosticsImpl {
+
+  // Note DataLocation uses 1-based bit/byte positions, so we have to add 1.
+  val loc: DataLocation = Runtime2DataLocation(finalBitPos0b + 1, 
(finalBitPos0b + 1) / 8)
+
   /**
    * Data is 'scannable' if it consists entirely of textual data, and that data
    * is all in the same encoding.
diff --git 
a/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/Daffodil.scala 
b/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/Daffodil.scala
index bd3c2a9..41bc891 100644
--- a/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/Daffodil.scala
+++ b/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/Daffodil.scala
@@ -456,10 +456,19 @@ class DataLocation private[sapi] (dl: SDataLocation) {
   override def toString() = dl.toString
 
   /**
-   * Determine if this data location is at the end of the input data
+   * Determine if we're positioned at the end of data.
    *
-   * @return true if this represents the end of the input data, false otherwise
+   * Blocks until either one byte of data can be read, or end-of-data
+   * is encountered.
+   *
+   * It is generally not advised to use this on network TCP data streams
+   * as it will block waiting for the sender of data to provide more data
+   * or close the stream.
+   *
+   * @return boolean indicating whether we are known to be positioned at
+   *         the end of data.
    */
+  @deprecated("Use comparison of bitPos1b() with expected position instead.", 
"3.1.0")
   def isAtEnd() = dl.isAtEnd
 
   /**
diff --git 
a/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/io/InputSourceDataInputStream.scala
 
b/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/io/InputSourceDataInputStream.scala
index f6f4fc1..20618f8 100644
--- 
a/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/io/InputSourceDataInputStream.scala
+++ 
b/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/io/InputSourceDataInputStream.scala
@@ -43,5 +43,26 @@ class InputSourceDataInputStream private[sapi] (private 
[sapi] val dis: SInputSo
   /**
    * Create an InputSourceDataInputStream from a byte array
    */
-  def this(arr: Array[Byte]) = this(SInputSourceDataInputStream(arr)) 
+  def this(arr: Array[Byte]) = this(SInputSourceDataInputStream(arr))
+
+
+  /**
+   * Returns true if the input stream has at least 1 bit of data.
+   *
+   * Does not advance the position.
+   *
+   * Returns true immediately if the input stream has available data that
+   * has not yet been consumed.
+   *
+   * On a network input stream, this may block to determine if the stream
+   * contains data or is at end-of-data.
+   *
+   * This is used when parsing multiple elements from a stream to see if there
+   * is data or not before calling parse().
+   *
+   * It may also be used after a parse() operation that is intended to consume
+   * the entire data stream (such as for a file) to determine if all data has
+   * been consumed or some data is left-over.
+   */
+  def hasData(): Boolean = dis.isDefinedForLength(1)
 }
diff --git 
a/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/package.scala 
b/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/package.scala
index 0a5076c..d09770a 100644
--- a/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/package.scala
+++ b/daffodil-sapi/src/main/scala/org/apache/daffodil/sapi/package.scala
@@ -134,11 +134,11 @@ package org.apache.daffodil
  * val is = new InputSourceDataInputStream(dataStream)
  * val scalaOutputter = new ScalaXMLInfosetOutputter()
  * val keepParsing = true
- * while (keepParsing) {
+ * while (keepParsing && is.hasData()) {
  *   scalaOutputter.reset()
  *   val pr = dp.parse(is, jdomOutputter)
  *   ...
- *   keepParsing = !pr.location().isAtEnd() && !pr.isError()
+ *   keepParsing = !pr.isError()
  * }
  * }}}
  *
@@ -193,12 +193,12 @@ package org.apache.daffodil
  * val contentHandler = new SAXHandler()
  * xmlReader.setContentHandler(contentHandler)
  * val keepParsing = true
- * while (keepParsing) {
+ * while (keepParsing && is.hasData()) {
  *   contentHandler.reset()
  *   xmlReader.parse(is)
  *   val pr = 
xmlReader.getProperty(DaffodilParseXMLReader.DAFFODIL_SAX_URN_PARSERESULT)
  *   ...
- *   keepParsing = !pr.location().isAtEnd() && !pr.isError()
+ *   keepParsing = !pr.isError()
  * }
  * }}}
  *
diff --git 
a/daffodil-sapi/src/test/scala/org/apache/daffodil/example/TestScalaAPI.scala 
b/daffodil-sapi/src/test/scala/org/apache/daffodil/example/TestScalaAPI.scala
index 745d4e2..4b58959 100644
--- 
a/daffodil-sapi/src/test/scala/org/apache/daffodil/example/TestScalaAPI.scala
+++ 
b/daffodil-sapi/src/test/scala/org/apache/daffodil/example/TestScalaAPI.scala
@@ -17,10 +17,12 @@
 
 package org.apache.daffodil.example
 
+import org.apache.commons.io.FileUtils
 import org.junit.Assert.assertEquals
 import org.junit.Assert.assertFalse
 import org.junit.Assert.assertTrue
 import org.junit.Assert.fail
+
 import java.io.ByteArrayInputStream
 import java.io.ByteArrayOutputStream
 import java.io.ObjectInputStream
@@ -28,7 +30,6 @@ import java.io.ObjectOutputStream
 import java.io.File
 import java.nio.channels.Channels
 import java.nio.file.Paths
-
 import org.junit.Test
 import org.apache.daffodil.sapi.Daffodil
 import org.apache.daffodil.sapi.DataProcessor
@@ -47,6 +48,8 @@ import 
org.apache.daffodil.sapi.DaffodilUnparseErrorSAXException
 import org.apache.daffodil.sapi.SAXErrorHandlerForSAPITest
 import org.apache.daffodil.sapi.infoset.XMLTextInfosetOutputter
 
+import java.nio.ByteBuffer
+
 class TestScalaAPI {
 
   lazy val SAX_NAMESPACES_FEATURE = "http://xml.org/sax/features/namespaces";
@@ -133,7 +136,6 @@ class TestScalaAPI {
     val res = dp.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertTrue(res.location().isAtEnd())
     assertEquals(0, lw.errors.size)
     assertEquals(0, lw.warnings.size)
 
@@ -185,13 +187,16 @@ class TestScalaAPI {
     .withDebugging(true)
     .withValidationMode(ValidationMode.Off)
     val file = getResource("/test/sapi/myData.dat")
-    val fis = new java.io.FileInputStream(file)
-    val input = new InputSourceDataInputStream(fis)
+    // This test uses a byte array here, just so as to be sure to exercise
+    // the constructor for creating an InputSourceDataInputStream from a byte 
array
+    // and byte buffer.
+    val ba = FileUtils.readFileToByteArray(file)
+    val bb = ByteBuffer.wrap(ba)
+    val input = new InputSourceDataInputStream(bb)
     val outputter = new ScalaXMLInfosetOutputter()
     val res = parser.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertTrue(res.location().isAtEnd())
 
     lw.errors.foreach(println)
     lw.warnings.foreach(println)
@@ -232,8 +237,11 @@ class TestScalaAPI {
     val dp = reserializeDataProcessor(dp1)
 
     val file = getResource("/test/sapi/myDataBroken.dat")
-    val fis = new java.io.FileInputStream(file)
-    val input = new InputSourceDataInputStream(fis)
+    // This test uses a byte array here, just so as to be sure to exercise
+    // the constructor for creating an InputSourceDataInputStream from a byte 
array
+    // and byte buffer.
+    val ba = FileUtils.readFileToByteArray(file)
+    val input = new InputSourceDataInputStream(ba)
     val outputter = new ScalaXMLInfosetOutputter()
     val res = dp.parse(input, outputter)
 
@@ -282,7 +290,6 @@ class TestScalaAPI {
     val res = dp.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertFalse(res.location().isAtEnd())
     assertEquals(2, res.location().bytePos1b())
     assertEquals(9, res.location().bitPos1b())
 
@@ -323,7 +330,6 @@ class TestScalaAPI {
     val res = parser.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertFalse(res.location().isAtEnd())
     assertEquals(2, res.location().bytePos1b())
     assertEquals(9, res.location().bitPos1b())
 
@@ -352,7 +358,6 @@ class TestScalaAPI {
     val res = dp.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertFalse(res.location().isAtEnd())
     assertEquals(5, res.location().bytePos1b())
     assertEquals(33, res.location().bitPos1b())
 
@@ -383,7 +388,6 @@ class TestScalaAPI {
     val res = dp.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertTrue(!res.location().isAtEnd())
     assertEquals(5, res.location().bytePos1b())
     assertEquals(33, res.location().bitPos1b())
 
@@ -456,7 +460,6 @@ class TestScalaAPI {
     val res = dp.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertTrue(res.location().isAtEnd())
 
     val bos = new java.io.ByteArrayOutputStream()
     val wbc = java.nio.channels.Channels.newChannel(bos)
@@ -500,7 +503,6 @@ class TestScalaAPI {
     val res = dp.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertTrue(res.location().isAtEnd())
 
     val bos = new java.io.ByteArrayOutputStream()
     val wbc = java.nio.channels.Channels.newChannel(bos)
@@ -589,7 +591,6 @@ class TestScalaAPI {
     val node = outputter.getResult
     val hidden = node \\ "hiddenElement"
     assertTrue(hidden.isEmpty)
-    assertTrue(res.location().isAtEnd())
   }
 
   /**
@@ -623,7 +624,6 @@ class TestScalaAPI {
     assertTrue(rootE2.isEmpty)
     val rootE3 = rootNode \ "e3"
     assertTrue(rootE3.isEmpty)
-    assertTrue(res.location().isAtEnd())
   }
 
   @Test
@@ -652,7 +652,6 @@ class TestScalaAPI {
     val res = dp.parse(input, outputter)
     val err = res.isError()
     assertFalse(err)
-    assertTrue(res.location().isAtEnd())
 
     lw2.errors.foreach(println)
     lw2.warnings.foreach(println)
@@ -745,7 +744,6 @@ class TestScalaAPI {
     assertTrue(var1ValueNode.size == 1)
     val var1ValueText = var1ValueNode.text
     assertTrue(var1ValueText == "externallySet")
-    assertTrue(res.location().isAtEnd())
 
     lw.errors.foreach(println)
     lw.warnings.foreach(println)
@@ -857,7 +855,6 @@ class TestScalaAPI {
     assertTrue(res.isError())
     assertFalse(res.isProcessingError())
     assertTrue(res.isValidationError())
-    assertTrue(res.location().isAtEnd())
 
     val diags = res.getDiagnostics
     assertEquals(1, diags.size)
@@ -883,7 +880,8 @@ class TestScalaAPI {
     assertTrue(res.isError())
     assertFalse(res.isProcessingError())
     assertTrue(res.isValidationError())
-    assertTrue(res.location().isAtEnd())
+    val actualLength = res.location.bytePos1b - 1
+    assertEquals(file.length, actualLength)
 
     val diags = res.getDiagnostics
     assertEquals(3, diags.size)
@@ -924,7 +922,6 @@ class TestScalaAPI {
     res = dp.parse(input, outputter)
     err = res.isError()
     assertFalse(err)
-    assertFalse(res.location().isAtEnd())
     assertEquals(5, res.location().bytePos1b())
     assertEquals("data", outputter.getResult.text)
 
@@ -932,7 +929,6 @@ class TestScalaAPI {
     res = dp.parse(input, outputter)
     err = res.isError()
     assertFalse(err)
-    assertFalse(res.location().isAtEnd())
     assertEquals(9, res.location().bytePos1b())
     assertEquals("left", outputter.getResult.text)
 
@@ -940,7 +936,7 @@ class TestScalaAPI {
     res = dp.parse(input, outputter)
     err = res.isError()
     assertFalse(err)
-    assertTrue(res.location().isAtEnd())
+    assertFalse(input.hasData())
     assertEquals(13, res.location().bytePos1b())
     assertEquals("over", outputter.getResult.text)
   }
@@ -1021,7 +1017,6 @@ class TestScalaAPI {
     val infosetSAXString = new 
org.jdom2.output.XMLOutputter(pretty).outputString(infosetSAX)
 
     assertFalse(err)
-    assertTrue(resSAX.location().isAtEnd())
     assertTrue(diags.isEmpty)
     assertEquals(infosetDPString, infosetSAXString)
 
diff --git 
a/daffodil-tdml-lib/src/main/scala/org/apache/daffodil/tdml/TDMLRunner.scala 
b/daffodil-tdml-lib/src/main/scala/org/apache/daffodil/tdml/TDMLRunner.scala
index 28f52ae..5acb095 100644
--- a/daffodil-tdml-lib/src/main/scala/org/apache/daffodil/tdml/TDMLRunner.scala
+++ b/daffodil-tdml-lib/src/main/scala/org/apache/daffodil/tdml/TDMLRunner.scala
@@ -194,7 +194,7 @@ class DFDLTestSuite private[tdml] (
   // That avoids creating the test suites repeatedly, but also leaks memory 
unless
   // you have an @AfterClass shutdown method in the object that calls 
runner.reset() at end.
   //
-  // @deprecated("2016-12-30", "Use Runner(...) instead.")
+  // @deprecated("Use Runner(...) instead.", "2016-12-30")
   def this(
     aNodeFileOrURL: Any,
     validateTDMLFile: Boolean = true,
@@ -837,7 +837,9 @@ case class ParserTestCase(ptc: NodeSeq, parentArg: 
DFDLTestSuite)
     roundTrip: RoundTrip,
     implString: Option[String]) = {
 
+    Assert.usage(optLengthLimitInBits.isDefined, "TDML tests should always 
have a length limit.")
     val nBits = optLengthLimitInBits.get
+
     val dataToParse = optDataToParse.get
 
     (optExpectedInfoset, optExpectedErrors) match {
@@ -911,8 +913,10 @@ case class ParserTestCase(ptc: NodeSeq, parentArg: 
DFDLTestSuite)
             //
             val loc: DataLocation = actual.currentLocation
 
-            if (!loc.isAtEnd) {
-              val leftOverMsg = "Left over data. Consumed %s bit(s) with %s 
bit(s) remaining.".format(loc.bitPos1b - 1, lengthLimitInBits - (loc.bitPos1b - 
1))
+            if (loc.bitPos1b <= lengthLimitInBits) {
+              val leftOverMsg =
+                "Left over data. Consumed %s bit(s) with %s bit(s) 
remaining.".format(
+                  loc.bitPos1b - 1, lengthLimitInBits - (loc.bitPos1b - 1))
               actual.addDiagnostic(new TDMLDiagnostic(leftOverMsg, implString))
               true
             } else {
@@ -969,7 +973,7 @@ case class ParserTestCase(ptc: NodeSeq, parentArg: 
DFDLTestSuite)
   private def verifyLeftOverData(actual: TDMLParseResult, lengthLimitInBits: 
Long, implString: Option[String]) = {
     val loc: DataLocation = actual.currentLocation
 
-    val leftOverException = if (!loc.isAtEnd) {
+    val leftOverException = if (loc.bitPos1b < lengthLimitInBits) {
       val leftOverMsg = "Left over data. Consumed %s bit(s) with %s bit(s) 
remaining.".format(
         loc.bitPos1b - 1, lengthLimitInBits - (loc.bitPos1b - 1))
       Some(TDMLException(leftOverMsg, implString))
@@ -1363,9 +1367,15 @@ case class UnparserTestCase(ptc: NodeSeq, parentArg: 
DFDLTestSuite)
       }
       val loc: DataLocation = parseActual.currentLocation
 
-      val leftOverException = if (!loc.isAtEnd) {
-        val leftOverMsg = "Left over data. Consumed %s bit(s) with %s bit(s) 
remaining.".format(loc.bitPos1b - 1, testDataLength - (loc.bitPos1b - 1))
-        println(leftOverMsg)
+      val leftOverException = if (loc.bitPos1b < testDataLength) {
+        //
+        // For this to happen (and have test coverage) we need an 
unparserTestCase
+        // which is roundTrip onePass, and where the parse doesn't consume all
+        // the data.
+        //
+        val leftOverMsg =
+          "Left over data. Consumed %s bit(s) with %s bit(s) 
remaining.".format(
+            loc.bitPos1b - 1, testDataLength - (loc.bitPos1b - 1))
         Some(TDMLException(leftOverMsg, implString))
       } else None
 
diff --git 
a/daffodil-tdml-processor/src/main/scala/org/apache/daffodil/tdml/processor/DaffodilTDMLDFDLProcessor.scala
 
b/daffodil-tdml-processor/src/main/scala/org/apache/daffodil/tdml/processor/DaffodilTDMLDFDLProcessor.scala
index 2ab8437..20b313e 100644
--- 
a/daffodil-tdml-processor/src/main/scala/org/apache/daffodil/tdml/processor/DaffodilTDMLDFDLProcessor.scala
+++ 
b/daffodil-tdml-processor/src/main/scala/org/apache/daffodil/tdml/processor/DaffodilTDMLDFDLProcessor.scala
@@ -298,6 +298,13 @@ class DaffodilTDMLDFDLProcessor private (private var dp: 
DataProcessor) extends
 
   def doParseWithBothApis(dpInputStream: java.io.InputStream, saxInputStream: 
java.io.InputStream,
     lengthLimitInBits: Long): TDMLParseResult = {
+    //
+    // TDML Tests MUST have a length limit. Otherwise they cannot determine if
+    // there is left-over-data or not without doing more reading from the 
input stream
+    // so as to be sure to hit end-of-data.
+    //
+    Assert.usage(lengthLimitInBits >= 0)
+
     val outputter = new TDMLInfosetOutputter()
     outputter.setBlobAttributes(blobDir, blobPrefix, blobSuffix)
 
@@ -313,13 +320,9 @@ class DaffodilTDMLDFDLProcessor private (private var dp: 
DataProcessor) extends
 
     val dis = InputSourceDataInputStream(dpInputStream)
     val sis = InputSourceDataInputStream(saxInputStream)
-    if (lengthLimitInBits >= 0 && lengthLimitInBits % 8 != 0) {
-      // Only set the bit limit if the length is not a multiple of 8. In that
-      // case, we aren't expected to consume all the data and need a bitLimit
-      // to prevent messages about left over bits.
-      dis.setBitLimit0b(MaybeULong(lengthLimitInBits))
-      sis.setBitLimit0b(MaybeULong(lengthLimitInBits))
-    }
+
+    dis.setBitLimit0b(MaybeULong(lengthLimitInBits))
+    sis.setBitLimit0b(MaybeULong(lengthLimitInBits))
 
     val actual = dp.parse(dis, outputter)
     xri.parse(sis)
diff --git 
a/daffodil-test/src/test/resources/org/apache/daffodil/section12/lengthKind/ExplicitTests.tdml
 
b/daffodil-test/src/test/resources/org/apache/daffodil/section12/lengthKind/ExplicitTests.tdml
index 801096d..5fa82cc 100644
--- 
a/daffodil-test/src/test/resources/org/apache/daffodil/section12/lengthKind/ExplicitTests.tdml
+++ 
b/daffodil-test/src/test/resources/org/apache/daffodil/section12/lengthKind/ExplicitTests.tdml
@@ -598,7 +598,7 @@
       <tdml:documentPart type="text"><![CDATA[000118Ridgewood Circle    
Rochester           NY123]]></tdml:documentPart>
     </tdml:document>
     <tdml:errors>
-      <tdml:error>6467715096</tdml:error>
+      <tdml:error>6467715464</tdml:error>
       <tdml:error>insufficient</tdml:error>
       <tdml:error>parse error</tdml:error>
     </tdml:errors>

Reply via email to