During my search, I stumbled upon this
<https://github.com/adamw/reactive-akka-pres/blob/master/src/main/scala/com/softwaremill/reactive/ParseLinesStage.scala>
place,
where Adam Warski created a new ParseLinesStage. This worked for me but
failed where we so a check for

if (buffer.size > maximumLineBytes) { println("XXX FAIL") ctx.fail(new
IllegalStateException(s"Read ${buffer.size} bytes " + s"which is more than
$maximumLineBytes without seeing a line terminator")) }

So, I removed this logic and it now looks as

object LogFile {
  val maxBytesPerLine = 1500
  implicit val system = ActorSystem("system")

  def apply(file: File) = new LogFile(file)

  def main(args: Array[String]) {
    val file: File = new
File("processor/src/main/resources/Demo_log_004.log")
    //    LogFile(file).processGraph()
    LogFile(file).process()
    implicit val materializer = ActorMaterializer()
  }
}

class LogFile(file: File)(implicit val system: ActorSystem) {
  // todo (harit): apply more filters to make sure file is right
  Predef.assert(file.exists(), "log file must exists")

  implicit val materializer = ActorMaterializer()
  val logger = Logger(LoggerFactory.getLogger(getClass))

  val source: Source[ByteString, Future[Long]] =
SynchronousFileSource(file, 1500)

  def process() = {
    source.transform(() => new ParseLinesStage("\n")).runForeach(println)
  }
}


class ParseLinesStage(separator: String) extends StatefulStage[ByteString,
String] {
  private val separatorBytes = ByteString(separator)
  private val firstSeparatorByte = separatorBytes.head
  private var buffer = ByteString.empty
  private var nextPossibleMatch = 0

  def initial = new State {
    override def onPush(chunk: ByteString, ctx: Context[String]) = {
      buffer ++= chunk
      emit(doParse(Vector.empty).iterator, ctx)
    }

    @tailrec
    private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] =
{
      val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from =
nextPossibleMatch)
      if (possibleMatchPos == -1) {
        // No matching character, we need to accumulate more bytes into the
buffer
        nextPossibleMatch = buffer.size
        parsedLinesSoFar
      } else {
        if (possibleMatchPos + separatorBytes.size > buffer.size) {
          // We have found a possible match (we found the first character
of the terminator
          // sequence) but we don't have yet enough bytes. We remember the
position to
          // retry from next time.
          nextPossibleMatch = possibleMatchPos
          parsedLinesSoFar
        } else {
          if (buffer.slice(possibleMatchPos, possibleMatchPos +
separatorBytes.size)
            == separatorBytes) {
            // Found a match
            val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
            buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
            nextPossibleMatch -= possibleMatchPos + separatorBytes.size
            doParse(parsedLinesSoFar :+ parsedLine)
          } else {
            nextPossibleMatch += 1
            doParse(parsedLinesSoFar)
          }
        }
      }

    }
  }
}

Couple of open questions

a.) I use SynchronousFileSource(file, 1500), does changing *chunkSize* will
make differ huge difference in performance? As I understand (and could be
totally wrong), this is how much you read from file each time, right? what
is the motivation of keeping defaultChunkSize of 8192

b.) As I mentioned I removed the check that original code listed, I am not
sure if this would have any performance hits. My apologies, I am new and
have questions that I can not find answers to.

Thank you
+ Harit Himanshu

On Sun, Oct 11, 2015 at 6:22 PM, Harit Himanshu <
[email protected]> wrote:

> Thanks Konrad
>
> This is insightful. In this case, I am not able to read through the entire
> longline and I asked a question
> <https://groups.google.com/d/msg/akka-user/hfNwkl10dog/xSonpctwCgAJ> where
> I am looking for guidance.
>
> Any help/recommendation is very much appreciated
>
> Thanks again
>
> On Sun, Oct 11, 2015 at 5:24 PM, Konrad Malawski <
> [email protected]> wrote:
>
>> Are there any performance issues with the seconds approach
>> (scala.io.Source.fromFile(file).getLines()))?
>>
>> If I remember correctly 5 to 10 times slower than SynchronousFileSource,
>> *and* the Source.fromFile used (like in the above) example leaks open
>> FileInputStreams which you never close.
>>
>> SynchronousFileSource is fast, safe, and uses a dedicated thread-pool for
>> the blocking operations by default – use it instead of hand-rolling file
>> reading.
>>
>>
>> FYI, benchmarks (to be found in akka-bench-jmh-dev on branch
>> release-2.3-dev):
>>
>> [info] Benchmark                                         (bufSize)  Mode
>>  Cnt     Score     Error  Units
>>
>> *[info] FileSourcesBenchmark.fileChannel                       2048  avgt
>>   10   711.195 ±  36.094  ms/op  // this is SynchronousFileSource*
>>
>> [info] FileSourcesBenchmark.fileChannel_noReadAhead           2048  avgt
>>   10  1660.726 ±  49.221  ms/op
>>
>> [info] FileSourcesBenchmark.inputStream                       2048  avgt
>>   10   587.248 ±   9.179  ms/op
>>
>> [info] FileSourcesBenchmark.naive_ioSourceLinesIterator       2048  avgt
>>   10  3794.313 ± 839.539  ms/op
>>
>>
>> -- konrad
>>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to