mbeckerle commented on a change in pull request #77: Modifications to IO layer 
to support streaming input data
URL: https://github.com/apache/incubator-daffodil/pull/77#discussion_r201048198
 
 

 ##########
 File path: 
daffodil-io/src/main/scala/org/apache/daffodil/io/InputSourceDataInputStream.scala
 ##########
 @@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.daffodil.io
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import java.nio.CharBuffer
+import java.nio.LongBuffer
+
+import passera.unsigned.ULong
+
+import org.apache.daffodil.equality._
+import org.apache.daffodil.exceptions.Assert
+import org.apache.daffodil.schema.annotation.props.gen.BitOrder
+import org.apache.daffodil.schema.annotation.props.gen.ByteOrder
+import org.apache.daffodil.util.Bits
+import org.apache.daffodil.util.Maybe
+import org.apache.daffodil.util.MaybeULong
+import org.apache.daffodil.util.MStackOf
+import org.apache.daffodil.util.Pool
+
+/**
+ * Factory for creating this type of DataInputStream
+ *
+ * Provides only core input sources to avoid making any assumptions about the
+ * incoming data (i.e. should a File be mapped to a ByteBuffer or be streamed
+ * as an InputStream). The user knows better than us, so have them make the
+ * decision.
+ */
+object InputSourceDataInputStream {
+
+  def apply(byteArray: Array[Byte]): InputSourceDataInputStream = {
+    apply(ByteBuffer.wrap(byteArray))
+  }
+
+  def apply(byteBuffer: ByteBuffer): InputSourceDataInputStream = {
+    new InputSourceDataInputStream(new ByteBufferInputSource(byteBuffer))
+  }
+
+  def apply(in: InputStream): InputSourceDataInputStream = {
+    new InputSourceDataInputStream(new BucketingInputSource(in))
+  }
+}
+
+/**
+ * The state that must be saved and restored by mark/reset calls
+ */
+final class MarkState()
+  extends DataStreamCommonState with DataInputStream.Mark {
+
+  override def equals(other: Any) = other match {
+    case ar: AnyRef => this eq ar // only if the same object
+    case _ => false
+  }
+
+  // any members added here must be added to assignFrom below.
+  var bitPos0b: Long = 0
+  var bitLimit0b: MaybeULong = MaybeULong.Nope
+  val charIteratorState = new InputSourceDataInputStreamCharIteratorState
+
+  @inline
+  def bytePos0b: Long = bitPos0b >> 3
+
+  def assignFrom(other: MarkState): Unit = {
+    super.assignFrom(other)
+    this.bitPos0b = other.bitPos0b
+    this.bitLimit0b = other.bitLimit0b
+    this.charIteratorState.assignFrom(other.charIteratorState)
+  }
+
+}
+
+private[io] class MarkPool() extends Pool[MarkState] {
+  override def allocate = new MarkState()
+}
+
+/**
+ * Realization of the DataInputStream API
+ *
+ * Underlying representation is an InputSource containing all input data.
+ */
+final class InputSourceDataInputStream private (val inputSource: InputSource)
+  extends DataInputStreamImplMixin {
+  import DataInputStream._
+
+  override def toString = {
+    val bp0b = bitPos0b
+    val bl0b = bitLimit0b
+    val bl0b1 = if (bl0b.isDefined) bl0b.get.toString else "none"
+    val str = "DataInputStream(bitPos=" + bp0b +
+      ", bitLimit=" + bl0b1 + ")"
+    str
+  }
+
+  override final val cst = new MarkState
+  val markStack = new MStackOf[MarkState]
+  val markPool = new MarkPool()
+
+  @inline
+  override final def bitPos0b: Long = cst.bitPos0b
+
+  @inline
+  override final def bitLimit0b: MaybeULong = cst.bitLimit0b
+
+  def setBitPos0b(newBitPos0b: Long) {
+    // threadCheck()
+    Assert.invariant(newBitPos0b >= 0)
+    Assert.invariant(bitLimit0b.isEmpty || newBitPos0b <= bitLimit0b.get)
+
+    inputSource.position(newBitPos0b >> 3)
+    cst.bitPos0b = newBitPos0b
+  }
+
+  override def setBitLimit0b(newBitLimit0b: MaybeULong): Boolean = {
+    // threadCheck()
+    Assert.invariant(newBitLimit0b.isEmpty || newBitLimit0b.get >= 0)
+    if (bitLimit0b.isEmpty || newBitLimit0b.isEmpty || newBitLimit0b.get <= 
bitLimit0b.get) {
+      cst.bitLimit0b = newBitLimit0b
+      true
+    } else {
+      false
+    }
+  }
+
+  def resetBitLimit0b(savedBitLimit0b: MaybeULong): Unit = {
+    cst.bitLimit0b = savedBitLimit0b
+  }
+
+  def getByteArray(bitLengthFrom1: Int, finfo: FormatInfo): Array[Byte] = {
+    // threadCheck()
+    if (!isDefinedForLength(bitLengthFrom1)) throw 
DataInputStream.NotEnoughDataException(bitLengthFrom1)
+
+    val arraySize = (bitLengthFrom1 + 7) / 8
+    val array = new Array[Byte](arraySize)
+    fillByteArray(array, bitLengthFrom1, finfo)
+
+    setBitPos0b(bitPos0b + bitLengthFrom1)
+
+    array
+  }
+
+  /**
+   * Accepts a preallocated array and a bitLength. Reads the specified number
+   * of bits and stores them in the array in big endian byte order and most
+   * significant bit first bit order. The most significant byte is stored in
+   * the zero'th index in the array. This means that if the array is larger
+   * than the number of bytes needed for the specified number of bits, the
+   * trailing bytes will be untouched and should be ignored by the caller.
+   */
+  private def fillByteArray(array: Array[Byte], bitLengthFrom1: Int, finfo: 
FormatInfo): Unit = {
+    if (isAligned(8) && bitLengthFrom1 % 8 == 0) {
+      fillByteArrayAlignedNoFragment(array, bitLengthFrom1, finfo)
+    } else {
+      fillByteArrayUnalignedOrFragment(array, bitLengthFrom1, finfo)
+    }
+  }
+
+  private def fillByteArrayAlignedNoFragment(array: Array[Byte], 
bitLengthFrom1: Int, finfo: FormatInfo): Unit = {
+    // threadCheck()
+    Assert.usage(isAligned(8))
+    Assert.usage(bitLengthFrom1 % 8 == 0)
+
+    val bytesToFill = bitLengthFrom1 / 8
+    Assert.invariant(array.size >= bytesToFill)
+
+    if (bytesToFill == 1 || // 1 byte is super common case. We don't want to 
retrieve byteOrder nor bitOrder in this case
+      (finfo.byteOrder == ByteOrder.BigEndian && finfo.bitOrder == 
BitOrder.MostSignificantBitFirst)) {
+      // bits & bytes are already in order, read them straight into the array
+      inputSource.get(array, 0, bytesToFill)
+    } else {
+      // we are either LittleEndian & MSBF or BigEndian & LSBF. In either case,
+      // we just need to flip the bytes to make it BigEndian MSBF. The bits are
+      // in the correct order.
+      var i = bytesToFill - 1
+      while (i >= 0) {
+        array(i) = inputSource.get().toByte
+        i -= 1
+      }
+    }
+  }
+
+  private def fillByteArrayUnalignedOrFragment(array: Array[Byte], 
bitLengthFrom1: Int, finfo: FormatInfo): Unit = {
+    // threadCheck()
+    Assert.usage(!isAligned(8) || bitLengthFrom1 % 8 != 0)
+
+    val bytesToFill = (bitLengthFrom1 + 7) / 8
+    val bitOffset0b = bitPos0b % 8
+    Assert.invariant(array.size >= bytesToFill)
+    val nFragmentBits = bitLengthFrom1 % 8
+
+    var priorByte = Bits.asUnsignedByte(inputSource.get())
+    var i = 0
+
+    @inline // see comment below as to why this giant method is marked inline.
+    def addFragmentByte() = {
+      // This function is used at either the beginning or end of this function
+      // to read a fragement byte and store it in the correct location in the
+      // output array. It modifies variables outside of the function, such as
+      // the array, array index (i), and potentialy the priorByte, thus making
+      // it a closure. Note that it is a nested function so that it can access
+      // those local variables. Normally, this would would lead to an
+      // allocation which is bad in this cricial code. However, marking it as
+      // @inline prevents that. So this allows for consolidating duplicate code
+      // keeping the code somewhat clean, and allows access to local variables
+      // without the allocaiton overhead of closures.
+      val bitsLeftInPriorByte = 8 - bitOffset0b
+      val fragmentByte =
+        if (nFragmentBits <= bitsLeftInPriorByte) {
+          // all nFragmentBits can come from from the priror byte
+          val composedByte =
+            finfo.bitOrder match {
+              case BitOrder.MostSignificantBitFirst => ((priorByte << 
bitOffset0b) & 0xFF) >>> (8 - nFragmentBits)
+              case BitOrder.LeastSignificantBitFirst => ((priorByte << 
(bitsLeftInPriorByte - nFragmentBits)) & 0xFF) >>> (8 - nFragmentBits)
+            }
+          composedByte
+        } else {
+          // we need all the fragment bits from prior plus some bits from cur
+          val bitsToGetFromCur = nFragmentBits - bitsLeftInPriorByte
+          val curByte = Bits.asUnsignedByte(inputSource.get)
+          val composedByte =
+            finfo.bitOrder match {
+              case BitOrder.MostSignificantBitFirst => {
+                val priorContribution = ((priorByte << bitOffset0b) & 0xFF) 
>>> (bitOffset0b - bitsToGetFromCur)
+                val curContribution = curByte >>> (8 - bitsToGetFromCur)
+                priorContribution | curContribution
+              }
+              case BitOrder.LeastSignificantBitFirst => {
+                val priorContribution = (priorByte >>> bitOffset0b)
+                val curContribution = ((curByte << (8 - bitsToGetFromCur)) & 
0xFF) >>> (8 - (bitsToGetFromCur + bitsLeftInPriorByte))
+                priorContribution | curContribution
+              }
+            }
+          priorByte = curByte
+          composedByte
+        }
+      if (finfo.byteOrder =:= ByteOrder.LittleEndian) {
+        array(bytesToFill - 1 - i) = Bits.asSignedByte(fragmentByte)
+      } else {
+        array(i) = Bits.asSignedByte(fragmentByte)
+      }
+      i += 1
+    }
+
+    val newBitOffset0b =
+      if (finfo.byteOrder =:= ByteOrder.BigEndian && nFragmentBits > 0) {
+        addFragmentByte()
+        (bitOffset0b + nFragmentBits) % 8
+      } else {
+        bitOffset0b
+      }
+
+    if (newBitOffset0b == 0) {
+      finfo.byteOrder match {
+        case ByteOrder.BigEndian => {
+          // we just parsed a bigEndian fragment byte and it put us back on a 
byte
+          // boundary, so we just need to read the rest of the full aligned 
bytes
+          inputSource.get(array, 1, bytesToFill - 1)
+        }
+        case ByteOrder.LittleEndian => {
+          // we're starting on a byte boundary, so lets just consume all the
+          // whole bytes quickly, reversing positions in the array. We'll not
+          // parse the last fragment byte and let that be done later
+          while (i < bytesToFill - 1) {
+            // we already got the first byte as a prior Byte, so just set it as
+            // the whole byte and get the next priorByte
+            array(bytesToFill - i - 1) = Bits.asSignedByte(priorByte)
+            // this byte will now either be used in the next iteration of this
+            // loop, or as the prior byte in the first iteration of consuming
+            // the fragment byte below
+            priorByte = Bits.asUnsignedByte(inputSource.get())
+            i += 1
+          }
+        }
+      }
+    } else {
+      val priorShift = newBitOffset0b
+      val curShift = 8 - priorShift
+
+      // If the bitOrder is BE, then we have already consumed a fragment byte
+      // above, so we want to fill in the rest of the array, so fill in the
+      // whole array (i.e. stop at the array size). If the byteOrder is LE,
+      // then we only want to fill the number of full bytes, and then we'll
+      // consume that last fragment byte afterwards
+      val stopBytePosition =
+        if (finfo.byteOrder =:= ByteOrder.BigEndian) bytesToFill
+        else (bitLengthFrom1 / 8)
+
+      // consume full bytes
+      while (i < stopBytePosition) {
+        val curByte = Bits.asUnsignedByte(inputSource.get())
+        val composedByte =
+          finfo.bitOrder match {
+            case BitOrder.MostSignificantBitFirst => ((priorByte << 
priorShift) & 0xFF) | ((curByte >>> curShift) & 0xFF)
+            case BitOrder.LeastSignificantBitFirst => ((priorByte >>> 
priorShift) & 0xFF) | ((curByte << curShift) & 0xFF)
+          }
+        if (finfo.byteOrder =:= ByteOrder.LittleEndian) {
+          array(bytesToFill - 1 - i) = Bits.asSignedByte(composedByte)
+        } else {
+          array(i) = Bits.asSignedByte(composedByte)
+        }
+        priorByte = curByte
+        i += 1
+      }
+    }
+
+    if (finfo.byteOrder =:= ByteOrder.LittleEndian && nFragmentBits > 0) {
+      addFragmentByte()
+    }
+  }
+
+  def getBinaryDouble(finfo: FormatInfo): Double = {
+    val l = getSignedLong(64, finfo)
+    val d = java.lang.Double.longBitsToDouble(l)
+    d
+  }
+
+  def getBinaryFloat(finfo: FormatInfo): Float = {
+    val i = getSignedLong(32, finfo).toInt
+    val f = java.lang.Float.intBitsToFloat(i)
+    f
+  }
+
+  def getSignedLong(bitLengthFrom1To64: Int, finfo: FormatInfo): Long = {
+    // threadCheck()
+    Assert.usage(bitLengthFrom1To64 >= 1)
+    Assert.usage(bitLengthFrom1To64 <= 64)
+
+    val res = getUnsignedLong(bitLengthFrom1To64, finfo)
+    Bits.signExtend(res.longValue, bitLengthFrom1To64)
+  }
+
+  // used by the below function to get up to 8 bytes of data
+  private val longArray = new Array[Byte](8)
+
+  def getUnsignedLong(bitLengthFrom1To64: Int, finfo: FormatInfo): ULong = {
+    Assert.usage(bitLengthFrom1To64 >= 1)
+    Assert.usage(bitLengthFrom1To64 <= 64)
+
+    if (!isDefinedForLength(bitLengthFrom1To64)) throw 
DataInputStream.NotEnoughDataException(bitLengthFrom1To64)
+
+    val numBytes = (bitLengthFrom1To64 + 7) / 8
+
+    // will result in the first numBytes the long array filled in
+    fillByteArray(longArray, bitLengthFrom1To64, finfo)
+
+    var res = Bits.asUnsignedByte(longArray(0)).toLong
+    var i = 1
+    while (i < numBytes) {
+      res = res << 8
+      res = res | Bits.asUnsignedByte(longArray(i))
+      i += 1
+    }
+
+    setBitPos0b(bitPos0b + bitLengthFrom1To64)
+
+    ULong(res)
+  }
+
+  def getSignedBigInt(bitLengthFrom1: Int, finfo: FormatInfo): BigInt = {
+    // threadCheck()
+    Assert.usage(bitLengthFrom1 >= 1)
+
+    if (bitLengthFrom1 <= 64) {
+      BigInt(getSignedLong(bitLengthFrom1, finfo))
+    } else {
+      val allBytes = getByteArray(bitLengthFrom1, finfo)
+      val fragmentLength = bitLengthFrom1 % 8
+      if (fragmentLength > 0) {
+        // if there is a fragment byte, we need to sign extend the rest of the
+        // most significant byte so that the BigInt constructor knows the sign
+        // of the bytes
+        val shift = 8 - fragmentLength
+        allBytes(0) = ((allBytes(0) << shift).toByte >> shift).toByte // 
arithmetic shift right extends sign.
+      }
+      BigInt(allBytes)
+    }
+  }
+
+  def getUnsignedBigInt(bitLengthFrom1: Int, finfo: FormatInfo): BigInt = {
+    Assert.usage(bitLengthFrom1 >= 1)
+    val bytes = getByteArray(bitLengthFrom1, finfo)
+    BigInt(1, bytes)
+  }
+
+  /**
+   * Determines whether the input stream has this much more data.
+   *
+   * Does not advance the position
+   */
+  final def isDefinedForLength(nBits: Long): Boolean = {
+    val newBitPos0b = bitPos0b + nBits
+    if (bitLimit0b.isDefined && newBitPos0b > bitLimit0b.get) false
+    else {
+      val newEndingBytePos0b = Bits.roundUpToByte(newBitPos0b)
+      val moreBytesNeeded = newEndingBytePos0b - inputSource.position()
+      inputSource.areBytesAvailable(moreBytesNeeded)
+    }
+  }
+
+  def skip(nBits: Long, finfo: FormatInfo): Boolean = {
+    // threadCheck()
+    Assert.usage(nBits <= Int.MaxValue)
+    if (!this.isDefinedForLength(nBits)) return false
+    setBitPos0b(bitPos0b + nBits)
+    true
+  }
+
+  def mark(requestorID: String): DataInputStream.Mark = {
+    val m = markPool.getFromPool(requestorID)
+    m.assignFrom(cst)
+    markStack.push(m)
+    inputSource.lockPosition(m.bytePos0b)
+    m
+  }
+
+  private def releaseUntilMark(mark: DataInputStream.Mark) = {
+    // threadCheck()
+    Assert.usage(!markStack.isEmpty)
+    Assert.usage(mark != null)
+    var current = markStack.pop
+    while (!(markStack.isEmpty) && (current ne mark)) {
+      inputSource.releasePosition(current.bytePos0b)
+      markPool.returnToPool(current)
+      current = markStack.pop
+    }
+    Assert.invariant(current eq mark) // holds unless markStack was empty
+    current
+  }
+
+  def reset(mark: DataInputStream.Mark): Unit = {
+    val current = releaseUntilMark(mark)
+    Assert.invariant(current eq mark)
+    cst.assignFrom(current)
+    setBitPos0b(cst.bitPos0b)
+    inputSource.releasePosition(current.bytePos0b)
+    markPool.returnToPool(current)
+  }
+
+  def discard(mark: DataInputStream.Mark): Unit = {
+    val current = releaseUntilMark(mark)
+    Assert.invariant(current eq mark)
+    inputSource.releasePosition(current.bytePos0b)
+    markPool.returnToPool(current)
+  }
+
+  override def markPos: MarkPos = bitPos0b
+  override def resetPos(m: MarkPos) {
+    setBitPos0b(m)
+  }
+
+  def validateFinalStreamState {
+    // threadCheck()
+    markPool.finalCheck
+  }
+
+  final def getString(nChars: Long, finfo: FormatInfo): Maybe[String] = {
+    val startingBitPos = bitPos0b
+    val aligned = align(finfo.encodingMandatoryAlignmentInBits, finfo)
+    if (!aligned) {
+      Maybe.Nope
+    } else {
+      withLocalCharBuffer { lcb =>
+        val cb = lcb.getBuf(nChars)
+        val numDecoded = finfo.decoder.decode(this, finfo, cb)
+        if (numDecoded == nChars) {
+          Maybe(cb.flip.toString)
+        } else {
+          setBitPos0b(startingBitPos)
+          Maybe.Nope
+        }
+      }
+    }
+  }
+
+  final def getSomeString(nChars: Long, finfo: FormatInfo): Maybe[String] = {
+    val startingBitPos = bitPos0b
+    val aligned = align(finfo.encodingMandatoryAlignmentInBits, finfo)
+    if (!aligned) {
+      Maybe.Nope
+    } else {
+      withLocalCharBuffer { lcb =>
+        val cb = lcb.getBuf(nChars)
+        val numDecoded = finfo.decoder.decode(this, finfo, cb)
+        if (numDecoded > 0) {
+          Maybe(cb.flip.toString)
+        } else {
+          setBitPos0b(startingBitPos)
+          Maybe.Nope
+        }
+      }
+    }
+  }
+
+  lazy val skipCharBuffer = CharBuffer.allocate(32)
+
+  def skipChars(nChars: Long, finfo: FormatInfo): Boolean = {
+    // threadCheck()
+    val startingBitPos = bitPos0b
+
+    val aligned = align(finfo.encodingMandatoryAlignmentInBits, finfo)
+    if (!aligned) {
+      false
+    } else {
+      var remainingCharsToSkip = nChars
+      var keepGoing = true
+
+      while (keepGoing && remainingCharsToSkip > 0) {
+        val charsToSkip = Math.min(remainingCharsToSkip, 
skipCharBuffer.capacity)
+        skipCharBuffer.position(0)
+        skipCharBuffer.limit(charsToSkip.toInt)
+
+        val numDecoded = finfo.decoder.decode(this, finfo, skipCharBuffer)
+        remainingCharsToSkip -= numDecoded
+
+        if (numDecoded == 0) {
+          keepGoing = false
+        }
+      }
+
+      val skippedAllNChars = remainingCharsToSkip == 0
+
+      if (!skippedAllNChars) {
+        // failed to skip all the necessary chars, reset the bit position back 
to
+        // where we started
+        setBitPos0b(startingBitPos)
+      }
+
+      skippedAllNChars
+    }
+  }
+
+  def lookingAt(matcher: java.util.regex.Matcher, finfo: FormatInfo): Boolean 
= {
 
 Review comment:
   Did this code change much? Fundamental algorithm is still gradually 
enlarging the buffer until big enough to hold a match or not. 
   
   I can''t see deltas because the diff tool isn't relating this to the prior 
existing code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to