Hi all, I am wondering, has anyone on this list been able to successfully implement Spark on top of Kinesis?
Best, Vadim ᐧ On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy <vadim.bichuts...@gmail.com > wrote: > ᐧ > Hi all, > > Below is the output that I am getting. My Kinesis stream has 1 shard, and > my Spark cluster on EC2 has 2 slaves (I think that's fine?). > I should mention that my Kinesis producer is written in Python where I > followed the example > http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python > > I also wrote a Python consumer, again using the example at the above link, > that works fine. But I am unable to display output from my Spark consumer. > > I'd appreciate any help. > > Thanks, > Vadim > > ------------------------------------------- > > Time: 1428254090000 ms > > ------------------------------------------- > > > 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job > 1428254090000 ms.0 from job set of time 1428254090000 ms > > 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for > time 1428254090000 ms (execution: 0.090 s) > > 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence > list > > 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 > > 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from > persistence list > > 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 > > 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from > persistence list > > 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 > > 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list > > 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 > > 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list > > 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 > > 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of > RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time > 1428254090000 ms > > *********** > > 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches > ArrayBuffer(1428254070000 ms) > On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy < > vadim.bichuts...@gmail.com> wrote: > >> Hi all, >> >> More good news! I was able to utilize mergeStrategy to assembly my >> Kinesis consumer into an "uber jar" >> >> Here's what I added to* build.sbt:* >> >> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>* >> * {* >> * case PathList("com", "esotericsoftware", "minlog", xs @ _*) => >> MergeStrategy.first* >> * case PathList("com", "google", "common", "base", xs @ _*) => >> MergeStrategy.first* >> * case PathList("org", "apache", "commons", xs @ _*) => >> MergeStrategy.last* >> * case PathList("org", "apache", "hadoop", xs @ _*) => >> MergeStrategy.first* >> * case PathList("org", "apache", "spark", "unused", xs @ _*) => >> MergeStrategy.first* >> * case x => old(x)* >> * }* >> *}* >> >> Everything appears to be working fine. Right now my producer is pushing >> simple strings through Kinesis, >> which my consumer is trying to print (using Spark's print() method for >> now). >> >> However, instead of displaying my strings, I get the following: >> >> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches >> ArrayBuffer(1428173848000 ms)* >> >> Any idea on what might be going on? >> >> Thanks, >> >> Vadim >> >> Here's my consumer code (adapted from the WordCount example): >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> *private object MyConsumer extends Logging { def main(args: >> Array[String]) { /* Check that all required args were passed in. */ >> if (args.length < 2) { System.err.println( """ |Usage: >> KinesisWordCount <stream-name> <endpoint-url> | <stream-name> >> is the name of the Kinesis stream | <endpoint-url> is the >> endpoint of the Kinesis service | (e.g. >> https://kinesis.us-east-1.amazonaws.com >> <https://kinesis.us-east-1.amazonaws.com>) """.stripMargin) >> System.exit(1) } /* Populate the appropriate variables from the given >> args */ val Array(streamName, endpointUrl) = args /* Determine the >> number of shards from the stream */ val kinesisClient = new >> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) >> kinesisClient.setEndpoint(endpointUrl) val numShards = >> kinesisClient.describeStream(streamName).getStreamDescription().getShards() >> .size() System.out.println("Num shards: " + numShards) /* In this >> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each >> shard. */ val numStreams = numShards /* Setup the and SparkConfig and >> StreamingContext */ /* Spark Streaming batch interval */ val >> batchInterval = Milliseconds(2000) val sparkConfig = new >> SparkConf().setAppName("MyConsumer") val ssc = new >> StreamingContext(sparkConfig, batchInterval) /* Kinesis checkpoint >> interval. Same as batchInterval for this example. */ val >> kinesisCheckpointInterval = batchInterval /* Create the same number of >> Kinesis DStreams/Receivers as Kinesis stream's shards */ val >> kinesisStreams = (0 until numStreams).map { i => >> KinesisUtils.createStream(ssc, streamName, endpointUrl, >> kinesisCheckpointInterval, InitialPositionInStream.LATEST, >> StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val >> unionStreams = ssc.union(kinesisStreams).map(byteArray => new >> String(byteArray)) unionStreams.print() ssc.start() >> ssc.awaitTermination() }}* >> >> >> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> Just remove "provided" for spark-streaming-kinesis-asl >>> >>> libraryDependencies += "org.apache.spark" %% >>> "spark-streaming-kinesis-asl" % "1.3.0" >>> >>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy < >>> vadim.bichuts...@gmail.com> wrote: >>> >>>> Thanks. So how do I fix it? >>>> >>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com> >>>> wrote: >>>> >>>>> spark-streaming-kinesis-asl is not part of the Spark distribution >>>>> on your cluster, so you cannot have it be just a "provided" dependency. >>>>> This is also why the KCL and its dependencies were not included in the >>>>> assembly (but yes, they should be). >>>>> >>>>> >>>>> ~ Jonathan Kelly >>>>> >>>>> From: Vadim Bichutskiy <vadim.bichuts...@gmail.com> >>>>> Date: Friday, April 3, 2015 at 12:26 PM >>>>> To: Jonathan Kelly <jonat...@amazon.com> >>>>> Cc: "u...@spark.apache.org" <u...@spark.apache.org> >>>>> Subject: Re: Spark + Kinesis >>>>> >>>>> Hi all, >>>>> >>>>> Good news! I was able to create a Kinesis consumer and assemble it >>>>> into an "uber jar" following >>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html >>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.html&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9> >>>>> and example >>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala >>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scala&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9> >>>>> . >>>>> >>>>> However when I try to spark-submit it I get the following exception: >>>>> >>>>> *Exception in thread "main" java.lang.NoClassDefFoundError: >>>>> com/amazonaws/auth/AWSCredentialsProvider* >>>>> >>>>> Do I need to include KCL dependency in *build.sbt*, here's what it >>>>> looks like currently: >>>>> >>>>> import AssemblyKeys._ >>>>> name := "Kinesis Consumer" >>>>> version := "1.0" >>>>> organization := "com.myconsumer" >>>>> scalaVersion := "2.11.5" >>>>> >>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" >>>>> % "provided" >>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>> "1.3.0" % "provided" >>>>> libraryDependencies += "org.apache.spark" %% >>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided" >>>>> >>>>> assemblySettings >>>>> jarName in assembly := "consumer-assembly.jar" >>>>> assemblyOption in assembly := (assemblyOption in >>>>> assembly).value.copy(includeScala=false) >>>>> >>>>> Any help appreciated. >>>>> >>>>> Thanks, >>>>> Vadim >>>>> >>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonat...@amazon.com> >>>>> wrote: >>>>> >>>>>> It looks like you're attempting to mix Scala versions, so that's >>>>>> going to cause some problems. If you really want to use Scala 2.11.5, >>>>>> you >>>>>> must also use Spark package versions built for Scala 2.11 rather than >>>>>> 2.10. Anyway, that's not quite the correct way to specify Scala >>>>>> dependencies in build.sbt. Instead of placing the Scala version after >>>>>> the >>>>>> artifactId (like "spark-core_2.10"), what you actually want is to use >>>>>> just >>>>>> "spark-core" with two percent signs before it. Using two percent signs >>>>>> will make it use the version of Scala that matches your declared >>>>>> scalaVersion. For example: >>>>>> >>>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" >>>>>> % "provided" >>>>>> >>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>>> "1.3.0" % "provided" >>>>>> >>>>>> libraryDependencies += "org.apache.spark" %% >>>>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>>>> >>>>>> I think that may get you a little closer, though I think you're >>>>>> probably going to run into the same problems I ran into in this thread: >>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html I >>>>>> never really got an answer for that, and I temporarily moved on to other >>>>>> things for now. >>>>>> >>>>>> >>>>>> ~ Jonathan Kelly >>>>>> >>>>>> From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com> >>>>>> Date: Thursday, April 2, 2015 at 9:53 AM >>>>>> To: "u...@spark.apache.org" <u...@spark.apache.org> >>>>>> Subject: Spark + Kinesis >>>>>> >>>>>> Hi all, >>>>>> >>>>>> I am trying to write an Amazon Kinesis consumer Scala app that >>>>>> processes data in the >>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*: >>>>>> >>>>>> ------- >>>>>> *import AssemblyKeys._* >>>>>> *name := "Kinesis Consumer"* >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion := >>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % >>>>>> "spark-core_2.10" >>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" % >>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % >>>>>> "1.3.0")* >>>>>> >>>>>> >>>>>> >>>>>> * assemblySettings jarName in assembly := "consumer-assembly.jar" >>>>>> assemblyOption in assembly := (assemblyOption in >>>>>> assembly).value.copy(includeScala=false)* >>>>>> -------- >>>>>> >>>>>> In *project/assembly.sbt* I have only the following line: >>>>>> >>>>>> *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")* >>>>>> >>>>>> I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark >>>>>> book. >>>>>> >>>>>> Thanks, >>>>>> Vadim >>>>>> >>>>>> >>>>> >>>> >>> >> >