Hey y'all, While I haven't been able to get Spark + Kinesis integration working, I pivoted to plan B: I now push data to S3 where I set up a DStream to monitor an S3 bucket with textFileStream, and that works great.
I <3 Spark! Best, Vadim ᐧ On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy < vadim.bichuts...@gmail.com> wrote: > Hi all, > > I am wondering, has anyone on this list been able to successfully > implement Spark on top of Kinesis? > > Best, > Vadim > > On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy < > vadim.bichuts...@gmail.com> wrote: > >> Hi all, >> >> Below is the output that I am getting. My Kinesis stream has 1 shard, and >> my Spark cluster on EC2 has 2 slaves (I think that's fine?). >> I should mention that my Kinesis producer is written in Python where I >> followed the example >> http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python >> >> I also wrote a Python consumer, again using the example at the above >> link, that works fine. But I am unable to display output from my Spark >> consumer. >> >> I'd appreciate any help. >> >> Thanks, >> Vadim >> >> ------------------------------------------- >> >> Time: 1428254090000 ms >> >> ------------------------------------------- >> >> >> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job >> 1428254090000 ms.0 from job set of time 1428254090000 ms >> >> 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for >> time 1428254090000 ms (execution: 0.090 s) >> >> 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence >> list >> >> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 >> >> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from >> persistence list >> >> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 >> >> 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from >> persistence list >> >> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 >> >> 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list >> >> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 >> >> 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list >> >> 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 >> >> 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of >> RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time >> 1428254090000 ms >> >> *********** >> >> 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches >> ArrayBuffer(1428254070000 ms) >> On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy < >> vadim.bichuts...@gmail.com> wrote: >> >>> Hi all, >>> >>> More good news! I was able to utilize mergeStrategy to assembly my >>> Kinesis consumer into an "uber jar" >>> >>> Here's what I added to* build.sbt:* >>> >>> *mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>* >>> * {* >>> * case PathList("com", "esotericsoftware", "minlog", xs @ _*) => >>> MergeStrategy.first* >>> * case PathList("com", "google", "common", "base", xs @ _*) => >>> MergeStrategy.first* >>> * case PathList("org", "apache", "commons", xs @ _*) => >>> MergeStrategy.last* >>> * case PathList("org", "apache", "hadoop", xs @ _*) => >>> MergeStrategy.first* >>> * case PathList("org", "apache", "spark", "unused", xs @ _*) => >>> MergeStrategy.first* >>> * case x => old(x)* >>> * }* >>> *}* >>> >>> Everything appears to be working fine. Right now my producer is pushing >>> simple strings through Kinesis, >>> which my consumer is trying to print (using Spark's print() method for >>> now). >>> >>> However, instead of displaying my strings, I get the following: >>> >>> *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches >>> ArrayBuffer(1428173848000 ms)* >>> >>> Any idea on what might be going on? >>> >>> Thanks, >>> >>> Vadim >>> >>> Here's my consumer code (adapted from the WordCount example): >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> *private object MyConsumer extends Logging { def main(args: >>> Array[String]) { /* Check that all required args were passed in. */ >>> if (args.length < 2) { System.err.println( """ |Usage: >>> KinesisWordCount <stream-name> <endpoint-url> | <stream-name> >>> is the name of the Kinesis stream | <endpoint-url> is the >>> endpoint of the Kinesis service | (e.g. >>> https://kinesis.us-east-1.amazonaws.com >>> <https://kinesis.us-east-1.amazonaws.com>) """.stripMargin) >>> System.exit(1) } /* Populate the appropriate variables from the given >>> args */ val Array(streamName, endpointUrl) = args /* Determine the >>> number of shards from the stream */ val kinesisClient = new >>> AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) >>> kinesisClient.setEndpoint(endpointUrl) val numShards = >>> kinesisClient.describeStream(streamName).getStreamDescription().getShards() >>> .size() System.out.println("Num shards: " + numShards) /* In this >>> example, we're going to create 1 Kinesis Worker/Receiver/DStream for each >>> shard. */ val numStreams = numShards /* Setup the and SparkConfig and >>> StreamingContext */ /* Spark Streaming batch interval */ val >>> batchInterval = Milliseconds(2000) val sparkConfig = new >>> SparkConf().setAppName("MyConsumer") val ssc = new >>> StreamingContext(sparkConfig, batchInterval) /* Kinesis checkpoint >>> interval. Same as batchInterval for this example. */ val >>> kinesisCheckpointInterval = batchInterval /* Create the same number of >>> Kinesis DStreams/Receivers as Kinesis stream's shards */ val >>> kinesisStreams = (0 until numStreams).map { i => >>> KinesisUtils.createStream(ssc, streamName, endpointUrl, >>> kinesisCheckpointInterval, InitialPositionInStream.LATEST, >>> StorageLevel.MEMORY_AND_DISK_2) } /* Union all the streams */ val >>> unionStreams = ssc.union(kinesisStreams).map(byteArray => new >>> String(byteArray)) unionStreams.print() ssc.start() >>> ssc.awaitTermination() }}* >>> >>> >>> On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Just remove "provided" for spark-streaming-kinesis-asl >>>> >>>> libraryDependencies += "org.apache.spark" %% >>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>> >>>> On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy < >>>> vadim.bichuts...@gmail.com> wrote: >>>> >>>>> Thanks. So how do I fix it? >>>>> >>>>> On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan <jonat...@amazon.com> >>>>> wrote: >>>>> >>>>>> spark-streaming-kinesis-asl is not part of the Spark distribution >>>>>> on your cluster, so you cannot have it be just a "provided" dependency. >>>>>> This is also why the KCL and its dependencies were not included in the >>>>>> assembly (but yes, they should be). >>>>>> >>>>>> >>>>>> ~ Jonathan Kelly >>>>>> >>>>>> From: Vadim Bichutskiy <vadim.bichuts...@gmail.com> >>>>>> Date: Friday, April 3, 2015 at 12:26 PM >>>>>> To: Jonathan Kelly <jonat...@amazon.com> >>>>>> Cc: "u...@spark.apache.org" <u...@spark.apache.org> >>>>>> Subject: Re: Spark + Kinesis >>>>>> >>>>>> Hi all, >>>>>> >>>>>> Good news! I was able to create a Kinesis consumer and assemble it >>>>>> into an "uber jar" following >>>>>> http://spark.apache.org/docs/latest/streaming-kinesis-integration.html >>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.html&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9> >>>>>> and example >>>>>> https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala >>>>>> <http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scala&si=5533377798602752&pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9> >>>>>> . >>>>>> >>>>>> However when I try to spark-submit it I get the following exception: >>>>>> >>>>>> *Exception in thread "main" java.lang.NoClassDefFoundError: >>>>>> com/amazonaws/auth/AWSCredentialsProvider* >>>>>> >>>>>> Do I need to include KCL dependency in *build.sbt*, here's what it >>>>>> looks like currently: >>>>>> >>>>>> import AssemblyKeys._ >>>>>> name := "Kinesis Consumer" >>>>>> version := "1.0" >>>>>> organization := "com.myconsumer" >>>>>> scalaVersion := "2.11.5" >>>>>> >>>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.3.0" >>>>>> % "provided" >>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>>> "1.3.0" % "provided" >>>>>> libraryDependencies += "org.apache.spark" %% >>>>>> "spark-streaming-kinesis-asl" % "1.3.0" % "provided" >>>>>> >>>>>> assemblySettings >>>>>> jarName in assembly := "consumer-assembly.jar" >>>>>> assemblyOption in assembly := (assemblyOption in >>>>>> assembly).value.copy(includeScala=false) >>>>>> >>>>>> Any help appreciated. >>>>>> >>>>>> Thanks, >>>>>> Vadim >>>>>> >>>>>> On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan <jonat...@amazon.com> >>>>>> wrote: >>>>>> >>>>>>> It looks like you're attempting to mix Scala versions, so that's >>>>>>> going to cause some problems. If you really want to use Scala 2.11.5, >>>>>>> you >>>>>>> must also use Spark package versions built for Scala 2.11 rather than >>>>>>> 2.10. Anyway, that's not quite the correct way to specify Scala >>>>>>> dependencies in build.sbt. Instead of placing the Scala version after >>>>>>> the >>>>>>> artifactId (like "spark-core_2.10"), what you actually want is to use >>>>>>> just >>>>>>> "spark-core" with two percent signs before it. Using two percent signs >>>>>>> will make it use the version of Scala that matches your declared >>>>>>> scalaVersion. For example: >>>>>>> >>>>>>> libraryDependencies += "org.apache.spark" %% "spark-core" % >>>>>>> "1.3.0" % "provided" >>>>>>> >>>>>>> libraryDependencies += "org.apache.spark" %% "spark-streaming" % >>>>>>> "1.3.0" % "provided" >>>>>>> >>>>>>> libraryDependencies += "org.apache.spark" %% >>>>>>> "spark-streaming-kinesis-asl" % "1.3.0" >>>>>>> >>>>>>> I think that may get you a little closer, though I think you're >>>>>>> probably going to run into the same problems I ran into in this thread: >>>>>>> https://www.mail-archive.com/user@spark.apache.org/msg23891.html I >>>>>>> never really got an answer for that, and I temporarily moved on to other >>>>>>> things for now. >>>>>>> >>>>>>> >>>>>>> ~ Jonathan Kelly >>>>>>> >>>>>>> From: 'Vadim Bichutskiy' <vadim.bichuts...@gmail.com> >>>>>>> Date: Thursday, April 2, 2015 at 9:53 AM >>>>>>> To: "u...@spark.apache.org" <u...@spark.apache.org> >>>>>>> Subject: Spark + Kinesis >>>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I am trying to write an Amazon Kinesis consumer Scala app that >>>>>>> processes data in the >>>>>>> Kinesis stream. Is this the correct way to specify *build.sbt*: >>>>>>> >>>>>>> ------- >>>>>>> *import AssemblyKeys._* >>>>>>> *name := "Kinesis Consumer"* >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> *version := "1.0" organization := "com.myconsumer" scalaVersion := >>>>>>> "2.11.5" libraryDependencies ++= Seq("org.apache.spark" % >>>>>>> "spark-core_2.10" >>>>>>> % "1.3.0" % "provided", "org.apache.spark" % "spark-streaming_2.10" % >>>>>>> "1.3.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % >>>>>>> "1.3.0")* >>>>>>> >>>>>>> >>>>>>> >>>>>>> * assemblySettings jarName in assembly := "consumer-assembly.jar" >>>>>>> assemblyOption in assembly := (assemblyOption in >>>>>>> assembly).value.copy(includeScala=false)* >>>>>>> -------- >>>>>>> >>>>>>> In *project/assembly.sbt* I have only the following line: >>>>>>> >>>>>>> *addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")* >>>>>>> >>>>>>> I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark >>>>>>> book. >>>>>>> >>>>>>> Thanks, >>>>>>> Vadim >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >