A possible problem may be that the kinesis stream in 1.3 uses the
SparkContext app name, as the Kinesis Application Name, that is used by the
Kinesis Client Library to save checkpoints in DynamoDB. Since both kinesis
DStreams are using the Kinesis application name (as they are in the same
StreamingContext / SparkContext / Spark app name), KCL may be doing weird
overwriting checkpoint information of both Kinesis streams into the same
DynamoDB table. Either ways, this is going to be fixed in Spark 1.4.

On Thu, May 14, 2015 at 4:10 PM, Chris Fregly <ch...@fregly.com> wrote:

> have you tried to union the 2 streams per the KinesisWordCountASL example
> <https://github.com/apache/spark/blob/branch-1.3/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L120>
>  where
> 2 streams (against the same Kinesis stream in this case) are created and
> union'd?
>
> it should work the same way - including union() of streams from totally
> different source types (kafka, kinesis, flume).
>
>
>
> On Thu, May 14, 2015 at 2:07 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> What is the error you are seeing?
>>
>> TD
>>
>> On Thu, May 14, 2015 at 9:00 AM, Erich Ess <er...@simplerelevance.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Is it possible to setup streams from multiple Kinesis streams and process
>>> them in a single job?  From what I have read, this should be possible,
>>> however, the Kinesis layer errors out whenever I try to receive from more
>>> than a single Kinesis Stream.
>>>
>>> Here is the code.  Currently, I am focused on just getting receivers
>>> setup
>>> and working for the two Kinesis Streams, as such, this code just
>>> attempts to
>>> print out the contents of both streams:
>>>
>>> implicit val formats = Serialization.formats(NoTypeHints)
>>>
>>> val conf = new SparkConf().setMaster("local[*]").setAppName("test")
>>> val ssc = new StreamingContext(conf, Seconds(1))
>>>
>>> val rawStream = KinesisUtils.createStream(ssc, "erich-test",
>>> "kinesis.us-east-1.amazonaws.com", Duration(1000),
>>> InitialPositionInStream.TRIM_HORIZON, StorageLevel.MEMORY_ONLY)
>>> rawStream.map(msg => new String(msg)).print
>>>
>>> val loaderStream = KinesisUtils.createStream(
>>>   ssc,
>>>   "dev-loader",
>>>   "kinesis.us-east-1.amazonaws.com",
>>>   Duration(1000),
>>>   InitialPositionInStream.TRIM_HORIZON,
>>>   StorageLevel.MEMORY_ONLY)
>>>
>>> val loader = loaderStream.map(msg => new String(msg)).print
>>>
>>> ssc.start()
>>>
>>> Thanks,
>>> -Erich
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-Kinesis-Streams-in-a-single-Streaming-job-tp22889.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>

Reply via email to