Hey Jun,

Thanks for the review. Responses below:

50. Yes, that is right. I clarified this in the KIP.

51. Yes, updated the KIP to mention.

52. Yeah, this was a reference to a previous iteration. I've fixed it.

53. I changed the API to use an `Optional<Integer>` for the leader epoch
and added a note about the default value. Does that seem reasonable?

54. We discussed this above, but could not find a great option. The options
are to add a new API (e.g. positionAndEpoch) or to rely on the user to get
the epoch from the fetched records. We were leaning toward the latter, but
I admit it was not fully satisfying. In this case, Connect would need to
track the last consumed offsets manually instead of relying on the
consumer. We also considered adding a convenience method to ConsumerRecords
to get the offset to commit for all fetched partitions. This makes the
additional bookkeeping pretty minimal. What do you think?

55. I clarified in the KIP. I was mainly thinking of situations where a
previously valid offset becomes out of range.

56. Yeah, that's a bit annoying. I decided to keep LeaderEpoch as it is and
use CurrentLeaderEpoch for both OffsetForLeaderEpoch and the Fetch APIs. I
think Dong suggested this previously as well.

57. We could, but I'm not sure there's a strong reason to do so. I was
thinking we would leave it around for convenience, but let me know if you
think we should do otherwise.


Thanks,
Jason


