I just found out that it halts for line which has 2564 chars(bytes)

15070 2005-05-06 14:58:57 42 45.23.4.218 200 TCP_NC_MISS 903 1098 GET http 
sports.espn.go.com /nba/xml/upcomingTV ?sport=nba - DIRECT sports.espn.go.com 
text/html;%20charset=iso-       8859-1 "Mozilla/4.0 (compatible; MSIE 6.0; 
Windows NT 5.0; .NET CLR 1.1.4322)" PROXIED Sports/Recreation/Hobbies - 
192.16.170.44 SG-HTTP-Service - none -15071 2005-05-06 14:58:57 306 45.23.4.218 
200 TCP_MISS 16641 2140 GET http m3.doubleclick.net 
/872526/match2fb_728x90v2.swf 
?clickTag=http%253A//ad.doubleclick.net/click%25253Bh%       
253Dv3%257C3270%257Cf%257C6e%257C%25252a%257Ct%25253B14910813%25253B0-0%25253B0%25253B11166676%25253B3454-728%257C90%25253B9369149%257C9387045%257C1%25253B%25253B%25257Essc
       
s%25253D%25253fhttp%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5504B8AD-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A188329%2
       
53A65%252526a%25253d1%252526goto%25253dhttp%25253a%25252f%25252fwww.levitra.com/match/levitra_promotions/match/get/forms.jsp%25253Frotation%25253D11166676%252526banner%2525
       
3D14910813&clickTag1=http%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5504B8AD-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A18
       
8329%253A65%252526a%25253d1%252526goto%25253dhttp%253A//ad.doubleclick.net/click%25253Bh%253Dv3%257C3270%257Cf%257C6e%257C%25252a%257Ct%25253B14910813%25253B0-0%25253B0%252
       
53B11166676%25253B3454-728%257C90%25253B9369149%257C9387045%257C1%25253B%25253B%25257Esscs%25253D%25253fhttp%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5504B8A
       
D-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A188329%253A65%252526a%25253d1%252526goto%25253dhttp%25253a%25252f%25252fwww.levitra.com/mat
       
ch/levitra_promotions/match/get/forms.jsp%25253Frotation%25253D11166676%252526banner%25253D14910813&clickTag2=http%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5
       
504B8AD-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A188329%253A65%252526a%25253d1%252526goto%25253dhttp%253A//ad.doubleclick.net/click%25
       
253Bh%253Dv3%257C3270%257Cf%257C6e%257C%25252a%257Ct%25253B14910813%25253B0-0%25253B0%25253B11166676%25253B3454-728%257C90%25253B9369149%257C9387045%257C1%25253B%25253B%252
       
57Esscs%25253D%25253fhttp%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5504B8AD-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A18
       
8329%253A65%252526a%25253d1%252526goto%25253dhttp%253A//www.levitra.com/consumer/about_levitra/levitra_side_effects.htm%253Frotation%253D11166676%2526banner%253D14910813&cl
       ickTag3=&clickTag4=&clickTag5= - DIRECT m3.doubleclick.net 
application/x-shockwave-flash "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 
5.0; .NET CLR 1.1.4322)" PROXIED Web       %20Advertisements - 192.16.170.44 
SG-HTTP-Service - none -

and

$ wc -l out.log 
   15070 out.log

so it halts for line 15071 which is 2564 bytes.

But why is it not throwing exception? How do I handle this?

