[ 
https://issues.apache.org/jira/browse/KAFKA-4468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076472#comment-16076472
 ] 

Evgeny Veretennikov commented on KAFKA-4468:
--------------------------------------------

Let me show you such example:

{code:java}
final Serde<Windowed<String>> windowedSerde = new WrapperSerde(
    new WindowedSerializer<>(new StringSerializer()),
    new WindowedDeserializer<>(new StringDeserializer())
);
final String topic = "name";
final RocksDBStore<Windowed<String>, String> store = new RocksDBStore<>(topic, 
windowedSerde, Serdes.String());
final MockProcessorContext context = ...;
context.setRecordContext(...);
store.init(context, store);
store.put(new Windowed<>("key1", new TimeWindow(100, 123)), "value1");
store.put(new Windowed<>("key2", new TimeWindow(101, 456)), "value2");
final KeyValueIterator<Windowed<String>, String> all = store.all();
all.next(); // KeyValue([key1@100/9223372036854775807], value1)
all.next(); // KeyValue([key2@101/9223372036854775807], value2)
{code}

We are able to put in store two time windows with different window sizes. When 
we try to get them back from store, we get two windows with proper begins, but 
broken ends ({{Long.MAX_VALUE}}, as in {{WindowedDeserializer}}). So, we are 
unable to calculate window end without saving it in {{WindowSerializer}}.

Now it seems, that [~bbejeck] was actually correct about this:

{noformat}
Unless I'm missing something, this task implies we'll need to include the 
window_size (and forgo the 8 bytes per key storage savings) on serialization 
with WindowedSerializer. As after we've read it via the WindowedDeserializer we 
only have the key and the start timestamp and don't have access to the original 
window_size to do the calculation.
{noformat}

> Correctly calculate the window end timestamp after read from state stores
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4468
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4468
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>              Labels: architecture
>
> When storing the WindowedStore on the persistent KV store, we only use the 
> start timestamp of the window as part of the combo-key as (start-timestamp, 
> key). The reason that we do not add the end-timestamp as well is that we can 
> always calculate it from the start timestamp + window_length, and hence we 
> can save 8 bytes per key on the persistent KV store.
> However, after read it (via {{WindowedDeserializer}}) we do not set its end 
> timestamp correctly but just read it as an {{UnlimitedWindow}}. We should fix 
> this by calculating its end timestamp as mentioned above.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to