You can create connection like this:

    val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])])
=> {
      val dbConnection = create a db connection
      iterator.flatMap { case (key, values, stateOption) =>
        if (values.isEmpty) {
          // don't access database
        } else {
          // update to new state and save to database
        }
        // return new state
      }
      TaskContext.get().addTaskCompletionListener(_ => db.disconnect())
    }


Best Regards,
Shixiong Zhu

2015-09-24 17:42 GMT+08:00 Bin Wang <wbi...@gmail.com>:

> It seems like a work around. But I don't know how to get the database
> connection from the working nodes.
>
> Shixiong Zhu <zsxw...@gmail.com>于2015年9月24日周四 下午5:37写道:
>
>> Could you write your update func like this?
>>
>>     val updateFunc = (iterator: Iterator[(String, Seq[Int],
>> Option[Int])]) => {
>>       iterator.flatMap { case (key, values, stateOption) =>
>>         if (values.isEmpty) {
>>           // don't access database
>>         } else {
>>           // update to new state and save to database
>>         }
>>         // return new state
>>       }
>>     }
>>
>> and use this overload:
>>
>> def updateStateByKey[S: ClassTag](
>>       updateFunc: (Seq[V], Option[S]) => Option[S],
>>       partitioner: Partitioner
>>     ): DStream[(K, S)]
>>
>> There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but
>> doesn't have a doc now...
>>
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-09-24 17:26 GMT+08:00 Bin Wang <wbi...@gmail.com>:
>>
>>> Data that are not updated should be saved earlier: while the data added
>>> to the DStream at the first time, it should be considered as updated. So
>>> save the same data again is a waste.
>>>
>>> What are the community is doing? Is there any doc or discussion that I
>>> can look for? Thanks.
>>>
>>>
>>>
>>> Shixiong Zhu <zsxw...@gmail.com>于2015年9月24日周四 下午4:27写道:
>>>
>>>> For data that are not updated, where do you save? Or do you only want
>>>> to avoid accessing database for those that are not updated?
>>>>
>>>> Besides,  the community is working on optimizing "updateStateBykey"'s
>>>> performance. Hope it will be delivered soon.
>>>>
>>>> Best Regards,
>>>> Shixiong Zhu
>>>>
>>>> 2015-09-24 13:45 GMT+08:00 Bin Wang <wbi...@gmail.com>:
>>>>
>>>>> I've read the source code and it seems to be impossible, but I'd like
>>>>> to confirm it.
>>>>>
>>>>> It is a very useful feature. For example, I need to store the state of
>>>>> DStream into my database, in order to recovery them from next redeploy. 
>>>>> But
>>>>> I only need to save the updated ones. Save all keys into database is a lot
>>>>> of waste.
>>>>>
>>>>> Through the source code, I think it could be add easily: StateDStream
>>>>> can get prevStateRDD so that it can make a diff. Is there any chance to 
>>>>> add
>>>>> this as an API of StateDStream? If so, I can work on this feature.
>>>>>
>>>>> If not possible, is there any work around or hack to do this by myself?
>>>>>
>>>>
>>>>
>>

Reply via email to