RE: RE: Re: Spark Kinesis Connector SSL issue
Hi, The issue is that the KCL inside the Spark Streaming connector does not provide a way to pass KCL configuration in, which means we can’t supply configuration to disable SSL cert checks. In a typical (non-Spark Streaming) KCL app, we can instantiate the KCL via: Worker worker = new Worker.Builder() .config(kclConfig) , //we can specify to disable SSL certs here. .kinesisClient(kinesisClient) .recordProcessorFactory(processorFactory) .build(); However with the Kinesis Spark Streaming consumer, we do not have the ability to set this. Our only interface with Kinesis is: KinesisUtils.createStream(context, appName, streamName, serviceEndpoint, regionName, InitialPositionInStream.TRIM_HORIZON, checkpoint, StorageLevel.MEMORY_AND_DISK_2()) and so we can’t pass config anywhere for the KCL to read. Regards, Ben Shashikant Bangera | DevOps Engineer Payment Services DevOps Engineering Email: shashikantbang...@discover.com<mailto:shashikantbang...@discover.com> Group email: eppdev...@discover.com<mailto:eppdev...@discover.com> Tel: +44 (0) Mob: +44 (0) 7440783885 From: Ben Watson Sent: 07 January 2019 12:32 To: yeikel valdes ; Shashikant Bangera Cc: user@spark.apache.org Subject: RE: [EXTERNAL] RE: Re: Spark Kinesis Connector SSL issue Hi, The issue is that the KCL inside the Spark Streaming connector does not provide a way to pass KCL configuration in, which means we can’t supply configuration to disable SSL cert checks. In a typical (non-Spark Streaming) KCL app, we can instantiate the KCL via: Worker worker = new Worker.Builder() .config(kclConfig) , //we can specify to disable SSL certs here. .kinesisClient(kinesisClient) .recordProcessorFactory(processorFactory) .build(); However with the Kinesis Spark Streaming consumer, we do not have the ability to set this. Our only interface with Kinesis is: KinesisUtils.createStream(context, appName, streamName, serviceEndpoint, regionName, InitialPositionInStream.TRIM_HORIZON, checkpoint, StorageLevel.MEMORY_AND_DISK_2()) and so we can’t pass config anywhere for the KCL to read. Regards, Ben From: yeikel valdes [mailto:em...@yeikel.com] Sent: 07 January 2019 12:21 To: Shashikant Bangera Cc: user@spark.apache.org; Ben Watson Subject: [EXTERNAL] RE: Re: Spark Kinesis Connector SSL issue CAUTION EXTERNAL EMAIL DO NOT open attachments or click on links from unknown senders or unexpected emails. Any chance you can share a minimum example to replicate the issue? On Mon, 07 Jan 2019 04:17:44 -0800 shashikantbang...@discover.com wrote Hi Valdes, Thank you for your response, to answer to your question. yes I can @ben : correct me if I am wrong. Cheers, Shashi Shashikant Bangera | DevOps Engineer Payment Services DevOps Engineering Email: shashikantbang...@discover.com<mailto:shashikantbang...@discover.com> Group email: eppdev...@discover.com<mailto:eppdev...@discover.com> Tel: +44 (0) Mob: +44 (0) 7440783885 From: yeikel valdes [mailto:em...@yeikel.com<mailto:em...@yeikel.com>] Sent: 07 January 2019 12:15 To: Shashikant Bangera mailto:shashikantbang...@discover.com>> Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: [EXTERNAL] Re: Spark Kinesis Connector SSL issue CAUTION EXTERNAL EMAIL DO NOT open attachments or click on links from unknown senders or unexpected emails. Can you call this service with regular code(No Spark)? On Mon, 07 Jan 2019 02:42:48 -0800 shashikantbang...@discover.com<mailto:shashikantbang...@discover.com> wrote Hi team, please help , we are kind of blocked here. Cheers, Shashi -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_=DwMFaQ=aGHIJeafFG1OvdAmHquqHk8LwduOIVr390d9RxG_Ii0=5iagzyAhXr_lW4Qru-HH2EiLU8DXPfEhqvrYe8cl44A=bruN0LP0xvrh5SboalWPOu1Iws5Ia1RONxxCEzzYQ9g=h1CVgmdCzA6yS9NqRH9XMVXrD4aeTZsKJMxauzIFDq4=> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
RE: Re: Spark Kinesis Connector SSL issue
Any chance you can share a minimum example to replicate the issue? On Mon, 07 Jan 2019 04:17:44 -0800 shashikantbang...@discover.com wrote Hi Valdes, Thank you for your response, to answer to your question. yes I can @ben : correct me if I am wrong. Cheers, Shashi Shashikant Bangera | DevOps Engineer Payment Services DevOps Engineering Email: shashikantbang...@discover.com Group email: eppdev...@discover.com Tel: +44 (0) Mob: +44 (0) 7440783885 From: yeikel valdes [mailto:em...@yeikel.com] Sent: 07 January 2019 12:15 To: Shashikant Bangera Cc: user@spark.apache.org Subject: [EXTERNAL] Re: Spark Kinesis Connector SSL issue CAUTION EXTERNAL EMAIL DO NOT open attachments or click on links from unknown senders or unexpected emails. Can you call this service with regular code(No Spark)? On Mon, 07 Jan 2019 02:42:48 -0800 shashikantbang...@discover.com wrote Hi team, please help , we are kind of blocked here. Cheers, Shashi -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RE: Re: Spark Kinesis Connector SSL issue
Hi Valdes, Thank you for your response, to answer to your question. yes I can @ben : correct me if I am wrong. Cheers, Shashi Shashikant Bangera | DevOps Engineer Payment Services DevOps Engineering Email: shashikantbang...@discover.com<mailto:shashikantbang...@discover.com> Group email: eppdev...@discover.com<mailto:eppdev...@discover.com> Tel: +44 (0) Mob: +44 (0) 7440783885 From: yeikel valdes [mailto:em...@yeikel.com] Sent: 07 January 2019 12:15 To: Shashikant Bangera Cc: user@spark.apache.org Subject: [EXTERNAL] Re: Spark Kinesis Connector SSL issue CAUTION EXTERNAL EMAIL DO NOT open attachments or click on links from unknown senders or unexpected emails. Can you call this service with regular code(No Spark)? On Mon, 07 Jan 2019 02:42:48 -0800 shashikantbang...@discover.com wrote Hi team, please help , we are kind of blocked here. Cheers, Shashi -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/<https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_=DwMFaQ=aGHIJeafFG1OvdAmHquqHk8LwduOIVr390d9RxG_Ii0=5iagzyAhXr_lW4Qru-HH2EiLU8DXPfEhqvrYe8cl44A=bruN0LP0xvrh5SboalWPOu1Iws5Ia1RONxxCEzzYQ9g=h1CVgmdCzA6yS9NqRH9XMVXrD4aeTZsKJMxauzIFDq4=> - To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
Re: Spark Kinesis Connector SSL issue
Can you call this service with regular code(No Spark)? On Mon, 07 Jan 2019 02:42:48 -0800 shashikantbang...@discover.com wrote Hi team, please help , we are kind of blocked here. Cheers, Shashi -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Kinesis Connector SSL issue
Hi team, please help , we are kind of blocked here. Cheers, Shashi -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Kinesis Connector SSL issue
Hi Team, we are trying access the endpoint thought library mentioned below and we get the SSL error i think internally it use KCL library. so if I have to skip the certificate is it possible through KCL utils call ? because I do not find any provision to do that to set no-verify=false within spark streaming kinesis library like we can do with KCL. Can you please help me with the same. compile("org.apache.spark:spark-streaming-kinesis-asl_2.11:2.3.0") { exclude group: 'org.apache.spark', module: 'spark-streaming_2.11' } Caused by: javax.net.ssl.SSLPeerUnverifiedException: Certificate for kinesis-endpoint> doesn't match any of the subject alternative names: [kinesis-fips.us-east-1.amazonaws.com, *.kinesis.us-east-1.vpce.amazonaws.com, kinesis.us-east-1.amazonaws.com] at org.apache.http.conn.ssl.SSLConnectionSocketFactory.verifyHostname(SSLConnectionSocketFactory.java:467) at org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:397) at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355) at shade.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132) at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:373) at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at shade.com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76) at shade.com.amazonaws.http.conn.$Proxy18.connect(Unknown Source) at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381) at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237) at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56) at shade.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) at shade.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1238) at shade.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) ... 20 more Shashi
Spark Kinesis Checkpointing/Processing Delay
Hi! Sorry if this is a repost. I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil
Re: Spark Kinesis Checkpointing/Processing Delay
You are correct. The earlier Kinesis receiver (as of Spark 1.4) was not saving checkpoints correctly and was in general not reliable (even with WAL enabled). We have improved this in Spark 1.5 with updated Kinesis receiver, that keeps track of the Kinesis sequence numbers as part of the Spark Streaming's DStream checkpointing, and the KCL checkpoint is updated only after the sequence number has been written to the DStream checkpoints. This allows a recovered streaming program (that is, restart from checkpoint) to recover the sequence numbers from the checkpoint information and reprocessed the corresponding records (those which had not been successfully processed). This will give better guarantees. If you are interested to learn more, see the JIRA: https://issues.apache.org/jira/browse/SPARK-9215 Related to this, for your scenarios, you should be setting rate limits (spark.streaming.rateLimit) to prevent spark from receiving data faster that it can process. On Mon, Aug 10, 2015 at 4:40 PM, Phil Kallos phil.kal...@gmail.com wrote: Hi! Sorry if this is a repost. I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil
Re: Spark Kinesis Checkpointing/Processing Delay
Hi, I actually run into the same problem although our endpoint is not ElasticSearch. When the spark job is dead, we lose some data because Kinesis checkpoint is already beyond the last point that spark is processed. Currently, our workaround is to use spark's checkpoint mechanism with write ahead log (WAL) https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing https://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications Using checkpointing comes with some disadvantage like application code is not upgradable, etc. I believe there is some work to fix this problem like Kafka direct API. Not sure if this is it : https://issues.apache.org/jira/browse/SPARK-9215 Thanks, Patanachai On 08/06/2015 12:08 PM, phibit wrote: Hi! I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Patanachai - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Kinesis Checkpointing/Processing Delay
Hi! I'm using Spark + Kinesis ASL to process and persist stream data to ElasticSearch. For the most part it works nicely. There is a subtle issue I'm running into about how failures are handled. For example's sake, let's say I am processing a Kinesis stream that produces 400 records per second, continuously. Kinesis provides a 24hr buffer of data, and I'm setting my Kinesis DStream consumer to use TRIM_HORIZON, to mean go as far back as possible and start processing the stream data as quickly as possible, until you catch up to the tip of the stream. This means that for some period of time, Spark will suck in data from Kinesis as quickly as it can, let's say at 5000 records per second. In my specific case, ElasticSearch can gracefully handle 400 writes per second, but is NOT happy to process 5000 writes per second. Let's say it only handles 2000 wps. This means that the processing time will exceed the batch time, scheduling delay in the stream will rise consistently and batches of data will get backlogged for some period of time. In normal circumstances, this is fine. When the Spark consumers catch up to real-time, the data input rate slows to 400rps and the backlogged batches eventually get flushed to ES. The system stabilizes. However! It appears to me that the Kinesis consumer actively submits checkpoints, even though the records may not have been processed yet (since they are backlogged). If for some reason there is processing delay and the Spark process crashes, the checkpoint will have advanced too far. If I resume the Spark Streaming process, there is essentially a gap in my ElasticSearch data. In principle I understand the reason for this, but is there a way to adjust this behavior? Or is there another way to handle this specific problem? Ideally I would be able to configure the process to only submit Kinesis checkpoints only after my data is successfully written to ES. Thanks, Phil -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kinesis-Checkpointing-Processing-Delay-tp24157.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kinesis + Stream Name + Cache?
hey mike- as you pointed out here from my docs, changing the stream name is sometimes problematic due to the way the Kinesis Client Library manages leases and checkpoints, etc in DynamoDB. I noticed this directly while developing the Kinesis connector which is why I highlighted the issue here. is wiping out the DynamoDB table a suitable workaround for now? usually in production, you wouldn't be changing stream names often, so hopefully that's ok for dev. otherwise, can you share the relevant spark streaming logs that are generated when you do this? I saw a lot of lease not owned by this Kinesis Client type of errors, from what I remember. lemme know! -Chris On May 8, 2015, at 4:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: [Kinesis stream name]: The Kinesis stream that this streaming application receives from The application name used in the streaming context becomes the Kinesis application name The application name must be unique for a given account and region. The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization. Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table. On Fri, May 8, 2015 at 2:06 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I am submitting the assembled fat jar file by the command: bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class com.xxx.Consumer -0.1-SNAPSHOT.jar It reads the data file from kinesis using the stream name defined in a configuration file. It turns out that it reads the data perfectly fine for one stream name (i.e. the default), however, if I switch the stream name and re-submit the jar, it no longer reads the data. From CloudWatch, I can see that there is data put into the stream and spark is actually sending get requests as well. However, it doesn't seem to be outputting the data. Has anyone else encountered a similar issue? Does spark cache the stream name somewhere? I also have checkpointing enabled as well. Thanks, Mike.
Re: Spark + Kinesis + Stream Name + Cache?
Hey Chris! I was happy to see the documentation outlining that issue :-) However, I must have got into a pretty terrible state because I had to delete and recreate the kinesis streams as well as the DynamoDB tables. Thanks for the reply, everything is sorted. Mike On Fri, May 8, 2015 at 7:55 PM, Chris Fregly ch...@fregly.com wrote: hey mike- as you pointed out here from my docs, changing the stream name is sometimes problematic due to the way the Kinesis Client Library manages leases and checkpoints, etc in DynamoDB. I noticed this directly while developing the Kinesis connector which is why I highlighted the issue here. is wiping out the DynamoDB table a suitable workaround for now? usually in production, you wouldn't be changing stream names often, so hopefully that's ok for dev. otherwise, can you share the relevant spark streaming logs that are generated when you do this? I saw a lot of lease not owned by this Kinesis Client type of errors, from what I remember. lemme know! -Chris On May 8, 2015, at 4:36 PM, Mike Trienis mike.trie...@orcsol.com wrote: - [Kinesis stream name]: The Kinesis stream that this streaming application receives from - The application name used in the streaming context becomes the Kinesis application name - The application name must be unique for a given account and region. - The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization. - *Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table.* On Fri, May 8, 2015 at 2:06 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I am submitting the assembled fat jar file by the command: bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class com.xxx.Consumer -0.1-SNAPSHOT.jar It reads the data file from kinesis using the stream name defined in a configuration file. It turns out that it reads the data perfectly fine for one stream name (i.e. the default), however, if I switch the stream name and re-submit the jar, it no longer reads the data. From CloudWatch, I can see that there is data put into the stream and spark is actually sending get requests as well. However, it doesn't seem to be outputting the data. Has anyone else encountered a similar issue? Does spark cache the stream name somewhere? I also have checkpointing enabled as well. Thanks, Mike.
Re: Spark + Kinesis + Stream Name + Cache?
- [Kinesis stream name]: The Kinesis stream that this streaming application receives from - The application name used in the streaming context becomes the Kinesis application name - The application name must be unique for a given account and region. - The Kinesis backend automatically associates the application name to the Kinesis stream using a DynamoDB table (always in the us-east-1 region) created during Kinesis Client Library initialization. - *Changing the application name or stream name can lead to Kinesis errors in some cases. If you see errors, you may need to manually delete the DynamoDB table.* On Fri, May 8, 2015 at 2:06 PM, Mike Trienis mike.trie...@orcsol.com wrote: Hi All, I am submitting the assembled fat jar file by the command: bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class com.xxx.Consumer -0.1-SNAPSHOT.jar It reads the data file from kinesis using the stream name defined in a configuration file. It turns out that it reads the data perfectly fine for one stream name (i.e. the default), however, if I switch the stream name and re-submit the jar, it no longer reads the data. From CloudWatch, I can see that there is data put into the stream and spark is actually sending get requests as well. However, it doesn't seem to be outputting the data. Has anyone else encountered a similar issue? Does spark cache the stream name somewhere? I also have checkpointing enabled as well. Thanks, Mike.
Spark + Kinesis + Stream Name + Cache?
Hi All, I am submitting the assembled fat jar file by the command: bin/spark-submit --jars /spark-streaming-kinesis-asl_2.10-1.3.0.jar --class com.xxx.Consumer -0.1-SNAPSHOT.jar It reads the data file from kinesis using the stream name defined in a configuration file. It turns out that it reads the data perfectly fine for one stream name (i.e. the default), however, if I switch the stream name and re-submit the jar, it no longer reads the data. From CloudWatch, I can see that there is data put into the stream and spark is actually sending get requests as well. However, it doesn't seem to be outputting the data. Has anyone else encountered a similar issue? Does spark cache the stream name somewhere? I also have checkpointing enabled as well. Thanks, Mike.
Re: Spark + Kinesis
Hey y'all, While I haven't been able to get Spark + Kinesis integration working, I pivoted to plan B: I now push data to S3 where I set up a DStream to monitor an S3 bucket with textFileStream, and that works great. I 3 Spark! Best, Vadim ᐧ On Mon, Apr 6, 2015 at 12:23 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, I am wondering, has anyone on this list been able to successfully implement Spark on top of Kinesis? Best, Vadim On Sun, Apr 5, 2015 at 1:50 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, Below is the output that I am getting. My Kinesis stream has 1 shard, and my Spark cluster on EC2 has 2 slaves (I think that's fine?). I should mention that my Kinesis producer is written in Python where I followed the example http://blogs.aws.amazon.com/bigdata/post/Tx2Z24D4T99AN35/Snakes-in-the-Stream-Feeding-and-Eating-Amazon-Kinesis-Streams-with-Python I also wrote a Python consumer, again using the example at the above link, that works fine. But I am unable to display output from my Spark consumer. I'd appreciate any help. Thanks, Vadim --- Time: 142825409 ms --- 15/04/05 17:14:50 INFO scheduler.JobScheduler: Finished job streaming job 142825409 ms.0 from job set of time 142825409 ms 15/04/05 17:14:50 INFO scheduler.JobScheduler: Total delay: 0.099 s for time 142825409 ms (execution: 0.090 s) 15/04/05 17:14:50 INFO rdd.ShuffledRDD: Removing RDD 63 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 63 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 62 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 62 15/04/05 17:14:50 INFO rdd.MapPartitionsRDD: Removing RDD 61 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 61 15/04/05 17:14:50 INFO rdd.UnionRDD: Removing RDD 60 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 60 15/04/05 17:14:50 INFO rdd.BlockRDD: Removing RDD 59 from persistence list 15/04/05 17:14:50 INFO storage.BlockManager: Removing RDD 59 15/04/05 17:14:50 INFO dstream.PluggableInputDStream: Removing blocks of RDD BlockRDD[59] at createStream at MyConsumer.scala:56 of time 142825409 ms *** 15/04/05 17:14:50 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(142825407 ms) On Sat, Apr 4, 2015 at 3:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */ if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming
Re: Spark + Kinesis
, batchInterval)/* Kinesis checkpoint interval. Same as batchInterval for this example. */val kinesisCheckpointInterval = batchInterval/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)}/* Union all the streams */val unionStreams = ssc.union(kinesisStreams).map(byteArray = new String(byteArray))unionStreams.print()ssc.start() ssc.awaitTermination() }}* On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.htmlsi=5533377798602752pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9 and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scalasi=5533377798602752pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9 . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0
Re: Spark + Kinesis
number of Kinesis DStreams/Receivers as Kinesis stream's shards */val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)}/* Union all the streams */val unionStreams = ssc.union(kinesisStreams).map(byteArray = new String(byteArray))unionStreams.print()ssc.start() ssc.awaitTermination() }}* On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fstreaming-kinesis-integration.htmlsi=5533377798602752pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9 and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala http://t.signauxtrois.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nMJW7t5XZs653q_MN8rBNbzRbv22W8r4TLx56dCDWf13Gc8R02?t=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fextras%2Fkinesis-asl%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FKinesisWordCountASL.scalasi=5533377798602752pi=8898f7f0-ede6-4e6c-9bfd-00cf2101f0e9 . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark
Re: Spark + Kinesis
, StorageLevel.MEMORY_AND_DISK_2)}/* Union all the streams */val unionStreams = ssc.union(kinesisStreams).map(byteArray = new String(byteArray))unionStreams.print()ssc.start() ssc.awaitTermination() }}* On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* * assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim
Re: Spark + Kinesis
Hi all, More good news! I was able to utilize mergeStrategy to assembly my Kinesis consumer into an uber jar Here's what I added to* build.sbt:* *mergeStrategy in assembly = (mergeStrategy in assembly) { (old) =* * {* * case PathList(com, esotericsoftware, minlog, xs @ _*) = MergeStrategy.first* * case PathList(com, google, common, base, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, commons, xs @ _*) = MergeStrategy.last* * case PathList(org, apache, hadoop, xs @ _*) = MergeStrategy.first* * case PathList(org, apache, spark, unused, xs @ _*) = MergeStrategy.first* *case x = old(x)* * }* *}* Everything appears to be working fine. Right now my producer is pushing simple strings through Kinesis, which my consumer is trying to print (using Spark's print() method for now). However, instead of displaying my strings, I get the following: *15/04/04 18:57:32 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1428173848000 ms)* Any idea on what might be going on? Thanks, Vadim Here's my consumer code (adapted from the WordCount example): *private object MyConsumer extends Logging { def main(args: Array[String]) {/* Check that all required args were passed in. */if (args.length 2) { System.err.println( |Usage: KinesisWordCount stream-name endpoint-url |stream-name is the name of the Kinesis stream |endpoint-url is the endpoint of the Kinesis service | (e.g. https://kinesis.us-east-1.amazonaws.com https://kinesis.us-east-1.amazonaws.com).stripMargin) System.exit(1)}/* Populate the appropriate variables from the given args */val Array(streamName, endpointUrl) = args/* Determine the number of shards from the stream */val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain()) kinesisClient.setEndpoint(endpointUrl)val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards() .size()System.out.println(Num shards: + numShards)/* In this example, we're going to create 1 Kinesis Worker/Receiver/DStream for each shard. */val numStreams = numShards/* Setup the and SparkConfig and StreamingContext *//* Spark Streaming batch interval */val batchInterval = Milliseconds(2000)val sparkConfig = new SparkConf().setAppName(MyConsumer)val ssc = new StreamingContext(sparkConfig, batchInterval)/* Kinesis checkpoint interval. Same as batchInterval for this example. */val kinesisCheckpointInterval = batchInterval/* Create the same number of Kinesis DStreams/Receivers as Kinesis stream's shards */val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)}/* Union all the streams */val unionStreams = ssc.union(kinesisStreams).map(byteArray = new String(byteArray))unionStreams.print()ssc.start() ssc.awaitTermination() }}* ᐧ On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies
Re: Spark + Kinesis
spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.commailto:jonat...@amazon.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala. However when I try to spark-submit it I get the following exception: Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider Do I need to include KCL dependency in build.sbt, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=3d9e0d72-3cbe-4d6f-b262-829b92632515]ᐧ On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.commailto:jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify build.sbt: --- import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0) assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) In project/assembly.sbt I have only the following line: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0) I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim
Re: Spark + Kinesis
Remove provided and got the following error: [error] (*:assembly) deduplicate: different file contents found in the following: [error] /Users/vb/.ivy2/cache/com.esotericsoftware.kryo/kryo/bundles/kryo-2.21.jar:com/esotericsoftware/minlog/Log$Logger.class [error] /Users/vb/.ivy2/cache/com.esotericsoftware.minlog/minlog/jars/minlog-1.2.jar:com/esotericsoftware/minlog/Log$Logger.class ᐧ On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* * assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim
Re: Spark + Kinesis
Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* * assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim
Re: Spark + Kinesis
Just remove provided from the end of the line where you specify the spark-streaming-kinesis-asl dependency. That will cause that package and all of its transitive dependencies (including the KCL, the AWS Java SDK libraries and other transitive dependencies) to be included in your uber jar. They all must be in there because they are not part of the Spark distribution in your cluster. However, as I mentioned before, I think making this change might cause you to run into the same problems I spoke of in the thread I linked below (https://www.mail-archive.com/user@spark.apache.org/msg23891.html), and unfortunately I haven't solved that yet. ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:45 PM To: Jonathan Kelly jonat...@amazon.commailto:jonat...@amazon.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark + Kinesis Thanks. So how do I fix it? [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=51a86f6a-7130-4760-aab3-f4368d8176b9]ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.commailto:jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.commailto:jonat...@amazon.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala. However when I try to spark-submit it I get the following exception: Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider Do I need to include KCL dependency in build.sbt, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.commailto:jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify build.sbt: --- import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0
Re: Spark + Kinesis
Assembly settings have an option to exclude jars. You need something similar to: assemblyExcludedJars in assembly = (fullClasspath in assembly) map { cp = val excludes = Set( minlog-1.2.jar ) cp filter { jar = excludes(jar.data.getName) } } in your build file (may need to be refactored into a .scala file) On Fri, Apr 3, 2015 at 12:57 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Remove provided and got the following error: [error] (*:assembly) deduplicate: different file contents found in the following: [error] /Users/vb/.ivy2/cache/com.esotericsoftware.kryo/kryo/bundles/kryo-2.21.jar:com/esotericsoftware/minlog/Log$Logger.class [error] /Users/vb/.ivy2/cache/com.esotericsoftware.minlog/minlog/jars/minlog-1.2.jar:com/esotericsoftware/minlog/Log$Logger.class ᐧ On Fri, Apr 3, 2015 at 3:48 PM, Tathagata Das t...@databricks.com wrote: Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* * assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project
Re: Spark + Kinesis
Just remove provided for spark-streaming-kinesis-asl libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 On Fri, Apr 3, 2015 at 12:45 PM, Vadim Bichutskiy vadim.bichuts...@gmail.com wrote: Thanks. So how do I fix it? ᐧ On Fri, Apr 3, 2015 at 3:43 PM, Kelly, Jonathan jonat...@amazon.com wrote: spark-streaming-kinesis-asl is not part of the Spark distribution on your cluster, so you cannot have it be just a provided dependency. This is also why the KCL and its dependencies were not included in the assembly (but yes, they should be). ~ Jonathan Kelly From: Vadim Bichutskiy vadim.bichuts...@gmail.com Date: Friday, April 3, 2015 at 12:26 PM To: Jonathan Kelly jonat...@amazon.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: Spark + Kinesis Hi all, Good news! I was able to create a Kinesis consumer and assemble it into an uber jar following http://spark.apache.org/docs/latest/streaming-kinesis-integration.html and example https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala . However when I try to spark-submit it I get the following exception: *Exception in thread main java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider* Do I need to include KCL dependency in *build.sbt*, here's what it looks like currently: import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 % provided assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) Any help appreciated. Thanks, Vadim On Thu, Apr 2, 2015 at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* * assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim
Spark + Kinesis
Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify *build.sbt*: --- *import AssemblyKeys._* *name := Kinesis Consumer* *version := 1.0organization := com.myconsumerscalaVersion := 2.11.5libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided,org.apache.spark % spark-streaming_2.10 % 1.3.0org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0)* *assemblySettingsjarName in assembly := consumer-assembly.jarassemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false)* In *project/assembly.sbt* I have only the following line: *addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0)* I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim ᐧ
Re: Spark + Kinesis
It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify build.sbt: --- import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0) assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) In project/assembly.sbt I have only the following line: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0) I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=ccce8efd-9f9f-4140-8f31-28b661c06314]ᐧ
Re: Spark + Kinesis
Thanks Jonathan. Helpful. VB On Apr 2, 2015, at 1:15 PM, Kelly, Jonathan jonat...@amazon.com wrote: It looks like you're attempting to mix Scala versions, so that's going to cause some problems. If you really want to use Scala 2.11.5, you must also use Spark package versions built for Scala 2.11 rather than 2.10. Anyway, that's not quite the correct way to specify Scala dependencies in build.sbt. Instead of placing the Scala version after the artifactId (like spark-core_2.10), what you actually want is to use just spark-core with two percent signs before it. Using two percent signs will make it use the version of Scala that matches your declared scalaVersion. For example: libraryDependencies += org.apache.spark %% spark-core % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming % 1.3.0 % provided libraryDependencies += org.apache.spark %% spark-streaming-kinesis-asl % 1.3.0 I think that may get you a little closer, though I think you're probably going to run into the same problems I ran into in this thread: https://www.mail-archive.com/user@spark.apache.org/msg23891.html I never really got an answer for that, and I temporarily moved on to other things for now. ~ Jonathan Kelly From: 'Vadim Bichutskiy' vadim.bichuts...@gmail.com Date: Thursday, April 2, 2015 at 9:53 AM To: user@spark.apache.org user@spark.apache.org Subject: Spark + Kinesis Hi all, I am trying to write an Amazon Kinesis consumer Scala app that processes data in the Kinesis stream. Is this the correct way to specify build.sbt: --- import AssemblyKeys._ name := Kinesis Consumer version := 1.0 organization := com.myconsumer scalaVersion := 2.11.5 libraryDependencies ++= Seq(org.apache.spark % spark-core_2.10 % 1.3.0 % provided, org.apache.spark % spark-streaming_2.10 % 1.3.0 org.apache.spark % spark-streaming-kinesis-asl_2.10 % 1.3.0) assemblySettings jarName in assembly := consumer-assembly.jar assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala=false) In project/assembly.sbt I have only the following line: addSbtPlugin(com.eed3si9n % sbt-assembly % 0.13.0) I am using sbt 0.13.7. I adapted Example 7.7 in the Learning Spark book. Thanks, Vadim ᐧ