On Saturday, October 10, 2015 at 5:11:34 PM UTC-7, Harit Himanshu wrote:
>
> Hello there!
>
> I am trying to build a processing pipeline for a log file where log lines 
> look like  
>
> 2005-05-06 14:58:57 1 45.23.4.218 304 TCP_HIT 542 1109 GET http 
> sports.espn.go.com /crossdomain.xml - - DIRECT 199.181.132.141 text/xml 
> "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.1.4322)" 
> PROXIED Sports/Recreation/Hobbies - 192.16.170.44 SG-HTTP-Service - none -
>
> For this specific log file I have around *193705* log lines that I want 
> to process.
>
> Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ wc -l Demo_log_004.log 
>   195765 Demo_log_004.log
> Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ grep -E "GET|POST|CONNECT" 
> Demo_log_004.log | wc -l
>   192197
> Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ wc -l Demo_log_004.log 
>   195765 Demo_log_004.log
> Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ grep -v "^#" 
> Demo_log_004.log | wc -l
>   193705
> Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ 
>
>
> I created a flow graph that initially looked like  
>
>       source ~> byteStringToString ~> filterComments ~> splitLogLine ~> 
> broadcast ~> transformEvent ~> sinkEvents
>                                                                         
> broadcast ~> obfuscateIpAddress ~> sinkAssets
>
> But then I realized that I was getting lot less in Sink
> $ java -cp 
> processor/target/lib:processor/target/processor-1.0-SNAPSHOT.jar 
> com.learner.processor.LogFile | tee out.log
>
> $ wc -l out.log 
>    15070 out.log
>
> So I made my graph linear to make sure if all lines pass through pipes. My 
> current code looks like  
>
>
> import java.io.File
>
> import akka.actor.ActorSystem
> import akka.stream.ActorMaterializer
> import akka.stream.io.Implicits.AddSynchronousFileSource
> import akka.stream.scaladsl.FlowGraph.Builder
> import akka.stream.scaladsl._
> import akka.util.ByteString
> import com.learner.messages.BlueCoatEvent
> import com.learner.processor.Flows.byteStringToString
> import com.typesafe.scalalogging.Logger
> import org.slf4j.LoggerFactory
>
> import scala.concurrent.Future
> import scala.util.hashing.MurmurHash3.stringHash
>
> object LogFile {
>   val maxBytesPerLine = 1500
>   implicit val system = ActorSystem("system")
>
>   def apply(file: File) = new LogFile(file)
>
>   def main(args: Array[String]) {
>     LogFile(new 
> File("/Users/harit/Downloads/bluecoat_proxy_big/Demo_log_004.log")).processGraph()
>   }
> }
>
> class LogFile(file: File)(implicit val system: ActorSystem) {
>   Predef.assert(file.exists(), "log file must exists")
>
>   implicit val materializer = ActorMaterializer()
>   val logger = Logger(LoggerFactory.getLogger(getClass))
>
>   val source: Source[ByteString, Future[Long]] = 
> Source.synchronousFile(file)
>
>
>   def processGraph() = {
>     val sinkEvents = Sink.foreach(println)
>     val sinkAssets = Sink.ignore
>
>     val filterComments = Flow[String].filter(!_.startsWith("#"))
>     val splitLogLine = Flow[String].map(_.split("\\s") toList)
>     val transformEvent = Flow[List[String]].map(tokens => 
> BlueCoatEvent(tokens))
>     val obfuscateIpAddress = Flow[List[String]].map(tokens => Map[String, 
> String](tokens(3) -> stringHash(tokens(3)).toString))
>
>
>     FlowGraph.closed() { implicit builder: Builder[Unit] =>
>       import FlowGraph.Implicits._
>
>       source ~> byteStringToString ~> sinkEvents
>     }.run()
>
>   }
> }
>
> and
>
>   val byteStringToString: Flow[ByteString, String, Unit] = Flow[ByteString]
>     .via(Framing.delimiter(ByteString(System.lineSeparator), 
> maximumFrameLength = LogFile.maxBytesPerLine, allowTruncation = true))
>     .map(_.utf8String)
>
>
> When I ran my program again, it is again generating lines somewhere near 
> to above number (no exactly same)
>
> I am confused and wanted get some help here
>
>    1. The stream does not fail or throw exceptions, but still generates 
>    lot less output lines, How do I debug this?
>    2. The maximumFrameLength is 1500, however the log lines can be 
>    greater than that. Can that be an issue? How do I confirm?
>    3. As per my code, I do not get back the Materializer when I call 
>    run(), so I am unable to shutdown the ActorSystem, what am I missing there?
>
>
> Thank you
> + Harit Himanshu
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to