[ 
https://issues.apache.org/jira/browse/SPARK-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15189520#comment-15189520
 ] 

Iain Cundy commented on SPARK-2629:
-----------------------------------

I've discovered compaction works if I switch off Kryo.

I was using a workaround to get around mapWithState not supporting Kryo. My 
custom KryoRegistrator Java class has

// workaround until bug fixes in spark 1.6.1
kryo.register(OpenHashMapBasedStateMap.class);
kryo.register(EmptyStateMap.class);
kryo.register(MapWithStateRDDRecord.class);

which  certainly made the nullPointerException errors when checkpointing go 
away, but (inexplicably to me) doesn't allow compaction to work. 

I wonder whether the "proper" fix in 1.6.1 enables compaction? Has anybody seen 
compaction working with the patch? 

Cheers
Iain 

> Improved state management for Spark Streaming
> ---------------------------------------------
>
>                 Key: SPARK-2629
>                 URL: https://issues.apache.org/jira/browse/SPARK-2629
>             Project: Spark
>          Issue Type: Epic
>          Components: Streaming
>    Affects Versions: 0.9.2, 1.0.2, 1.2.2, 1.3.1, 1.4.1, 1.5.1
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>
>  Current updateStateByKey provides stateful processing in Spark Streaming. It 
> allows the user to maintain per-key state and manage that state using an 
> updateFunction. The updateFunction is called for each key, and it uses new 
> data and existing state of the key, to generate an updated state. However, 
> based on community feedback, we have learnt the following lessons.
> - Need for more optimized state management that does not scan every key
> - Need to make it easier to implement common use cases - (a) timeout of idle 
> data, (b) returning items other than state
> The high level idea that I am proposing is 
> - Introduce a new API -trackStateByKey- *mapWithState* that, allows the user 
> to update per-key state, and emit arbitrary records. The new API is necessary 
> as this will have significantly different semantics than the existing 
> updateStateByKey API. This API will have direct support for timeouts.
> - Internally, the system will keep the state data as a map/list within the 
> partitions of the state RDDs. The new data RDDs will be partitioned 
> appropriately, and for all the key-value data, it will lookup the map/list in 
> the state RDD partition and create a new list/map of updated state data. The 
> new state RDD partition will be created based on the update data and if 
> necessary, with old data. 
> Here is the detailed design doc (*outdated, to be updated*). Please take a 
> look and provide feedback as comments.
> https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to