hey vadim- sorry for the delay.
if you're interested in trying to get Kinesis working one-on-one, shoot me a direct email and we'll get it going off-list. we can circle back and summarize our findings here. lots of people are using Spark Streaming+Kinesis successfully. would love to help you through this - albeit a month later! the goal is to have this working out of the box, so i'd like to implement anything i can do to make that happen. lemme know. btw, Spark 1.4 will have some improvements to the Kinesis Spark Streaming. TD and I have been working together on this. thanks! -chris On Tue, Apr 7, 2015 at 6:17 PM, Vadim Bichutskiy <vadim.bichuts...@gmail.com > wrote: > Hey y'all, > > While I haven't been able to get Spark + Kinesis integration working, I > pivoted to plan B: I now push data to S3 where I set up a DStream to > monitor an S3 bucket with textFileStream, and that works great. > > I <3 Spark! > > Best, > Vadim > > > ᐧ > > On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy < > vadim.bichuts...@gmail.com> wrote: > >> Hi all, >> >> I am wondering, has anyone on this list been able to successfully >> implement Spark on top of Kinesis? >> >> Best, >> Vadim >> >> On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy < >> vadim.bichuts...@gmail.com> wrote: >> >>> Hi all, >>> >>> Below is the output that I am getting. My Kinesis stream has 1 shard, >>> and my Spark cluster on EC2 has 2 slaves (I think that's fine?). >>> I should mention that my Kinesis producer is written in Python where I >>> followed the example >>> http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python >>> >>> I also wrote a Python consumer, again using the example at the above >>> link, that works fine. But I am unable to display output from my Spark >>> consumer. >>> >>> I'd appreciate any help. >>> >>> Thanks, >>> Vadim >>> >>> ------------------------------------------- >>> >>> Time: 1428254090000 ms >>> >>> ------------------------------------------- >>> >>> >>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming >>> job 1428254090000 ms.0 from job set of time 1428254090000 ms >>> >>> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for >>> time 1428254090000 ms (execution: 0.090 s) >>> >>> 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence >>> list >>> >>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 >>> >>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from >>> persistence list >>> >>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 >>> >>> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from >>> persistence list >>> >>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 >>> >>> 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence >>> list >>> >>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 >>> >>> 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence >>> list >>> >>> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 >>> >>> 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of >>> RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time >>> 1428254090000 ms >>> >>> *********** >>> >>> 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches >>> ArrayBuffer(1428254070000 ms) >>> On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy < >>> vadim.bichuts...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> More good news! I was able to utilize mergeStrategy to assembly my >>>> Kinesis consumer into an "uber jar" >>>> >>>> Here's what I added to* build.sbt:* >>>> >>>> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>* >>>> * {* >>>> * case PathList("com", "esotericsoftware", "minlog", xs @ _*) => >>>> MergeStrategy.first* >>>> * case PathList("com", "google", "common", "base", xs @ _*) => >>>> MergeStrategy.first* >>>> * case PathList("org", "apache", "commons", xs @ _*) => >>>> MergeStrategy.last* >>>> * case PathList("org", "apache", "hadoop", xs @ _*) => >>>> MergeStrategy.first* >>>> * case PathList("org", "apache", "spark", "unused", xs @ _*) => >>>> MergeStrategy.first* >>>> * case x => old(x)* >>>> * }* >>>> *}* >>>> >>>> Everything appears to be working fine. Right now my producer is pushing >>>> simple strings through Kinesis, >>>> which my consumer is trying to print (using Spark's print() method for >>>> now). >>>> >>>> However, instead of displaying my strings, I get the following: >>>> >>>> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting >>>> batches ArrayBuffer(1428173848000 ms)* >>>> >>>> Any idea on what might be going on? >>>> >>>> Thanks, >>>> >>>> Vadim >>>> >>>> Here's my consumer code (adapted from the WordCount example): >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *private object MyConsumer extends Logging { def main(args: >>>> Array[String]) { /* Check that all required args were passed in. */ >>>> if (args.length < 2) { System.err.println( """ |Usage: >>>> KinesisWordCount <stream-name> <endpoint-url> | <stream-name> >>>> is the name of the Kinesis stream | <endpoint-url> is the >>>> endpoint of the Kinesis service | (e.g. >>>> https://kinesis.us-east-1.amazonaws.com >>>> <https://kinesis.us-east-1.amazonaws.com>) """.stripMargin) >>>> System.exit(1) } /* Populate the appropriate variables from the given >>>> args */ val Array(streamName, endpointUrl) = args /* Determine the >>>> number of shards from the stream */ val kinesisClient = new >>>> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) >>>> kinesisClient.setEndpoint(endpointUrl) val numShards = >>>> kinesisClient.describeStream(streamName).getStreamDescription().getShards() >>>> .size() System.out.println("Num shards: " + numShards) /* In this >>>> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each >>>> shard. */ val numStreams = numShards /* Setup the and SparkConfig and >>>> StreamingContext */ /* Spark Streaming batch interval */ val >>>> batchInterval = Milliseconds(2000) val sparkConfig = new >>>> SparkConf().setAppName("MyConsumer") val ssc = new >>>> StreamingContext(sparkConfig, batchInterval) /* Kinesis checkpoint >>>> interval. Same as batchInterval for this example. */ val >>>> kinesisCheckpointInterval = batchInterval /* Create the same number of >>>> Kinesis DStreams/Receivers as Kinesis stream's shards */ val >>>> kinesisStreams = (0 until numStreams).map { i => >>>> KinesisUtils.createStream(ssc, streamName, endpointUrl, >>>> kinesisCheckpointInterval, InitialPositionInStream.LATEST, >>>> StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val >>>> unionStreams = ssc.union(kinesisStreams).map(byteArray => new >>>> String(byteArray)) unionStreams.print() ssc.start() >>>> ssc.awaitTermination() }}* >>>> >>>> >>>> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> Just remove "provided" for spark-streaming-kinesis-asl >>>>> >>>>> libraryDependencies += "org.apache.spark" %% >>>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>>> >>>>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy < >>>>> vadim.bichuts...@gmail.com> wrote: >>>>> >>>>>> Thanks. So how do I fix it? >>>>>> >>>>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com> >>>>>> wrote: >>>>>> >>>>>>> spark-streaming-kinesis-asl is not part of the Spark distribution >>>>>>> on your cluster, so you cannot have it be just a "provided" dependency. >>>>>>> This is also why the KCL and its dependencies were not included in the >>>>>>> assembly (but yes, they should be). >>>>>>> >>>>>>> >>>>>>> ~ Jonathan Kelly >>>>>>> >>>>>>> From: Vadim Bichutskiy <vadim.bichuts...@gmail.com> >>>>>>> Date: Friday, April 3, 2015 at 12:26 PM >>>>>>> To: Jonathan Kelly <jonat...@amazon.com> >>>>>>> Cc: "u...@spark.apache.org" <u...@spark.apache.org> >>>>>>> Subject: Re: Spark + Kinesis >>>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> Good news! I was able to create a Kinesis consumer and assemble it >>>>>>> into an "uber jar" following >>>>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html >>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.html&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9> >>>>>>> and example >>>>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala >>>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scala&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9> >>>>>>> . >>>>>>> >>>>>>> However when I try to spark-submit it I get the following >>>>>>> exception: >>>>>>> >>>>>>> *Exception in thread "main" java.lang.NoClassDefFoundError: >>>>>>> com/amazonaws/auth/AWSCredentialsProvider* >>>>>>> >>>>>>> Do I need to include KCL dependency in *build.sbt*, here's what it >>>>>>> looks like currently: >>>>>>> >>>>>>> import AssemblyKeys._ >>>>>>> name := "Kinesis Consumer" >>>>>>> version := "1.0" >>>>>>> organization := "com.myconsumer" >>>>>>> scalaVersion := "2.11.5" >>>>>>> >>>>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % >>>>>>> "1.3.0" % "provided" >>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>>>> "1.3.0" % "provided" >>>>>>> libraryDependencies += "org.apache.spark" %% >>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided" >>>>>>> >>>>>>> assemblySettings >>>>>>> jarName in assembly := "consumer-assembly.jar" >>>>>>> assemblyOption in assembly := (assemblyOption in >>>>>>> assembly).value.copy(includeScala=false) >>>>>>> >>>>>>> Any help appreciated. >>>>>>> >>>>>>> Thanks, >>>>>>> Vadim >>>>>>> >>>>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonat...@amazon.com >>>>>>> > wrote: >>>>>>> >>>>>>>> It looks like you're attempting to mix Scala versions, so that's >>>>>>>> going to cause some problems. If you really want to use Scala 2.11.5, >>>>>>>> you >>>>>>>> must also use Spark package versions built for Scala 2.11 rather than >>>>>>>> 2.10. Anyway, that's not quite the correct way to specify Scala >>>>>>>> dependencies in build.sbt. Instead of placing the Scala version after >>>>>>>> the >>>>>>>> artifactId (like "spark-core_2.10"), what you actually want is to use >>>>>>>> just >>>>>>>> "spark-core" with two percent signs before it. Using two percent signs >>>>>>>> will make it use the version of Scala that matches your declared >>>>>>>> scalaVersion. For example: >>>>>>>> >>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % >>>>>>>> "1.3.0" % "provided" >>>>>>>> >>>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>>>>> "1.3.0" % "provided" >>>>>>>> >>>>>>>> libraryDependencies += "org.apache.spark" %% >>>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>>>>>> >>>>>>>> I think that may get you a little closer, though I think you're >>>>>>>> probably going to run into the same problems I ran into in this thread: >>>>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html >>>>>>>> I never really got an answer for that, and I temporarily moved on to >>>>>>>> other >>>>>>>> things for now. >>>>>>>> >>>>>>>> >>>>>>>> ~ Jonathan Kelly >>>>>>>> >>>>>>>> From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com> >>>>>>>> Date: Thursday, April 2, 2015 at 9:53 AM >>>>>>>> To: "u...@spark.apache.org" <u...@spark.apache.org> >>>>>>>> Subject: Spark + Kinesis >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I am trying to write an Amazon Kinesis consumer Scala app that >>>>>>>> processes data in the >>>>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*: >>>>>>>> >>>>>>>> ------- >>>>>>>> *import AssemblyKeys._* >>>>>>>> *name := "Kinesis Consumer"* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion := >>>>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % >>>>>>>> "spark-core_2.10" >>>>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" % >>>>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % >>>>>>>> "1.3.0")* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> * assemblySettings jarName in assembly := "consumer-assembly.jar" >>>>>>>> assemblyOption in assembly := (assemblyOption in >>>>>>>> assembly).value.copy(includeScala=false)* >>>>>>>> -------- >>>>>>>> >>>>>>>> In *project/assembly.sbt* I have only the following line: >>>>>>>> >>>>>>>> *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")* >>>>>>>> >>>>>>>> I am using sbt 0.13.7. I adapted Example 7.7 in the Learning >>>>>>>> Spark book. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Vadim >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >