You can create connection like this: val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { val dbConnection = create a db connection iterator.flatMap { case (key, values, stateOption) => if (values.isEmpty) { // don't access database } else { // update to new state and save to database } // return new state } TaskContext.get().addTaskCompletionListener(_ => db.disconnect()) }
Best Regards, Shixiong Zhu 2015-09-24 17:42 GMT+08:00 Bin Wang <wbi...@gmail.com>: > It seems like a work around. But I don't know how to get the database > connection from the working nodes. > > Shixiong Zhu <zsxw...@gmail.com>于2015年9月24日周四 下午5:37写道: > >> Could you write your update func like this? >> >> val updateFunc = (iterator: Iterator[(String, Seq[Int], >> Option[Int])]) => { >> iterator.flatMap { case (key, values, stateOption) => >> if (values.isEmpty) { >> // don't access database >> } else { >> // update to new state and save to database >> } >> // return new state >> } >> } >> >> and use this overload: >> >> def updateStateByKey[S: ClassTag]( >> updateFunc: (Seq[V], Option[S]) => Option[S], >> partitioner: Partitioner >> ): DStream[(K, S)] >> >> There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but >> doesn't have a doc now... >> >> >> Best Regards, >> Shixiong Zhu >> >> 2015-09-24 17:26 GMT+08:00 Bin Wang <wbi...@gmail.com>: >> >>> Data that are not updated should be saved earlier: while the data added >>> to the DStream at the first time, it should be considered as updated. So >>> save the same data again is a waste. >>> >>> What are the community is doing? Is there any doc or discussion that I >>> can look for? Thanks. >>> >>> >>> >>> Shixiong Zhu <zsxw...@gmail.com>于2015年9月24日周四 下午4:27写道: >>> >>>> For data that are not updated, where do you save? Or do you only want >>>> to avoid accessing database for those that are not updated? >>>> >>>> Besides, the community is working on optimizing "updateStateBykey"'s >>>> performance. Hope it will be delivered soon. >>>> >>>> Best Regards, >>>> Shixiong Zhu >>>> >>>> 2015-09-24 13:45 GMT+08:00 Bin Wang <wbi...@gmail.com>: >>>> >>>>> I've read the source code and it seems to be impossible, but I'd like >>>>> to confirm it. >>>>> >>>>> It is a very useful feature. For example, I need to store the state of >>>>> DStream into my database, in order to recovery them from next redeploy. >>>>> But >>>>> I only need to save the updated ones. Save all keys into database is a lot >>>>> of waste. >>>>> >>>>> Through the source code, I think it could be add easily: StateDStream >>>>> can get prevStateRDD so that it can make a diff. Is there any chance to >>>>> add >>>>> this as an API of StateDStream? If so, I can work on this feature. >>>>> >>>>> If not possible, is there any work around or hack to do this by myself? >>>>> >>>> >>>> >>