[ https://issues.apache.org/jira/browse/APEXMALHAR-2244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15513770#comment-15513770 ]
Siyuan Hua commented on APEXMALHAR-2244: ---------------------------------------- Each spillable DS implementation use a SpillableStateStore to store things and we can make ManagedTimeUnifiedStateImpl implement the store as well and it can take some time extract function to get/calculate time and time buckets from each V/KV data. And the Store can be setup by the WindowedOperator, correct? > Optimize WindowedStorage and Spillable data structures for time series > ---------------------------------------------------------------------- > > Key: APEXMALHAR-2244 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2244 > Project: Apache Apex Malhar > Issue Type: Sub-task > Reporter: David Yan > Assignee: Siyuan Hua > > The spillable data structures currently does not make any assumption about > the key that is used in Managed State, and as a result, it uses > ManagedStateImpl to interface with Managed State and uses time buckets that > are based on the apex window id. But for WindowedStorage used by > WindowedOperator, the key to the storage is a window, which is event time > based. Using the default ManagedStateImpl would be very inefficient for event > time based keys, since it would write data that would belong to the same > window to different time buckets. > On a high level, the below summarizes roughly what needs to be done: > 1. a way to tell the spillable data structures to use the > ManagedTimeUnifiedStateImpl > 2. a way to tell the spillable data structures how to extract the timestamp > from the key. Note that in the case of WindowedOperator, the timestamp should > be the end timestamp of the window (beginTimeMillis + durationMillis), not > the begin timestamp. > 3. a way to tell the spillable data structures how to assign the time bucket > given that timestamp > 4. with point 3, the spillable implementations of WindowedStorage will need > to take a config parameter that says how much time (in millis) is each time > bucket > 5. only purge a time bucket when all keys that belong to that time bucket are > removed and the apex window id of the first window in which the keys are all > removed has been committed -- This message was sent by Atlassian JIRA (v6.3.4#6332)