On Fri, Aug 3, 2018 at 4:49 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Jason,
>
> Thanks for the updated KIP. Well thought-through. Just a few minor comments
> below.
>
> 50. For seek(TopicPartition partition, OffsetAndMetadata offset), I guess
> under the cover, it will make OffsetsForLeaderEpoch request to determine if
> the seeked offset is still valid before fetching? If so, it will be useful
> document this in the wiki.
>
> 51. Similarly, if the consumer fetch request gets FENCED_LEADER_EPOCH, I
> guess the consumer will also make OffsetsForLeaderEpoch request to
> determine if the last consumed offset is still valid before fetching? If
> so, it will be useful document this in the wiki.
>
> 52. "If the consumer seeks to the middle of the log, for example, then we
> will use the sentinel value -1 and the leader will skip the epoch
> validation. " Is this true? If the consumer seeks using seek(TopicPartition
> partition, OffsetAndMetadata offset) and the seeked offset is valid, the
> consumer can/should use the leaderEpoch in the cached metadata for
> fetching?
>
> 53. OffsetAndMetadata. For backward compatibility, we need to support
> constructing OffsetAndMetadata without providing leaderEpoch. Could we
> define the default value of leaderEpoch if not provided and the semantics
> of that (e.g., skipping the epoch validation)?
>
> 54. I saw the following code in WorkerSinkTask in Connect. It saves the
> offset obtained through position(), which can be committed latter. Since
> position() doesn't return the leaderEpoch, this can lead to committed
> offset without leaderEpoch. Not sure how common this usage is, but what's
> the recommendation for such users?
>
> private class HandleRebalance implements ConsumerRebalanceListener {
>     @Override
>     public void onPartitionsAssigned(Collection<TopicPartition>
> partitions) {
>         log.debug("{} Partitions assigned {}", WorkerSinkTask.this,
> partitions);
>         lastCommittedOffsets = new HashMap<>();
>         currentOffsets = new HashMap<>();
>         for (TopicPartition tp : partitions) {
>             long pos = consumer.position(tp);
>             lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
>
> 55. "With this KIP, the only case in which this is possible is if the
> consumer fetches from an offset earlier than the log start offset." Is that
> true? I guess a user could seek to a large offset without providing
> leaderEpoch, which can cause the offset to be larger than the log end
> offset during fetch?
>
> 56. In the schema for OffsetForLeaderEpochRequest, LeaderEpoch seems to be
> an existing field. Is LeaderEpochQuery the new field? The name is not very
> intuitive. It will be useful to document its meaning.
>
> 57. Should we deprecate the following api?
> void seek(TopicPartition partition, long offset);
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 3, 2018 at 9:32 AM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hey All,
> >
> > I think I've addressed all pending review. If there is no additional
> > feedback, I'll plan to start a vote thread next week.
> >
> > Thanks,
> > Jason
> >
> > On Tue, Jul 31, 2018 at 9:46 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jason,
> > >
> > > Thanks for your reply. I will comment below.
> > >
> > > Regarding 1, we probably can not simply rename both to `LeaderEpoch`
> > > because we already have a LeaderEpoch field in OffsetsForLeaderEpoch.
> > >
> > > Regarding 5, I am not strong on this. I agree with the two benefits of
> > > having two error codes: 1) not having to refresh metadata when consumer
> > > sees UNKNOWN_LEADER_EPOCH and 2) provide more information in the log
> for
> > > debugging. Whether or not these two benefits are useful enough for one
> > more
> > > error code may be subjective. I will let you and others determine this.
> > >
> > > Regarding 6, yeah overloading seek() looks good to me.
> > >
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Mon, Jul 30, 2018 at 9:33 AM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Dong,
> > > >
> > > > Thanks for the detailed review. Responses below:
> > > >
> > > > 1/2: Thanks for noticing the inconsistency. Would it be reasonable to
> > > > simply call it LeaderEpoch for both APIs?
> > > >
> > > > 3: I agree it should be a map. I will update.
> > > >
> > > > 4: Fair point. I think we should always be able to identify an
> offset.
> > > > Let's remove the Optional for now and reconsider if we find an
> > unhandled
> > > > case during implementation.
> > > >
> > > > 5: Yeah, I was thinking about this. The two error codes could be
> > handled
> > > > similarly, so we might merge them. Mainly I was thinking that it will
> > be
> > > > useful for consumers/replicas to know whether they are ahead or
> behind
> > > the
> > > > leader. For example, if a consumer sees UNKNOWN_LEADER_EPOCH, it need
> > not
> > > > refresh metadata. Or if a replica sees a FENCED_LEADER_EPOCH error,
> it
> > > > could just stop fetching and await the LeaderAndIsr request that it
> is
> > > > missing. It probably also makes debugging a little bit easier. I
> guess
> > > I'm
> > > > a bit inclined to keep both error codes, but I'm open to
> > reconsideration
> > > if
> > > > you feel strongly. Another point to consider is whether we should
> > > continue
> > > > using NOT_LEADER_FOR_PARTITION if a follower receives an unexpected
> > > fetch.
> > > > The leader epoch would be different in this case so we could use one
> of
> > > the
> > > > invalid epoch error codes instead since they contain more
> information.
> > > >
> > > > 6: I agree the name is not ideal in that scenario. What if we
> > overloaded
> > > > `seek`?
> > > >
> > > > 7: Sure, I will mention this.
> > > >
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Fri, Jul 27, 2018 at 6:17 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Jason,
> > > > >
> > > > > Thanks for the update! I agree with the current proposal overall. I
> > > have
> > > > > some minor comments related to naming etc.
> > > > >
> > > > > 1) I am not strong and will just leave it here for discussion.
> Would
> > it
> > > > be
> > > > > better to rename "CurrentLeaderEpoch" to "ExpectedLeaderEpoch" for
> > the
> > > > new
> > > > > field in the OffsetsForLeaderEpochRequest? The reason is that
> > > > > "CurrentLeaderEpoch" may not necessarily be true current leader
> epoch
> > > if
> > > > > the consumer has stale metadata. "ExpectedLeaderEpoch" shows that
> > this
> > > > > epoch is what consumer expects on the broker which may or may not
> be
> > > the
> > > > > true value.
> > > > >
> > > > > 2) Currently we add the field "LeaderEpoch" to FetchRequest and the
> > > field
> > > > > "CurrentLeaderEpoch" to OffsetsForLeaderEpochRequest. Given that
> both
> > > > > fields are compared with the leaderEpoch in the broker, would it be
> > > > better
> > > > > to give them the same name?
> > > > >
> > > > > 3) Currently LogTruncationException.truncationOffset() returns
> > > > > Optional<OffsetAndMetadata> to user. Should it return
> > > > > Optional<Map<TopicPartition, OffsetAndMetadata>> to handle the
> > scenario
> > > > > where leaderEpoch of multiple partitions are different from the
> > > > leaderEpoch
> > > > > in the broker?
> > > > >
> > > > > 4) Currently LogTruncationException.truncationOffset() returns an
> > > > Optional
> > > > > value. Could you explain a bit more when it will return
> > > > Optional.empty()? I
> > > > > am trying to understand whether it is simpler and reasonable to
> > > > > replace Optional.empty()
> > > > > with OffsetMetadata(offset=last_fetched_offset, leaderEpoch=-1).
> > > > >
> > > > > 5) Do we also need to add a new retriable exception for error code
> > > > > FENCED_LEADER_EPOCH? And do we need to define both
> > FENCED_LEADER_EPOCH
> > > > > and UNKNOWN_LEADER_EPOCH.
> > > > > It seems that the current KIP uses these two error codes in the
> same
> > > way
> > > > > and the exception for these two error codes is not exposed to the
> > user.
> > > > > Maybe we should combine them into one error, e.g.
> > INVALID_LEADER_EPOCH?
> > > > >
> > > > > 6) For users who has turned off auto offset reset, when
> > consumer.poll()
> > > > > throw LogTruncationException, it seems that user will most likely
> > call
> > > > > seekToCommitted(offset,
> > > > > leaderEpoch) where offset and leaderEpoch are obtained from
> > > > > LogTruncationException.truncationOffset(). In this case, the
> offset
> > > used
> > > > > here is not committed, which is inconsistent from the method name
> > > > > seekToCommitted(...). Would it be better to rename the method to
> e.g.
> > > > > seekToLastConsumedMessage()?
> > > > >
> > > > > 7) Per point 3 in Jun's comment, would it be useful to explicitly
> > > specify
> > > > > in the KIP that we will log the truncation event if user has turned
> > on
> > > > auto
> > > > > offset reset policy?
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Fri, Jul 27, 2018 at 12:39 PM, Jason Gustafson <
> > ja...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Thanks Anna, you are right on both points. I updated the KIP.
> > > > > >
> > > > > > -Jason
> > > > > >
> > > > > > On Thu, Jul 26, 2018 at 2:08 PM, Anna Povzner <a...@confluent.io
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Jason,
> > > > > > >
> > > > > > > Thanks for the update. I agree with the current proposal.
> > > > > > >
> > > > > > > Two minor comments:
> > > > > > > 1) In “API Changes” section, first paragraph says that “users
> can
> > > > catch
> > > > > > the
> > > > > > > more specific exception type and use the new `seekToNearest()`
> > API
> > > > > > defined
> > > > > > > below.”. Since LogTruncationException “will include the
> > partitions
> > > > that
> > > > > > > were truncated and the offset of divergence”., shouldn’t the
> > client
> > > > use
> > > > > > > seek(offset) to seek to the offset of divergence in response to
> > the
> > > > > > > exception?
> > > > > > > 2) In “Protocol Changes” section, OffsetsForLeaderEpoch
> > subsection
> > > > says
> > > > > > > “Note
> > > > > > > that consumers will send a sentinel value (-1) for the current
> > > epoch
> > > > > and
> > > > > > > the broker will simply disregard that validation.”. Is that
> still
> > > > true
> > > > > > with
> > > > > > > MetadataResponse containing leader epoch?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Anna
> > > > > > >
> > > > > > > On Wed, Jul 25, 2018 at 1:44 PM Jason Gustafson <
> > > ja...@confluent.io>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi All,
> > > > > > > >
> > > > > > > > I have made some updates to the KIP. As many of you know, a
> > side
> > > > > > project
> > > > > > > of
> > > > > > > > mine has been specifying the Kafka replication protocol in
> TLA.
> > > You
> > > > > can
> > > > > > > > check out the code here if you are interested:
> > > > > > > > https://github.com/hachikuji/kafka-specification. In
> addition
> > to
> > > > > > > > uncovering
> > > > > > > > a couple unknown bugs in the replication protocol (e.g.
> > > > > > > > https://issues.apache.org/jira/browse/KAFKA-7128), this has
> > > helped
> > > > > me
> > > > > > > > validate the behavior in this KIP. In fact, the original
> > version
> > > I
> > > > > > > proposed
> > > > > > > > had a weakness. I initially suggested letting the leader
> > validate
> > > > the
> > > > > > > > expected epoch at the fetch offset. This made sense for the
> > > > consumer
> > > > > in
> > > > > > > the
> > > > > > > > handling of unclean leader election, but it was not strong
> > enough
> > > > to
> > > > > > > > protect the follower in all cases. In order to make
> advancement
> > > of
> > > > > the
> > > > > > > high
> > > > > > > > watermark safe, for example, the leader actually needs to be
> > sure
> > > > > that
> > > > > > > > every follower in the ISR matches its own epoch.
> > > > > > > >
> > > > > > > > I attempted to fix this problem by treating the epoch in the
> > > fetch
> > > > > > > request
> > > > > > > > slightly differently for consumers and followers. For
> > consumers,
> > > it
> > > > > > would
> > > > > > > > be the expected epoch of the record at the fetch offset, and
> > the
> > > > > leader
> > > > > > > > would raise a LOG_TRUNCATION error if the expectation failed.
> > For
> > > > > > > > followers, it would be the current epoch and the leader would
> > > > require
> > > > > > > that
> > > > > > > > it match its own epoch. This was unsatisfying both because of
> > the
> > > > > > > > inconsistency in behavior and because the consumer was left
> > with
> > > > the
> > > > > > > weaker
> > > > > > > > fencing that we already knew was insufficient for the
> replicas.
> > > > > > > Ultimately
> > > > > > > > I decided that we should make the behavior consistent and
> that
> > > > meant
> > > > > > that
> > > > > > > > the consumer needed to act more like a following replica.
> > Instead
> > > > of
> > > > > > > > checking for truncation while fetching, the consumer should
> > check
> > > > for
> > > > > > > > truncation after leader changes. After checking for
> truncation,
> > > the
> > > > > > > > consumer can then use the current epoch when fetching and get
> > the
> > > > > > > stronger
> > > > > > > > protection that it provides. What this means is that the
> > Metadata
> > > > API
> > > > > > > must
> > > > > > > > include the current leader epoch. Given the problems we have
> > had
> > > > > around
> > > > > > > > stale metadata and how challenging they have been to debug,
> I'm
> > > > > > convinced
> > > > > > > > that this is a good idea in any case and it resolves the
> > > > inconsistent
> > > > > > > > behavior in the Fetch API. The downside is that there will be
> > > some
> > > > > > > > additional overhead upon leader changes, but I don't think it
> > is
> > > a
> > > > > > major
> > > > > > > > concern since leader changes are rare and the
> > > OffsetForLeaderEpoch
> > > > > > > request
> > > > > > > > is cheap.
> > > > > > > >
> > > > > > > > This approach leaves the door open for some interesting
> follow
> > up
> > > > > > > > improvements. For example, now that we have the leader epoch
> in
> > > the
> > > > > > > > Metadata request, we can implement similar fencing for the
> > > Produce
> > > > > API.
> > > > > > > And
> > > > > > > > now that the consumer can reason about truncation, we could
> > > > consider
> > > > > > > having
> > > > > > > > a configuration to expose records beyond the high watermark.
> > This
> > > > > would
> > > > > > > let
> > > > > > > > users trade lower end-to-end latency for weaker durability
> > > > semantics.
> > > > > > It
> > > > > > > is
> > > > > > > > sort of like having an acks=0 option for the consumer.
> Neither
> > of
> > > > > these
> > > > > > > > options are included in this KIP, I am just mentioning them
> as
> > > > > > potential
> > > > > > > > work for the future.
> > > > > > > >
> > > > > > > > Finally, based on the discussion in this thread, I have added
> > the
> > > > > > > > seekToCommitted API for the consumer. Please take a look and
> > let
> > > me
> > > > > > know
> > > > > > > > what you think.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Jason
> > > > > > > >
> > > > > > > > On Fri, Jul 20, 2018 at 2:34 PM, Guozhang Wang <
> > > wangg...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jason,
> > > > > > > > >
> > > > > > > > > The proposed API seems reasonable to me too. Could you
> please
> > > > also
> > > > > > > update
> > > > > > > > > the wiki page (
> > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > > > > > > 320%3A+Allow+fetchers+to+detect+and+handle+log+truncation)
> > > > > > > > > with a section say "workflow" on how the proposed API will
> be
> > > > > co-used
> > > > > > > > with
> > > > > > > > > others to:
> > > > > > > > >
> > > > > > > > > 1. consumer callers handling a LogTruncationException.
> > > > > > > > > 2. consumer internals for handling a retriable
> > > > > > > > UnknownLeaderEpochException.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Jul 17, 2018 at 10:23 AM, Anna Povzner <
> > > > a...@confluent.io>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jason,
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I also like your proposal and agree that
> > > > > > > > KafkaConsumer#seekToCommitted()
> > > > > > > > > > is
> > > > > > > > > > more intuitive as a way to initialize both consumer's
> > > position
> > > > > and
> > > > > > > its
> > > > > > > > > > fetch state.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > My understanding that KafkaConsumer#seekToCommitted() is
> > > > purely
> > > > > > for
> > > > > > > > > > clients
> > > > > > > > > > who store their offsets externally, right? And we are
> still
> > > > going
> > > > > > to
> > > > > > > > > > add KafkaConsumer#findOffsets()
> > > > > > > > > > in this KIP as we discussed, so that the client can
> handle
> > > > > > > > > > LogTruncationException?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Anna
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Jul 12, 2018 at 3:57 PM Dong Lin <
> > > lindon...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Jason,
> > > > > > > > > > >
> > > > > > > > > > > It is a great summary. The solution sounds good. I
> might
> > > have
> > > > > > minor
> > > > > > > > > > > comments regarding the method name. But we can discuss
> > that
> > > > > minor
> > > > > > > > > points
> > > > > > > > > > > later after we reach consensus on the high level API.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Dong
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Jul 12, 2018 at 11:41 AM, Jason Gustafson <
> > > > > > > > ja...@confluent.io>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hey Anna and Dong,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks a lot for the great discussion. I've been
> > hanging
> > > > > back a
> > > > > > > bit
> > > > > > > > > > > because
> > > > > > > > > > > > honestly the best option hasn't seemed clear. I agree
> > > with
> > > > > > Anna's
> > > > > > > > > > general
> > > > > > > > > > > > observation that there is a distinction between the
> > > > position
> > > > > of
> > > > > > > the
> > > > > > > > > > > > consumer and its fetch state up to that position. If
> > you
> > > > > think
> > > > > > > > about
> > > > > > > > > > it,
> > > > > > > > > > > a
> > > > > > > > > > > > committed offset actually represents both of these.
> The
> > > > > > metadata
> > > > > > > is
> > > > > > > > > > used
> > > > > > > > > > > to
> > > > > > > > > > > > initialize the state of the consumer application and
> > the
> > > > > offset
> > > > > > > > > > > initializes
> > > > > > > > > > > > the position. Additionally, we are extending the
> offset
> > > > > commit
> > > > > > in
> > > > > > > > > this
> > > > > > > > > > > KIP
> > > > > > > > > > > > to also include the last epoch fetched by the
> consumer,
> > > > which
> > > > > > is
> > > > > > > > used
> > > > > > > > > > to
> > > > > > > > > > > > initialize the internal fetch state. Of course if you
> > do
> > > an
> > > > > > > > arbitrary
> > > > > > > > > > > > `seek` and immediately commit offsets, then there
> won't
> > > be
> > > > a
> > > > > > last
> > > > > > > > > epoch
> > > > > > > > > > > to
> > > > > > > > > > > > commit. This seems intuitive since there is no fetch
> > > state
> > > > in
> > > > > > > this
> > > > > > > > > > case.
> > > > > > > > > > > We
> > > > > > > > > > > > only commit fetch state when we have it.
> > > > > > > > > > > >
> > > > > > > > > > > > So if we think about a committed offset as
> initializing
> > > > both
> > > > > > the
> > > > > > > > > > > consumer's
> > > > > > > > > > > > position and its fetch state, then the gap in the API
> > is
> > > > > > > evidently
> > > > > > > > > that
> > > > > > > > > > > we
> > > > > > > > > > > > don't have a way to initialize the consumer to a
> > > committed
> > > > > > > offset.
> > > > > > > > We
> > > > > > > > > > do
> > > > > > > > > > > it
> > > > > > > > > > > > implicitly of course for offsets stored in Kafka, but
> > > since
> > > > > > > > external
> > > > > > > > > > > > storage is a use case we support, then we should have
> > an
> > > > > > explicit
> > > > > > > > API
> > > > > > > > > > as
> > > > > > > > > > > > well. Perhaps something like this:
> > > > > > > > > > > >
> > > > > > > > > > > > seekToCommitted(TopicPartition, OffsetAndMetadata)
> > > > > > > > > > > >
> > > > > > > > > > > > In this KIP, we are proposing to allow the
> > > > > `OffsetAndMetadata`
> > > > > > > > object
> > > > > > > > > > to
> > > > > > > > > > > > include the leader epoch, so I think this would have
> > the
> > > > same
> > > > > > > > effect
> > > > > > > > > as
> > > > > > > > > > > > Anna's suggested `seekToRecord`. But perhaps it is a
> > more
> > > > > > natural
> > > > > > > > fit
> > > > > > > > > > > given
> > > > > > > > > > > > the current API? Furthermore, if we find a need for
> > > > > additional
> > > > > > > > > metadata
> > > > > > > > > > > in
> > > > > > > > > > > > the offset commit API in the future, then we will
> just
> > > need
> > > > > to
> > > > > > > > modify
> > > > > > > > > > the
> > > > > > > > > > > > `OffsetAndMetadata` object and we will not need a new
> > > > `seek`
> > > > > > API.
> > > > > > > > > > > >
> > > > > > > > > > > > With this approach, I think then we can leave the
> > > > `position`
> > > > > > API
> > > > > > > as
> > > > > > > > > it
> > > > > > > > > > > is.
> > > > > > > > > > > > The position of the consumer is still just the next
> > > > expected
> > > > > > > fetch
> > > > > > > > > > > offset.
> > > > > > > > > > > > If a user needs to record additional state based on
> > > > previous
> > > > > > > fetch
> > > > > > > > > > > > progress, then they would use the result of the
> > previous
> > > > > fetch
> > > > > > to
> > > > > > > > > > obtain
> > > > > > > > > > > > it. This makes the dependence on fetch progress
> > > explicit. I
> > > > > > think
> > > > > > > > we
> > > > > > > > > > > could
> > > > > > > > > > > > make this a little more convenience with a helper in
> > the
> > > > > > > > > > > `ConsumerRecords`
> > > > > > > > > > > > object, but I think that's more of a nice-to-have.
> > > > > > > > > > > >
> > > > > > > > > > > > Thoughts?
> > > > > > > > > > > >
> > > > > > > > > > > > By the way, I have been iterating a little bit on the
> > > > replica
> > > > > > > side
> > > > > > > > of
> > > > > > > > > > > this
> > > > > > > > > > > > KIP. My initial proposal in fact did not have strong
> > > enough
> > > > > > > fencing
> > > > > > > > > to
> > > > > > > > > > > > protect all of the edge cases. I believe the current
> > > > proposal
> > > > > > > fixes
> > > > > > > > > the
> > > > > > > > > > > > problems, but I am still verifying the model.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Jason
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Jul 11, 2018 at 10:45 AM, Dong Lin <
> > > > > > lindon...@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hey Anna,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks much for the explanation. Approach 1 also
> > sounds
> > > > > good
> > > > > > to
> > > > > > > > > me. I
> > > > > > > > > > > > think
> > > > > > > > > > > > > findOffsets() is useful for users who don't use
> > > automatic
> > > > > > > offset
> > > > > > > > > > reset
> > > > > > > > > > > > > policy.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Just one more question. Since users who store
> offsets
> > > > > > > externally
> > > > > > > > > need
> > > > > > > > > > > to
> > > > > > > > > > > > > provide leaderEpoch to findOffsets(...), do we need
> > an
> > > > > extra
> > > > > > > API
> > > > > > > > > for
> > > > > > > > > > > user
> > > > > > > > > > > > > to get both offset and leaderEpoch, e.g.
> > > > recordPosition()?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Dong
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Jul 11, 2018 at 10:12 AM, Anna Povzner <
> > > > > > > > a...@confluent.io>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Dong,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What I called “not covering all use cases” is
> what
> > > you
> > > > > call
> > > > > > > > > > > best-effort
> > > > > > > > > > > > > > (not guaranteeing some corner cases). I think we
> > are
> > > on
> > > > > the
> > > > > > > > same
> > > > > > > > > > page
> > > > > > > > > > > > > here.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I wanted to be clear in the API whether the
> > consumer
> > > > > seeks
> > > > > > > to a
> > > > > > > > > > > > position
> > > > > > > > > > > > > > (offset) or to a record (offset, leader epoch).
> The
> > > > only
> > > > > > > > use-case
> > > > > > > > > > of
> > > > > > > > > > > > > > seeking to a record is seeking to a committed
> > offset
> > > > for
> > > > > a
> > > > > > > user
> > > > > > > > > who
> > > > > > > > > > > > > stores
> > > > > > > > > > > > > > committed offsets externally. (Unless users find
> > some
> > > > > other
> > > > > > > > > reason
> > > > > > > > > > to
> > > > > > > > > > > > > seek
> > > > > > > > > > > > > > to a record.) I thought it was possible to
> provide
> > > this
> > > > > > > > > > functionality
> > > > > > > > > > > > > with
> > > > > > > > > > > > > > findOffset(offset, leader epoch) followed by a
> > > > > > seek(offset).
> > > > > > > > > > However,
> > > > > > > > > > > > you
> > > > > > > > > > > > > > are right that this will not handle the race
> > > condition
> > > > > > where
> > > > > > > > > > > > > non-divergent
> > > > > > > > > > > > > > offset found by findOffset() could change again
> > > before
> > > > > the
> > > > > > > > > consumer
> > > > > > > > > > > > does
> > > > > > > > > > > > > > the first fetch.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regarding position() — if we add position that
> > > returns
> > > > > > > (offset,
> > > > > > > > > > > leader
> > > > > > > > > > > > > > epoch), this is specifically a position after a
> > > record
> > > > > that
> > > > > > > was
> > > > > > > > > > > > actually
> > > > > > > > > > > > > > consumed or position of a committed record. In
> > which
> > > > > case,
> > > > > > I
> > > > > > > > > still
> > > > > > > > > > > > think
> > > > > > > > > > > > > > it’s cleaner to get a record position of consumed
> > > > message
> > > > > > > from
> > > > > > > > a
> > > > > > > > > > new
> > > > > > > > > > > > > helper
> > > > > > > > > > > > > > method in ConsumerRecords() or from committed
> > > offsets.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I think all the use-cases could be then covered
> > with:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > (Approach 1)
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > seekToRecord(offset, leaderEpoch) — this will
> just
> > > > > > > > initialize/set
> > > > > > > > > > the
> > > > > > > > > > > > > > consumer state;
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > findOffsets(offset, leaderEpoch) returns {offset,
> > > > > > > leaderEpoch}
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > If we agree that the race condition is also a
> > corner
> > > > > case,
> > > > > > > > then I
> > > > > > > > > > > think
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > can cover use-cases with:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > (Approach 2)
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > findOffsets(offset, leaderEpoch) returns offset —
> > we
> > > > > still
> > > > > > > want
> > > > > > > > > > > leader
> > > > > > > > > > > > > > epoch as a parameter for the users who store
> their
> > > > > > committed
> > > > > > > > > > offsets
> > > > > > > > > > > > > > externally.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I am actually now leaning more to approach 1,
> since
> > > it
> > > > is
> > > > > > > more
> > > > > > > > > > > > explicit,
> > > > > > > > > > > > > > and maybe there are more use cases for it.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Anna
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:47 PM Dong Lin <
> > > > > > > lindon...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hey Anna,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for the comment. To answer your
> question,
> > it
> > > > > seems
> > > > > > > > that
> > > > > > > > > we
> > > > > > > > > > > can
> > > > > > > > > > > > > > cover
> > > > > > > > > > > > > > > all case in this KIP. As stated in "Consumer
> > > > Handling"
> > > > > > > > section,
> > > > > > > > > > > > KIP-101
> > > > > > > > > > > > > > > based approach will be used to derive the
> > > truncation
> > > > > > offset
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > > > > 2-tuple (offset, leaderEpoch). This approach is
> > > best
> > > > > > effort
> > > > > > > > and
> > > > > > > > > > it
> > > > > > > > > > > is
> > > > > > > > > > > > > > > inaccurate only in very rare scenarios (as
> > > described
> > > > in
> > > > > > > > > KIP-279).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > By using seek(offset, leaderEpoch), consumer
> will
> > > > still
> > > > > > be
> > > > > > > > able
> > > > > > > > > > to
> > > > > > > > > > > > > follow
> > > > > > > > > > > > > > > this best-effort approach to detect log
> > truncation
> > > > and
> > > > > > > > > determine
> > > > > > > > > > > the
> > > > > > > > > > > > > > > truncation offset. On the other hand, if we use
> > > > > > > seek(offset),
> > > > > > > > > > > > consumer
> > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > not detect log truncation in some cases which
> > > weakens
> > > > > the
> > > > > > > > > > guarantee
> > > > > > > > > > > > of
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > KIP. Does this make sense?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Dong
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 3:14 PM, Anna Povzner <
> > > > > > > > > a...@confluent.io
> > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Sorry, I hit "send" before finishing.
> > > Continuing...
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2) Hiding most of the consumer handling log
> > > > > truncation
> > > > > > > > logic
> > > > > > > > > > with
> > > > > > > > > > > > > > minimal
> > > > > > > > > > > > > > > > exposure in KafkaConsumer API.  I was
> proposing
> > > > this
> > > > > > > path.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Before answering your specific questions… I
> > want
> > > to
> > > > > > > answer
> > > > > > > > to
> > > > > > > > > > > your
> > > > > > > > > > > > > > > comment
> > > > > > > > > > > > > > > > “In general, maybe we should discuss the
> final
> > > > > solution
> > > > > > > > that
> > > > > > > > > > > covers
> > > > > > > > > > > > > all
> > > > > > > > > > > > > > > > cases?”. With current KIP, we don’t cover all
> > > cases
> > > > > of
> > > > > > > > > consumer
> > > > > > > > > > > > > > detecting
> > > > > > > > > > > > > > > > log truncation because the KIP proposes a
> > leader
> > > > > epoch
> > > > > > > > cache
> > > > > > > > > in
> > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > that does not persist across restarts. Plus,
> we
> > > > only
> > > > > > > store
> > > > > > > > > last
> > > > > > > > > > > > > > committed
> > > > > > > > > > > > > > > > offset (either internally or users can store
> > > > > > externally).
> > > > > > > > > This
> > > > > > > > > > > has
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > > limitation that the consumer will not always
> be
> > > > able
> > > > > to
> > > > > > > > find
> > > > > > > > > > > point
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > > truncation just because we have a limited
> > history
> > > > > (just
> > > > > > > one
> > > > > > > > > > data
> > > > > > > > > > > > > > point).
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > So, maybe we should first agree on whether we
> > > > accept
> > > > > > that
> > > > > > > > > > storing
> > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > committed offset/leader epoch has a
> limitation
> > > that
> > > > > the
> > > > > > > > > > consumer
> > > > > > > > > > > > will
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > be able to detect log truncation in all
> cases?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Anna
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 2:20 PM Anna Povzner
> <
> > > > > > > > > > a...@confluent.io>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi Dong,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for the follow up! I finally have
> much
> > > > more
> > > > > > > clear
> > > > > > > > > > > > > > understanding
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > where you are coming from.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > You are right. The success of
> > > > > findOffsets()/finding a
> > > > > > > > point
> > > > > > > > > > of
> > > > > > > > > > > > > > > > > non-divergence depends on whether we have
> > > enough
> > > > > > > entries
> > > > > > > > in
> > > > > > > > > > the
> > > > > > > > > > > > > > > > consumer's
> > > > > > > > > > > > > > > > > leader epoch cache. However, I think this
> is
> > a
> > > > > > > > fundamental
> > > > > > > > > > > > > limitation
> > > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > > having a leader epoch cache that does not
> > > persist
> > > > > > > across
> > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > restarts.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > If we consider the general case where
> > consumer
> > > > may
> > > > > or
> > > > > > > may
> > > > > > > > > not
> > > > > > > > > > > > have
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > cache, then I see two paths:
> > > > > > > > > > > > > > > > > 1) Letting the user to track the leader
> epoch
> > > > > history
> > > > > > > > > > > externally,
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > more exposure to leader epoch and finding
> > point
> > > > of
> > > > > > > > > > > non-divergence
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > KafkaConsumer API. I understand this is the
> > > case
> > > > > you
> > > > > > > were
> > > > > > > > > > > talking
> > > > > > > > > > > > > > > about.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Tue, Jul 10, 2018 at 12:16 PM Dong Lin <
> > > > > > > > > > lindon...@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >> Hey Anna,
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> Thanks much for your detailed explanation
> > and
> > > > > > example!
> > > > > > > > It
> > > > > > > > > > does
> > > > > > > > > > > > > help
> > > > > > > > > > > > > > me
> > > > > > > > > > > > > > > > >> understand the difference between our
> > > > > understanding.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> So it seems that the solution based on
> > > > > findOffsets()
> > > > > > > > > > currently
> > > > > > > > > > > > > > focuses
> > > > > > > > > > > > > > > > >> mainly on the scenario that consumer has
> > > cached
> > > > > > > > > leaderEpoch
> > > > > > > > > > ->
> > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > >> mapping whereas I was thinking about the
> > > general
> > > > > > case
> > > > > > > > > where
> > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > may
> > > > > > > > > > > > > > > > >> or
> > > > > > > > > > > > > > > > >> may not have this cache. I guess that is
> why
> > > we
> > > > > have
> > > > > > > > > > different
> > > > > > > > > > > > > > > > >> understanding here. I have some comments
> > > below.
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> 3) The proposed solution using
> > > > findOffsets(offset,
> > > > > > > > > > > leaderEpoch)
> > > > > > > > > > > > > > > followed
> > > > > > > > > > > > > > > > >> by
> > > > > > > > > > > > > > > > >> seek(offset) works if consumer has the
> > cached
> > > > > > > > leaderEpoch
> > > > > > > > > ->
> > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > >> mapping. But if we assume consumer has
> this
> > > > cache,
> > > > > > do
> > > > > > > we
> > > > > > > > > > need
> > > > > > > > > > > to
> > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > >> leaderEpoch in the findOffsets(...)?
> > > > Intuitively,
> > > > > > the
> > > > > > > > > > > > > > > > findOffsets(offset)
> > > > > > > > > > > > > > > > >> can also derive the leaderEpoch using
> offset
> > > > just
> > > > > > like
> > > > > > > > the
> > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > >> solution does with seek(offset).
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> 4) If consumer does not have cached
> > > leaderEpoch
> > > > ->
> > > > > > > > offset
> > > > > > > > > > > > mapping,
> > > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > >> is
> > > > > > > > > > > > > > > > >> the case if consumer is restarted on a new
> > > > > machine,
> > > > > > > then
> > > > > > > > > it
> > > > > > > > > > is
> > > > > > > > > > > > not
> > > > > > > > > > > > > > > clear
> > > > > > > > > > > > > > > > >> what leaderEpoch would be included in the
> > > > > > FetchRequest
> > > > > > > > if
> > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > does
> > > > > > > > > > > > > > > > >> seek(offset). This is the case that
> > motivates
> > > > the
> > > > > > > first
> > > > > > > > > > > question
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> previous email. In general, maybe we
> should
> > > > > discuss
> > > > > > > the
> > > > > > > > > > final
> > > > > > > > > > > > > > solution
> > > > > > > > > > > > > > > > >> that
> > > > > > > > > > > > > > > > >> covers all cases?
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> 5) The second question in my previous
> email
> > is
> > > > > > related
> > > > > > > > to
> > > > > > > > > > the
> > > > > > > > > > > > > > > following
> > > > > > > > > > > > > > > > >> paragraph:
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> "... In some cases, offsets returned from
> > > > > position()
> > > > > > > > could
> > > > > > > > > > be
> > > > > > > > > > > > > actual
> > > > > > > > > > > > > > > > >> consumed messages by this consumer
> > identified
> > > by
> > > > > > > > {offset,
> > > > > > > > > > > leader
> > > > > > > > > > > > > > > epoch}.
> > > > > > > > > > > > > > > > >> In
> > > > > > > > > > > > > > > > >> other cases, position() returns offset
> that
> > > was
> > > > > not
> > > > > > > > > actually
> > > > > > > > > > > > > > consumed.
> > > > > > > > > > > > > > > > >> Suppose, the user calls position() for the
> > > last
> > > > > > > > > offset...".
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> I guess my point is that, if user calls
> > > > position()
> > > > > > for
> > > > > > > > the
> > > > > > > > > > > last
> > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > >> uses that offset in seek(...), then user
> can
> > > > > > probably
> > > > > > > > just
> > > > > > > > > > > call
> > > > > > > > > > > > > > > > >> Consumer#seekToEnd() without calling
> > > position()
> > > > > and
> > > > > > > > > > seek(...).
> > > > > > > > > > > > > > > Similarly
> > > > > > > > > > > > > > > > >> user can call Consumer#seekToBeginning()
> to
> > > the
> > > > > seek
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > > earliest
> > > > > > > > > > > > > > > > >> position without calling position() and
> > > > seek(...).
> > > > > > > Thus
> > > > > > > > > > > > position()
> > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > >> needs to return the actual consumed
> messages
> > > > > > > identified
> > > > > > > > by
> > > > > > > > > > > > > {offset,
> > > > > > > > > > > > > > > > leader
> > > > > > > > > > > > > > > > >> epoch}. Does this make sense?
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > > > >> Dong
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> On Mon, Jul 9, 2018 at 6:47 PM, Anna
> > Povzner <
> > > > > > > > > > > a...@confluent.io
> > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > > > >> > Hi Dong,
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Thanks for considering my suggestions.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Based on your comments, I realized that
> my
> > > > > > > suggestion
> > > > > > > > > was
> > > > > > > > > > > not
> > > > > > > > > > > > > > > complete
> > > > > > > > > > > > > > > > >> with
> > > > > > > > > > > > > > > > >> > regard to KafkaConsumer API vs.
> > > > consumer-broker
> > > > > > > > > protocol.
> > > > > > > > > > > > While
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > >> propose
> > > > > > > > > > > > > > > > >> > to keep KafkaConsumer#seek() unchanged
> and
> > > > take
> > > > > > > offset
> > > > > > > > > > only,
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> underlying
> > > > > > > > > > > > > > > > >> > consumer will send the next
> FetchRequest()
> > > to
> > > > > > broker
> > > > > > > > > with
> > > > > > > > > > > > offset
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > >> > leaderEpoch if it is known (based on
> > leader
> > > > > epoch
> > > > > > > > cache
> > > > > > > > > in
> > > > > > > > > > > > > > > consumer) —
> > > > > > > > > > > > > > > > >> note
> > > > > > > > > > > > > > > > >> > that this is different from the current
> > KIP,
> > > > > which
> > > > > > > > > > suggests
> > > > > > > > > > > to
> > > > > > > > > > > > > > > always
> > > > > > > > > > > > > > > > >> send
> > > > > > > > > > > > > > > > >> > unknown leader epoch after seek(). This
> > way,
> > > > if
> > > > > > the
> > > > > > > > > > consumer
> > > > > > > > > > > > > and a
> > > > > > > > > > > > > > > > >> broker
> > > > > > > > > > > > > > > > >> > agreed on the point of non-divergence,
> > which
> > > > is
> > > > > > some
> > > > > > > > > > > {offset,
> > > > > > > > > > > > > > > > >> leaderEpoch}
> > > > > > > > > > > > > > > > >> > pair, the new leader which causes
> another
> > > > > > truncation
> > > > > > > > > (even
> > > > > > > > > > > > > further
> > > > > > > > > > > > > > > > back)
> > > > > > > > > > > > > > > > >> > will be able to detect new divergence
> and
> > > > > restart
> > > > > > > the
> > > > > > > > > > > process
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > > finding
> > > > > > > > > > > > > > > > >> > the new point of non-divergence. So, to
> > > answer
> > > > > > your
> > > > > > > > > > > question,
> > > > > > > > > > > > If
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > truncation happens just after the user
> > calls
> > > > > > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offset,
> > > > leaderEpoch)
> > > > > > > > followed
> > > > > > > > > > by
> > > > > > > > > > > > > > > > seek(offset),
> > > > > > > > > > > > > > > > >> > the user will not seek to the wrong
> > position
> > > > > > without
> > > > > > > > > > knowing
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > >> > truncation has happened, because the
> > > consumer
> > > > > will
> > > > > > > get
> > > > > > > > > > > another
> > > > > > > > > > > > > > > > >> truncation
> > > > > > > > > > > > > > > > >> > error, and seek again.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > I am afraid, I did not understand your
> > > second
> > > > > > > > question.
> > > > > > > > > > Let
> > > > > > > > > > > me
> > > > > > > > > > > > > > > > >> summarize my
> > > > > > > > > > > > > > > > >> > suggestions again, and then give an
> > example
> > > to
> > > > > > > > hopefully
> > > > > > > > > > > make
> > > > > > > > > > > > my
> > > > > > > > > > > > > > > > >> > suggestions more clear. Also, the last
> > part
> > > of
> > > > > my
> > > > > > > > > example
> > > > > > > > > > > > shows
> > > > > > > > > > > > > > how
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > use-case in your first question will
> work.
> > > If
> > > > it
> > > > > > > does
> > > > > > > > > not
> > > > > > > > > > > > answer
> > > > > > > > > > > > > > > your
> > > > > > > > > > > > > > > > >> > second question, would you mind
> > clarifying?
> > > I
> > > > am
> > > > > > > also
> > > > > > > > > > > focusing
> > > > > > > > > > > > > on
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> case
> > > > > > > > > > > > > > > > >> > of a consumer having enough entries in
> the
> > > > > cache.
> > > > > > > The
> > > > > > > > > case
> > > > > > > > > > > of
> > > > > > > > > > > > > > > > restarting
> > > > > > > > > > > > > > > > >> > from committed offset either stored
> > > externally
> > > > > or
> > > > > > > > > > internally
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > >> probably
> > > > > > > > > > > > > > > > >> > need to be discussed more.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Let me summarize my suggestion again:
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > 1) KafkaConsumer#seek() and
> > > > > > KafkaConsumer#position()
> > > > > > > > > > remains
> > > > > > > > > > > > > > > unchanged
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > 2) New KafkaConsumer#findOffsets() takes
> > > > > {offset,
> > > > > > > > > > > leaderEpoch}
> > > > > > > > > > > > > > pair
> > > > > > > > > > > > > > > > per
> > > > > > > > > > > > > > > > >> > topic partition and returns offset per
> > topic
> > > > > > > > partition.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > 3) FetchRequest() to broker after
> > > > > > > KafkaConsumer#seek()
> > > > > > > > > > will
> > > > > > > > > > > > > > contain
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > offset set by seek and leaderEpoch that
> > > > > > corresponds
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > based
> > > > > > > > > > > > > > > > >> on
> > > > > > > > > > > > > > > > >> > leader epoch cache in the consumer.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > The rest of this e-mail is a long and
> > > > contrived
> > > > > > > > example
> > > > > > > > > > with
> > > > > > > > > > > > > > several
> > > > > > > > > > > > > > > > log
> > > > > > > > > > > > > > > > >> > truncations and unclean leader elections
> > to
> > > > > > > illustrate
> > > > > > > > > the
> > > > > > > > > > > API
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > your
> > > > > > > > > > > > > > > > >> > first use-case. Suppose we have three
> > > brokers.
> > > > > > > > > Initially,
> > > > > > > > > > > > Broker
> > > > > > > > > > > > > > A,
> > > > > > > > > > > > > > > B,
> > > > > > > > > > > > > > > > >> and
> > > > > > > > > > > > > > > > >> > C has one message at offset 0 with
> leader
> > > > epoch
> > > > > 0.
> > > > > > > > Then,
> > > > > > > > > > > > Broker
> > > > > > > > > > > > > A
> > > > > > > > > > > > > > > goes
> > > > > > > > > > > > > > > > >> down
> > > > > > > > > > > > > > > > >> > for some time. Broker B becomes a leader
> > > with
> > > > > > epoch
> > > > > > > 1,
> > > > > > > > > and
> > > > > > > > > > > > > writes
> > > > > > > > > > > > > > > > >> messages
> > > > > > > > > > > > > > > > >> > to offsets 1 and 2. Broker C fetches
> > offset
> > > 1,
> > > > > but
> > > > > > > > > before
> > > > > > > > > > > > > fetching
> > > > > > > > > > > > > > > > >> offset
> > > > > > > > > > > > > > > > >> > 2, becomes a leader with leader epoch 2
> > and
> > > > > > writes a
> > > > > > > > > > message
> > > > > > > > > > > > at
> > > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > >> 2.
> > > > > > > > > > > > > > > > >> > Here is the state of brokers at this
> > point:
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > > Broker A:
> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0 <— leader
> > > > > > > > > > > > > > > > >> > > goes down…
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > > Broker B:
> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0
> > > > > > > > > > > > > > > > >> > > offset 1, epoch 1  <- leader
> > > > > > > > > > > > > > > > >> > > offset 2, epoch 1
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Broker C:
> > > > > > > > > > > > > > > > >> > > offset 0, epoch 0
> > > > > > > > > > > > > > > > >> > > offset 1, epoch 1
> > > > > > > > > > > > > > > > >> > > offset 2, epoch 2 <— leader
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Before Broker C becomes a leader with
> > leader
> > > > > epoch
> > > > > > > 2,
> > > > > > > > > the
> > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > >> consumed
> > > > > > > > > > > > > > > > >> > the following messages from broker A and
> > > > broker
> > > > > B:
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > {offset=0, leaderEpoch=0}, {offset=1,
> > > > > > > leaderEpoch=1},
> > > > > > > > > > > > {offset=2,
> > > > > > > > > > > > > > > > >> > leaderEpoch=1}.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Consumer’s leader epoch cache at this
> > point
> > > > > > contains
> > > > > > > > the
> > > > > > > > > > > > > following
> > > > > > > > > > > > > > > > >> entries:
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > (leaderEpoch=0, startOffset=0)
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > (leaderEpoch=1, startOffset=1)
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > endOffset = 3
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Then, broker B becomes the follower of
> > > broker
> > > > C,
> > > > > > > > > truncates
> > > > > > > > > > > and
> > > > > > > > > > > > > > > starts
> > > > > > > > > > > > > > > > >> > fetching from offset 2.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Consumer sends fetchRequest(offset=3,
> > > > > > leaderEpoch=1)
> > > > > > > > and
> > > > > > > > > > > gets
> > > > > > > > > > > > > > > > >> > LOG_TRUNCATION
> > > > > > > > > > > > > > > > >> > error from broker C.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > In response, the client calls
> > > > > > > > KafkaConsumer#findOffsets(
> > > > > > > > > > > > > offset=3,
> > > > > > > > > > > > > > > > >> > leaderEpoch=1). The underlying consumer
> > > sends
> > > > > > > > > > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1),
> > > broker
> > > > C
> > > > > > > > responds
> > > > > > > > > > with
> > > > > > > > > > > > > > > > >> > {leaderEpoch=1, endOffset=2}. So,
> > > > > > > > > > > > > > > KafkaConsumer#findOffsets(offset=3,
> > > > > > > > > > > > > > > > >> > leaderEpoch=1) returns offset=2.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > In response, consumer calls
> > > KafkaConsumer@seek
> > > > > > > > > (offset=2)
> > > > > > > > > > > > > followed
> > > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > >> > poll(), which results in
> > > > FetchRequest(offset=2,
> > > > > > > > > > > leaderEpoch=1)
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> broker C.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > I will continue with this example with
> the
> > > > goal
> > > > > to
> > > > > > > > > answer
> > > > > > > > > > > your
> > > > > > > > > > > > > > first
> > > > > > > > > > > > > > > > >> > question about truncation just after
> > > > > findOffsets()
> > > > > > > > > > followed
> > > > > > > > > > > by
> > > > > > > > > > > > > > > seek():
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Suppose, brokers B and C go down, and
> > > broker A
> > > > > > comes
> > > > > > > > up
> > > > > > > > > > and
> > > > > > > > > > > > > > becomes
> > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > >> > leader with leader epoch 3, and writes a
> > > > message
> > > > > > to
> > > > > > > > > offset
> > > > > > > > > > > 1.
> > > > > > > > > > > > > > > Suppose,
> > > > > > > > > > > > > > > > >> this
> > > > > > > > > > > > > > > > >> > happens before the consumer gets
> response
> > > from
> > > > > > > broker
> > > > > > > > C
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > previous
> > > > > > > > > > > > > > > > >> > fetch request:  FetchRequest(offset=2,
> > > > > > > leaderEpoch=1).
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Consumer re-sends FetchRequest(offset=2,
> > > > > > > > leaderEpoch=1)
> > > > > > > > > to
> > > > > > > > > > > > > broker
> > > > > > > > > > > > > > A,
> > > > > > > > > > > > > > > > >> which
> > > > > > > > > > > > > > > > >> > returns LOG_TRUNCATION error, because
> > > broker A
> > > > > has
> > > > > > > > > leader
> > > > > > > > > > > > epoch
> > > > > > > > > > > > > 3
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >> leader
> > > > > > > > > > > > > > > > >> > epoch in FetchRequest with starting
> > offset =
> > > > 1 <
> > > > > > > > offset
> > > > > > > > > 2
> > > > > > > > > > in
> > > > > > > > > > > > > > > > >> > FetchRequest().
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > In response, the user calls
> > > > > > > KafkaConsumer#findOffsets(
> > > > > > > > > > > > offset=2,
> > > > > > > > > > > > > > > > >> > leaderEpoch=1). The underlying consumer
> > > sends
> > > > > > > > > > > > > > > > >> > OffsetsForLeaderEpoch(leaderEpoch=1),
> > > broker
> > > > A
> > > > > > > > responds
> > > > > > > > > > with
> > > > > > > > > > > > > > > > >> > {leaderEpoch=0, endOffset=1}; the
> > underlying
> > > > > > > consumer
> > > > > > > > > > finds
> > > > > > > > > > > > > > > > leaderEpoch
> > > > > > > > > > > > > > > > >> = 0
> > > > > > > > > > > > > > > > >> > in its cache with end offset == 1, which
> > > > results
> > > > > > in
> > > > > > > > > > > > > > > > >> > KafkaConsumer#findOffsets(offset=2,
> > > > > > leaderEpoch=1)
> > > > > > > > > > returning
> > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > = 1.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > In response, the user calls
> > > KafkaConsumer@seek
> > > > > > > > > (offset=1)
> > > > > > > > > > > > > followed
> > > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > >> > poll(), which results in
> > > > FetchRequest(offset=1,
> > > > > > > > > > > leaderEpoch=0)
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> broker A,
> > > > > > > > > > > > > > > > >> > which responds with message at offset 1,
> > > > leader
> > > > > > > epoch
> > > > > > > > 3.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > I will think some more about consumers
> > > > > restarting
> > > > > > > from
> > > > > > > > > > > > committed
> > > > > > > > > > > > > > > > >> offsets,
> > > > > > > > > > > > > > > > >> > and send a follow up.
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Thanks,
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > Anna
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > On Sat, Jul 7, 2018 at 1:36 AM Dong Lin
> <
> > > > > > > > > > > lindon...@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > > > >> > > Hey Anna,
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > Thanks much for the thoughtful reply.
> It
> > > > makes
> > > > > > > sense
> > > > > > > > > to
> > > > > > > > > > > > > > different
> > > > > > > > > > > > > > > > >> between
> > > > > > > > > > > > > > > > >> > > "seeking to a message" and "seeking
> to a
> > > > > > > position".
> > > > > > > > I
> > > > > > > > > > have
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> questions
> > > > > > > > > > > > > > > > >> > > here:
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > - For "seeking to a message" use-case,
> > > with
> > > > > the
> > > > > > > > > proposed
> > > > > > > > > > > > > > approach
> > > > > > > > > > > > > > > > user
> > > > > > > > > > > > > > > > >> > > needs to call findOffset(offset,
> > > > leaderEpoch)
> > > > > > > > followed
> > > > > > > > > > by
> > > > > > > > > > > > > > > > >> seek(offset).
> > > > > > > > > > > > > > > > >> > If
> > > > > > > > > > > > > > > > >> > > message truncation and message append
> > > happen
> > > > > > > > > immediately
> > > > > > > > > > > > after
> > > > > > > > > > > > > > > > >> > > findOffset(offset,
> > > > > > > > > > > > > > > > >> > > leaderEpoch) but before seek(offset),
> it
> > > > seems
> > > > > > > that
> > > > > > > > > user
> > > > > > > > > > > > will
> > > > > > > > > > > > > > seek
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > > > >> > > wrong message without knowing the
> > > truncation
> > > > > has
> > > > > > > > > > happened.
> > > > > > > > > > > > > Would
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > >> be
> > > > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > > > >> > > problem?
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > - For "seeking to a position"
> use-case,
> > it
> > > > > seems
> > > > > > > > that
> > > > > > > > > > > there
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > two
> > > > > > > > > > > > > > > > >> > > positions, i.e. earliest and latest.
> So
> > > > these
> > > > > > two
> > > > > > > > > cases
> > > > > > > > > > > can
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > >> > > Consumer.fulfilled by
> seekToBeginning()
> > > and
> > > > > > > > > > > > > > Consumer.seekToEnd().
> > > > > > > > > > > > > > > > >> Then it
> > > > > > > > > > > > > > > > >> > > seems that user will only need to call
> > > > > > position()
> > > > > > > > and
> > > > > > > > > > > seek()
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > >> "seeking
> > > > > > > > > > > > > > > > >> > > to a message" use-case?
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > Thanks,
> > > > > > > > > > > > > > > > >> > > Dong
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > On Wed, Jul 4, 2018 at 12:33 PM, Anna
> > > > Povzner
> > > > > <
> > > > > > > > > > > > > > a...@confluent.io>
> > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > > Hi Jason and Dong,
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > I’ve been thinking about your
> > > suggestions
> > > > > and
> > > > > > > > > > discussion
> > > > > > > > > > > > > > > regarding
> > > > > > > > > > > > > > > > >> > > > position(), seek(), and new proposed
> > > API.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > Here is my thought process why we
> > should
> > > > > keep
> > > > > > > > > > position()
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > > seek()
> > > > > > > > > > > > > > > > >> API
> > > > > > > > > > > > > > > > >> > > > unchanged.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > I think we should separate {offset,
> > > leader
> > > > > > > epoch}
> > > > > > > > > that
> > > > > > > > > > > > > > uniquely
> > > > > > > > > > > > > > > > >> > > identifies
> > > > > > > > > > > > > > > > >> > > > a message from an offset that is a
> > > > position.
> > > > > > In
> > > > > > > > some
> > > > > > > > > > > > cases,
> > > > > > > > > > > > > > > > offsets
> > > > > > > > > > > > > > > > >> > > > returned from position() could be
> > actual
> > > > > > > consumed
> > > > > > > > > > > messages
> > > > > > > > > > > > > by
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > >> > > consumer
> > > > > > > > > > > > > > > > >> > > > identified by {offset, leader
> epoch}.
> > In
> > > > > other
> > > > > > > > > cases,
> > > > > > > > > > > > > > position()
> > > > > > > > > > > > > > > > >> > returns
> > > > > > > > > > > > > > > > >> > > > offset that was not actually
> consumed.
> > > > > > Suppose,
> > > > > > > > the
> > > > > > > > > > user
> > > > > > > > > > > > > calls
> > > > > > > > > > > > > > > > >> > position()
> > > > > > > > > > > > > > > > >> > > > for the last offset. Suppose we
> return
> > > > > > {offset,
> > > > > > > > > leader
> > > > > > > > > > > > > epoch}
> > > > > > > > > > > > > > of
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > > > message currently in the log. Then,
> > the
> > > > > > message
> > > > > > > > gets
> > > > > > > > > > > > > truncated
> > > > > > > > > > > > > > > > >> before
> > > > > > > > > > > > > > > > >> > > > consumer’s first poll(). It does not
> > > make
> > > > > > sense
> > > > > > > > for
> > > > > > > > > > > poll()
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > fail
> > > > > > > > > > > > > > > > >> in
> > > > > > > > > > > > > > > > >> > > this
> > > > > > > > > > > > > > > > >> > > > case, because the log truncation did
> > not
> > > > > > > actually
> > > > > > > > > > happen
> > > > > > > > > > > > > from
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > > consumer
> > > > > > > > > > > > > > > > >> > > > perspective. On the other hand, as
> the
> > > KIP
> > > > > > > > proposes,
> > > > > > > > > > it
> > > > > > > > > > > > > makes
> > > > > > > > > > > > > > > > sense
> > > > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > >> > > > committed() method to return
> {offset,
> > > > leader
> > > > > > > > epoch}
> > > > > > > > > > > > because
> > > > > > > > > > > > > > > those
> > > > > > > > > > > > > > > > >> > offsets
> > > > > > > > > > > > > > > > >> > > > represent actual consumed messages.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > The same argument applies to the
> > seek()
> > > > > > method —
> > > > > > > > we
> > > > > > > > > > are
> > > > > > > > > > > > not
> > > > > > > > > > > > > > > > seeking
> > > > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > > > >> > a
> > > > > > > > > > > > > > > > >> > > > message, we are seeking to a
> position.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > I like the proposal to add
> > > > > > > > > KafkaConsumer#findOffsets()
> > > > > > > > > > > > API.
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > am
> > > > > > > > > > > > > > > > >> > assuming
> > > > > > > > > > > > > > > > >> > > > something like:
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > Map<TopicPartition, Long>
> > > > > > > > > > > findOffsets(Map<TopicPartition,
> > > > > > > > > > > > > > > > >> > OffsetAndEpoch>
> > > > > > > > > > > > > > > > >> > > > offsetsToSearch)
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > Similar to seek() and position(), I
> > > think
> > > > > > > > > > findOffsets()
> > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > >> return
> > > > > > > > > > > > > > > > >> > > > offset without leader epoch, because
> > > what
> > > > we
> > > > > > > want
> > > > > > > > is
> > > > > > > > > > the
> > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > >> that we
> > > > > > > > > > > > > > > > >> > > > think is closest to the not
> divergent
> > > > > message
> > > > > > > from
> > > > > > > > > the
> > > > > > > > > > > > given
> > > > > > > > > > > > > > > > >> consumed
> > > > > > > > > > > > > > > > >> > > > message. Until the consumer actually
> > > > fetches
> > > > > > the
> > > > > > > > > > > message,
> > > > > > > > > > > > we
> > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > >> not
> > > > > > > > > > > > > > > > >> > > let
> > > > > > > > > > > > > > > > >> > > > the consumer store the leader epoch
> > for
> > > a
> > > > > > > message
> > > > > > > > it
> > > > > > > > > > did
> > > > > > > > > > > > not
> > > > > > > > > > > > > > > > >> consume.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > So, the workflow will be:
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > 1) The user gets
> > LogTruncationException
> > > > with
> > > > > > > > > {offset,
> > > > > > > > > > > > leader
> > > > > > > > > > > > > > > epoch
> > > > > > > > > > > > > > > > >> of
> > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > >> > > > previous message} (whatever we send
> > with
> > > > new
> > > > > > > > > > > FetchRecords
> > > > > > > > > > > > > > > > request).
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > 2) offset = findOffsets(tp ->
> {offset,
> > > > > leader
> > > > > > > > > epoch})
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > 3) seek(offset)
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > For the use-case where the users
> store
> > > > > > committed
> > > > > > > > > > offsets
> > > > > > > > > > > > > > > > externally:
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > 1) Such users would have to track
> the
> > > > leader
> > > > > > > epoch
> > > > > > > > > > > > together
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > >> > > offset.
> > > > > > > > > > > > > > > > >> > > > Otherwise, there is no way to detect
> > > later
> > > > > > what
> > > > > > > > > leader
> > > > > > > > > > > > epoch
> > > > > > > > > > > > > > was
> > > > > > > > > > > > > > > > >> > > associated
> > > > > > > > > > > > > > > > >> > > > with the message. I think it’s
> > > reasonable
> > > > to
> > > > > > ask
> > > > > > > > > that
> > > > > > > > > > > from
> > > > > > > > > > > > > > users
> > > > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > >> > they
> > > > > > > > > > > > > > > > >> > > > want to detect log truncation.
> > > Otherwise,
> > > > > they
> > > > > > > > will
> > > > > > > > > > get
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > >> > > > behavior.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > If the users currently get an offset
> > to
> > > be
> > > > > > > stored
> > > > > > > > > > using
> > > > > > > > > > > > > > > > position(),
> > > > > > > > > > > > > > > > >> I
> > > > > > > > > > > > > > > > >> > see
> > > > > > > > > > > > > > > > >> > > > two possibilities. First, they call
> > save
> > > > > > offset
> > > > > > > > > > returned
> > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > >> > position()
> > > > > > > > > > > > > > > > >> > > > that they call before poll(). In
> that
> > > > case,
> > > > > it
> > > > > > > > would
> > > > > > > > > > not
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > correct
> > > > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > > > >> > > > store {offset, leader epoch} if we
> > would
> > > > > have
> > > > > > > > > changed
> > > > > > > > > > > > > > position()
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> > > return
> > > > > > > > > > > > > > > > >> > > > {offset, leader epoch} since actual
> > > > fetched
> > > > > > > > message
> > > > > > > > > > > could
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > > >> different
> > > > > > > > > > > > > > > > >> > > > (from the example I described
> > earlier).
> > > > So,
> > > > > it
> > > > > > > > would
> > > > > > > > > > be
> > > > > > > > > > > > more
> > > > > > > > > > > > > > > > >> correct to
> > > > > > > > > > > > > > > > >> > > > call position() after poll().
> However,
> > > the
> > > > > > user
> > > > > > > > > > already
> > > > > > > > > > > > gets
> > > > > > > > > > > > > > > > >> > > > ConsumerRecords at this point, from
> > > which
> > > > > the
> > > > > > > user
> > > > > > > > > can
> > > > > > > > > > > > > extract
> > > > > > > > > > > > > > > > >> {offset,
> > > > > > > > > > > > > > > > >> > > > leader epoch} of the last message.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > So, I like the idea of adding a
> helper
> > > > > method
> > > > > > to
> > > > > > > > > > > > > > > ConsumerRecords,
> > > > > > > > > > > > > > > > as
> > > > > > > > > > > > > > > > >> > > Jason
> > > > > > > > > > > > > > > > >> > > > proposed, something like:
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > public OffsetAndEpoch
> > > > > > > lastOffsetWithLeaderEpoch(),
> > > > > > > > > > where
> > > > > > > > > > > > > > > > >> OffsetAndEpoch
> > > > > > > > > > > > > > > > >> > > is
> > > > > > > > > > > > > > > > >> > > > a data struct holding {offset,
> leader
> > > > > epoch}.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > In this case, we would advise the
> user
> > > to
> > > > > > follow
> > > > > > > > the
> > > > > > > > > > > > > workflow:
> > > > > > > > > > > > > > > > >> poll(),
> > > > > > > > > > > > > > > > >> > > get
> > > > > > > > > > > > > > > > >> > > > {offset, leader epoch} from
> > > > > > > > > > > ConsumerRecords#lastOffsetWith
> > > > > > > > > > > > > > > > >> > LeaderEpoch(),
> > > > > > > > > > > > > > > > >> > > > save offset and leader epoch,
> process
> > > > > records.
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > 2) When the user needs to seek to
> the
> > > last
> > > > > > > > committed
> > > > > > > > > > > > offset,
> > > > > > > > > > > > > > > they
> > > > > > > > > > > > > > > > >> call
> > > > > > > > > > > > > > > > >> > > new
> > > > > > > > > > > > > > > > >> > > > findOffsets(saved offset, leader
> > epoch),
> > > > and
> > > > > > > then
> > > > > > > > > > > > > > seek(offset).
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > What do you think?
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > Thanks,
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > Anna
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > On Tue, Jul 3, 2018 at 4:06 PM Dong
> > Lin
> > > <
> > > > > > > > > > > > > lindon...@gmail.com>
> > > > > > > > > > > > > > > > >> wrote:
> > > > > > > > > > > > > > > > >> > > >
> > > > > > > > > > > > > > > > >> > > > > Hey Jason,
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > Thanks much for your thoughtful
> > > > > explanation.
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > Yes the solution using
> > > > findOffsets(offset,
> > > > > > > > > > > leaderEpoch)
> > > > > > > > > > > > > also
> > > > > > > > > > > > > > > > >> works.
> > > > > > > > > > > > > > > > >> > The
> > > > > > > > > > > > > > > > >> > > > > advantage of this solution it adds
> > > only
> > > > > one
> > > > > > > API
> > > > > > > > > > > instead
> > > > > > > > > > > > of
> > > > > > > > > > > > > > two
> > > > > > > > > > > > > > > > >> APIs.
> > > > > > > > > > > > > > > > >> > > The
> > > > > > > > > > > > > > > > >> > > > > concern is that its usage seems a
> > bit
> > > > more
> > > > > > > > clumsy
> > > > > > > > > > for
> > > > > > > > > > > > > > advanced
> > > > > > > > > > > > > > > > >> users.
> > > > > > > > > > > > > > > > >> > > > More
> > > > > > > > > > > > > > > > >> > > > > specifically, advanced users who
> > store
> > > > > > offsets
> > > > > > > > > > > > externally
> > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > >> always
> > > > > > > > > > > > > > > > >> > > > need
> > > > > > > > > > > > > > > > >> > > > > to call findOffsets() before
> calling
> > > > > > > > seek(offset)
> > > > > > > > > > > during
> > > > > > > > > > > > > > > > consumer
> > > > > > > > > > > > > > > > >> > > > > initialization. And those advanced
> > > users
> > > > > > will
> > > > > > > > need
> > > > > > > > > > to
> > > > > > > > > > > > > > manually
> > > > > > > > > > > > > > > > >> keep
> > > > > > > > > > > > > > > > >> > > track
> > > > > > > > > > > > > > > > >> > > > > of the leaderEpoch of the last
> > > > > > ConsumerRecord.
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > The other solution may be more
> > > > > user-friendly
> > > > > > > for
> > > > > > > > > > > > advanced
> > > > > > > > > > > > > > > users
> > > > > > > > > > > > > > > > >> is to
> > > > > > > > > > > > > > > > >> > > add
> > > > > > > > > > > > > > > > >> > > > > two APIs, `void seek(offset,
> > > > leaderEpoch)`
> > > > > > and
> > > > > > > > > > > `(offset,
> > > > > > > > > > > > > > > epoch)
> > > > > > > > > > > > > > > > =
> > > > > > > > > > > > > > > > >> > > > > offsetEpochs(topicPartition)`.
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > I kind of prefer the second
> solution
> > > > > because
> > > > > > > it
> > > > > > > > is
> > > > > > > > > > > > easier
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > use
> > > > > > > > > > > > > > > > >> for
> > > > > > > > > > > > > > > > >> > > > > advanced users. If we need to
> expose
> > > > > > > leaderEpoch
> > > > > > > > > > > anyway
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > > safely
> > > > > > > > > > > > > > > > >> > > > identify
> > > > > > > > > > > > > > > > >> > > > > a message, it may be conceptually
> > > > simpler
> > > > > to
> > > > > > > > > expose
> > > > > > > > > > it
> > > > > > > > > > > > > > > directly
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > >> > > > > seek(...) rather than requiring
> one
> > > more
> > > > > > > > > translation
> > > > > > > > > > > > using
> > > > > > > > > > > > > > > > >> > > > > findOffsets(...). But I am also OK
> > > with
> > > > > the
> > > > > > > > first
> > > > > > > > > > > > solution
> > > > > > > > > > > > > > if
> > > > > > > > > > > > > > > > >> other
> > > > > > > > > > > > > > > > >> > > > > developers also favor that one :)
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > Thanks,
> > > > > > > > > > > > > > > > >> > > > > Dong
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > On Mon, Jul 2, 2018 at 11:10 AM,
> > Jason
> > > > > > > > Gustafson <
> > > > > > > > > > > > > > > > >> ja...@confluent.io
> > > > > > > > > > > > > > > > >> > >
> > > > > > > > > > > > > > > > >> > > > > wrote:
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > > > > > > > > > > > >> > > > > > Hi Dong,
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Thanks, I've been thinking about
> > > your
> > > > > > > > > suggestions
> > > > > > > > > > a
> > > > > > > > > > > > bit.
> > > > > > > > > > > > > > It
> > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > >> > > > > challenging
> > > > > > > > > > > > > > > > >> > > > > > to make this work given the
> > current
> > > > > APIs.
> > > > > > > One
> > > > > > > > of
> > > > > > > > > > the
> > > > > > > > > > > > > > > > >> difficulties
> > > > > > > > > > > > > > > > >> > is
> > > > > > > > > > > > > > > > >> > > > that
> > > > > > > > > > > > > > > > >> > > > > > we don't have an API to find the
> > > > leader
> > > > > > > epoch
> > > > > > > > > for
> > > > > > > > > > a
> > > > > > > > > > > > > given
> > > > > > > > > > > > > > > > >> offset at
> > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > >> > > > > > moment. So if the user does a
> seek
> > > to
> > > > > > offset
> > > > > > > > 5,
> > > > > > > > > > then
> > > > > > > > > > > > > we'll
> > > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > >> a
> > > > > > > > > > > > > > > > >> > new
> > > > > > > > > > > > > > > > >> > > > API
> > > > > > > > > > > > > > > > >> > > > > > to find the corresponding epoch
> in
> > > > order
> > > > > > to
> > > > > > > > > > fulfill
> > > > > > > > > > > > the
> > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > >> > > position()
> > > > > > > > > > > > > > > > >> > > > > API.
> > > > > > > > > > > > > > > > >> > > > > > Potentially we could modify
> > > > ListOffsets
> > > > > to
> > > > > > > > > enable
> > > > > > > > > > > > > finding
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > leader
> > > > > > > > > > > > > > > > >> > > > > epoch,
> > > > > > > > > > > > > > > > >> > > > > > but I am not sure it is
> > worthwhile.
> > > > > > Perhaps
> > > > > > > it
> > > > > > > > > is
> > > > > > > > > > > > > > reasonable
> > > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > >> > > > advanced
> > > > > > > > > > > > > > > > >> > > > > > usage to expect that the epoch
> > > > > > information,
> > > > > > > if
> > > > > > > > > > > needed,
> > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > >> > > > extracted
> > > > > > > > > > > > > > > > >> > > > > > from the records directly? It
> > might
> > > > make
> > > > > > > sense
> > > > > > > > > to
> > > > > > > > > > > > > expose a
> > > > > > > > > > > > > > > > >> helper
> > > > > > > > > > > > > > > > >> > in
> > > > > > > > > > > > > > > > >> > > > > > `ConsumerRecords` to make this a
> > > > little
> > > > > > > easier
> > > > > > > > > > > though.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Alternatively, if we think it is
> > > > > important
> > > > > > > to
> > > > > > > > > have
> > > > > > > > > > > > this
> > > > > > > > > > > > > > > > >> information
> > > > > > > > > > > > > > > > >> > > > > exposed
> > > > > > > > > > > > > > > > >> > > > > > directly, we could create batch
> > APIs
> > > > to
> > > > > > > solve
> > > > > > > > > the
> > > > > > > > > > > > naming
> > > > > > > > > > > > > > > > >> problem.
> > > > > > > > > > > > > > > > >> > For
> > > > > > > > > > > > > > > > >> > > > > > example:
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Map<TopicPartition,
> > OffsetAndEpoch>
> > > > > > > > positions();
> > > > > > > > > > > > > > > > >> > > > > > void seek(Map<TopicPartition,
> > > > > > > OffsetAndEpoch>
> > > > > > > > > > > > > positions);
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > However, I'm actually leaning
> > toward
> > > > > > leaving
> > > > > > > > the
> > > > > > > > > > > > seek()
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > >> > > position()
> > > > > > > > > > > > > > > > >> > > > > APIs
> > > > > > > > > > > > > > > > >> > > > > > unchanged. Instead, we can add a
> > new
> > > > API
> > > > > > to
> > > > > > > > > search
> > > > > > > > > > > for
> > > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > by
> > > > > > > > > > > > > > > > >> > > > > timestamp
> > > > > > > > > > > > > > > > >> > > > > > or by offset/leader epoch. Let's
> > say
> > > > we
> > > > > > call
> > > > > > > > it
> > > > > > > > > > > > > > > `findOffsets`.
> > > > > > > > > > > > > > > > >> If
> > > > > > > > > > > > > > > > >> > the
> > > > > > > > > > > > > > > > >> > > > > user
> > > > > > > > > > > > > > > > >> > > > > > hits a log truncation error,
> they
> > > can
> > > > > use
> > > > > > > this
> > > > > > > > > API
> > > > > > > > > > > to
> > > > > > > > > > > > > find
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > > closest
> > > > > > > > > > > > > > > > >> > > > > > offset and then do a seek(). At
> > the
> > > > same
> > > > > > > time,
> > > > > > > > > we
> > > > > > > > > > > > > > deprecate
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> > > > > > `offsetsForTimes` APIs. We now
> > have
> > > > two
> > > > > > use
> > > > > > > > > cases
> > > > > > > > > > > > which
> > > > > > > > > > > > > > > > require
> > > > > > > > > > > > > > > > >> > > finding
> > > > > > > > > > > > > > > > >> > > > > > offsets, so I think we should
> make
> > > > this
> > > > > > API
> > > > > > > > > > general
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > leave
> > > > > > > > > > > > > > > > >> the
> > > > > > > > > > > > > > > > >> > > door
> > > > > > > > > > > > > > > > >> > > > > open
> > > > > > > > > > > > > > > > >> > > > > > for future extensions.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > By the way, I'm unclear about
> the
> > > > desire
> > > > > > to
> > > > > > > > move
> > > > > > > > > > > part
> > > > > > > > > > > > of
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > >> > > > > functionality
> > > > > > > > > > > > > > > > >> > > > > > to AdminClient. Guozhang
> suggested
> > > > this
> > > > > > > > > > previously,
> > > > > > > > > > > > but
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > >> it
> > > > > > > > > > > > > > > > >> > > only
> > > > > > > > > > > > > > > > >> > > > > > makes sense for cross-cutting
> > > > > capabilities
> > > > > > > > such
> > > > > > > > > as
> > > > > > > > > > > > topic
> > > > > > > > > > > > > > > > >> creation.
> > > > > > > > > > > > > > > > >> > If
> > > > > > > > > > > > > > > > >> > > > we
> > > > > > > > > > > > > > > > >> > > > > > have an API which is primarily
> > > useful
> > > > by
> > > > > > > > > > consumers,
> > > > > > > > > > > > > then I
> > > > > > > > > > > > > > > > think
> > > > > > > > > > > > > > > > >> > > that's
> > > > > > > > > > > > > > > > >> > > > > > where it should be exposed. The
> > > > > > AdminClient
> > > > > > > > also
> > > > > > > > > > has
> > > > > > > > > > > > its
> > > > > > > > > > > > > > own
> > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > >> > > > > integrity
> > > > > > > > > > > > > > > > >> > > > > > and should not become a dumping
> > > ground
> > > > > for
> > > > > > > > > > advanced
> > > > > > > > > > > > use
> > > > > > > > > > > > > > > cases.
> > > > > > > > > > > > > > > > >> I'll
> > > > > > > > > > > > > > > > >> > > > > update
> > > > > > > > > > > > > > > > >> > > > > > the KIP with the  `findOffsets`
> > API
> > > > > > > suggested
> > > > > > > > > > above
> > > > > > > > > > > > and
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > >> see
> > > > > > > > > > > > > > > > >> > if
> > > > > > > > > > > > > > > > >> > > > it
> > > > > > > > > > > > > > > > >> > > > > > does a good enough job of
> keeping
> > > the
> > > > > API
> > > > > > > > simple
> > > > > > > > > > for
> > > > > > > > > > > > > > common
> > > > > > > > > > > > > > > > >> cases.
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > > > > > > > > >> > > > > > Jason
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > On Sat, Jun 30, 2018 at 4:39 AM,
> > > Dong
> > > > > Lin
> > > > > > <
> > > > > > > > > > > > > > > > lindon...@gmail.com>
> > > > > > > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > > > > > > >> > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > Hey Jason,
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > Regarding seek(...), it seems
> > that
> > > > we
> > > > > > want
> > > > > > > > an
> > > > > > > > > > API
> > > > > > > > > > > > for
> > > > > > > > > > > > > > user
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> > > > > initialize
> > > > > > > > > > > > > > > > >> > > > > > > consumer with (offset,
> > > leaderEpoch)
> > > > > and
> > > > > > > that
> > > > > > > > > API
> > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > allow
> > > > > > > > > > > > > > > > >> > > > throwing
> > > > > > > > > > > > > > > > >> > > > > > > PartitionTruncationException.
> > > > Suppose
> > > > > we
> > > > > > > > agree
> > > > > > > > > > on
> > > > > > > > > > > > > this,
> > > > > > > > > > > > > > > then
> > > > > > > > > > > > > > > > >> > > > > > > seekToNearest() is not
> > sufficient
> > > > > > because
> > > > > > > it
> > > > > > > > > > will
> > > > > > > > > > > > > always
> > > > > > > > > > > > > > > > >> swallow
> > > > > > > > > > > > > > > > >> > > > > > > PartitionTruncationException.
> > Here
> > > > we
> > > > > > have
> > > > > > > > two
> > > > > > > > > > > > > options.
> > > > > > > > > > > > > > > The
> > > > > > > > > > > > > > > > >> first
> > > > > > > > > > > > > > > > >> > > > > option
> > > > > > > > > > > > > > > > >> > > > > > is
> > > > > > > > > > > > > > > > >> > > > > > > to add API
> > > offsetsForLeaderEpochs()
> > > > to
> > > > > > > > > translate
> > > > > > > > > > > > > > > > (leaderEpoch,
> > > > > > > > > > > > > > > > >> > > > offset)
> > > > > > > > > > > > > > > > >> > > > > to
> > > > > > > > > > > > > > > > >> > > > > > > offset. The second option is
> to
> > > have
> > > > > add
> > > > > > > > > > > > seek(offset,
> > > > > > > > > > > > > > > > >> > leaderEpoch).
> > > > > > > > > > > > > > > > >> > > > It
> > > > > > > > > > > > > > > > >> > > > > > > seems that second option may
> be
> > > more
> > > > > > > simpler
> > > > > > > > > > > because
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > makes
> > > > > > > > > > > > > > > > >> it
> > > > > > > > > > > > > > > > >> > > > clear
> > > > > > > > > > > > > > > > >> > > > > > that
> > > > > > > > > > > > > > > > >> > > > > > > (offset, leaderEpoch) will be
> > used
> > > > to
> > > > > > > > identify
> > > > > > > > > > > > > > consumer's
> > > > > > > > > > > > > > > > >> > position
> > > > > > > > > > > > > > > > >> > > > in a
> > > > > > > > > > > > > > > > >> > > > > > > partition. And user only needs
> > to
> > > > > handle
> > > > > > > > > > > > > > > > >> > > PartitionTruncationException
> > > > > > > > > > > > > > > > >> > > > > > from
> > > > > > > > > > > > > > > > >> > > > > > > the poll(). In comparison the
> > > first
> > > > > > option
> > > > > > > > > > seems a
> > > > > > > > > > > > bit
> > > > > > > > > > > > > > > > harder
> > > > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > > > >> > > use
> > > > > > > > > > > > > > > > >> > > > > > > because user have to also
> handle
> > > the
> > > > > > > > > > > > > > > > >> PartitionTruncationException
> > > > > > > > > > > > > > > > >> > > if
> > > > > > > > > > > > > > > > >> > > > > > > offsetsForLeaderEpochs()
> returns
> > > > > > different
> > > > > > > > > > offset
> > > > > > > > > > > > from
> > > > > > > > > > > > > > > > >> > > user-provided
> > > > > > > > > > > > > > > > >> > > > > > > offset. What do you think?
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > If we decide to add API
> > > seek(offset,
> > > > > > > > > > leaderEpoch),
> > > > > > > > > > > > > then
> > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > >> > > decide
> > > > > > > > > > > > > > > > >> > > > > > > whether and how to add API to
> > > > > translate
> > > > > > > > > (offset,
> > > > > > > > > > > > > > > > leaderEpoch)
> > > > > > > > > > > > > > > > >> to
> > > > > > > > > > > > > > > > >> > > > > offset.
> > > > > > > > > > > > > > > > >> > > > > > It
> > > > > > > > > > > > > > > > >> > > > > > > seems that this API will be
> > needed
> > > > by
> > > > > > > > advanced
> > > > > > > > > > > user
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > >> want
> > > > > > > > > > > > > > > > >> > > > auto
> > > > > > > > > > > > > > > > >> > > > > > > offset reset (so that it can
> be
> > > > > > notified)
> > > > > > > > but
> > > > > > > > > > > still
> > > > > > > > > > > > > > wants
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > >> > reset
> > > > > > > > > > > > > > > > >> > > > > offset
> > > > > > > > > > > > > > > > >> > > > > > > to closest. For those users if
> > > > > probably
> > > > > > > > makes
> > > > > > > > > > > sense
> > > > > > > > > > > > to
> > > > > > > > > > > > > > > only
> > > > > > > > > > > > > > > > >> have
> > > > > > > > > > > > > > > > >> > > the
> > > > > > > > > > > > > > > > >> > > > > API
> > > > > > > > > > > > > > > > >> > > > > > in
> > > > > > > > > > > > > > > > >> > > > > > > AdminClient. offsetsForTimes()
> > > seems
> > > > > > like
> > > > > > > a
> > > > > > > > > > common
> > > > > > > > > > > > API
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > >> will
> > > > > > > > > > > > > > > > >> > be
> > > > > > > > > > > > > > > > >> > > > > > needed
> > > > > > > > > > > > > > > > >> > > > > > > by user's of consumer in
> > general,
> > > so
> > > > > it
> > > > > > > may
> > > > > > > > be
> > > > > > > > > > > more
> > > > > > > > > > > > > > > > >> reasonable to
> > > > > > > > > > > > > > > > >> > > > stay
> > > > > > > > > > > > > > > > >> > > > > in
> > > > > > > > > > > > > > > > >> > > > > > > the consumer API. I don't
> have a
> > > > > strong
> > > > > > > > > opinion
> > > > > > > > > > on
> > > > > > > > > > > > > > whether
> > > > > > > > > > > > > > > > >> > > > > > > offsetsForTimes() should be
> > > replaced
> > > > > by
> > > > > > > API
> > > > > > > > in
> > > > > > > > > > > > > > > AdminClient.
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > Though (offset, leaderEpoch)
> is
> > > > needed
> > > > > > to
> > > > > > > > > > uniquely
> > > > > > > > > > > > > > > identify
> > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > >> > > message
> > > > > > > > > > > > > > > > >> > > > > in
> > > > > > > > > > > > > > > > >> > > > > > > general, it is only needed for
> > > > > advanced
> > > > > > > > users
> > > > > > > > > > who
> > > > > > > > > > > > has
> > > > > > > > > > > > > > > turned
> > > > > > > > > > > > > > > > >> on
> > > > > > > > > > > > > > > > >> > > > unclean
> > > > > > > > > > > > > > > > >> > > > > > > leader election, need to use
> > > > seek(..),
> > > > > > and
> > > > > > > > > don't
> > > > > > > > > > > > want
> > > > > > > > > > > > > > auto
> > > > > > > > > > > > > > > > >> offset
> > > > > > > > > > > > > > > > >> > > > > reset.
> > > > > > > > > > > > > > > > >> > > > > > > Most other users probably just
> > > want
> > > > to
> > > > > > > > enable
> > > > > > > > > > auto
> > > > > > > > > > > > > > offset
> > > > > > > > > > > > > > > > >> reset
> > > > > > > > > > > > > > > > >> > and
> > > > > > > > > > > > > > > > >> > > > > store
> > > > > > > > > > > > > > > > >> > > > > > > offset in Kafka. Thus we might
> > > want
> > > > to
> > > > > > > keep
> > > > > > > > > the
> > > > > > > > > > > > > existing
> > > > > > > > > > > > > > > > >> > > offset-only
> > > > > > > > > > > > > > > > >> > > > > APIs
> > > > > > > > > > > > > > > > >> > > > > > > (e.g. seek() and position())
> for
> > > > most
> > > > > > > users
> > > > > > > > > > while
> > > > > > > > > > > > > adding
> > > > > > > > > > > > > > > new
> > > > > > > > > > > > > > > > >> APIs
> > > > > > > > > > > > > > > > >> > > for
> > > > > > > > > > > > > > > > >> > > > > > > advanced users. And yes, it
> > seems
> > > > that
> > > > > > we
> > > > > > > > need
> > > > > > > > > > new
> > > > > > > > > > > > > name
> > > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > >> > > > position().
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > Though I think we need new
> APIs
> > to
> > > > > carry
> > > > > > > the
> > > > > > > > > new
> > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > >> > (e.g.
> > > > > > > > > > > > > > > > >> > > > > > > leaderEpoch), I am not very
> sure
> > > how
> > > > > > that
> > > > > > > > > should
> > > > > > > > > > > > look
> > > > > > > > > > > > > > > like.
> > > > > > > > > > > > > > > > >> One
> > > > > > > > > > > > > > > > >> > > > > possible
> > > > > > > > > > > > > > > > >> > > > > > > option is those APIs in
> KIP-232.
> > > > > Another
> > > > > > > > > option
> > > > > > > > > > is
> > > > > > > > > > > > > > > something
> > > > > > > > > > > > > > > > >> like
> > > > > > > > > > > > > > > > >> > > > this:
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > `````
> > > > > > > > > > > > > > > > >> > > > > > > class OffsetEpochs {
> > > > > > > > > > > > > > > > >> > > > > > >   long offset;
> > > > > > > > > > > > > > > > >> > > > > > >   int leaderEpoch;
> > > > > > > > > > > > > > > > >> > > > > > >   int partitionEpoch;   //
> This
> > > may
> > > > be
> > > > > > > > needed
> > > > > > > > > > > later
> > > > > > > > > > > > as
> > > > > > > > > > > > > > > > >> discussed
> > > > > > > > > > > > > > > > >> > in
> > > > > > > > > > > > > > > > >> > > > > > KIP-232
> > > > > > > > > > > > > > > > >> > > > > > >   ... // Hopefully these are
> all
> > > we
> > > > > need
> > > > > > > to
> > > > > > > > > > > identify
> > > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > > >> in
> > > > > > > > > > > > > > > > >> > > > Kafka.
> > > > > > > > > > > > > > > > >> > > > > > But
> > > > > > > > > > > > > > > > >> > > > > > > if we need more then we can
> add
> > > new
> > > > > > fields
> > > > > > > > in
> > > > > > > > > > this
> > > > > > > > > > > > > > class.
> > > > > > > > > > > > > > > > >> > > > > > > }
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > OffsetEpochs
> > > > > > offsetEpochs(TopicPartition);
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > void seek(TopicPartition,
> > > > > OffsetEpochs);
> > > > > > > > > > > > > > > > >> > > > > > > ``````
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > Thanks,
> > > > > > > > > > > > > > > > >> > > > > > > Dong
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > On Fri, Jun 29, 2018 at 11:13
> > AM,
> > > > > Jason
> > > > > > > > > > Gustafson
> > > > > > > > > > > <
> > > > > > > > > > > > > > > > >> > > > ja...@confluent.io>
> > > > > > > > > > > > > > > > >> > > > > > > wrote:
> > > > > > > > > > > > > > > > >> > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > > Hey Dong,
> > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > > Thanks for the feedback. The
> > > first
> > > > > > three
> > > > > > > > > > points
> > > > > > > > > > > > are
> > > > > > > > > > > > > > > easy:
> > > > > > > > > > > > > > > > >> > > > > > > >
> > > > > > > > > > > > > > > > >> > > > > > > > 1. Yes, we should be
> > consistent.
> > > > > > > > > > > > > > > > >> > > > > > > > 2. Yes, I will add this.
> > > > > > > > > > > > > > > > >> > > > > > > > 3. Yes, I think we should
> > > document
> > > > > the
> > > > > > > > > changes
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > >> committed
> > > > > > > > > > > > > > > > >> > > > > offset
> > > > > > > > > > > > > > > > >> > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to