The reduceByKeyAndWindow and other ***ByKey**** operations work only on DStreams of key-value pairs. "Words" is a DStream[String], so its not key-value pairs. "words.map(x => (x, 1))" is DStream[(String, Int)] that has key-value pairs, so you can call reduceByKeyAndWindow.
TD On Wed, Feb 5, 2014 at 8:15 AM, Eduardo Costa Alfaia <e.costaalf...@unibs.it > wrote: > Hi Tathagata > I am playing with NetworkWordCount.scala, I did some changes like this(in > red): > > // Create the context with a 1 second batch size > 67 val ssc = new StreamingContext(args(0), "NetworkWordCount", > Seconds(1), > 68 System.getenv("SPARK_HOME"), > StreamingContext.jarOfClass(this.getClass)) > 69 ssc.checkpoint("hdfs://computer8:54310/user/root/INPUT") > 70 // Create a socket text stream on target ip:port and count the > 71 // words in the input stream of \n delimited text (eg. generated > by 'nc') > 72 val lines1 = ssc.socketTextStream("localhost", "12345".toInt, > StorageLevel.MEMORY_ONLY) > 73 val lines2 = ssc.socketTextStream("localhost", "12345".toInt, > StorageLevel.MEMORY_ONLY) > 74 val lines3 = ssc.socketTextStream("localhost", "12345".toInt, > StorageLevel.MEMORY_ONLY) > 75 val union2 = lines1.union(lines2) > 76 val union3 = union2.union(lines3) > 77 > 78 //val words = lines.flatMap(_.split(" ")) > 79 val words = union3.flatMap(_.split(" ")) > 80 // val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) > 81 val wordCounts = words.reduceByKeyAndWindow(_ + _, > Seconds(30), Seconds(10)) > > However I have gotten the error bellow: > > [error] > /opt/unibs_test/incubator-spark-tdas/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala:81: > value reduceByKeyAndWindow is not a member of > org.apache.spark.streaming.dstream.DStream[String] > [error] val wordCounts = words.reduceByKeyAndWindow(_ + _, > Seconds(30), Seconds(10)) > [error] ^ > [error] one error found > [error] (examples/compile:compile) Compilation failed > [error] Total time: 15 s, completed 05-Feb-2014 17:10:38 > > > The class is import within the code: > > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.streaming.StreamingContext._ > import org.apache.spark.storage.StorageLevel > > > Thanks > > On Feb 5, 2014, at 5:22, Tathagata Das <tathagata.das1...@gmail.com> > wrote: > > > Seems good to me. BTW, its find to MEMORY_ONLY (i.e. without replication) > > for testing, but you should turn on replication if you want > > fault-tolerance. > > > > TD > > > > > > On Mon, Feb 3, 2014 at 3:19 PM, Eduardo Costa Alfaia < > e.costaalf...@unibs.it > >> wrote: > > > >> Hi Tathagata, > >> > >> You were right when you have said for me to use scala against java, > scala > >> is very easy. I have implemented that code you have given (in bold), > but I > >> have implemented also an union function(in red) because I am testing > with 2 > >> stream sources, my idea is putting 3 or more stream sources and doing > the > >> union. > >> > >> object NetworkWordCount { > >> 37 def main(args: Array[String]) { > >> 38 if (args.length < 1) { > >> 39 System.err.println("Usage: NetworkWordCount <master> <hostname> > >> <port>\n" + > >> 40 "In local mode, <master> should be 'local[n]' with n > 1") > >> 41 System.exit(1) > >> 42 } > >> 43 > >> 44 StreamingExamples.setStreamingLogLevels() > >> 45 > >> 46 // Create the context with a 1 second batch size > >> 47 val ssc = new StreamingContext(args(0), "NetworkWordCount", > >> Seconds(1), > >> 48 System.getenv("SPARK_HOME"), > >> StreamingContext.jarOfClass(this.getClass)) > >> 49 ssc.checkpoint("hdfs://computer22:54310/user/root/INPUT") > >> 50 // Create a socket text stream on target ip:port and count the > >> 51 // words in the input stream of \n delimited text (eg. generated > by > >> 'nc') > >> 52 *val lines1 = ssc.socketTextStream("localhost", "12345".toInt, > >> StorageLevel.MEMORY_ONLY_SER)* > >> * 53 val lines2 = ssc.socketTextStream("localhost", "12345".toInt, > >> StorageLevel.MEMORY_ONLY_SER)* > >> * 54 val union2 = lines1.union(lines2)* > >> 55 //val words = lines.flatMap(_.split(" ")) > >> 56 *val words = union2.flatMap(_.split(" "))* > >> 57 val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) > >> 58 > >> 59 * words.count().foreachRDD(rdd => {* > >> * 60 val totalCount = rdd.first()* > >> * 61 * > >> * 62 // print to screen* > >> * 63 println(totalCount)* > >> * 64 * > >> * 65 // append count to file* > >> * 66 // ...* > >> * 67 })* > >> //wordCounts.print() > >> 70 ssc.start() > >> 71 ssc.awaitTermination() > >> 72 } > >> 73 } > >> > >> What do you think? is My code right? > >> > >> I have obtained the follow result: > >> > >> root@computer8:/opt/unibs_test/incubator-spark-tdas# bin/run-example > >> org.apache.spark.streaming.examples.NetworkWordCount > >> spark://192.168.0.13:7077SLF4J: Class path contains multiple SLF4J > >> bindings. > >> SLF4J: Found binding in > >> > >> > [jar:file:/opt/unibs_test/incubator-spark-tdas/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> SLF4J: Found binding in > >> > >> > [jar:file:/opt/unibs_test/incubator-spark-tdas/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-SNAPSHOT-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] > >> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > >> explanation. > >> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > >> 14/02/04 00:02:07 INFO StreamingExamples: Using Spark's default log4j > >> profile: org/apache/spark/log4j-defaults.properties > >> 14/02/04 00:02:07 INFO StreamingExamples: Setting log level to [WARN] > for > >> streaming example. To override add a custom log4j.properties to the > >> classpath. > >> 0 > >> 0 > >> 0 > >> 0 > >> 0 > >> 0 > >> 0 > >> 0 > >> 0 > >> 0 > >> 0 > >> 0 > >> 90715 > >> 1375825 > >> 882490 > >> 941226 > >> 811032 > >> 734399 > >> 804453 > >> 718688 > >> 1058695 > >> 854417 > >> 813263 > >> 798885 > >> 785455 > >> 952804 > >> 780140 > >> 697533 > >> > >> > >> Thanks Tathagata. > >> > >> Att > >> > >> > >> 2014-01-30 Eduardo Costa Alfaia <e.costaalf...@unibs.it>: > >> > >>> Hi Tathagata, > >>> > >>> Thank you by your explanations it'll be useful to me to understand how > >>> work this piece of code to do that we want. We have created a code in C > >>> which send a txt file, for example Don Quixote, like a stream over the > >>> network so we've changed the java code from JavaNetworkWordcount to > >> connect > >>> in each source described within source code. Bellow it is that we've > >>> inserted, three streams sources. > >>> > >>> JavaDStream<String> lines1 = ssc1.socketTextStream("localhost", > >>> Integer.parseInt("12345")); > >>> JavaDStream<String> lines2 = ssc1.socketTextStream("localhost", > >>> Integer.parseInt("12345")); > >>> JavaDStream<String> lines3 = ssc1.socketTextStream("localhost", > >>> Integer.parseInt("12345")); > >>> JavaDStream<String> union2 = lines1.union(lines2); > >>> JavaDStream<String> union3 = union2.union(lines3); > >>> JavaDStream<String> words = union3.flatMap(new > >>> FlatMapFunction<String, String>() { > >>> > >>> So, the second option that you've given me I think to be the better > >> option. > >>> Sorry Tathagata for my insistence in this case and I thank you by your > >>> patient. > >>> > >>> Best Regards > >>> > >>> > >>> 2014-01-30 Tathagata Das <tathagata.das1...@gmail.com> > >>> > >>> Let me first ask for a few clarifications. > >>>> > >>>> 1. If you just want to count the words in a single text file like Don > >>>> Quixote (that is, not for a stream of data), you should use only > Spark. > >>>> Then the program to count the frequency of words in a text file would > >> look > >>>> like this in Java. If you are not super-comfortable with Java, then I > >>>> strongly recommend using the Scala API or pyspark. For scala, it may > be > >> a > >>>> little trickier to learn if you have absolutely no idea. But it is > worth > >>>> it. The frequency count would look like this. > >>>> > >>>> val sc = new SparkContext(...) > >>>> val linesInFile = sc.textFile("path_to_file") > >>>> val words = linesInFile.flatMap(line => line.split(" ")) > >>>> val frequencies = words.map(word => (word, 1L)).reduceByKey(_ + _) > >>>> println("Word frequencies = " + frequences.collect()) // collect > is > >>>> costly if the file is large > >>>> > >>>> > >>>> 2. Let me assume that you want to do read a stream of text over the > >>>> network and then print the count of total number of words into a file. > >> Note > >>>> that it is "total number of words" and not "frequency of each word". > The > >>>> Java version would be something like this. > >>>> > >>>> DStream<Integer> totalCounts = words.count(); > >>>> > >>>> totalCounts.foreachRDD(new Function2<JavaRDD<Long>, Time, Void>() { > >>>> @Override public Void call(JavaRDD<Long> pairRDD, Time time) throws > >>>> Exception { > >>>> Long totalCount = totalCounts.first(); > >>>> > >>>> // print to screen > >>>> System.out.println(totalCount); > >>>> > >>>> // append count to file > >>>> ... > >>>> return null; > >>>> } > >>>> }) > >>>> > >>>> This is count how many words have been received in each batch. The > Scala > >>>> version would be much simpler to read. > >>>> > >>>> words.count().foreachRDD(rdd => { > >>>> val totalCount = rdd.first() > >>>> > >>>> // print to screen > >>>> println(totalCount) > >>>> > >>>> // append count to file > >>>> ... > >>>> }) > >>>> > >>>> Hope this helps! I apologize if the code doesnt compile, I didnt test > >> for > >>>> syntax and stuff. > >>>> > >>>> TD > >>>> > >>>> > >>>> > >>>> On Thu, Jan 30, 2014 at 8:12 AM, Eduardo Costa Alfaia < > >>>> e.costaalf...@unibs.it> wrote: > >>>> > >>>>> Hi Guys, > >>>>> > >>>>> I'm not very good like java programmer, so anybody could me help with > >>>>> this > >>>>> code piece from JavaNetworkWordcount: > >>>>> > >>>>> JavaPairDStream<String, Integer> wordCounts = words.map( > >>>>> new PairFunction<String, String, Integer>() { > >>>>> @Override > >>>>> public Tuple2<String, Integer> call(String s) throws > >> Exception > >>>>> { > >>>>> return new Tuple2<String, Integer>(s, 1); > >>>>> } > >>>>> }).reduceByKey(new Function2<Integer, Integer, Integer>() { > >>>>> @Override > >>>>> public Integer call(Integer i1, Integer i2) throws Exception > >> { > >>>>> return i1 + i2; > >>>>> } > >>>>> }); > >>>>> > >>>>> JavaPairDStream<String, Integer> counts = > >>>>> wordCounts.reduceByKeyAndWindow( > >>>>> new Function2<Integer, Integer, Integer>() { > >>>>> public Integer call(Integer i1, Integer i2) { return i1 + > >> i2; } > >>>>> }, > >>>>> new Function2<Integer, Integer, Integer>() { > >>>>> public Integer call(Integer i1, Integer i2) { return i1 - > >> i2; } > >>>>> }, > >>>>> new Duration(60 * 5 * 1000), > >>>>> new Duration(1 * 1000) > >>>>> ); > >>>>> > >>>>> I would like to think a manner of counting and after summing and > >>>>> getting a > >>>>> total from words counted in a single file, for example a book in txt > >>>>> extension Don Quixote. The counts function give me the resulted from > >> each > >>>>> word has found and not a total of words from the file. > >>>>> Tathagata has sent me a piece from scala code, Thanks Tathagata by > your > >>>>> attention with my posts I am very thankfully, > >>>>> > >>>>> yourDStream.foreachRDD(rdd => { > >>>>> > >>>>> // Get and print first n elements > >>>>> val firstN = rdd.take(n) > >>>>> println("First N elements = " + firstN) > >>>>> > >>>>> // Count the number of elements in each batch > >>>>> println("RDD has " + rdd.count() + " elements") > >>>>> > >>>>> }) > >>>>> > >>>>> yourDStream.count.print() > >>>>> > >>>>> Could anybody help me? > >>>>> > >>>>> > >>>>> Thanks Guys > >>>>> > >>>>> -- > >>>>> > >>>>> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI > >>>>> > >>>>> I dati utilizzati per l'invio del presente messaggio sono trattati > >>>>> dall'Università degli Studi di Brescia esclusivamente per finalità > >>>>> istituzionali. Informazioni più dettagliate anche in ordine ai > diritti > >>>>> > >>>>> dell'interessato sono riposte nell'informativa generale e nelle > notizie > >>>>> pubblicate sul sito web dell'Ateneo nella sezione "Privacy". > >>>>> > >>>>> Il contenuto di questo messaggio è rivolto unicamente alle persona > cui > >>>>> è indirizzato e può contenere informazioni la cui riservatezza è > >>>>> > >>>>> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e > >>>>> l'uso > >>>>> in mancanza di autorizzazione del destinatario. Qualora il messaggio > >>>>> fosse pervenuto per errore, preghiamo di eliminarlo. > >>>>> > >>>> > >>>> > >>> > >> > >> > >> -- > >> MSc Eduardo Costa Alfaia > >> PhD Student > >> Università degli Studi di Brescia > >> > >> -- > >> INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI > >> > >> I dati utilizzati per l'invio del presente messaggio sono trattati > >> dall'Università degli Studi di Brescia esclusivamente per finalità > >> istituzionali. Informazioni più dettagliate anche in ordine ai diritti > >> dell'interessato sono riposte nell'informativa generale e nelle notizie > >> pubblicate sul sito web dell'Ateneo nella sezione "Privacy". > >> > >> Il contenuto di questo messaggio è rivolto unicamente alle persona cui > >> è indirizzato e può contenere informazioni la cui riservatezza è > >> tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e > l'uso > >> in mancanza di autorizzazione del destinatario. Qualora il messaggio > >> fosse pervenuto per errore, preghiamo di eliminarlo. > >> > > > -- > INFORMATIVA SUL TRATTAMENTO DEI DATI PERSONALI > > I dati utilizzati per l'invio del presente messaggio sono trattati > dall'Università degli Studi di Brescia esclusivamente per finalità > istituzionali. Informazioni più dettagliate anche in ordine ai diritti > dell'interessato sono riposte nell'informativa generale e nelle notizie > pubblicate sul sito web dell'Ateneo nella sezione "Privacy". > > Il contenuto di questo messaggio è rivolto unicamente alle persona cui > è indirizzato e può contenere informazioni la cui riservatezza è > tutelata legalmente. Ne sono vietati la riproduzione, la diffusione e l'uso > in mancanza di autorizzazione del destinatario. Qualora il messaggio > fosse pervenuto per errore, preghiamo di eliminarlo. >