[
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15795304#comment-15795304
]
Mitch Seymour commented on KAFKA-4113:
--------------------------------------
I believe I am experiencing the same issue that [~gfodor] describes above
(note: I'm using 0.10.0 client libs since we haven't upgraded our brokers yet).
If I stream a compacted source topic as a changelog stream using
KStreamBuilder#table, with an application id that doesn't have any consumer
metadata yet, then the state store does not get initialized as I would expect.
This results in failed joins in our application (as [~mjsax] also noted
above). I have been able to get around this issue by creating a dummy consumer
before starting the stream, and checking for null meta data using a
ConsumerRebalanceListener. If the meta data is null, I do a single poll on the
source topic and then exit. I added an example of this method to a public gist,
see here:
https://gist.github.com/mitch-seymour/3427ecd0a577cf26b67021e964d6be6c
I am not entirely sure if that approach is safe yet (please feel free to share
any feedback or concerns you may have). It seems to be working, but
occasionally I get the following error on start up.
Exception in thread "main"
org.apache.kafka.common.errors.InconsistentGroupProtocolException: The group
member's supported protocols are incompatible with those of existing members.
The error goes away after a restart, so I'm not sure what causes it. Anyways, I
am happy to help with this issue if needed.
> Allow KTable bootstrap
> ----------------------
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: Matthias J. Sax
> Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the
> data. Only after this topic got read completely and the KTable is ready, the
> application should start processing. This would indicate, that on startup,
> the current partition sizes must be fetched and stored, and after KTable got
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without
> reading any other topics until see one record with timestamp 1000.
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)