[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15447624#comment-15447624
 ] 

David Yan commented on APEXMALHAR-2130:
---------------------------------------

We are in the middle of the process of using [~timothyfarkas]'s spillable data 
structures to implement the state storage for the WindowedOperator. We need 
something that supports the equivalence of Map<Window, Map<K, V>> and 
Map<Window, V>, where Window is event-time based window, and recovery is based 
on Apex windows. 

There are some gaps in the current state from what we need, most notably:

1. Getting all keys given a window from Map<Window, Map<K, V>>
2. Getting all windows from Map<Window, V>
3. Deleting a window from Map<Window, Map<K, V>> and from Map<Window, V>
4. Deleting a key given a window from Map<Window, Map<K, V>>

Because of the above required features, implementing Map<Window, Map<K, V>> 
with a SpillableByteMap<Pair<Window, K>, V> in conjunction with a 
SpillableArrayListMultimap<Window, K> will not work.

We are considering the following:

To support 1 and 2:
- Add the support of getting all keys >= given key by taking advantage of the 
FileAccess.FileReader.seek() and next() method and expose the functionality in 
the Bucket interface.
- The seek() and next() need to take a timebucket. That means in order to 
support 1, we need to have the ability to derive the timebucket from the 
event-time window, and have SpillableByteMap to support user provided mapping 
from Key to time bucket. (If such mapping is provided, time bucket will not be 
assumed -1 any more). 
- To support 2, we also need to add functionality of getting the list of all 
timeBuckets.
- Since event time is arbitrary, unlike processing time, the actual key 
representing the timebucket cannot be assumed a natural sequence. However, 
TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that 
is sequential starting from 0. We want to make the actual timebucket key based 
on the actual event window timestamp. [~csingh] Will this break anything?

To support 3 and 4:

- We are thinking of a special valueSlice that denotes a deleted key. When a 
key is deleted, we just set the value to be the special valueSlice. The get 
methods will also handle it accordingly.
- Expiring and purging are done very differently and should be based on 3. 
Managed State should determine whether to purge a timebucket based on whether 
an Apex window is committed and whether all event windows that belong to that 
timebucket are marked "deleted" for that Apex window.

As you can see, going ahead with this will require some surgery on existing 
ManagedState and Spillable data structures.
This is based on my limited knowledge on Managed State so please pardon me and 
correct me if my statements don't make sense.

[~csingh] [~timothyfarkas] Please comment.


> implement scalable windowed storage
> -----------------------------------
>
>                 Key: APEXMALHAR-2130
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: bright chen
>            Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the 
> checkpointing window id.  This should be done incrementally (ManagedState) to 
> avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that 
> snapshot
> 4. When a window is committed, all windows with a lower ID should be purged 
> from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage 
> interfaces, and because of 2 and 3, we may want to add methods to the 
> WindowedStorage interface so that the implementation of WindowedOperator can 
> notify the storage of checkpointing, recovering and committing of a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to