Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication) for testing, but you should turn on replication if you want fault-tolerance.
TD On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia <e.costaalf...@unibs.it > wrote: > Hi Tathagata, > > You were right when you have said for me to use scala against java, scala > is very easy. I have implemented that code you have given (in bold), but I > have implemented also an union function(in red) because I am testing with 2 > stream sources, my idea is putting 3 or more stream sources and doing the > union. > > object NetworkWordCount { > 37 def main(args: Array[String]) { > 38 if (args.length < 1) { > 39 System.err.println("Usage: NetworkWordCount <master> <hostname> > <port>\n" + > 40 "In local mode, <master> should be 'local[n]' with n > 1") > 41 System.exit(1) > 42 } > 43 > 44 StreamingExamples.setStreamingLogLevels() > 45 > 46 // Create the context with a 1 second batch size > 47 val ssc = new StreamingContext(args(0), "NetworkWordCount", > Seconds(1), > 48 System.getenv("SPARK_HOME"), > StreamingContext.jarOfClass(this.getClass)) > 49 ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT") > 50 // Create a socket text stream on target ip:port and count the > 51 // words in the input stream of \n delimited text (eg. generated by > 'nc') > 52 *val lines1 = ssc.socketTextStream("localhost", "12345".toInt, > StorageLevel.MEMORY_ONLY_SER)* > * 53 val lines2 = ssc.socketTextStream("localhost", "12345".toInt, > StorageLevel.MEMORY_ONLY_SER)* > * 54 val union2 = lines1.union(lines2)* > 55 //val words = lines.flatMap(_.split(" ")) > 56 *val words = union2.flatMap(_.split(" "))* > 57 val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) > 58 > 59 * words.count().foreachRDD(rdd => {* > * 60 val totalCount = rdd.first()* > * 61 * > * 62 // print to screen* > * 63 println(totalCount)* > * 64 * > * 65 // append count to file* > * 66 // ...* > * 67 })* > //wordCounts.print() > 70 ssc.start() > 71 ssc.awaitTermination() > 72 } > 73 } > > What do you think? is My code right? > > I have obtained the follow result: > > root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example > org.apache.spark.streaming.examples.NetworkWordCount > spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J > bindings. > SLF4J: Found binding in > > [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > > [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j > profile: org/apache/spark/log4j-defaults.properties > 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN] for > streaming example. To override add a custom log4j.properties to the > classpath. > 0 > 0 > 0 > 0 > 0 > 0 > 0 > 0 > 0 > 0 > 0 > 0 > 90715 > 1375825 > 882490 > 941226 > 811032 > 734399 > 804453 > 718688 > 1058695 > 854417 > 813263 > 798885 > 785455 > 952804 > 780140 > 697533 > > > Thanks Tathagata. > > Att > > > 2014-01-30 Eduardo Costa Alfaia <e.costaalf...@unibs.it>: > > > Hi Tathagata, > > > > Thank you by your explanations it'll be useful to me to understand how > > work this piece of code to do that we want. We have created a code in C > > which send a txt file, for example Don Quixote, like a stream over the > > network so we've changed the java code from JavaNetworkWordcount to > connect > > in each source described within source code. Bellow it is that we've > > inserted, three streams sources. > > > > JavaDStream<String> lines1 = ssc1.socketTextStream("localhost", > > Integer.parseInt("12345")); > > JavaDStream<String> lines2 = ssc1.socketTextStream("localhost", > > Integer.parseInt("12345")); > > JavaDStream<String> lines3 = ssc1.socketTextStream("localhost", > > Integer.parseInt("12345")); > > JavaDStream<String> union2 = lines1.union(lines2); > > JavaDStream<String> union3 = union2.union(lines3); > > JavaDStream<String> words = union3.flatMap(new > > FlatMapFunction<String, String>() { > > > > So, the second option that you've given me I think to be the better > option. > > Sorry Tathagata for my insistence in this case and I thank you by your > > patient. > > > > Best Regards > > > > > > 2014-01-30 Tathagata Das <tathagata.das1...@gmail.com> > > > > Let me first ask for a few clarifications. > >> > >> 1. If you just want to count the words in a single text file like Don > >> Quixote (that is, not for a stream of data), you should use only Spark. > >> Then the program to count the frequency of words in a text file would > look > >> like this in Java. If you are not super-comfortable with Java, then I > >> strongly recommend using the Scala API or pyspark. For scala, it may be > a > >> little trickier to learn if you have absolutely no idea. But it is worth > >> it. The frequency count would look like this. > >> > >> val sc = new SparkContext(...) > >> val linesInFile = sc.textFile("path_to_file") > >> val words = linesInFile.flatMap(line => line.split(" ")) > >> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _) > >> println("Word frequencies = " + frequences.collect()) // collect is > >> costly if the file is large > >> > >> > >> 2. Let me assume that you want to do read a stream of text over the > >> network and then print the count of total number of words into a file. > Note > >> that it is "total number of words" and not "frequency of each word". The > >> Java version would be something like this. > >> > >> DStream<Integer> totalCounts = words.count(); > >> > >> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>() { > >> @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws > >> Exception { > >> Long totalCount = totalCounts.first(); > >> > >> // print to screen > >> System.out.println(totalCount); > >> > >> // append count to file > >> ... > >> return null; > >> } > >> }) > >> > >> This is count how many words have been received in each batch. The Scala > >> version would be much simpler to read. > >> > >> words.count().foreachRDD(rdd => { > >> val totalCount = rdd.first() > >> > >> // print to screen > >> println(totalCount) > >> > >> // append count to file > >> ... > >> }) > >> > >> Hope this helps! I apologize if the code doesnt compile, I didnt test > for > >> syntax and stuff. > >> > >> TD > >> > >> > >> > >> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia < > >> e.costaalf...@unibs.it> wrote: > >> > >>> Hi Guys, > >>> > >>> I'm not very good like java programmer, so anybody could me help with > >>> this > >>> code piece from JavaNetworkWordcount: > >>> > >>> JavaPairDStream<String, Integer> wordCounts = words.map( > >>> new PairFunction<String, String, Integer>() { > >>> @Override > >>> public Tuple2<String, Integer> call(String s) throws > Exception > >>> { > >>> return new Tuple2<String, Integer>(s, 1); > >>> } > >>> }).reduceByKey(new Function2<Integer, Integer, Integer>() { > >>> @Override > >>> public Integer call(Integer i1, Integer i2) throws Exception > { > >>> return i1 + i2; > >>> } > >>> }); > >>> > >>> JavaPairDStream<String, Integer> counts = > >>> wordCounts.reduceByKeyAndWindow( > >>> new Function2<Integer, Integer, Integer>() { > >>> public Integer call(Integer i1, Integer i2) { return i1 + > i2; } > >>> }, > >>> new Function2<Integer, Integer, Integer>() { > >>> public Integer call(Integer i1, Integer i2) { return i1 - > i2; } > >>> }, > >>> new Duration(60 * 5 * 1000), > >>> new Duration(1 * 1000) > >>> ); > >>> > >>> I would like to think a manner of counting and after summing and > >>> getting a > >>> total from words counted in a single file, for example a book in txt > >>> extension Don Quixote. The counts function give me the resulted from > each > >>> word has found and not a total of words from the file. > >>> Tathagata has sent me a piece from scala code, Thanks Tathagata by your > >>> attention with my posts I am very thankfully, > >>> > >>> yourDStream.foreachRDD(rdd => { > >>> > >>> // Get and print first n elements > >>> val firstN = rdd.take(n) > >>> println("First N elements = " + firstN) > >>> > >>> // Count the number of elements in each batch > >>> println("RDD has " + rdd.count() + " elements") > >>> > >>> }) > >>> > >>> yourDStream.count.print() > >>> > >>> Could anybody help me? > >>> > >>> > >>> Thanks Guys > >>> > >>> -- > >>> > >>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI > >>> > >>> I dati utilizzati per l'invio del presente messaggio sono trattati > >>> dall'Università degli Studi di Brescia esclusivamente per finalità > >>> istituzionali. Informazioni più dettagliate anche in ordine ai diritti > >>> > >>> dell'interessato sono riposte nell'informativa generale e nelle notizie > >>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy". > >>> > >>> Il contenuto di questo messaggio è rivolto unicamente alle persona cui > >>> è indirizzato e può contenere informazioni la cui riservatezza è > >>> > >>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e > >>> l'uso > >>> in mancanza di autorizzazione del destinatario. Qualora il messaggio > >>> fosse pervenuto per errore, preghiamo di eliminarlo. > >>> > >> > >> > > > > > -- > MSc Eduardo Costa Alfaia > PhD Student > Università degli Studi di Brescia > > -- > INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI > > I dati utilizzati per l'invio del presente messaggio sono trattati > dall'Università degli Studi di Brescia esclusivamente per finalità > istituzionali. Informazioni più dettagliate anche in ordine ai diritti > dell'interessato sono riposte nell'informativa generale e nelle notizie > pubblicate sul sito web dell'Ateneo nella sezione "Privacy". > > Il contenuto di questo messaggio è rivolto unicamente alle persona cui > è indirizzato e può contenere informazioni la cui riservatezza è > tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso > in mancanza di autorizzazione del destinatario. Qualora il messaggio > fosse pervenuto per errore, preghiamo di eliminarlo. >