[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15795304#comment-15795304
 ] 

Mitch Seymour commented on KAFKA-4113:
--------------------------------------

I believe I am experiencing the same issue that [~gfodor] describes above 
(note: I'm using 0.10.0 client libs since we haven't upgraded our brokers yet). 
If I stream a compacted source topic as a changelog stream using 
KStreamBuilder#table, with an application id that doesn't have any consumer 
metadata yet, then the state store does not get initialized as I would expect. 
This results in failed joins in our application  (as [~mjsax] also noted 
above). I have been able to get around this issue by creating a dummy consumer 
before starting the stream, and checking for null meta data using a 
ConsumerRebalanceListener. If the meta data is null, I do a single poll on the 
source topic and then exit. I added an example of this method to a public gist, 
see here:

https://gist.github.com/mitch-seymour/3427ecd0a577cf26b67021e964d6be6c

I am not entirely sure if that approach is safe yet (please feel free to share 
any feedback or concerns you may have). It seems to be working, but 
occasionally I get the following error on start up.

Exception in thread "main" 
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group 
member's supported protocols are incompatible with those of existing members.

The error goes away after a restart, so I'm not sure what causes it. Anyways, I 
am happy to help with this issue if needed.

> Allow KTable bootstrap
> ----------------------
>
>                 Key: KAFKA-4113
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4113
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to