During my search, I stumbled upon this
<https://github.com/adamw/reactive-akka-pres/blob/master/src/main/scala/com/softwaremill/reactive/ParseLinesStage.scala>
place,
where Adam Warski created a new ParseLinesStage. This worked for me but
failed where we so a check for
if (buffer.size > maximumLineBytes) { println("XXX FAIL") ctx.fail(new
IllegalStateException(s"Read ${buffer.size} bytes " + s"which is more than
$maximumLineBytes without seeing a line terminator")) }
So, I removed this logic and it now looks as
object LogFile {
val maxBytesPerLine = 1500
implicit val system = ActorSystem("system")
def apply(file: File) = new LogFile(file)
def main(args: Array[String]) {
val file: File = new
File("processor/src/main/resources/Demo_log_004.log")
// LogFile(file).processGraph()
LogFile(file).process()
implicit val materializer = ActorMaterializer()
}
}
class LogFile(file: File)(implicit val system: ActorSystem) {
// todo (harit): apply more filters to make sure file is right
Predef.assert(file.exists(), "log file must exists")
implicit val materializer = ActorMaterializer()
val logger = Logger(LoggerFactory.getLogger(getClass))
val source: Source[ByteString, Future[Long]] =
SynchronousFileSource(file, 1500)
def process() = {
source.transform(() => new ParseLinesStage("\n")).runForeach(println)
}
}
class ParseLinesStage(separator: String) extends StatefulStage[ByteString,
String] {
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]) = {
buffer ++= chunk
emit(doParse(Vector.empty).iterator, ctx)
}
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] =
{
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from =
nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the
buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character
of the terminator
// sequence) but we don't have yet enough bytes. We remember the
position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos +
separatorBytes.size)
== separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}
}
}
}
Couple of open questions
a.) I use SynchronousFileSource(file, 1500), does changing *chunkSize* will
make differ huge difference in performance? As I understand (and could be
totally wrong), this is how much you read from file each time, right? what
is the motivation of keeping defaultChunkSize of 8192
b.) As I mentioned I removed the check that original code listed, I am not
sure if this would have any performance hits. My apologies, I am new and
have questions that I can not find answers to.
Thank you
+ Harit Himanshu
On Sun, Oct 11, 2015 at 6:22 PM, Harit Himanshu <
[email protected]> wrote:
> Thanks Konrad
>
> This is insightful. In this case, I am not able to read through the entire
> longline and I asked a question
> <https://groups.google.com/d/msg/akka-user/hfNwkl10dog/xSonpctwCgAJ> where
> I am looking for guidance.
>
> Any help/recommendation is very much appreciated
>
> Thanks again
>
> On Sun, Oct 11, 2015 at 5:24 PM, Konrad Malawski <
> [email protected]> wrote:
>
>> Are there any performance issues with the seconds approach
>> (scala.io.Source.fromFile(file).getLines()))?
>>
>> If I remember correctly 5 to 10 times slower than SynchronousFileSource,
>> *and* the Source.fromFile used (like in the above) example leaks open
>> FileInputStreams which you never close.
>>
>> SynchronousFileSource is fast, safe, and uses a dedicated thread-pool for
>> the blocking operations by default – use it instead of hand-rolling file
>> reading.
>>
>>
>> FYI, benchmarks (to be found in akka-bench-jmh-dev on branch
>> release-2.3-dev):
>>
>> [info] Benchmark (bufSize) Mode
>> Cnt Score Error Units
>>
>> *[info] FileSourcesBenchmark.fileChannel 2048 avgt
>> 10 711.195 ± 36.094 ms/op // this is SynchronousFileSource*
>>
>> [info] FileSourcesBenchmark.fileChannel_noReadAhead 2048 avgt
>> 10 1660.726 ± 49.221 ms/op
>>
>> [info] FileSourcesBenchmark.inputStream 2048 avgt
>> 10 587.248 ± 9.179 ms/op
>>
>> [info] FileSourcesBenchmark.naive_ioSourceLinesIterator 2048 avgt
>> 10 3794.313 ± 839.539 ms/op
>>
>>
>> -- konrad
>>
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.