Hey Jun, Thanks for the review. Responses below:
50. Yes, that is right. I clarified this in the KIP. 51. Yes, updated the KIP to mention. 52. Yeah, this was a reference to a previous iteration. I've fixed it. 53. I changed the API to use an `Optional<Integer>` for the leader epoch and added a note about the default value. Does that seem reasonable? 54. We discussed this above, but could not find a great option. The options are to add a new API (e.g. positionAndEpoch) or to rely on the user to get the epoch from the fetched records. We were leaning toward the latter, but I admit it was not fully satisfying. In this case, Connect would need to track the last consumed offsets manually instead of relying on the consumer. We also considered adding a convenience method to ConsumerRecords to get the offset to commit for all fetched partitions. This makes the additional bookkeeping pretty minimal. What do you think? 55. I clarified in the KIP. I was mainly thinking of situations where a previously valid offset becomes out of range. 56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is and use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch APIs. I think Dong suggested this previously as well. 57. We could, but I'm not sure there's a strong reason to do so. I was thinking we would leave it around for convenience, but let me know if you think we should do otherwise. Thanks, Jason On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao <j...@confluent.io> wrote: > Hi, Jason, > > Thanks for the updated KIP. Well thought-through. Just a few minor comments > below. > > 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess > under the cover, it will make OffsetsForLeaderEpoch request to determine if > the seeked offset is still valid before fetching? If so, it will be useful > document this in the wiki. > > 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I > guess the consumer will also make OffsetsForLeaderEpoch request to > determine if the last consumed offset is still valid before fetching? If > so, it will be useful document this in the wiki. > > 52. "If the consumer seeks to the middle of the log, for example, then we > will use the sentinel value -1 and the leader will skip the epoch > validation. " Is this true? If the consumer seeks using seek(TopicPartition > partition, OffsetAndMetadata offset) and the seeked offset is valid, the > consumer can/should use the leaderEpoch in the cached metadata for > fetching? > > 53. OffsetAndMetadata. For backward compatibility, we need to support > constructing OffsetAndMetadata without providing leaderEpoch. Could we > define the default value of leaderEpoch if not provided and the semantics > of that (e.g., skipping the epoch validation)? > > 54. I saw the following code in WorkerSinkTask in Connect. It saves the > offset obtained through position(), which can be committed latter. Since > position() doesn't return the leaderEpoch, this can lead to committed > offset without leaderEpoch. Not sure how common this usage is, but what's > the recommendation for such users? > > private class HandleRebalance implements ConsumerRebalanceListener { > @Override > public void onPartitionsAssigned(Collection<TopicPartition> > partitions) { > log.debug("{} Partitions assigned {}", WorkerSinkTask.this, > partitions); > lastCommittedOffsets = new HashMap<>(); > currentOffsets = new HashMap<>(); > for (TopicPartition tp : partitions) { > long pos = consumer.position(tp); > lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos)); > > 55. "With this KIP, the only case in which this is possible is if the > consumer fetches from an offset earlier than the log start offset." Is that > true? I guess a user could seek to a large offset without providing > leaderEpoch, which can cause the offset to be larger than the log end > offset during fetch? > > 56. In the schema for OffsetForLeaderEpochRequest, LeaderEpoch seems to be > an existing field. Is LeaderEpochQuery the new field? The name is not very > intuitive. It will be useful to document its meaning. > > 57. Should we deprecate the following api? > void seek(TopicPartition partition, long offset); > > Thanks, > > Jun > > > On Fri, Aug 3, 2018 at 9:32 AM, Jason Gustafson <ja...@confluent.io> > wrote: > > > Hey All, > > > > I think I've addressed all pending review. If there is no additional > > feedback, I'll plan to start a vote thread next week. > > > > Thanks, > > Jason > > > > On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Jason, > > > > > > Thanks for your reply. I will comment below. > > > > > > Regarding 1, we probably can not simply rename both to `LeaderEpoch` > > > because we already have a LeaderEpoch field in OffsetsForLeaderEpoch. > > > > > > Regarding 5, I am not strong on this. I agree with the two benefits of > > > having two error codes: 1) not having to refresh metadata when consumer > > > sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log > for > > > debugging. Whether or not these two benefits are useful enough for one > > more > > > error code may be subjective. I will let you and others determine this. > > > > > > Regarding 6, yeah overloading seek() looks good to me. > > > > > > > > > Thanks, > > > Dong > > > > > > > > > On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson <ja...@confluent.io> > > > wrote: > > > > > > > Hey Dong, > > > > > > > > Thanks for the detailed review. Responses below: > > > > > > > > 1/2: Thanks for noticing the inconsistency. Would it be reasonable to > > > > simply call it LeaderEpoch for both APIs? > > > > > > > > 3: I agree it should be a map. I will update. > > > > > > > > 4: Fair point. I think we should always be able to identify an > offset. > > > > Let's remove the Optional for now and reconsider if we find an > > unhandled > > > > case during implementation. > > > > > > > > 5: Yeah, I was thinking about this. The two error codes could be > > handled > > > > similarly, so we might merge them. Mainly I was thinking that it will > > be > > > > useful for consumers/replicas to know whether they are ahead or > behind > > > the > > > > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need > > not > > > > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error, > it > > > > could just stop fetching and await the LeaderAndIsr request that it > is > > > > missing. It probably also makes debugging a little bit easier. I > guess > > > I'm > > > > a bit inclined to keep both error codes, but I'm open to > > reconsideration > > > if > > > > you feel strongly. Another point to consider is whether we should > > > continue > > > > using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected > > > fetch. > > > > The leader epoch would be different in this case so we could use one > of > > > the > > > > invalid epoch error codes instead since they contain more > information. > > > > > > > > 6: I agree the name is not ideal in that scenario. What if we > > overloaded > > > > `seek`? > > > > > > > > 7: Sure, I will mention this. > > > > > > > > > > > > Thanks, > > > > Jason > > > > > > > > On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hey Jason, > > > > > > > > > > Thanks for the update! I agree with the current proposal overall. I > > > have > > > > > some minor comments related to naming etc. > > > > > > > > > > 1) I am not strong and will just leave it here for discussion. > Would > > it > > > > be > > > > > better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for > > the > > > > new > > > > > field in the OffsetsForLeaderEpochRequest? The reason is that > > > > > "CurrentLeaderEpoch" may not necessarily be true current leader > epoch > > > if > > > > > the consumer has stale metadata. "ExpectedLeaderEpoch" shows that > > this > > > > > epoch is what consumer expects on the broker which may or may not > be > > > the > > > > > true value. > > > > > > > > > > 2) Currently we add the field "LeaderEpoch" to FetchRequest and the > > > field > > > > > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that > both > > > > > fields are compared with the leaderEpoch in the broker, would it be > > > > better > > > > > to give them the same name? > > > > > > > > > > 3) Currently LogTruncationException.truncationOffset() returns > > > > > Optional<OffsetAndMetadata> to user. Should it return > > > > > Optional<Map<TopicPartition, OffsetAndMetadata>> to handle the > > scenario > > > > > where leaderEpoch of multiple partitions are different from the > > > > leaderEpoch > > > > > in the broker? > > > > > > > > > > 4) Currently LogTruncationException.truncationOffset() returns an > > > > Optional > > > > > value. Could you explain a bit more when it will return > > > > Optional.empty()? I > > > > > am trying to understand whether it is simpler and reasonable to > > > > > replace Optional.empty() > > > > > with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1). > > > > > > > > > > 5) Do we also need to add a new retriable exception for error code > > > > > FENCED_LEADER_EPOCH? And do we need to define both > > FENCED_LEADER_EPOCH > > > > > and UNKNOWN_LEADER_EPOCH. > > > > > It seems that the current KIP uses these two error codes in the > same > > > way > > > > > and the exception for these two error codes is not exposed to the > > user. > > > > > Maybe we should combine them into one error, e.g. > > INVALID_LEADER_EPOCH? > > > > > > > > > > 6) For users who has turned off auto offset reset, when > > consumer.poll() > > > > > throw LogTruncationException, it seems that user will most likely > > call > > > > > seekToCommitted(offset, > > > > > leaderEpoch) where offset and leaderEpoch are obtained from > > > > > LogTruncationException.truncationOffset(). In this case, the > offset > > > used > > > > > here is not committed, which is inconsistent from the method name > > > > > seekToCommitted(...). Would it be better to rename the method to > e.g. > > > > > seekToLastConsumedMessage()? > > > > > > > > > > 7) Per point 3 in Jun's comment, would it be useful to explicitly > > > specify > > > > > in the KIP that we will log the truncation event if user has turned > > on > > > > auto > > > > > offset reset policy? > > > > > > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > > > > > > On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson < > > ja...@confluent.io> > > > > > wrote: > > > > > > > > > > > Thanks Anna, you are right on both points. I updated the KIP. > > > > > > > > > > > > -Jason > > > > > > > > > > > > On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner <a...@confluent.io > > > > > > wrote: > > > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > > > Thanks for the update. I agree with the current proposal. > > > > > > > > > > > > > > Two minor comments: > > > > > > > 1) In “API Changes” section, first paragraph says that “users > can > > > > catch > > > > > > the > > > > > > > more specific exception type and use the new `seekToNearest()` > > API > > > > > > defined > > > > > > > below.”. Since LogTruncationException “will include the > > partitions > > > > that > > > > > > > were truncated and the offset of divergence”., shouldn’t the > > client > > > > use > > > > > > > seek(offset) to seek to the offset of divergence in response to > > the > > > > > > > exception? > > > > > > > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch > > subsection > > > > says > > > > > > > “Note > > > > > > > that consumers will send a sentinel value (-1) for the current > > > epoch > > > > > and > > > > > > > the broker will simply disregard that validation.”. Is that > still > > > > true > > > > > > with > > > > > > > MetadataResponse containing leader epoch? > > > > > > > > > > > > > > Thanks, > > > > > > > Anna > > > > > > > > > > > > > > On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson < > > > ja...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi All, > > > > > > > > > > > > > > > > I have made some updates to the KIP. As many of you know, a > > side > > > > > > project > > > > > > > of > > > > > > > > mine has been specifying the Kafka replication protocol in > TLA. > > > You > > > > > can > > > > > > > > check out the code here if you are interested: > > > > > > > > https://github.com/hachikuji/kafka-specification. In > addition > > to > > > > > > > > uncovering > > > > > > > > a couple unknown bugs in the replication protocol (e.g. > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7128), this has > > > helped > > > > > me > > > > > > > > validate the behavior in this KIP. In fact, the original > > version > > > I > > > > > > > proposed > > > > > > > > had a weakness. I initially suggested letting the leader > > validate > > > > the > > > > > > > > expected epoch at the fetch offset. This made sense for the > > > > consumer > > > > > in > > > > > > > the > > > > > > > > handling of unclean leader election, but it was not strong > > enough > > > > to > > > > > > > > protect the follower in all cases. In order to make > advancement > > > of > > > > > the > > > > > > > high > > > > > > > > watermark safe, for example, the leader actually needs to be > > sure > > > > > that > > > > > > > > every follower in the ISR matches its own epoch. > > > > > > > > > > > > > > > > I attempted to fix this problem by treating the epoch in the > > > fetch > > > > > > > request > > > > > > > > slightly differently for consumers and followers. For > > consumers, > > > it > > > > > > would > > > > > > > > be the expected epoch of the record at the fetch offset, and > > the > > > > > leader > > > > > > > > would raise a LOG_TRUNCATION error if the expectation failed. > > For > > > > > > > > followers, it would be the current epoch and the leader would > > > > require > > > > > > > that > > > > > > > > it match its own epoch. This was unsatisfying both because of > > the > > > > > > > > inconsistency in behavior and because the consumer was left > > with > > > > the > > > > > > > weaker > > > > > > > > fencing that we already knew was insufficient for the > replicas. > > > > > > > Ultimately > > > > > > > > I decided that we should make the behavior consistent and > that > > > > meant > > > > > > that > > > > > > > > the consumer needed to act more like a following replica. > > Instead > > > > of > > > > > > > > checking for truncation while fetching, the consumer should > > check > > > > for > > > > > > > > truncation after leader changes. After checking for > truncation, > > > the > > > > > > > > consumer can then use the current epoch when fetching and get > > the > > > > > > > stronger > > > > > > > > protection that it provides. What this means is that the > > Metadata > > > > API > > > > > > > must > > > > > > > > include the current leader epoch. Given the problems we have > > had > > > > > around > > > > > > > > stale metadata and how challenging they have been to debug, > I'm > > > > > > convinced > > > > > > > > that this is a good idea in any case and it resolves the > > > > inconsistent > > > > > > > > behavior in the Fetch API. The downside is that there will be > > > some > > > > > > > > additional overhead upon leader changes, but I don't think it > > is > > > a > > > > > > major > > > > > > > > concern since leader changes are rare and the > > > OffsetForLeaderEpoch > > > > > > > request > > > > > > > > is cheap. > > > > > > > > > > > > > > > > This approach leaves the door open for some interesting > follow > > up > > > > > > > > improvements. For example, now that we have the leader epoch > in > > > the > > > > > > > > Metadata request, we can implement similar fencing for the > > > Produce > > > > > API. > > > > > > > And > > > > > > > > now that the consumer can reason about truncation, we could > > > > consider > > > > > > > having > > > > > > > > a configuration to expose records beyond the high watermark. > > This > > > > > would > > > > > > > let > > > > > > > > users trade lower end-to-end latency for weaker durability > > > > semantics. > > > > > > It > > > > > > > is > > > > > > > > sort of like having an acks=0 option for the consumer. > Neither > > of > > > > > these > > > > > > > > options are included in this KIP, I am just mentioning them > as > > > > > > potential > > > > > > > > work for the future. > > > > > > > > > > > > > > > > Finally, based on the discussion in this thread, I have added > > the > > > > > > > > seekToCommitted API for the consumer. Please take a look and > > let > > > me > > > > > > know > > > > > > > > what you think. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Jason > > > > > > > > > > > > > > > > On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang < > > > wangg...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > > > > > > > The proposed API seems reasonable to me too. Could you > please > > > > also > > > > > > > update > > > > > > > > > the wiki page ( > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > > > > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation) > > > > > > > > > with a section say "workflow" on how the proposed API will > be > > > > > co-used > > > > > > > > with > > > > > > > > > others to: > > > > > > > > > > > > > > > > > > 1. consumer callers handling a LogTruncationException. > > > > > > > > > 2. consumer internals for handling a retriable > > > > > > > > UnknownLeaderEpochException. > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner < > > > > a...@confluent.io> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I also like your proposal and agree that > > > > > > > > KafkaConsumer#seekToCommitted() > > > > > > > > > > is > > > > > > > > > > more intuitive as a way to initialize both consumer's > > > position > > > > > and > > > > > > > its > > > > > > > > > > fetch state. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > My understanding that KafkaConsumer#seekToCommitted() is > > > > purely > > > > > > for > > > > > > > > > > clients > > > > > > > > > > who store their offsets externally, right? And we are > still > > > > going > > > > > > to > > > > > > > > > > add KafkaConsumer#findOffsets() > > > > > > > > > > in this KIP as we discussed, so that the client can > handle > > > > > > > > > > LogTruncationException? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Anna > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin < > > > lindon...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hey Jason, > > > > > > > > > > > > > > > > > > > > > > It is a great summary. The solution sounds good. I > might > > > have > > > > > > minor > > > > > > > > > > > comments regarding the method name. But we can discuss > > that > > > > > minor > > > > > > > > > points > > > > > > > > > > > later after we reach consensus on the high level API. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson < > > > > > > > > ja...@confluent.io> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hey Anna and Dong, > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for the great discussion. I've been > > hanging > > > > > back a > > > > > > > bit > > > > > > > > > > > because > > > > > > > > > > > > honestly the best option hasn't seemed clear. I agree > > > with > > > > > > Anna's > > > > > > > > > > general > > > > > > > > > > > > observation that there is a distinction between the > > > > position > > > > > of > > > > > > > the > > > > > > > > > > > > consumer and its fetch state up to that position. If > > you > > > > > think > > > > > > > > about > > > > > > > > > > it, > > > > > > > > > > > a > > > > > > > > > > > > committed offset actually represents both of these. > The > > > > > > metadata > > > > > > > is > > > > > > > > > > used > > > > > > > > > > > to > > > > > > > > > > > > initialize the state of the consumer application and > > the > > > > > offset > > > > > > > > > > > initializes > > > > > > > > > > > > the position. Additionally, we are extending the > offset > > > > > commit > > > > > > in > > > > > > > > > this > > > > > > > > > > > KIP > > > > > > > > > > > > to also include the last epoch fetched by the > consumer, > > > > which > > > > > > is > > > > > > > > used > > > > > > > > > > to > > > > > > > > > > > > initialize the internal fetch state. Of course if you > > do > > > an > > > > > > > > arbitrary > > > > > > > > > > > > `seek` and immediately commit offsets, then there > won't > > > be > > > > a > > > > > > last > > > > > > > > > epoch > > > > > > > > > > > to > > > > > > > > > > > > commit. This seems intuitive since there is no fetch > > > state > > > > in > > > > > > > this > > > > > > > > > > case. > > > > > > > > > > > We > > > > > > > > > > > > only commit fetch state when we have it. > > > > > > > > > > > > > > > > > > > > > > > > So if we think about a committed offset as > initializing > > > > both > > > > > > the > > > > > > > > > > > consumer's > > > > > > > > > > > > position and its fetch state, then the gap in the API > > is > > > > > > > evidently > > > > > > > > > that > > > > > > > > > > > we > > > > > > > > > > > > don't have a way to initialize the consumer to a > > > committed > > > > > > > offset. > > > > > > > > We > > > > > > > > > > do > > > > > > > > > > > it > > > > > > > > > > > > implicitly of course for offsets stored in Kafka, but > > > since > > > > > > > > external > > > > > > > > > > > > storage is a use case we support, then we should have > > an > > > > > > explicit > > > > > > > > API > > > > > > > > > > as > > > > > > > > > > > > well. Perhaps something like this: > > > > > > > > > > > > > > > > > > > > > > > > seekToCommitted(TopicPartition, OffsetAndMetadata) > > > > > > > > > > > > > > > > > > > > > > > > In this KIP, we are proposing to allow the > > > > > `OffsetAndMetadata` > > > > > > > > object > > > > > > > > > > to > > > > > > > > > > > > include the leader epoch, so I think this would have > > the > > > > same > > > > > > > > effect > > > > > > > > > as > > > > > > > > > > > > Anna's suggested `seekToRecord`. But perhaps it is a > > more > > > > > > natural > > > > > > > > fit > > > > > > > > > > > given > > > > > > > > > > > > the current API? Furthermore, if we find a need for > > > > > additional > > > > > > > > > metadata > > > > > > > > > > > in > > > > > > > > > > > > the offset commit API in the future, then we will > just > > > need > > > > > to > > > > > > > > modify > > > > > > > > > > the > > > > > > > > > > > > `OffsetAndMetadata` object and we will not need a new > > > > `seek` > > > > > > API. > > > > > > > > > > > > > > > > > > > > > > > > With this approach, I think then we can leave the > > > > `position` > > > > > > API > > > > > > > as > > > > > > > > > it > > > > > > > > > > > is. > > > > > > > > > > > > The position of the consumer is still just the next > > > > expected > > > > > > > fetch > > > > > > > > > > > offset. > > > > > > > > > > > > If a user needs to record additional state based on > > > > previous > > > > > > > fetch > > > > > > > > > > > > progress, then they would use the result of the > > previous > > > > > fetch > > > > > > to > > > > > > > > > > obtain > > > > > > > > > > > > it. This makes the dependence on fetch progress > > > explicit. I > > > > > > think > > > > > > > > we > > > > > > > > > > > could > > > > > > > > > > > > make this a little more convenience with a helper in > > the > > > > > > > > > > > `ConsumerRecords` > > > > > > > > > > > > object, but I think that's more of a nice-to-have. > > > > > > > > > > > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > > > > > > > > > By the way, I have been iterating a little bit on the > > > > replica > > > > > > > side > > > > > > > > of > > > > > > > > > > > this > > > > > > > > > > > > KIP. My initial proposal in fact did not have strong > > > enough > > > > > > > fencing > > > > > > > > > to > > > > > > > > > > > > protect all of the edge cases. I believe the current > > > > proposal > > > > > > > fixes > > > > > > > > > the > > > > > > > > > > > > problems, but I am still verifying the model. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin < > > > > > > lindon...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hey Anna, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks much for the explanation. Approach 1 also > > sounds > > > > > good > > > > > > to > > > > > > > > > me. I > > > > > > > > > > > > think > > > > > > > > > > > > > findOffsets() is useful for users who don't use > > > automatic > > > > > > > offset > > > > > > > > > > reset > > > > > > > > > > > > > policy. > > > > > > > > > > > > > > > > > > > > > > > > > > Just one more question. Since users who store > offsets > > > > > > > externally > > > > > > > > > need > > > > > > > > > > > to > > > > > > > > > > > > > provide leaderEpoch to findOffsets(...), do we need > > an > > > > > extra > > > > > > > API > > > > > > > > > for > > > > > > > > > > > user > > > > > > > > > > > > > to get both offset and leaderEpoch, e.g. > > > > recordPosition()? > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner < > > > > > > > > a...@confluent.io> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > What I called “not covering all use cases” is > what > > > you > > > > > call > > > > > > > > > > > best-effort > > > > > > > > > > > > > > (not guaranteeing some corner cases). I think we > > are > > > on > > > > > the > > > > > > > > same > > > > > > > > > > page > > > > > > > > > > > > > here. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I wanted to be clear in the API whether the > > consumer > > > > > seeks > > > > > > > to a > > > > > > > > > > > > position > > > > > > > > > > > > > > (offset) or to a record (offset, leader epoch). > The > > > > only > > > > > > > > use-case > > > > > > > > > > of > > > > > > > > > > > > > > seeking to a record is seeking to a committed > > offset > > > > for > > > > > a > > > > > > > user > > > > > > > > > who > > > > > > > > > > > > > stores > > > > > > > > > > > > > > committed offsets externally. (Unless users find > > some > > > > > other > > > > > > > > > reason > > > > > > > > > > to > > > > > > > > > > > > > seek > > > > > > > > > > > > > > to a record.) I thought it was possible to > provide > > > this > > > > > > > > > > functionality > > > > > > > > > > > > > with > > > > > > > > > > > > > > findOffset(offset, leader epoch) followed by a > > > > > > seek(offset). > > > > > > > > > > However, > > > > > > > > > > > > you > > > > > > > > > > > > > > are right that this will not handle the race > > > condition > > > > > > where > > > > > > > > > > > > > non-divergent > > > > > > > > > > > > > > offset found by findOffset() could change again > > > before > > > > > the > > > > > > > > > consumer > > > > > > > > > > > > does > > > > > > > > > > > > > > the first fetch. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regarding position() — if we add position that > > > returns > > > > > > > (offset, > > > > > > > > > > > leader > > > > > > > > > > > > > > epoch), this is specifically a position after a > > > record > > > > > that > > > > > > > was > > > > > > > > > > > > actually > > > > > > > > > > > > > > consumed or position of a committed record. In > > which > > > > > case, > > > > > > I > > > > > > > > > still > > > > > > > > > > > > think > > > > > > > > > > > > > > it’s cleaner to get a record position of consumed > > > > message > > > > > > > from > > > > > > > > a > > > > > > > > > > new > > > > > > > > > > > > > helper > > > > > > > > > > > > > > method in ConsumerRecords() or from committed > > > offsets. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think all the use-cases could be then covered > > with: > > > > > > > > > > > > > > > > > > > > > > > > > > > > (Approach 1) > > > > > > > > > > > > > > > > > > > > > > > > > > > > seekToRecord(offset, leaderEpoch) — this will > just > > > > > > > > initialize/set > > > > > > > > > > the > > > > > > > > > > > > > > consumer state; > > > > > > > > > > > > > > > > > > > > > > > > > > > > findOffsets(offset, leaderEpoch) returns {offset, > > > > > > > leaderEpoch} > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If we agree that the race condition is also a > > corner > > > > > case, > > > > > > > > then I > > > > > > > > > > > think > > > > > > > > > > > > > we > > > > > > > > > > > > > > can cover use-cases with: > > > > > > > > > > > > > > > > > > > > > > > > > > > > (Approach 2) > > > > > > > > > > > > > > > > > > > > > > > > > > > > findOffsets(offset, leaderEpoch) returns offset — > > we > > > > > still > > > > > > > want > > > > > > > > > > > leader > > > > > > > > > > > > > > epoch as a parameter for the users who store > their > > > > > > committed > > > > > > > > > > offsets > > > > > > > > > > > > > > externally. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I am actually now leaning more to approach 1, > since > > > it > > > > is > > > > > > > more > > > > > > > > > > > > explicit, > > > > > > > > > > > > > > and maybe there are more use cases for it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Anna > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin < > > > > > > > lindon...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Anna, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the comment. To answer your > question, > > it > > > > > seems > > > > > > > > that > > > > > > > > > we > > > > > > > > > > > can > > > > > > > > > > > > > > cover > > > > > > > > > > > > > > > all case in this KIP. As stated in "Consumer > > > > Handling" > > > > > > > > section, > > > > > > > > > > > > KIP-101 > > > > > > > > > > > > > > > based approach will be used to derive the > > > truncation > > > > > > offset > > > > > > > > > from > > > > > > > > > > > the > > > > > > > > > > > > > > > 2-tuple (offset, leaderEpoch). This approach is > > > best > > > > > > effort > > > > > > > > and > > > > > > > > > > it > > > > > > > > > > > is > > > > > > > > > > > > > > > inaccurate only in very rare scenarios (as > > > described > > > > in > > > > > > > > > KIP-279). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > By using seek(offset, leaderEpoch), consumer > will > > > > still > > > > > > be > > > > > > > > able > > > > > > > > > > to > > > > > > > > > > > > > follow > > > > > > > > > > > > > > > this best-effort approach to detect log > > truncation > > > > and > > > > > > > > > determine > > > > > > > > > > > the > > > > > > > > > > > > > > > truncation offset. On the other hand, if we use > > > > > > > seek(offset), > > > > > > > > > > > > consumer > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > not detect log truncation in some cases which > > > weakens > > > > > the > > > > > > > > > > guarantee > > > > > > > > > > > > of > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > KIP. Does this make sense? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner < > > > > > > > > > a...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Sorry, I hit "send" before finishing. > > > Continuing... > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) Hiding most of the consumer handling log > > > > > truncation > > > > > > > > logic > > > > > > > > > > with > > > > > > > > > > > > > > minimal > > > > > > > > > > > > > > > > exposure in KafkaConsumer API. I was > proposing > > > > this > > > > > > > path. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Before answering your specific questions… I > > want > > > to > > > > > > > answer > > > > > > > > to > > > > > > > > > > > your > > > > > > > > > > > > > > > comment > > > > > > > > > > > > > > > > “In general, maybe we should discuss the > final > > > > > solution > > > > > > > > that > > > > > > > > > > > covers > > > > > > > > > > > > > all > > > > > > > > > > > > > > > > cases?”. With current KIP, we don’t cover all > > > cases > > > > > of > > > > > > > > > consumer > > > > > > > > > > > > > > detecting > > > > > > > > > > > > > > > > log truncation because the KIP proposes a > > leader > > > > > epoch > > > > > > > > cache > > > > > > > > > in > > > > > > > > > > > > > > consumer > > > > > > > > > > > > > > > > that does not persist across restarts. Plus, > we > > > > only > > > > > > > store > > > > > > > > > last > > > > > > > > > > > > > > committed > > > > > > > > > > > > > > > > offset (either internally or users can store > > > > > > externally). > > > > > > > > > This > > > > > > > > > > > has > > > > > > > > > > > > a > > > > > > > > > > > > > > > > limitation that the consumer will not always > be > > > > able > > > > > to > > > > > > > > find > > > > > > > > > > > point > > > > > > > > > > > > of > > > > > > > > > > > > > > > > truncation just because we have a limited > > history > > > > > (just > > > > > > > one > > > > > > > > > > data > > > > > > > > > > > > > > point). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > So, maybe we should first agree on whether we > > > > accept > > > > > > that > > > > > > > > > > storing > > > > > > > > > > > > > last > > > > > > > > > > > > > > > > committed offset/leader epoch has a > limitation > > > that > > > > > the > > > > > > > > > > consumer > > > > > > > > > > > > will > > > > > > > > > > > > > > not > > > > > > > > > > > > > > > > be able to detect log truncation in all > cases? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Anna > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner > < > > > > > > > > > > a...@confluent.io> > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Dong, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the follow up! I finally have > much > > > > more > > > > > > > clear > > > > > > > > > > > > > > understanding > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > where you are coming from. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > You are right. The success of > > > > > findOffsets()/finding a > > > > > > > > point > > > > > > > > > > of > > > > > > > > > > > > > > > > > non-divergence depends on whether we have > > > enough > > > > > > > entries > > > > > > > > in > > > > > > > > > > the > > > > > > > > > > > > > > > > consumer's > > > > > > > > > > > > > > > > > leader epoch cache. However, I think this > is > > a > > > > > > > > fundamental > > > > > > > > > > > > > limitation > > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > having a leader epoch cache that does not > > > persist > > > > > > > across > > > > > > > > > > > consumer > > > > > > > > > > > > > > > > restarts. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > If we consider the general case where > > consumer > > > > may > > > > > or > > > > > > > may > > > > > > > > > not > > > > > > > > > > > > have > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > > cache, then I see two paths: > > > > > > > > > > > > > > > > > 1) Letting the user to track the leader > epoch > > > > > history > > > > > > > > > > > externally, > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > > more exposure to leader epoch and finding > > point > > > > of > > > > > > > > > > > non-divergence > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > KafkaConsumer API. I understand this is the > > > case > > > > > you > > > > > > > were > > > > > > > > > > > talking > > > > > > > > > > > > > > > about. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin < > > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> Hey Anna, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> Thanks much for your detailed explanation > > and > > > > > > example! > > > > > > > > It > > > > > > > > > > does > > > > > > > > > > > > > help > > > > > > > > > > > > > > me > > > > > > > > > > > > > > > > >> understand the difference between our > > > > > understanding. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> So it seems that the solution based on > > > > > findOffsets() > > > > > > > > > > currently > > > > > > > > > > > > > > focuses > > > > > > > > > > > > > > > > >> mainly on the scenario that consumer has > > > cached > > > > > > > > > leaderEpoch > > > > > > > > > > -> > > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > >> mapping whereas I was thinking about the > > > general > > > > > > case > > > > > > > > > where > > > > > > > > > > > > > consumer > > > > > > > > > > > > > > > may > > > > > > > > > > > > > > > > >> or > > > > > > > > > > > > > > > > >> may not have this cache. I guess that is > why > > > we > > > > > have > > > > > > > > > > different > > > > > > > > > > > > > > > > >> understanding here. I have some comments > > > below. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> 3) The proposed solution using > > > > findOffsets(offset, > > > > > > > > > > > leaderEpoch) > > > > > > > > > > > > > > > followed > > > > > > > > > > > > > > > > >> by > > > > > > > > > > > > > > > > >> seek(offset) works if consumer has the > > cached > > > > > > > > leaderEpoch > > > > > > > > > -> > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > >> mapping. But if we assume consumer has > this > > > > cache, > > > > > > do > > > > > > > we > > > > > > > > > > need > > > > > > > > > > > to > > > > > > > > > > > > > > have > > > > > > > > > > > > > > > > >> leaderEpoch in the findOffsets(...)? > > > > Intuitively, > > > > > > the > > > > > > > > > > > > > > > > findOffsets(offset) > > > > > > > > > > > > > > > > >> can also derive the leaderEpoch using > offset > > > > just > > > > > > like > > > > > > > > the > > > > > > > > > > > > > proposed > > > > > > > > > > > > > > > > >> solution does with seek(offset). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> 4) If consumer does not have cached > > > leaderEpoch > > > > -> > > > > > > > > offset > > > > > > > > > > > > mapping, > > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > > >> is > > > > > > > > > > > > > > > > >> the case if consumer is restarted on a new > > > > > machine, > > > > > > > then > > > > > > > > > it > > > > > > > > > > is > > > > > > > > > > > > not > > > > > > > > > > > > > > > clear > > > > > > > > > > > > > > > > >> what leaderEpoch would be included in the > > > > > > FetchRequest > > > > > > > > if > > > > > > > > > > > > consumer > > > > > > > > > > > > > > > does > > > > > > > > > > > > > > > > >> seek(offset). This is the case that > > motivates > > > > the > > > > > > > first > > > > > > > > > > > question > > > > > > > > > > > > > of > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> previous email. In general, maybe we > should > > > > > discuss > > > > > > > the > > > > > > > > > > final > > > > > > > > > > > > > > solution > > > > > > > > > > > > > > > > >> that > > > > > > > > > > > > > > > > >> covers all cases? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> 5) The second question in my previous > email > > is > > > > > > related > > > > > > > > to > > > > > > > > > > the > > > > > > > > > > > > > > > following > > > > > > > > > > > > > > > > >> paragraph: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> "... In some cases, offsets returned from > > > > > position() > > > > > > > > could > > > > > > > > > > be > > > > > > > > > > > > > actual > > > > > > > > > > > > > > > > >> consumed messages by this consumer > > identified > > > by > > > > > > > > {offset, > > > > > > > > > > > leader > > > > > > > > > > > > > > > epoch}. > > > > > > > > > > > > > > > > >> In > > > > > > > > > > > > > > > > >> other cases, position() returns offset > that > > > was > > > > > not > > > > > > > > > actually > > > > > > > > > > > > > > consumed. > > > > > > > > > > > > > > > > >> Suppose, the user calls position() for the > > > last > > > > > > > > > offset...". > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> I guess my point is that, if user calls > > > > position() > > > > > > for > > > > > > > > the > > > > > > > > > > > last > > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> uses that offset in seek(...), then user > can > > > > > > probably > > > > > > > > just > > > > > > > > > > > call > > > > > > > > > > > > > > > > >> Consumer#seekToEnd() without calling > > > position() > > > > > and > > > > > > > > > > seek(...). > > > > > > > > > > > > > > > Similarly > > > > > > > > > > > > > > > > >> user can call Consumer#seekToBeginning() > to > > > the > > > > > seek > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > earliest > > > > > > > > > > > > > > > > >> position without calling position() and > > > > seek(...). > > > > > > > Thus > > > > > > > > > > > > position() > > > > > > > > > > > > > > > only > > > > > > > > > > > > > > > > >> needs to return the actual consumed > messages > > > > > > > identified > > > > > > > > by > > > > > > > > > > > > > {offset, > > > > > > > > > > > > > > > > leader > > > > > > > > > > > > > > > > >> epoch}. Does this make sense? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> Thanks, > > > > > > > > > > > > > > > > >> Dong > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna > > Povzner < > > > > > > > > > > > a...@confluent.io > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > Hi Dong, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Thanks for considering my suggestions. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Based on your comments, I realized that > my > > > > > > > suggestion > > > > > > > > > was > > > > > > > > > > > not > > > > > > > > > > > > > > > complete > > > > > > > > > > > > > > > > >> with > > > > > > > > > > > > > > > > >> > regard to KafkaConsumer API vs. > > > > consumer-broker > > > > > > > > > protocol. > > > > > > > > > > > > While > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > >> propose > > > > > > > > > > > > > > > > >> > to keep KafkaConsumer#seek() unchanged > and > > > > take > > > > > > > offset > > > > > > > > > > only, > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> underlying > > > > > > > > > > > > > > > > >> > consumer will send the next > FetchRequest() > > > to > > > > > > broker > > > > > > > > > with > > > > > > > > > > > > offset > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > leaderEpoch if it is known (based on > > leader > > > > > epoch > > > > > > > > cache > > > > > > > > > in > > > > > > > > > > > > > > > consumer) — > > > > > > > > > > > > > > > > >> note > > > > > > > > > > > > > > > > >> > that this is different from the current > > KIP, > > > > > which > > > > > > > > > > suggests > > > > > > > > > > > to > > > > > > > > > > > > > > > always > > > > > > > > > > > > > > > > >> send > > > > > > > > > > > > > > > > >> > unknown leader epoch after seek(). This > > way, > > > > if > > > > > > the > > > > > > > > > > consumer > > > > > > > > > > > > > and a > > > > > > > > > > > > > > > > >> broker > > > > > > > > > > > > > > > > >> > agreed on the point of non-divergence, > > which > > > > is > > > > > > some > > > > > > > > > > > {offset, > > > > > > > > > > > > > > > > >> leaderEpoch} > > > > > > > > > > > > > > > > >> > pair, the new leader which causes > another > > > > > > truncation > > > > > > > > > (even > > > > > > > > > > > > > further > > > > > > > > > > > > > > > > back) > > > > > > > > > > > > > > > > >> > will be able to detect new divergence > and > > > > > restart > > > > > > > the > > > > > > > > > > > process > > > > > > > > > > > > of > > > > > > > > > > > > > > > > finding > > > > > > > > > > > > > > > > >> > the new point of non-divergence. So, to > > > answer > > > > > > your > > > > > > > > > > > question, > > > > > > > > > > > > If > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > truncation happens just after the user > > calls > > > > > > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offset, > > > > leaderEpoch) > > > > > > > > followed > > > > > > > > > > by > > > > > > > > > > > > > > > > seek(offset), > > > > > > > > > > > > > > > > >> > the user will not seek to the wrong > > position > > > > > > without > > > > > > > > > > knowing > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > >> > truncation has happened, because the > > > consumer > > > > > will > > > > > > > get > > > > > > > > > > > another > > > > > > > > > > > > > > > > >> truncation > > > > > > > > > > > > > > > > >> > error, and seek again. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > I am afraid, I did not understand your > > > second > > > > > > > > question. > > > > > > > > > > Let > > > > > > > > > > > me > > > > > > > > > > > > > > > > >> summarize my > > > > > > > > > > > > > > > > >> > suggestions again, and then give an > > example > > > to > > > > > > > > hopefully > > > > > > > > > > > make > > > > > > > > > > > > my > > > > > > > > > > > > > > > > >> > suggestions more clear. Also, the last > > part > > > of > > > > > my > > > > > > > > > example > > > > > > > > > > > > shows > > > > > > > > > > > > > > how > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > use-case in your first question will > work. > > > If > > > > it > > > > > > > does > > > > > > > > > not > > > > > > > > > > > > answer > > > > > > > > > > > > > > > your > > > > > > > > > > > > > > > > >> > second question, would you mind > > clarifying? > > > I > > > > am > > > > > > > also > > > > > > > > > > > focusing > > > > > > > > > > > > > on > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> case > > > > > > > > > > > > > > > > >> > of a consumer having enough entries in > the > > > > > cache. > > > > > > > The > > > > > > > > > case > > > > > > > > > > > of > > > > > > > > > > > > > > > > restarting > > > > > > > > > > > > > > > > >> > from committed offset either stored > > > externally > > > > > or > > > > > > > > > > internally > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > >> probably > > > > > > > > > > > > > > > > >> > need to be discussed more. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Let me summarize my suggestion again: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > 1) KafkaConsumer#seek() and > > > > > > KafkaConsumer#position() > > > > > > > > > > remains > > > > > > > > > > > > > > > unchanged > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > 2) New KafkaConsumer#findOffsets() takes > > > > > {offset, > > > > > > > > > > > leaderEpoch} > > > > > > > > > > > > > > pair > > > > > > > > > > > > > > > > per > > > > > > > > > > > > > > > > >> > topic partition and returns offset per > > topic > > > > > > > > partition. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > 3) FetchRequest() to broker after > > > > > > > KafkaConsumer#seek() > > > > > > > > > > will > > > > > > > > > > > > > > contain > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > offset set by seek and leaderEpoch that > > > > > > corresponds > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > based > > > > > > > > > > > > > > > > >> on > > > > > > > > > > > > > > > > >> > leader epoch cache in the consumer. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > The rest of this e-mail is a long and > > > > contrived > > > > > > > > example > > > > > > > > > > with > > > > > > > > > > > > > > several > > > > > > > > > > > > > > > > log > > > > > > > > > > > > > > > > >> > truncations and unclean leader elections > > to > > > > > > > illustrate > > > > > > > > > the > > > > > > > > > > > API > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > your > > > > > > > > > > > > > > > > >> > first use-case. Suppose we have three > > > brokers. > > > > > > > > > Initially, > > > > > > > > > > > > Broker > > > > > > > > > > > > > > A, > > > > > > > > > > > > > > > B, > > > > > > > > > > > > > > > > >> and > > > > > > > > > > > > > > > > >> > C has one message at offset 0 with > leader > > > > epoch > > > > > 0. > > > > > > > > Then, > > > > > > > > > > > > Broker > > > > > > > > > > > > > A > > > > > > > > > > > > > > > goes > > > > > > > > > > > > > > > > >> down > > > > > > > > > > > > > > > > >> > for some time. Broker B becomes a leader > > > with > > > > > > epoch > > > > > > > 1, > > > > > > > > > and > > > > > > > > > > > > > writes > > > > > > > > > > > > > > > > >> messages > > > > > > > > > > > > > > > > >> > to offsets 1 and 2. Broker C fetches > > offset > > > 1, > > > > > but > > > > > > > > > before > > > > > > > > > > > > > fetching > > > > > > > > > > > > > > > > >> offset > > > > > > > > > > > > > > > > >> > 2, becomes a leader with leader epoch 2 > > and > > > > > > writes a > > > > > > > > > > message > > > > > > > > > > > > at > > > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > >> 2. > > > > > > > > > > > > > > > > >> > Here is the state of brokers at this > > point: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > Broker A: > > > > > > > > > > > > > > > > >> > > offset 0, epoch 0 <— leader > > > > > > > > > > > > > > > > >> > > goes down… > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > Broker B: > > > > > > > > > > > > > > > > >> > > offset 0, epoch 0 > > > > > > > > > > > > > > > > >> > > offset 1, epoch 1 <- leader > > > > > > > > > > > > > > > > >> > > offset 2, epoch 1 > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Broker C: > > > > > > > > > > > > > > > > >> > > offset 0, epoch 0 > > > > > > > > > > > > > > > > >> > > offset 1, epoch 1 > > > > > > > > > > > > > > > > >> > > offset 2, epoch 2 <— leader > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Before Broker C becomes a leader with > > leader > > > > > epoch > > > > > > > 2, > > > > > > > > > the > > > > > > > > > > > > > consumer > > > > > > > > > > > > > > > > >> consumed > > > > > > > > > > > > > > > > >> > the following messages from broker A and > > > > broker > > > > > B: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > {offset=0, leaderEpoch=0}, {offset=1, > > > > > > > leaderEpoch=1}, > > > > > > > > > > > > {offset=2, > > > > > > > > > > > > > > > > >> > leaderEpoch=1}. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Consumer’s leader epoch cache at this > > point > > > > > > contains > > > > > > > > the > > > > > > > > > > > > > following > > > > > > > > > > > > > > > > >> entries: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > (leaderEpoch=0, startOffset=0) > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > (leaderEpoch=1, startOffset=1) > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > endOffset = 3 > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Then, broker B becomes the follower of > > > broker > > > > C, > > > > > > > > > truncates > > > > > > > > > > > and > > > > > > > > > > > > > > > starts > > > > > > > > > > > > > > > > >> > fetching from offset 2. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Consumer sends fetchRequest(offset=3, > > > > > > leaderEpoch=1) > > > > > > > > and > > > > > > > > > > > gets > > > > > > > > > > > > > > > > >> > LOG_TRUNCATION > > > > > > > > > > > > > > > > >> > error from broker C. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > In response, the client calls > > > > > > > > KafkaConsumer#findOffsets( > > > > > > > > > > > > > offset=3, > > > > > > > > > > > > > > > > >> > leaderEpoch=1). The underlying consumer > > > sends > > > > > > > > > > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), > > > broker > > > > C > > > > > > > > responds > > > > > > > > > > with > > > > > > > > > > > > > > > > >> > {leaderEpoch=1, endOffset=2}. So, > > > > > > > > > > > > > > > KafkaConsumer#findOffsets(offset=3, > > > > > > > > > > > > > > > > >> > leaderEpoch=1) returns offset=2. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > In response, consumer calls > > > KafkaConsumer@seek > > > > > > > > > (offset=2) > > > > > > > > > > > > > followed > > > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > >> > poll(), which results in > > > > FetchRequest(offset=2, > > > > > > > > > > > leaderEpoch=1) > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> broker C. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > I will continue with this example with > the > > > > goal > > > > > to > > > > > > > > > answer > > > > > > > > > > > your > > > > > > > > > > > > > > first > > > > > > > > > > > > > > > > >> > question about truncation just after > > > > > findOffsets() > > > > > > > > > > followed > > > > > > > > > > > by > > > > > > > > > > > > > > > seek(): > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Suppose, brokers B and C go down, and > > > broker A > > > > > > comes > > > > > > > > up > > > > > > > > > > and > > > > > > > > > > > > > > becomes > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > >> > leader with leader epoch 3, and writes a > > > > message > > > > > > to > > > > > > > > > offset > > > > > > > > > > > 1. > > > > > > > > > > > > > > > Suppose, > > > > > > > > > > > > > > > > >> this > > > > > > > > > > > > > > > > >> > happens before the consumer gets > response > > > from > > > > > > > broker > > > > > > > > C > > > > > > > > > to > > > > > > > > > > > the > > > > > > > > > > > > > > > > previous > > > > > > > > > > > > > > > > >> > fetch request: FetchRequest(offset=2, > > > > > > > leaderEpoch=1). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Consumer re-sends FetchRequest(offset=2, > > > > > > > > leaderEpoch=1) > > > > > > > > > to > > > > > > > > > > > > > broker > > > > > > > > > > > > > > A, > > > > > > > > > > > > > > > > >> which > > > > > > > > > > > > > > > > >> > returns LOG_TRUNCATION error, because > > > broker A > > > > > has > > > > > > > > > leader > > > > > > > > > > > > epoch > > > > > > > > > > > > > 3 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >> leader > > > > > > > > > > > > > > > > >> > epoch in FetchRequest with starting > > offset = > > > > 1 < > > > > > > > > offset > > > > > > > > > 2 > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > FetchRequest(). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > In response, the user calls > > > > > > > KafkaConsumer#findOffsets( > > > > > > > > > > > > offset=2, > > > > > > > > > > > > > > > > >> > leaderEpoch=1). The underlying consumer > > > sends > > > > > > > > > > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1), > > > broker > > > > A > > > > > > > > responds > > > > > > > > > > with > > > > > > > > > > > > > > > > >> > {leaderEpoch=0, endOffset=1}; the > > underlying > > > > > > > consumer > > > > > > > > > > finds > > > > > > > > > > > > > > > > leaderEpoch > > > > > > > > > > > > > > > > >> = 0 > > > > > > > > > > > > > > > > >> > in its cache with end offset == 1, which > > > > results > > > > > > in > > > > > > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offset=2, > > > > > > leaderEpoch=1) > > > > > > > > > > returning > > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > = 1. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > In response, the user calls > > > KafkaConsumer@seek > > > > > > > > > (offset=1) > > > > > > > > > > > > > followed > > > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > >> > poll(), which results in > > > > FetchRequest(offset=1, > > > > > > > > > > > leaderEpoch=0) > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> broker A, > > > > > > > > > > > > > > > > >> > which responds with message at offset 1, > > > > leader > > > > > > > epoch > > > > > > > > 3. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > I will think some more about consumers > > > > > restarting > > > > > > > from > > > > > > > > > > > > committed > > > > > > > > > > > > > > > > >> offsets, > > > > > > > > > > > > > > > > >> > and send a follow up. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > Anna > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin > < > > > > > > > > > > > lindon...@gmail.com> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > Hey Anna, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > Thanks much for the thoughtful reply. > It > > > > makes > > > > > > > sense > > > > > > > > > to > > > > > > > > > > > > > > different > > > > > > > > > > > > > > > > >> between > > > > > > > > > > > > > > > > >> > > "seeking to a message" and "seeking > to a > > > > > > > position". > > > > > > > > I > > > > > > > > > > have > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> questions > > > > > > > > > > > > > > > > >> > > here: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > - For "seeking to a message" use-case, > > > with > > > > > the > > > > > > > > > proposed > > > > > > > > > > > > > > approach > > > > > > > > > > > > > > > > user > > > > > > > > > > > > > > > > >> > > needs to call findOffset(offset, > > > > leaderEpoch) > > > > > > > > followed > > > > > > > > > > by > > > > > > > > > > > > > > > > >> seek(offset). > > > > > > > > > > > > > > > > >> > If > > > > > > > > > > > > > > > > >> > > message truncation and message append > > > happen > > > > > > > > > immediately > > > > > > > > > > > > after > > > > > > > > > > > > > > > > >> > > findOffset(offset, > > > > > > > > > > > > > > > > >> > > leaderEpoch) but before seek(offset), > it > > > > seems > > > > > > > that > > > > > > > > > user > > > > > > > > > > > > will > > > > > > > > > > > > > > seek > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> the > > > > > > > > > > > > > > > > >> > > wrong message without knowing the > > > truncation > > > > > has > > > > > > > > > > happened. > > > > > > > > > > > > > Would > > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> be > > > > > > > > > > > > > > > > >> > a > > > > > > > > > > > > > > > > >> > > problem? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > - For "seeking to a position" > use-case, > > it > > > > > seems > > > > > > > > that > > > > > > > > > > > there > > > > > > > > > > > > > can > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > two > > > > > > > > > > > > > > > > >> > > positions, i.e. earliest and latest. > So > > > > these > > > > > > two > > > > > > > > > cases > > > > > > > > > > > can > > > > > > > > > > > > be > > > > > > > > > > > > > > > > >> > > Consumer.fulfilled by > seekToBeginning() > > > and > > > > > > > > > > > > > > Consumer.seekToEnd(). > > > > > > > > > > > > > > > > >> Then it > > > > > > > > > > > > > > > > >> > > seems that user will only need to call > > > > > > position() > > > > > > > > and > > > > > > > > > > > seek() > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > >> "seeking > > > > > > > > > > > > > > > > >> > > to a message" use-case? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > Thanks, > > > > > > > > > > > > > > > > >> > > Dong > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna > > > > Povzner > > > > > < > > > > > > > > > > > > > > a...@confluent.io> > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > Hi Jason and Dong, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > I’ve been thinking about your > > > suggestions > > > > > and > > > > > > > > > > discussion > > > > > > > > > > > > > > > regarding > > > > > > > > > > > > > > > > >> > > > position(), seek(), and new proposed > > > API. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Here is my thought process why we > > should > > > > > keep > > > > > > > > > > position() > > > > > > > > > > > > and > > > > > > > > > > > > > > > > seek() > > > > > > > > > > > > > > > > >> API > > > > > > > > > > > > > > > > >> > > > unchanged. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > I think we should separate {offset, > > > leader > > > > > > > epoch} > > > > > > > > > that > > > > > > > > > > > > > > uniquely > > > > > > > > > > > > > > > > >> > > identifies > > > > > > > > > > > > > > > > >> > > > a message from an offset that is a > > > > position. > > > > > > In > > > > > > > > some > > > > > > > > > > > > cases, > > > > > > > > > > > > > > > > offsets > > > > > > > > > > > > > > > > >> > > > returned from position() could be > > actual > > > > > > > consumed > > > > > > > > > > > messages > > > > > > > > > > > > > by > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> > > consumer > > > > > > > > > > > > > > > > >> > > > identified by {offset, leader > epoch}. > > In > > > > > other > > > > > > > > > cases, > > > > > > > > > > > > > > position() > > > > > > > > > > > > > > > > >> > returns > > > > > > > > > > > > > > > > >> > > > offset that was not actually > consumed. > > > > > > Suppose, > > > > > > > > the > > > > > > > > > > user > > > > > > > > > > > > > calls > > > > > > > > > > > > > > > > >> > position() > > > > > > > > > > > > > > > > >> > > > for the last offset. Suppose we > return > > > > > > {offset, > > > > > > > > > leader > > > > > > > > > > > > > epoch} > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > message currently in the log. Then, > > the > > > > > > message > > > > > > > > gets > > > > > > > > > > > > > truncated > > > > > > > > > > > > > > > > >> before > > > > > > > > > > > > > > > > >> > > > consumer’s first poll(). It does not > > > make > > > > > > sense > > > > > > > > for > > > > > > > > > > > poll() > > > > > > > > > > > > > to > > > > > > > > > > > > > > > fail > > > > > > > > > > > > > > > > >> in > > > > > > > > > > > > > > > > >> > > this > > > > > > > > > > > > > > > > >> > > > case, because the log truncation did > > not > > > > > > > actually > > > > > > > > > > happen > > > > > > > > > > > > > from > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > consumer > > > > > > > > > > > > > > > > >> > > > perspective. On the other hand, as > the > > > KIP > > > > > > > > proposes, > > > > > > > > > > it > > > > > > > > > > > > > makes > > > > > > > > > > > > > > > > sense > > > > > > > > > > > > > > > > >> for > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > committed() method to return > {offset, > > > > leader > > > > > > > > epoch} > > > > > > > > > > > > because > > > > > > > > > > > > > > > those > > > > > > > > > > > > > > > > >> > offsets > > > > > > > > > > > > > > > > >> > > > represent actual consumed messages. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > The same argument applies to the > > seek() > > > > > > method — > > > > > > > > we > > > > > > > > > > are > > > > > > > > > > > > not > > > > > > > > > > > > > > > > seeking > > > > > > > > > > > > > > > > >> to > > > > > > > > > > > > > > > > >> > a > > > > > > > > > > > > > > > > >> > > > message, we are seeking to a > position. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > I like the proposal to add > > > > > > > > > KafkaConsumer#findOffsets() > > > > > > > > > > > > API. > > > > > > > > > > > > > I > > > > > > > > > > > > > > am > > > > > > > > > > > > > > > > >> > assuming > > > > > > > > > > > > > > > > >> > > > something like: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Map<TopicPartition, Long> > > > > > > > > > > > findOffsets(Map<TopicPartition, > > > > > > > > > > > > > > > > >> > OffsetAndEpoch> > > > > > > > > > > > > > > > > >> > > > offsetsToSearch) > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Similar to seek() and position(), I > > > think > > > > > > > > > > findOffsets() > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > >> return > > > > > > > > > > > > > > > > >> > > > offset without leader epoch, because > > > what > > > > we > > > > > > > want > > > > > > > > is > > > > > > > > > > the > > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > >> that we > > > > > > > > > > > > > > > > >> > > > think is closest to the not > divergent > > > > > message > > > > > > > from > > > > > > > > > the > > > > > > > > > > > > given > > > > > > > > > > > > > > > > >> consumed > > > > > > > > > > > > > > > > >> > > > message. Until the consumer actually > > > > fetches > > > > > > the > > > > > > > > > > > message, > > > > > > > > > > > > we > > > > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > >> not > > > > > > > > > > > > > > > > >> > > let > > > > > > > > > > > > > > > > >> > > > the consumer store the leader epoch > > for > > > a > > > > > > > message > > > > > > > > it > > > > > > > > > > did > > > > > > > > > > > > not > > > > > > > > > > > > > > > > >> consume. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > So, the workflow will be: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > 1) The user gets > > LogTruncationException > > > > with > > > > > > > > > {offset, > > > > > > > > > > > > leader > > > > > > > > > > > > > > > epoch > > > > > > > > > > > > > > > > >> of > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > >> > > > previous message} (whatever we send > > with > > > > new > > > > > > > > > > > FetchRecords > > > > > > > > > > > > > > > > request). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > 2) offset = findOffsets(tp -> > {offset, > > > > > leader > > > > > > > > > epoch}) > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > 3) seek(offset) > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > For the use-case where the users > store > > > > > > committed > > > > > > > > > > offsets > > > > > > > > > > > > > > > > externally: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > 1) Such users would have to track > the > > > > leader > > > > > > > epoch > > > > > > > > > > > > together > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > an > > > > > > > > > > > > > > > > >> > > offset. > > > > > > > > > > > > > > > > >> > > > Otherwise, there is no way to detect > > > later > > > > > > what > > > > > > > > > leader > > > > > > > > > > > > epoch > > > > > > > > > > > > > > was > > > > > > > > > > > > > > > > >> > > associated > > > > > > > > > > > > > > > > >> > > > with the message. I think it’s > > > reasonable > > > > to > > > > > > ask > > > > > > > > > that > > > > > > > > > > > from > > > > > > > > > > > > > > users > > > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > >> > they > > > > > > > > > > > > > > > > >> > > > want to detect log truncation. > > > Otherwise, > > > > > they > > > > > > > > will > > > > > > > > > > get > > > > > > > > > > > > the > > > > > > > > > > > > > > > > current > > > > > > > > > > > > > > > > >> > > > behavior. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > If the users currently get an offset > > to > > > be > > > > > > > stored > > > > > > > > > > using > > > > > > > > > > > > > > > > position(), > > > > > > > > > > > > > > > > >> I > > > > > > > > > > > > > > > > >> > see > > > > > > > > > > > > > > > > >> > > > two possibilities. First, they call > > save > > > > > > offset > > > > > > > > > > returned > > > > > > > > > > > > > from > > > > > > > > > > > > > > > > >> > position() > > > > > > > > > > > > > > > > >> > > > that they call before poll(). In > that > > > > case, > > > > > it > > > > > > > > would > > > > > > > > > > not > > > > > > > > > > > > be > > > > > > > > > > > > > > > > correct > > > > > > > > > > > > > > > > >> to > > > > > > > > > > > > > > > > >> > > > store {offset, leader epoch} if we > > would > > > > > have > > > > > > > > > changed > > > > > > > > > > > > > > position() > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > > return > > > > > > > > > > > > > > > > >> > > > {offset, leader epoch} since actual > > > > fetched > > > > > > > > message > > > > > > > > > > > could > > > > > > > > > > > > be > > > > > > > > > > > > > > > > >> different > > > > > > > > > > > > > > > > >> > > > (from the example I described > > earlier). > > > > So, > > > > > it > > > > > > > > would > > > > > > > > > > be > > > > > > > > > > > > more > > > > > > > > > > > > > > > > >> correct to > > > > > > > > > > > > > > > > >> > > > call position() after poll(). > However, > > > the > > > > > > user > > > > > > > > > > already > > > > > > > > > > > > gets > > > > > > > > > > > > > > > > >> > > > ConsumerRecords at this point, from > > > which > > > > > the > > > > > > > user > > > > > > > > > can > > > > > > > > > > > > > extract > > > > > > > > > > > > > > > > >> {offset, > > > > > > > > > > > > > > > > >> > > > leader epoch} of the last message. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > So, I like the idea of adding a > helper > > > > > method > > > > > > to > > > > > > > > > > > > > > > ConsumerRecords, > > > > > > > > > > > > > > > > as > > > > > > > > > > > > > > > > >> > > Jason > > > > > > > > > > > > > > > > >> > > > proposed, something like: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > public OffsetAndEpoch > > > > > > > lastOffsetWithLeaderEpoch(), > > > > > > > > > > where > > > > > > > > > > > > > > > > >> OffsetAndEpoch > > > > > > > > > > > > > > > > >> > > is > > > > > > > > > > > > > > > > >> > > > a data struct holding {offset, > leader > > > > > epoch}. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > In this case, we would advise the > user > > > to > > > > > > follow > > > > > > > > the > > > > > > > > > > > > > workflow: > > > > > > > > > > > > > > > > >> poll(), > > > > > > > > > > > > > > > > >> > > get > > > > > > > > > > > > > > > > >> > > > {offset, leader epoch} from > > > > > > > > > > > ConsumerRecords#lastOffsetWith > > > > > > > > > > > > > > > > >> > LeaderEpoch(), > > > > > > > > > > > > > > > > >> > > > save offset and leader epoch, > process > > > > > records. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > 2) When the user needs to seek to > the > > > last > > > > > > > > committed > > > > > > > > > > > > offset, > > > > > > > > > > > > > > > they > > > > > > > > > > > > > > > > >> call > > > > > > > > > > > > > > > > >> > > new > > > > > > > > > > > > > > > > >> > > > findOffsets(saved offset, leader > > epoch), > > > > and > > > > > > > then > > > > > > > > > > > > > > seek(offset). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > What do you think? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > Anna > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong > > Lin > > > < > > > > > > > > > > > > > lindon...@gmail.com> > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > Hey Jason, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > Thanks much for your thoughtful > > > > > explanation. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > Yes the solution using > > > > findOffsets(offset, > > > > > > > > > > > leaderEpoch) > > > > > > > > > > > > > also > > > > > > > > > > > > > > > > >> works. > > > > > > > > > > > > > > > > >> > The > > > > > > > > > > > > > > > > >> > > > > advantage of this solution it adds > > > only > > > > > one > > > > > > > API > > > > > > > > > > > instead > > > > > > > > > > > > of > > > > > > > > > > > > > > two > > > > > > > > > > > > > > > > >> APIs. > > > > > > > > > > > > > > > > >> > > The > > > > > > > > > > > > > > > > >> > > > > concern is that its usage seems a > > bit > > > > more > > > > > > > > clumsy > > > > > > > > > > for > > > > > > > > > > > > > > advanced > > > > > > > > > > > > > > > > >> users. > > > > > > > > > > > > > > > > >> > > > More > > > > > > > > > > > > > > > > >> > > > > specifically, advanced users who > > store > > > > > > offsets > > > > > > > > > > > > externally > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > > >> always > > > > > > > > > > > > > > > > >> > > > need > > > > > > > > > > > > > > > > >> > > > > to call findOffsets() before > calling > > > > > > > > seek(offset) > > > > > > > > > > > during > > > > > > > > > > > > > > > > consumer > > > > > > > > > > > > > > > > >> > > > > initialization. And those advanced > > > users > > > > > > will > > > > > > > > need > > > > > > > > > > to > > > > > > > > > > > > > > manually > > > > > > > > > > > > > > > > >> keep > > > > > > > > > > > > > > > > >> > > track > > > > > > > > > > > > > > > > >> > > > > of the leaderEpoch of the last > > > > > > ConsumerRecord. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > The other solution may be more > > > > > user-friendly > > > > > > > for > > > > > > > > > > > > advanced > > > > > > > > > > > > > > > users > > > > > > > > > > > > > > > > >> is to > > > > > > > > > > > > > > > > >> > > add > > > > > > > > > > > > > > > > >> > > > > two APIs, `void seek(offset, > > > > leaderEpoch)` > > > > > > and > > > > > > > > > > > `(offset, > > > > > > > > > > > > > > > epoch) > > > > > > > > > > > > > > > > = > > > > > > > > > > > > > > > > >> > > > > offsetEpochs(topicPartition)`. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > I kind of prefer the second > solution > > > > > because > > > > > > > it > > > > > > > > is > > > > > > > > > > > > easier > > > > > > > > > > > > > to > > > > > > > > > > > > > > > use > > > > > > > > > > > > > > > > >> for > > > > > > > > > > > > > > > > >> > > > > advanced users. If we need to > expose > > > > > > > leaderEpoch > > > > > > > > > > > anyway > > > > > > > > > > > > to > > > > > > > > > > > > > > > > safely > > > > > > > > > > > > > > > > >> > > > identify > > > > > > > > > > > > > > > > >> > > > > a message, it may be conceptually > > > > simpler > > > > > to > > > > > > > > > expose > > > > > > > > > > it > > > > > > > > > > > > > > > directly > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > >> > > > > seek(...) rather than requiring > one > > > more > > > > > > > > > translation > > > > > > > > > > > > using > > > > > > > > > > > > > > > > >> > > > > findOffsets(...). But I am also OK > > > with > > > > > the > > > > > > > > first > > > > > > > > > > > > solution > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > >> other > > > > > > > > > > > > > > > > >> > > > > developers also favor that one :) > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > Thanks, > > > > > > > > > > > > > > > > >> > > > > Dong > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM, > > Jason > > > > > > > > Gustafson < > > > > > > > > > > > > > > > > >> ja...@confluent.io > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > Hi Dong, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > Thanks, I've been thinking about > > > your > > > > > > > > > suggestions > > > > > > > > > > a > > > > > > > > > > > > bit. > > > > > > > > > > > > > > It > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > >> > > > > challenging > > > > > > > > > > > > > > > > >> > > > > > to make this work given the > > current > > > > > APIs. > > > > > > > One > > > > > > > > of > > > > > > > > > > the > > > > > > > > > > > > > > > > >> difficulties > > > > > > > > > > > > > > > > >> > is > > > > > > > > > > > > > > > > >> > > > that > > > > > > > > > > > > > > > > >> > > > > > we don't have an API to find the > > > > leader > > > > > > > epoch > > > > > > > > > for > > > > > > > > > > a > > > > > > > > > > > > > given > > > > > > > > > > > > > > > > >> offset at > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > > moment. So if the user does a > seek > > > to > > > > > > offset > > > > > > > > 5, > > > > > > > > > > then > > > > > > > > > > > > > we'll > > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > >> a > > > > > > > > > > > > > > > > >> > new > > > > > > > > > > > > > > > > >> > > > API > > > > > > > > > > > > > > > > >> > > > > > to find the corresponding epoch > in > > > > order > > > > > > to > > > > > > > > > > fulfill > > > > > > > > > > > > the > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > >> > > position() > > > > > > > > > > > > > > > > >> > > > > API. > > > > > > > > > > > > > > > > >> > > > > > Potentially we could modify > > > > ListOffsets > > > > > to > > > > > > > > > enable > > > > > > > > > > > > > finding > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > leader > > > > > > > > > > > > > > > > >> > > > > epoch, > > > > > > > > > > > > > > > > >> > > > > > but I am not sure it is > > worthwhile. > > > > > > Perhaps > > > > > > > it > > > > > > > > > is > > > > > > > > > > > > > > reasonable > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > >> > > > advanced > > > > > > > > > > > > > > > > >> > > > > > usage to expect that the epoch > > > > > > information, > > > > > > > if > > > > > > > > > > > needed, > > > > > > > > > > > > > > will > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > >> > > > extracted > > > > > > > > > > > > > > > > >> > > > > > from the records directly? It > > might > > > > make > > > > > > > sense > > > > > > > > > to > > > > > > > > > > > > > expose a > > > > > > > > > > > > > > > > >> helper > > > > > > > > > > > > > > > > >> > in > > > > > > > > > > > > > > > > >> > > > > > `ConsumerRecords` to make this a > > > > little > > > > > > > easier > > > > > > > > > > > though. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > Alternatively, if we think it is > > > > > important > > > > > > > to > > > > > > > > > have > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> information > > > > > > > > > > > > > > > > >> > > > > exposed > > > > > > > > > > > > > > > > >> > > > > > directly, we could create batch > > APIs > > > > to > > > > > > > solve > > > > > > > > > the > > > > > > > > > > > > naming > > > > > > > > > > > > > > > > >> problem. > > > > > > > > > > > > > > > > >> > For > > > > > > > > > > > > > > > > >> > > > > > example: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > Map<TopicPartition, > > OffsetAndEpoch> > > > > > > > > positions(); > > > > > > > > > > > > > > > > >> > > > > > void seek(Map<TopicPartition, > > > > > > > OffsetAndEpoch> > > > > > > > > > > > > > positions); > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > However, I'm actually leaning > > toward > > > > > > leaving > > > > > > > > the > > > > > > > > > > > > seek() > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > >> > > position() > > > > > > > > > > > > > > > > >> > > > > APIs > > > > > > > > > > > > > > > > >> > > > > > unchanged. Instead, we can add a > > new > > > > API > > > > > > to > > > > > > > > > search > > > > > > > > > > > for > > > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > by > > > > > > > > > > > > > > > > >> > > > > timestamp > > > > > > > > > > > > > > > > >> > > > > > or by offset/leader epoch. Let's > > say > > > > we > > > > > > call > > > > > > > > it > > > > > > > > > > > > > > > `findOffsets`. > > > > > > > > > > > > > > > > >> If > > > > > > > > > > > > > > > > >> > the > > > > > > > > > > > > > > > > >> > > > > user > > > > > > > > > > > > > > > > >> > > > > > hits a log truncation error, > they > > > can > > > > > use > > > > > > > this > > > > > > > > > API > > > > > > > > > > > to > > > > > > > > > > > > > find > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > closest > > > > > > > > > > > > > > > > >> > > > > > offset and then do a seek(). At > > the > > > > same > > > > > > > time, > > > > > > > > > we > > > > > > > > > > > > > > deprecate > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> > > > > > `offsetsForTimes` APIs. We now > > have > > > > two > > > > > > use > > > > > > > > > cases > > > > > > > > > > > > which > > > > > > > > > > > > > > > > require > > > > > > > > > > > > > > > > >> > > finding > > > > > > > > > > > > > > > > >> > > > > > offsets, so I think we should > make > > > > this > > > > > > API > > > > > > > > > > general > > > > > > > > > > > > and > > > > > > > > > > > > > > > leave > > > > > > > > > > > > > > > > >> the > > > > > > > > > > > > > > > > >> > > door > > > > > > > > > > > > > > > > >> > > > > open > > > > > > > > > > > > > > > > >> > > > > > for future extensions. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > By the way, I'm unclear about > the > > > > desire > > > > > > to > > > > > > > > move > > > > > > > > > > > part > > > > > > > > > > > > of > > > > > > > > > > > > > > > this > > > > > > > > > > > > > > > > >> > > > > functionality > > > > > > > > > > > > > > > > >> > > > > > to AdminClient. Guozhang > suggested > > > > this > > > > > > > > > > previously, > > > > > > > > > > > > but > > > > > > > > > > > > > I > > > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > >> it > > > > > > > > > > > > > > > > >> > > only > > > > > > > > > > > > > > > > >> > > > > > makes sense for cross-cutting > > > > > capabilities > > > > > > > > such > > > > > > > > > as > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > >> creation. > > > > > > > > > > > > > > > > >> > If > > > > > > > > > > > > > > > > >> > > > we > > > > > > > > > > > > > > > > >> > > > > > have an API which is primarily > > > useful > > > > by > > > > > > > > > > consumers, > > > > > > > > > > > > > then I > > > > > > > > > > > > > > > > think > > > > > > > > > > > > > > > > >> > > that's > > > > > > > > > > > > > > > > >> > > > > > where it should be exposed. The > > > > > > AdminClient > > > > > > > > also > > > > > > > > > > has > > > > > > > > > > > > its > > > > > > > > > > > > > > own > > > > > > > > > > > > > > > > API > > > > > > > > > > > > > > > > >> > > > > integrity > > > > > > > > > > > > > > > > >> > > > > > and should not become a dumping > > > ground > > > > > for > > > > > > > > > > advanced > > > > > > > > > > > > use > > > > > > > > > > > > > > > cases. > > > > > > > > > > > > > > > > >> I'll > > > > > > > > > > > > > > > > >> > > > > update > > > > > > > > > > > > > > > > >> > > > > > the KIP with the `findOffsets` > > API > > > > > > > suggested > > > > > > > > > > above > > > > > > > > > > > > and > > > > > > > > > > > > > we > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > >> see > > > > > > > > > > > > > > > > >> > if > > > > > > > > > > > > > > > > >> > > > it > > > > > > > > > > > > > > > > >> > > > > > does a good enough job of > keeping > > > the > > > > > API > > > > > > > > simple > > > > > > > > > > for > > > > > > > > > > > > > > common > > > > > > > > > > > > > > > > >> cases. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > Thanks, > > > > > > > > > > > > > > > > >> > > > > > Jason > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM, > > > Dong > > > > > Lin > > > > > > < > > > > > > > > > > > > > > > > lindon...@gmail.com> > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > Hey Jason, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > Regarding seek(...), it seems > > that > > > > we > > > > > > want > > > > > > > > an > > > > > > > > > > API > > > > > > > > > > > > for > > > > > > > > > > > > > > user > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > > > > initialize > > > > > > > > > > > > > > > > >> > > > > > > consumer with (offset, > > > leaderEpoch) > > > > > and > > > > > > > that > > > > > > > > > API > > > > > > > > > > > > > should > > > > > > > > > > > > > > > > allow > > > > > > > > > > > > > > > > >> > > > throwing > > > > > > > > > > > > > > > > >> > > > > > > PartitionTruncationException. > > > > Suppose > > > > > we > > > > > > > > agree > > > > > > > > > > on > > > > > > > > > > > > > this, > > > > > > > > > > > > > > > then > > > > > > > > > > > > > > > > >> > > > > > > seekToNearest() is not > > sufficient > > > > > > because > > > > > > > it > > > > > > > > > > will > > > > > > > > > > > > > always > > > > > > > > > > > > > > > > >> swallow > > > > > > > > > > > > > > > > >> > > > > > > PartitionTruncationException. > > Here > > > > we > > > > > > have > > > > > > > > two > > > > > > > > > > > > > options. > > > > > > > > > > > > > > > The > > > > > > > > > > > > > > > > >> first > > > > > > > > > > > > > > > > >> > > > > option > > > > > > > > > > > > > > > > >> > > > > > is > > > > > > > > > > > > > > > > >> > > > > > > to add API > > > offsetsForLeaderEpochs() > > > > to > > > > > > > > > translate > > > > > > > > > > > > > > > > (leaderEpoch, > > > > > > > > > > > > > > > > >> > > > offset) > > > > > > > > > > > > > > > > >> > > > > to > > > > > > > > > > > > > > > > >> > > > > > > offset. The second option is > to > > > have > > > > > add > > > > > > > > > > > > seek(offset, > > > > > > > > > > > > > > > > >> > leaderEpoch). > > > > > > > > > > > > > > > > >> > > > It > > > > > > > > > > > > > > > > >> > > > > > > seems that second option may > be > > > more > > > > > > > simpler > > > > > > > > > > > because > > > > > > > > > > > > > it > > > > > > > > > > > > > > > > makes > > > > > > > > > > > > > > > > >> it > > > > > > > > > > > > > > > > >> > > > clear > > > > > > > > > > > > > > > > >> > > > > > that > > > > > > > > > > > > > > > > >> > > > > > > (offset, leaderEpoch) will be > > used > > > > to > > > > > > > > identify > > > > > > > > > > > > > > consumer's > > > > > > > > > > > > > > > > >> > position > > > > > > > > > > > > > > > > >> > > > in a > > > > > > > > > > > > > > > > >> > > > > > > partition. And user only needs > > to > > > > > handle > > > > > > > > > > > > > > > > >> > > PartitionTruncationException > > > > > > > > > > > > > > > > >> > > > > > from > > > > > > > > > > > > > > > > >> > > > > > > the poll(). In comparison the > > > first > > > > > > option > > > > > > > > > > seems a > > > > > > > > > > > > bit > > > > > > > > > > > > > > > > harder > > > > > > > > > > > > > > > > >> to > > > > > > > > > > > > > > > > >> > > use > > > > > > > > > > > > > > > > >> > > > > > > because user have to also > handle > > > the > > > > > > > > > > > > > > > > >> PartitionTruncationException > > > > > > > > > > > > > > > > >> > > if > > > > > > > > > > > > > > > > >> > > > > > > offsetsForLeaderEpochs() > returns > > > > > > different > > > > > > > > > > offset > > > > > > > > > > > > from > > > > > > > > > > > > > > > > >> > > user-provided > > > > > > > > > > > > > > > > >> > > > > > > offset. What do you think? > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > If we decide to add API > > > seek(offset, > > > > > > > > > > leaderEpoch), > > > > > > > > > > > > > then > > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > >> > > decide > > > > > > > > > > > > > > > > >> > > > > > > whether and how to add API to > > > > > translate > > > > > > > > > (offset, > > > > > > > > > > > > > > > > leaderEpoch) > > > > > > > > > > > > > > > > >> to > > > > > > > > > > > > > > > > >> > > > > offset. > > > > > > > > > > > > > > > > >> > > > > > It > > > > > > > > > > > > > > > > >> > > > > > > seems that this API will be > > needed > > > > by > > > > > > > > advanced > > > > > > > > > > > user > > > > > > > > > > > > to > > > > > > > > > > > > > > > don't > > > > > > > > > > > > > > > > >> want > > > > > > > > > > > > > > > > >> > > > auto > > > > > > > > > > > > > > > > >> > > > > > > offset reset (so that it can > be > > > > > > notified) > > > > > > > > but > > > > > > > > > > > still > > > > > > > > > > > > > > wants > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > >> > reset > > > > > > > > > > > > > > > > >> > > > > offset > > > > > > > > > > > > > > > > >> > > > > > > to closest. For those users if > > > > > probably > > > > > > > > makes > > > > > > > > > > > sense > > > > > > > > > > > > to > > > > > > > > > > > > > > > only > > > > > > > > > > > > > > > > >> have > > > > > > > > > > > > > > > > >> > > the > > > > > > > > > > > > > > > > >> > > > > API > > > > > > > > > > > > > > > > >> > > > > > in > > > > > > > > > > > > > > > > >> > > > > > > AdminClient. offsetsForTimes() > > > seems > > > > > > like > > > > > > > a > > > > > > > > > > common > > > > > > > > > > > > API > > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > >> will > > > > > > > > > > > > > > > > >> > be > > > > > > > > > > > > > > > > >> > > > > > needed > > > > > > > > > > > > > > > > >> > > > > > > by user's of consumer in > > general, > > > so > > > > > it > > > > > > > may > > > > > > > > be > > > > > > > > > > > more > > > > > > > > > > > > > > > > >> reasonable to > > > > > > > > > > > > > > > > >> > > > stay > > > > > > > > > > > > > > > > >> > > > > in > > > > > > > > > > > > > > > > >> > > > > > > the consumer API. I don't > have a > > > > > strong > > > > > > > > > opinion > > > > > > > > > > on > > > > > > > > > > > > > > whether > > > > > > > > > > > > > > > > >> > > > > > > offsetsForTimes() should be > > > replaced > > > > > by > > > > > > > API > > > > > > > > in > > > > > > > > > > > > > > > AdminClient. > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > Though (offset, leaderEpoch) > is > > > > needed > > > > > > to > > > > > > > > > > uniquely > > > > > > > > > > > > > > > identify > > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > >> > > message > > > > > > > > > > > > > > > > >> > > > > in > > > > > > > > > > > > > > > > >> > > > > > > general, it is only needed for > > > > > advanced > > > > > > > > users > > > > > > > > > > who > > > > > > > > > > > > has > > > > > > > > > > > > > > > turned > > > > > > > > > > > > > > > > >> on > > > > > > > > > > > > > > > > >> > > > unclean > > > > > > > > > > > > > > > > >> > > > > > > leader election, need to use > > > > seek(..), > > > > > > and > > > > > > > > > don't > > > > > > > > > > > > want > > > > > > > > > > > > > > auto > > > > > > > > > > > > > > > > >> offset > > > > > > > > > > > > > > > > >> > > > > reset. > > > > > > > > > > > > > > > > >> > > > > > > Most other users probably just > > > want > > > > to > > > > > > > > enable > > > > > > > > > > auto > > > > > > > > > > > > > > offset > > > > > > > > > > > > > > > > >> reset > > > > > > > > > > > > > > > > >> > and > > > > > > > > > > > > > > > > >> > > > > store > > > > > > > > > > > > > > > > >> > > > > > > offset in Kafka. Thus we might > > > want > > > > to > > > > > > > keep > > > > > > > > > the > > > > > > > > > > > > > existing > > > > > > > > > > > > > > > > >> > > offset-only > > > > > > > > > > > > > > > > >> > > > > APIs > > > > > > > > > > > > > > > > >> > > > > > > (e.g. seek() and position()) > for > > > > most > > > > > > > users > > > > > > > > > > while > > > > > > > > > > > > > adding > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > >> APIs > > > > > > > > > > > > > > > > >> > > for > > > > > > > > > > > > > > > > >> > > > > > > advanced users. And yes, it > > seems > > > > that > > > > > > we > > > > > > > > need > > > > > > > > > > new > > > > > > > > > > > > > name > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > >> > > > position(). > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > Though I think we need new > APIs > > to > > > > > carry > > > > > > > the > > > > > > > > > new > > > > > > > > > > > > > > > information > > > > > > > > > > > > > > > > >> > (e.g. > > > > > > > > > > > > > > > > >> > > > > > > leaderEpoch), I am not very > sure > > > how > > > > > > that > > > > > > > > > should > > > > > > > > > > > > look > > > > > > > > > > > > > > > like. > > > > > > > > > > > > > > > > >> One > > > > > > > > > > > > > > > > >> > > > > possible > > > > > > > > > > > > > > > > >> > > > > > > option is those APIs in > KIP-232. > > > > > Another > > > > > > > > > option > > > > > > > > > > is > > > > > > > > > > > > > > > something > > > > > > > > > > > > > > > > >> like > > > > > > > > > > > > > > > > >> > > > this: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > ````` > > > > > > > > > > > > > > > > >> > > > > > > class OffsetEpochs { > > > > > > > > > > > > > > > > >> > > > > > > long offset; > > > > > > > > > > > > > > > > >> > > > > > > int leaderEpoch; > > > > > > > > > > > > > > > > >> > > > > > > int partitionEpoch; // > This > > > may > > > > be > > > > > > > > needed > > > > > > > > > > > later > > > > > > > > > > > > as > > > > > > > > > > > > > > > > >> discussed > > > > > > > > > > > > > > > > >> > in > > > > > > > > > > > > > > > > >> > > > > > KIP-232 > > > > > > > > > > > > > > > > >> > > > > > > ... // Hopefully these are > all > > > we > > > > > need > > > > > > > to > > > > > > > > > > > identify > > > > > > > > > > > > > > > message > > > > > > > > > > > > > > > > >> in > > > > > > > > > > > > > > > > >> > > > Kafka. > > > > > > > > > > > > > > > > >> > > > > > But > > > > > > > > > > > > > > > > >> > > > > > > if we need more then we can > add > > > new > > > > > > fields > > > > > > > > in > > > > > > > > > > this > > > > > > > > > > > > > > class. > > > > > > > > > > > > > > > > >> > > > > > > } > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > OffsetEpochs > > > > > > offsetEpochs(TopicPartition); > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > void seek(TopicPartition, > > > > > OffsetEpochs); > > > > > > > > > > > > > > > > >> > > > > > > `````` > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > Thanks, > > > > > > > > > > > > > > > > >> > > > > > > Dong > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13 > > AM, > > > > > Jason > > > > > > > > > > Gustafson > > > > > > > > > > > < > > > > > > > > > > > > > > > > >> > > > ja...@confluent.io> > > > > > > > > > > > > > > > > >> > > > > > > wrote: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Hey Dong, > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback. The > > > first > > > > > > three > > > > > > > > > > points > > > > > > > > > > > > are > > > > > > > > > > > > > > > easy: > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > 1. Yes, we should be > > consistent. > > > > > > > > > > > > > > > > >> > > > > > > > 2. Yes, I will add this. > > > > > > > > > > > > > > > > >> > > > > > > > 3. Yes, I think we should > > > document > > > > > the > > > > > > > > > changes > > > > > > > > > > > to > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > >> committed > > > > > > > > > > > > > > > > >> > > > > offset > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >