It seems like a work around. But I don't know how to get the database
connection from the working nodes.

Shixiong Zhu <zsxw...@gmail.com>于2015年9月24日周四 下午5:37写道:

> Could you write your update func like this?
>
>     val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])])
> => {
>       iterator.flatMap { case (key, values, stateOption) =>
>         if (values.isEmpty) {
>           // don't access database
>         } else {
>           // update to new state and save to database
>         }
>         // return new state
>       }
>     }
>
> and use this overload:
>
> def updateStateByKey[S: ClassTag](
>       updateFunc: (Seq[V], Option[S]) => Option[S],
>       partitioner: Partitioner
>     ): DStream[(K, S)]
>
> There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but
> doesn't have a doc now...
>
>
> Best Regards,
> Shixiong Zhu
>
> 2015-09-24 17:26 GMT+08:00 Bin Wang <wbi...@gmail.com>:
>
>> Data that are not updated should be saved earlier: while the data added
>> to the DStream at the first time, it should be considered as updated. So
>> save the same data again is a waste.
>>
>> What are the community is doing? Is there any doc or discussion that I
>> can look for? Thanks.
>>
>>
>>
>> Shixiong Zhu <zsxw...@gmail.com>于2015年9月24日周四 下午4:27写道:
>>
>>> For data that are not updated, where do you save? Or do you only want to
>>> avoid accessing database for those that are not updated?
>>>
>>> Besides,  the community is working on optimizing "updateStateBykey"'s
>>> performance. Hope it will be delivered soon.
>>>
>>> Best Regards,
>>> Shixiong Zhu
>>>
>>> 2015-09-24 13:45 GMT+08:00 Bin Wang <wbi...@gmail.com>:
>>>
>>>> I've read the source code and it seems to be impossible, but I'd like
>>>> to confirm it.
>>>>
>>>> It is a very useful feature. For example, I need to store the state of
>>>> DStream into my database, in order to recovery them from next redeploy. But
>>>> I only need to save the updated ones. Save all keys into database is a lot
>>>> of waste.
>>>>
>>>> Through the source code, I think it could be add easily: StateDStream
>>>> can get prevStateRDD so that it can make a diff. Is there any chance to add
>>>> this as an API of StateDStream? If so, I can work on this feature.
>>>>
>>>> If not possible, is there any work around or hack to do this by myself?
>>>>
>>>
>>>
>

Reply via email to