Thanks Chris! I was just looking to get back to Spark + Kinesis integration. Will be in touch shortly.
Vadim ᐧ On Sun, May 10, 2015 at 12:14 AM, Chris Fregly <ch...@fregly.com> wrote: > hey vadim- > > sorry for the delay. > > if you're interested in trying to get Kinesis working one-on-one, shoot me > a direct email and we'll get it going off-list. > > we can circle back and summarize our findings here. > > lots of people are using Spark Streaming+Kinesis successfully. > > would love to help you through this - albeit a month later! the goal is > to have this working out of the box, so i'd like to implement anything i > can do to make that happen. > > lemme know. > > btw, Spark 1.4 will have some improvements to the Kinesis Spark Streaming. > > TD and I have been working together on this. > > thanks! > > -chris > > On Tue, Apr 7, 2015 at 6:17 PM, Vadim Bichutskiy < > vadim.bichuts...@gmail.com> wrote: > >> Hey y'all, >> >> While I haven't been able to get Spark + Kinesis integration working, I >> pivoted to plan B: I now push data to S3 where I set up a DStream to >> monitor an S3 bucket with textFileStream, and that works great. >> >> I <3 Spark! >> >> Best, >> Vadim >> >> >> ᐧ >> >> On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy < >> vadim.bichuts...@gmail.com> wrote: >> >>> Hi all, >>> >>> I am wondering, has anyone on this list been able to successfully >>> implement Spark on top of Kinesis? >>> >>> Best, >>> Vadim >>> >>> On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy < >>> vadim.bichuts...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> Below is the output that I am getting. My Kinesis stream has 1 shard, >>>> and my Spark cluster on EC2 has 2 slaves (I think that's fine?). >>>> I should mention that my Kinesis producer is written in Python where I >>>> followed the example >>>> http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python >>>> >>>> I also wrote a Python consumer, again using the example at the above >>>> link, that works fine. But I am unable to display output from my Spark >>>> consumer. >>>> >>>> I'd appreciate any help. >>>> >>>> Thanks, >>>> Vadim >>>> >>>> ------------------------------------------- >>>> >>>> Time: 1428254090000 ms >>>> >>>> ------------------------------------------- >>>> >>>> >>>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming >>>> job 1428254090000 ms.0 from job set of time 1428254090000 ms >>>> >>>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for >>>> time 1428254090000 ms (execution: 0.090 s) >>>> >>>> 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from >>>> persistence list >>>> >>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 >>>> >>>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from >>>> persistence list >>>> >>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 >>>> >>>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from >>>> persistence list >>>> >>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 >>>> >>>> 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence >>>> list >>>> >>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 >>>> >>>> 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence >>>> list >>>> >>>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 >>>> >>>> 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks >>>> of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time >>>> 1428254090000 ms >>>> >>>> *********** >>>> >>>> 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches >>>> ArrayBuffer(1428254070000 ms) >>>> On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy < >>>> vadim.bichuts...@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> More good news! I was able to utilize mergeStrategy to assembly my >>>>> Kinesis consumer into an "uber jar" >>>>> >>>>> Here's what I added to* build.sbt:* >>>>> >>>>> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>* >>>>> * {* >>>>> * case PathList("com", "esotericsoftware", "minlog", xs @ _*) => >>>>> MergeStrategy.first* >>>>> * case PathList("com", "google", "common", "base", xs @ _*) => >>>>> MergeStrategy.first* >>>>> * case PathList("org", "apache", "commons", xs @ _*) => >>>>> MergeStrategy.last* >>>>> * case PathList("org", "apache", "hadoop", xs @ _*) => >>>>> MergeStrategy.first* >>>>> * case PathList("org", "apache", "spark", "unused", xs @ _*) => >>>>> MergeStrategy.first* >>>>> * case x => old(x)* >>>>> * }* >>>>> *}* >>>>> >>>>> Everything appears to be working fine. Right now my producer is >>>>> pushing simple strings through Kinesis, >>>>> which my consumer is trying to print (using Spark's print() method for >>>>> now). >>>>> >>>>> However, instead of displaying my strings, I get the following: >>>>> >>>>> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting >>>>> batches ArrayBuffer(1428173848000 ms)* >>>>> >>>>> Any idea on what might be going on? >>>>> >>>>> Thanks, >>>>> >>>>> Vadim >>>>> >>>>> Here's my consumer code (adapted from the WordCount example): >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> *private object MyConsumer extends Logging { def main(args: >>>>> Array[String]) { /* Check that all required args were passed in. */ >>>>> if (args.length < 2) { System.err.println( """ >>>>> |Usage: >>>>> KinesisWordCount <stream-name> <endpoint-url> | <stream-name> >>>>> is the name of the Kinesis stream | <endpoint-url> is the >>>>> endpoint of the Kinesis service | (e.g. >>>>> https://kinesis.us-east-1.amazonaws.com >>>>> <https://kinesis.us-east-1.amazonaws.com>) """.stripMargin) >>>>> System.exit(1) } /* Populate the appropriate variables from the >>>>> given >>>>> args */ val Array(streamName, endpointUrl) = args /* Determine the >>>>> number of shards from the stream */ val kinesisClient = new >>>>> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) >>>>> kinesisClient.setEndpoint(endpointUrl) val numShards = >>>>> kinesisClient.describeStream(streamName).getStreamDescription().getShards() >>>>> .size() System.out.println("Num shards: " + numShards) /* In >>>>> this >>>>> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each >>>>> shard. */ val numStreams = numShards /* Setup the and SparkConfig >>>>> and >>>>> StreamingContext */ /* Spark Streaming batch interval */ val >>>>> batchInterval = Milliseconds(2000) val sparkConfig = new >>>>> SparkConf().setAppName("MyConsumer") val ssc = new >>>>> StreamingContext(sparkConfig, batchInterval) /* Kinesis checkpoint >>>>> interval. Same as batchInterval for this example. */ val >>>>> kinesisCheckpointInterval = batchInterval /* Create the same number of >>>>> Kinesis DStreams/Receivers as Kinesis stream's shards */ val >>>>> kinesisStreams = (0 until numStreams).map { i => >>>>> KinesisUtils.createStream(ssc, streamName, endpointUrl, >>>>> kinesisCheckpointInterval, InitialPositionInStream.LATEST, >>>>> StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val >>>>> unionStreams = ssc.union(kinesisStreams).map(byteArray => new >>>>> String(byteArray)) unionStreams.print() ssc.start() >>>>> ssc.awaitTermination() }}* >>>>> >>>>> >>>>> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com> >>>>> wrote: >>>>> >>>>>> Just remove "provided" for spark-streaming-kinesis-asl >>>>>> >>>>>> libraryDependencies += "org.apache.spark" %% >>>>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>>>> >>>>>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy < >>>>>> vadim.bichuts...@gmail.com> wrote: >>>>>> >>>>>>> Thanks. So how do I fix it? >>>>>>> >>>>>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com >>>>>>> > wrote: >>>>>>> >>>>>>>> spark-streaming-kinesis-asl is not part of the Spark >>>>>>>> distribution on your cluster, so you cannot have it be just a >>>>>>>> "provided" >>>>>>>> dependency. This is also why the KCL and its dependencies were not >>>>>>>> included in the assembly (but yes, they should be). >>>>>>>> >>>>>>>> >>>>>>>> ~ Jonathan Kelly >>>>>>>> >>>>>>>> From: Vadim Bichutskiy <vadim.bichuts...@gmail.com> >>>>>>>> Date: Friday, April 3, 2015 at 12:26 PM >>>>>>>> To: Jonathan Kelly <jonat...@amazon.com> >>>>>>>> Cc: "u...@spark.apache.org" <u...@spark.apache.org> >>>>>>>> Subject: Re: Spark + Kinesis >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> Good news! I was able to create a Kinesis consumer and assemble >>>>>>>> it into an "uber jar" following >>>>>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html >>>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.html&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9> >>>>>>>> and example >>>>>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala >>>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scala&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9> >>>>>>>> . >>>>>>>> >>>>>>>> However when I try to spark-submit it I get the following >>>>>>>> exception: >>>>>>>> >>>>>>>> *Exception in thread "main" java.lang.NoClassDefFoundError: >>>>>>>> com/amazonaws/auth/AWSCredentialsProvider* >>>>>>>> >>>>>>>> Do I need to include KCL dependency in *build.sbt*, here's what >>>>>>>> it looks like currently: >>>>>>>> >>>>>>>> import AssemblyKeys._ >>>>>>>> name := "Kinesis Consumer" >>>>>>>> version := "1.0" >>>>>>>> organization := "com.myconsumer" >>>>>>>> scalaVersion := "2.11.5" >>>>>>>> >>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % >>>>>>>> "1.3.0" % "provided" >>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>>>>> "1.3.0" % "provided" >>>>>>>> libraryDependencies += "org.apache.spark" %% >>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided" >>>>>>>> >>>>>>>> assemblySettings >>>>>>>> jarName in assembly := "consumer-assembly.jar" >>>>>>>> assemblyOption in assembly := (assemblyOption in >>>>>>>> assembly).value.copy(includeScala=false) >>>>>>>> >>>>>>>> Any help appreciated. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Vadim >>>>>>>> >>>>>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan < >>>>>>>> jonat...@amazon.com> wrote: >>>>>>>> >>>>>>>>> It looks like you're attempting to mix Scala versions, so that's >>>>>>>>> going to cause some problems. If you really want to use Scala >>>>>>>>> 2.11.5, you >>>>>>>>> must also use Spark package versions built for Scala 2.11 rather than >>>>>>>>> 2.10. Anyway, that's not quite the correct way to specify Scala >>>>>>>>> dependencies in build.sbt. Instead of placing the Scala version >>>>>>>>> after the >>>>>>>>> artifactId (like "spark-core_2.10"), what you actually want is to use >>>>>>>>> just >>>>>>>>> "spark-core" with two percent signs before it. Using two percent >>>>>>>>> signs >>>>>>>>> will make it use the version of Scala that matches your declared >>>>>>>>> scalaVersion. For example: >>>>>>>>> >>>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % >>>>>>>>> "1.3.0" % "provided" >>>>>>>>> >>>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>>>>>> "1.3.0" % "provided" >>>>>>>>> >>>>>>>>> libraryDependencies += "org.apache.spark" %% >>>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>>>>>>> >>>>>>>>> I think that may get you a little closer, though I think you're >>>>>>>>> probably going to run into the same problems I ran into in this >>>>>>>>> thread: >>>>>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html >>>>>>>>> I never really got an answer for that, and I temporarily moved on to >>>>>>>>> other >>>>>>>>> things for now. >>>>>>>>> >>>>>>>>> >>>>>>>>> ~ Jonathan Kelly >>>>>>>>> >>>>>>>>> From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com> >>>>>>>>> Date: Thursday, April 2, 2015 at 9:53 AM >>>>>>>>> To: "u...@spark.apache.org" <u...@spark.apache.org> >>>>>>>>> Subject: Spark + Kinesis >>>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I am trying to write an Amazon Kinesis consumer Scala app that >>>>>>>>> processes data in the >>>>>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*: >>>>>>>>> >>>>>>>>> ------- >>>>>>>>> *import AssemblyKeys._* >>>>>>>>> *name := "Kinesis Consumer"* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion := >>>>>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % >>>>>>>>> "spark-core_2.10" >>>>>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" % >>>>>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % >>>>>>>>> "1.3.0")* >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> * assemblySettings jarName in assembly := "consumer-assembly.jar" >>>>>>>>> assemblyOption in assembly := (assemblyOption in >>>>>>>>> assembly).value.copy(includeScala=false)* >>>>>>>>> -------- >>>>>>>>> >>>>>>>>> In *project/assembly.sbt* I have only the following line: >>>>>>>>> >>>>>>>>> *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")* >>>>>>>>> >>>>>>>>> I am using sbt 0.13.7. I adapted Example 7.7 in the Learning >>>>>>>>> Spark book. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Vadim >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >