Hello there!

I am trying to build a processing pipeline for a log file where log lines 
look like  

2005-05-06 14:58:57 1 45.23.4.218 304 TCP_HIT 542 1109 GET http 
sports.espn.go.com /crossdomain.xml - - DIRECT 199.181.132.141 text/xml 
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.1.4322)" 
PROXIED Sports/Recreation/Hobbies - 192.16.170.44 SG-HTTP-Service - none -

For this specific log file I have around *193705* log lines that I want to 
process.

Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ wc -l Demo_log_004.log 
  195765 Demo_log_004.log
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ grep -E "GET|POST|CONNECT" 
Demo_log_004.log | wc -l
  192197
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ wc -l Demo_log_004.log 
  195765 Demo_log_004.log
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ grep -v "^#" 
Demo_log_004.log | wc -l
  193705
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ 


I created a flow graph that initially looked like  

      source ~> byteStringToString ~> filterComments ~> splitLogLine ~> 
broadcast ~> transformEvent ~> sinkEvents
                                                                        
broadcast ~> obfuscateIpAddress ~> sinkAssets

But then I realized that I was getting lot less in Sink
$ java -cp processor/target/lib:processor/target/processor-1.0-SNAPSHOT.jar 
com.learner.processor.LogFile | tee out.log

$ wc -l out.log 
   15070 out.log

So I made my graph linear to make sure if all lines pass through pipes. My 
current code looks like  


import java.io.File

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.io.Implicits.AddSynchronousFileSource
import akka.stream.scaladsl.FlowGraph.Builder
import akka.stream.scaladsl._
import akka.util.ByteString
import com.learner.messages.BlueCoatEvent
import com.learner.processor.Flows.byteStringToString
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory

import scala.concurrent.Future
import scala.util.hashing.MurmurHash3.stringHash

object LogFile {
  val maxBytesPerLine = 1500
  implicit val system = ActorSystem("system")

  def apply(file: File) = new LogFile(file)

  def main(args: Array[String]) {
    LogFile(new 
File("/Users/harit/Downloads/bluecoat_proxy_big/Demo_log_004.log")).processGraph()
  }
}

class LogFile(file: File)(implicit val system: ActorSystem) {
  Predef.assert(file.exists(), "log file must exists")

  implicit val materializer = ActorMaterializer()
  val logger = Logger(LoggerFactory.getLogger(getClass))

  val source: Source[ByteString, Future[Long]] = 
Source.synchronousFile(file)


  def processGraph() = {
    val sinkEvents = Sink.foreach(println)
    val sinkAssets = Sink.ignore

    val filterComments = Flow[String].filter(!_.startsWith("#"))
    val splitLogLine = Flow[String].map(_.split("\\s") toList)
    val transformEvent = Flow[List[String]].map(tokens => 
BlueCoatEvent(tokens))
    val obfuscateIpAddress = Flow[List[String]].map(tokens => Map[String, 
String](tokens(3) -> stringHash(tokens(3)).toString))


    FlowGraph.closed() { implicit builder: Builder[Unit] =>
      import FlowGraph.Implicits._

      source ~> byteStringToString ~> sinkEvents
    }.run()

  }
}

and

  val byteStringToString: Flow[ByteString, String, Unit] = Flow[ByteString]
    .via(Framing.delimiter(ByteString(System.lineSeparator), 
maximumFrameLength = LogFile.maxBytesPerLine, allowTruncation = true))
    .map(_.utf8String)


When I ran my program again, it is again generating lines somewhere near to 
above number (no exactly same)

I am confused and wanted get some help here

   1. The stream does not fail or throw exceptions, but still generates lot 
   less output lines, How do I debug this?
   2. The maximumFrameLength is 1500, however the log lines can be greater 
   than that. Can that be an issue? How do I confirm?
   3. As per my code, I do not get back the Materializer when I call run(), 
   so I am unable to shutdown the ActorSystem, what am I missing there?


Thank you
+ Harit Himanshu

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to