Thanks, Matthias!

I can certainly document it. I didn't bother because the old
Processor, Supplier, and Context will themselves be
deprecated, so any method that handles them won't be able to
avoid the deprecation warning. Nevertheless, it doesn't hurt
just to explicitly deprecated those methods.

Thanks,
-John

On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote:
> Thanks John. I like the proposal.
> 
> Btw: I was just going over the KIP and realized that we add new methods
> to `StreamBuilder`, `Topology`, and `KStream` that take the new
> `ProcessorSupplier` class -- should we also deprecate the corresponding
> existing ones that take the old `ProcessorSupplier`?
> 
> 
> -Matthias
> 
> 
> On 9/30/20 7:46 AM, John Roesler wrote:
> > Thanks Paul and Sophie,
> > 
> > Your feedback certainly underscores the need to be explicit
> > in the javadoc about why that parameter is Optional. Getting
> > this kind of feedback before the release is exactly the kind
> > of outcome we hope to get from the KIP process!
> > 
> > Thanks,
> > -John
> > 
> > On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> > > John, I totally agree that adding a method to Processor is cumbersome and
> > > not a good path.  I was imagining maybe a separate interface that could be
> > > used in the appropriate context, but I don't think that makes too much
> > > sense - it's just too far away from what Kafka Streams is.  I was
> > > originally more interested in the "why" Optional than the "how" (I think 
> > > my
> > > original reply overplayed the "optional as an argument" concern).  But
> > > you've convinced me that there is a perfectly legitimate "why".  We should
> > > make sure that it's clear why it's Optional, but I suppose that goes
> > > without saying.  It's a nice opportunity to make the API reflect more what
> > > is actually going on under the hood.
> > > 
> > > Thanks!
> > > Paul
> > > 
> > > On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman <sop...@confluent.io>
> > > wrote:
> > > 
> > > > FWIW, while I'm really not a fan of Optional in general, I agree that 
> > > > its
> > > > usage
> > > > here seems appropriate. Even for those rare software developers who
> > > > carefully
> > > > read all the docs several times over, I think it wouldn't be too hard to
> > > > miss a
> > > > note about the RecordMetadata possibly being null.
> > > > 
> > > > Especially because it's not that obvious why at first glance, and takes 
> > > > a
> > > > bit of
> > > > thinking to realize that records originating from a Punctuator wouldn't
> > > > have a
> > > > "current record". This  is something that has definitely confused users
> > > > today.
> > > > 
> > > > It's on us to improve the education here -- and an 
> > > > Optional<RecordMetadata>
> > > > would naturally raise awareness of this subtlety
> > > > 
> > > > On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
> > > > <sop...@confluent.io>
> > > > wrote:
> > > > 
> > > > > Does my reply address your concerns?
> > > > > 
> > > > > 
> > > > > Yes; also, I definitely misread part of the proposal earlier and 
> > > > > thought
> > > > > you had put
> > > > > the timestamp field in RecordMetadata. Sorry for not giving things a
> > > > > closer look
> > > > > before responding! I'm not sure my original message made much sense 
> > > > > given
> > > > > the misunderstanding, but thanks for responding anyway :P
> > > > > 
> > > > > Having given the proposal a second pass, I agree, it's very elegant. 
> > > > > +1
> > > > > 
> > > > > On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > > > > Thanks for the reply, Sophie,
> > > > > > 
> > > > > > I think I may have summarized too much in my prior reply.
> > > > > > 
> > > > > > In the currently proposed KIP, any caller of forward() must
> > > > > > supply a Record, which consists of:
> > > > > > * key
> > > > > > * value
> > > > > > * timestamp
> > > > > > * headers (with a convenience constructor that sets empty
> > > > > > headers)
> > > > > > 
> > > > > > These aren't what I was referring to as potentially being
> > > > > > undefined downstream, since thanks to the introduction of
> > > > > > Record, they are, as you're advocating, required to be
> > > > > > defined everywhere, even when forwarding from a punctuator.
> > > > > > 
> > > > > > So to be clear, the intent of this change is actually to
> > > > > > _enforce_ that timestamp would never be undefined (which it
> > > > > > currently can be). Also, since punctuators _are_ going to
> > > > > > have to "make up" a timestamp going forward, we should note
> > > > > > that the "punctuate" method currently passes in a good
> > > > > > timestamp that they can use: for system-time punctuations,
> > > > > > they receive the current system time, and for stream-time
> > > > > > punctuations, they get the current stream time.
> > > > > > 
> > > > > > The potentially undefined RecordMetadata only contains these
> > > > > > fields:
> > > > > > * topic
> > > > > > * partition
> > > > > > * offset
> > > > > > 
> > > > > > These fields aren't required (or even used) in a Sink, and
> > > > > > it doesn't seem like they would be important to many
> > > > > > applications. Furthermore, it doesn't _seem_ like you'd even
> > > > > > want to set these fields. They seem purely informational and
> > > > > > only useful in the context when you are actually processing
> > > > > > a real input record. It doesn't sound like you were asking
> > > > > > for it, but just to put it on the record, I think if we were
> > > > > > to require values for the metadata from punctuators, people
> > > > > > would mostly just make up their own dummy values, to no
> > > > > > one's benefit.
> > > > > > 
> > > > > > I should also note that with the current
> > > > > > Record/RecordMetadata split, we will have the freedom to
> > > > > > move fields into the Record class (or even add new fields)
> > > > > > if we want them to become "data" as opposed to "metadata" in
> > > > > > the future.
> > > > > > 
> > > > > > Thanks for your reply; I was similarly floored when I
> > > > > > realized the true nature of the current situation. Does my
> > > > > > reply address your concerns?
> > > > > > 
> > > > > > Thanks,
> > > > > > -John
> > > > > > 
> > > > > > On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> > > > > > wrote:
> > > > > > > > However, the record metadata is only defined when the parent
> > > > forwards
> > > > > > > > while processing a
> > > > > > > 
> > > > > > > real record, not when it calls forward from the punctuator
> > > > > > > 
> > > > > > > 
> > > > > > > Can we take a step back for a second...why wouldn't you be 
> > > > > > > required to
> > > > > > set
> > > > > > > the RecordContext
> > > > > > > yourself when calling forward from a Punctuator? I think I agree 
> > > > > > > with
> > > > > > Paul
> > > > > > > here, it seems kind of
> > > > > > > absurd not to enforce that the RecordContext be present inside the
> > > > > > > process() method.
> > > > > > > 
> > > > > > > The original problem with Punctuators, as I understood it, was 
> > > > > > > that
> > > > all
> > > > > > of
> > > > > > > the RecordContext
> > > > > > > fields were exposed automatically to both the Processor and any
> > > > > > Punctuator,
> > > > > > > due to being
> > > > > > > direct methods on the ProcessorContext. We can't control which
> > > > > > > ProcessorContext methods
> > > > > > > someone will call from with a Punctuator vs from a Processor. The 
> > > > > > > best
> > > > > > we
> > > > > > > could do was
> > > > > > > set these "nonsense" fields to null when inside a Punctuator, or 
> > > > > > > set
> > > > > > them
> > > > > > > to some dummy
> > > > > > > values as you pointed out.
> > > > > > > 
> > > > > > > But then you proposed the solution of a separate RecordContext 
> > > > > > > which
> > > > is
> > > > > > not
> > > > > > > attached to the
> > > > > > > ProcessorContext at all. This seemed to solve the above problem 
> > > > > > > very
> > > > > > > neatly: we only pass
> > > > > > > in the RecordContext to the process() method, so we don't have to
> > > > worry
> > > > > > > about people trying
> > > > > > > to access these fields from within a Punctuator. The fields aren't
> > > > > > > accessible unless they're
> > > > > > > defined.
> > > > > > > 
> > > > > > > So what happens when someone wants to forward something from 
> > > > > > > within a
> > > > > > > Punctuator? I
> > > > > > > don't think it's reasonable to let the timestamp field be 
> > > > > > > undefined,
> > > > > > ever.
> > > > > > > What if the Punctuator
> > > > > > > forwards directly to a sink, or directly to some windowing logic. 
> > > > > > > Are
> > > > we
> > > > > > > supposed to add
> > > > > > > handling for the RecordContext == null case to every processor? 
> > > > > > > Or are
> > > > > > we
> > > > > > > just going to
> > > > > > > assume the implicit restriction that users will only forward 
> > > > > > > records
> > > > > > from a
> > > > > > > Punctuator to
> > > > > > > downstream processors that know how to handle and/or set the
> > > > > > RecordContext
> > > > > > > if it's
> > > > > > > undefined. That seems to throw away a lot of the awesome safety 
> > > > > > > added
> > > > in
> > > > > > > this KIP
> > > > > > > 
> > > > > > > Apologies for the rant. But I feel pretty strongly that allowing 
> > > > > > > to
> > > > > > forward
> > > > > > > records from a
> > > > > > > Punctuator without a defined RecordContext would be asking for
> > > > trouble.
> > > > > > > Imo, if you
> > > > > > > want to forward from a Punctuator, you need to store the info you 
> > > > > > > need
> > > > > > in
> > > > > > > order to
> > > > > > > set the timestamp, or make one up yourself
> > > > > > > 
> > > > > > > (the one alternative I can think of here is that maybe we could 
> > > > > > > pass
> > > > in
> > > > > > the
> > > > > > > current
> > > > > > > partition time, so users can at least put in a reasonable 
> > > > > > > estimate for
> > > > > > the
> > > > > > > timestamp
> > > > > > > that won't cause it to get dropped and won't potentially lurch the
> > > > > > > streamtime far into
> > > > > > > the future. This would be similar to what we do in the
> > > > > > TimestampExtractor)
> > > > > > > On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org>
> > > > > > wrote:
> > > > > > > > Oh, I guess one other thing I should have mentioned is that I’ve
> > > > > > recently
> > > > > > > > discovered that in cases where the context is undefined, we
> > > > currently
> > > > > > just
> > > > > > > > fill in dummy values for the context. So there’s a good chance 
> > > > > > > > that
> > > > > > real
> > > > > > > > applications in use are depending on undefined context without 
> > > > > > > > even
> > > > > > > > realizing it. What I’m hoping to do is just make the situation
> > > > > > explicit and
> > > > > > > > get rid of the dummy values.
> > > > > > > > 
> > > > > > > > Thanks,
> > > > > > > > John
> > > > > > > > 
> > > > > > > > On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> > > > > > > > > Thanks for the review, Paul!
> > > > > > > > > 
> > > > > > > > > I had read some of that debate before. There seems to be some
> > > > > > subtext
> > > > > > > > > there, because they advise against using Optional in cases 
> > > > > > > > > like
> > > > > > this,
> > > > > > > > > but there doesn’t seem to be a specific reason why it’s
> > > > > > inappropriate.
> > > > > > > > > I got the impression they were just afraid that people would 
> > > > > > > > > go
> > > > > > > > > overboard and make everything Optional.
> > > > > > > > > 
> > > > > > > > > I could also make two methods, but it seemed like it might be 
> > > > > > > > > an
> > > > > > > > > unfortunate way to handle the issue, since Processor is just
> > > > about a
> > > > > > > > > Function as-is, but the two-method approach would require 
> > > > > > > > > people
> > > > to
> > > > > > > > > implement both methods.
> > > > > > > > > 
> > > > > > > > > To your question, this is something that’s only recently 
> > > > > > > > > became
> > > > > > clear
> > > > > > > > > to me. Imagine you have a parent processor that calls forward 
> > > > > > > > > both
> > > > > > from
> > > > > > > > > process and a punctuator. The child will have process() 
> > > > > > > > > invoked in
> > > > > > both
> > > > > > > > > cases, and won’t be able to distinguish them. However, the 
> > > > > > > > > record
> > > > > > > > > metadata is only defined when the parent forwards while
> > > > processing a
> > > > > > > > > real record, not when it calls forward from the punctuator.
> > > > > > > > > 
> > > > > > > > > This is why I wanted to make the metadata Optional, to 
> > > > > > > > > advertise
> > > > > > that
> > > > > > > > > the metadata might be undefined if any ancestor processor ever
> > > > calls
> > > > > > > > > forward from a punctuator. We could remove the Optional and
> > > > instead
> > > > > > > > > just document that the argument might be null.
> > > > > > > > > 
> > > > > > > > > With that context in place, what’s your take?
> > > > > > > > > 
> > > > > > > > > Thanks,
> > > > > > > > > John
> > > > > > > > > 
> > > > > > > > > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > > > > > > > > > Looks pretty good to me, though the 
> > > > > > > > > > Processor#process(Record,
> > > > > > > > > > Optional<RecordMetadata>) signature caught my eye.  There's 
> > > > > > > > > > some
> > > > > > > > debate
> > > > > > > > > > (
> > > > > > > > > > 
> > > > https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
> > > > > > > > )
> > > > > > > > > > about whether to use Optionals in arguments, and while 
> > > > > > > > > > that's a
> > > > > > bit of
> > > > > > > > a
> > > > > > > > > > religious debate in the abstract, it did make me wonder 
> > > > > > > > > > whether
> > > > it
> > > > > > > > makes
> > > > > > > > > > sense in this specific case.  When is it actually not 
> > > > > > > > > > present?
> > > > I
> > > > > > was
> > > > > > > > > > under
> > > > > > > > > > the impression that we should always have access to it in
> > > > > > process(),
> > > > > > > > and
> > > > > > > > > > that the concern about metadata being undefined was about 
> > > > > > > > > > having
> > > > > > > > access
> > > > > > > > > > to
> > > > > > > > > > record metadata in the ProcessorContext held for use inside 
> > > > > > > > > > a
> > > > > > > > > > Punctuator.
> > > > > > > > > > 
> > > > > > > > > > If that's not the case and it is truly optional in 
> > > > > > > > > > process(), is
> > > > > > there
> > > > > > > > an
> > > > > > > > > > opportunity for an alternate interface for the cases when we
> > > > > > don't get
> > > > > > > > it,
> > > > > > > > > > rather than force the branching on implementers of the
> > > > interface?
> > > > > > > > > > Apologies if I've missed something, I took a look at the PR 
> > > > > > > > > > and
> > > > I
> > > > > > > > didn't
> > > > > > > > > > see any spots where I thought it would be empty.  Perhaps an
> > > > > > example
> > > > > > > > of a
> > > > > > > > > > Punctuator using (and not using) the new API would clear 
> > > > > > > > > > things
> > > > > > up.
> > > > > > > > > > Best,
> > > > > > > > > > Paul
> > > > > > > > > > 
> > > > > > > > > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler <
> > > > vvcep...@apache.org
> > > > > > > > wrote:
> > > > > > > > > > > Hello again, all,
> > > > > > > > > > > 
> > > > > > > > > > > Thanks for the latest round of discussion. I've taken the
> > > > > > > > > > > recent feedback and come up with an updated KIP that seems
> > > > > > > > > > > actually quite a bit nicer than the prior proposal.
> > > > > > > > > > > 
> > > > > > > > > > > The specific diff on the KIP is here:
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
> > > > > > > > > > > These changes are implemented in this POC PR:
> > > > > > > > > > > https://github.com/apache/kafka/pull/9346
> > > > > > > > > > > 
> > > > > > > > > > > The basic idea is that, building on the recent 
> > > > > > > > > > > conversaion,
> > > > > > > > > > > we would transition away from the current API where we get
> > > > > > > > > > > only key/value in the process() method and other "data"
> > > > > > > > > > > comes in the ProcessorContext along with the "metadata".
> > > > > > > > > > > 
> > > > > > > > > > > Instead, we formalize what is "data" and what is 
> > > > > > > > > > > "metadata",
> > > > > > > > > > > and pass it all in to the process method:
> > > > > > > > > > > Processor#process(Record, Optional<RecordMetadata>)
> > > > > > > > > > > 
> > > > > > > > > > > Also, you forward the whole data class instead of mutating
> > > > > > > > > > > the ProcessorContext fields and also calling forward:
> > > > > > > > > > > ProcessorContext#forward(Record)
> > > > > > > > > > > 
> > > > > > > > > > > The Record class itself ships with methods like
> > > > > > > > > > > record#withValue(NewV newValue)
> > > > > > > > > > > that make a shallow copy of the input Record, enabling
> > > > > > > > > > > Processors to safely handle the record without polluting 
> > > > > > > > > > > the
> > > > > > > > > > > context of their parents and siblings.
> > > > > > > > > > > 
> > > > > > > > > > > This proposal has a number of key benefits:
> > > > > > > > > > > * As we've discovered in KAFKA-9584, it's unsafe to mutate
> > > > > > > > > > > the Headers via the ProcessorContext. This proposal 
> > > > > > > > > > > offers a
> > > > > > > > > > > way to safely forward changes only to downstream 
> > > > > > > > > > > processors.
> > > > > > > > > > > * The new API has symmetry (each processor's input is the
> > > > > > > > > > > output of its parent processor)
> > > > > > > > > > > * The API makes clear that the record metadata isn't 
> > > > > > > > > > > always
> > > > > > > > > > > defined (for example, in a punctuation, there is no 
> > > > > > > > > > > current
> > > > > > > > > > > topic/partition/offset)
> > > > > > > > > > > * The API enables punctuators to forward well defined
> > > > > > > > > > > headers downstream, which is currently not possible.
> > > > > > > > > > > 
> > > > > > > > > > > Unless their are objections, I'll go ahead and re-finalize
> > > > > > > > > > > this KIP and update that PR to a mergeable state.
> > > > > > > > > > > 
> > > > > > > > > > > Thanks, all,
> > > > > > > > > > > -John
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> > > > > > > > > > > > Interesting proposal. However, I am not totally 
> > > > > > > > > > > > convinced,
> > > > > > because
> > > > > > > > I see
> > > > > > > > > > > > a fundamental difference between "data" and "metadata".
> > > > > > > > > > > > 
> > > > > > > > > > > > Topic/partition/offset are "metadata" in the strong 
> > > > > > > > > > > > sense
> > > > and
> > > > > > they
> > > > > > > > are
> > > > > > > > > > > > immutable.
> > > > > > > > > > > > 
> > > > > > > > > > > > On the other hand there is "primary" data like key and
> > > > value,
> > > > > > as
> > > > > > > > well as
> > > > > > > > > > > > "secondary" data like timestamp and headers. The issue 
> > > > > > > > > > > > seems
> > > > > > that
> > > > > > > > we
> > > > > > > > > > > > treat "secondary data" more like metadata atm?
> > > > > > > > > > > > 
> > > > > > > > > > > > Thus, promoting timestamp and headers into a first class
> > > > > > citizen
> > > > > > > > roll
> > > > > > > > > > > > make sense to me (my original proposal about 
> > > > > > > > > > > > `RecordContext`
> > > > > > would
> > > > > > > > still
> > > > > > > > > > > > fall short with this regard). However, putting both 
> > > > > > > > > > > > (data
> > > > and
> > > > > > > > metadata)
> > > > > > > > > > > > into a `Record` abstraction might go too far?
> > > > > > > > > > > > 
> > > > > > > > > > > > I am also a little bit concerned about `Record.copy()`
> > > > > > because it
> > > > > > > > might
> > > > > > > > > > > > be a trap: Users might assume it does a full deep copy 
> > > > > > > > > > > > of
> > > > the
> > > > > > > > record,
> > > > > > > > > > > > however, it would not. It would only create a new 
> > > > > > > > > > > > `Record`
> > > > > > object
> > > > > > > > as
> > > > > > > > > > > > wrapper that points to the same key/value/header 
> > > > > > > > > > > > objects as
> > > > > > the
> > > > > > > > input
> > > > > > > > > > > > record.
> > > > > > > > > > > > 
> > > > > > > > > > > > With the current `context.forward(key, value)` we don't 
> > > > > > > > > > > > have
> > > > > > this
> > > > > > > > "deep
> > > > > > > > > > > > copy" issue -- it's pretty clear what is happening.
> > > > > > > > > > > > 
> > > > > > > > > > > > Instead of `To.all().withTimestamp()` we could also add
> > > > > > > > > > > > `context.forward(key, value, timestamp)` etc (just 
> > > > > > > > > > > > wondering
> > > > > > about
> > > > > > > > the
> > > > > > > > > > > > exposition in overload)?
> > > > > > > > > > > > 
> > > > > > > > > > > > Also, `Record.withValue` etc sounds odd? Should a 
> > > > > > > > > > > > record not
> > > > > > be
> > > > > > > > > > > > immutable? So, we could have something like
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> > > > > > > > > > > > But it looks rather verbose?
> > > > > > > > > > > > 
> > > > > > > > > > > > The other question is of course, to what extend to we 
> > > > > > > > > > > > want
> > > > to
> > > > > > keep
> > > > > > > > the
> > > > > > > > > > > > distinction between "primary" and "secondary" data? To 
> > > > > > > > > > > > me,
> > > > > > it's a
> > > > > > > > > > > > question of easy of use?
> > > > > > > > > > > > 
> > > > > > > > > > > > Just putting all this out to move the discussion 
> > > > > > > > > > > > forward.
> > > > > > Don't
> > > > > > > > have a
> > > > > > > > > > > > concrete proposal atm.
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > -Matthias
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > On 9/14/20 9:24 AM, John Roesler wrote:
> > > > > > > > > > > > > Thanks for this thought, Matthias!
> > > > > > > > > > > > > 
> > > > > > > > > > > > > To be honest, it's bugged me quite a bit that _all_ 
> > > > > > > > > > > > > the
> > > > > > > > > > > > > record information hasn't been an argument to 
> > > > > > > > > > > > > `process`. I
> > > > > > > > > > > > > suppose I was trying to be conservative in this 
> > > > > > > > > > > > > proposal,
> > > > > > > > > > > > > but then again, if we're adding new Processor and
> > > > > > > > > > > > > ProcessorContext interfaces, then this is the time to 
> > > > > > > > > > > > > make
> > > > > > > > > > > > > such a change.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > To be unambiguous, I think this is what we're talking
> > > > about:
> > > > > > > > > > > > > ProcessorContext:
> > > > > > > > > > > > > * applicationId
> > > > > > > > > > > > > * taskId
> > > > > > > > > > > > > * appConfigs
> > > > > > > > > > > > > * appConfigsWithPrefix
> > > > > > > > > > > > > * keySerde
> > > > > > > > > > > > > * valueSerde
> > > > > > > > > > > > > * stateDir
> > > > > > > > > > > > > * metrics
> > > > > > > > > > > > > * schedule
> > > > > > > > > > > > > * commit
> > > > > > > > > > > > > * forward
> > > > > > > > > > > > > 
> > > > > > > > > > > > > StateStoreContext:
> > > > > > > > > > > > > * applicationId
> > > > > > > > > > > > > * taskId
> > > > > > > > > > > > > * appConfigs
> > > > > > > > > > > > > * appConfigsWithPrefix
> > > > > > > > > > > > > * keySerde
> > > > > > > > > > > > > * valueSerde
> > > > > > > > > > > > > * stateDir
> > > > > > > > > > > > > * metrics
> > > > > > > > > > > > > * register
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > RecordContext
> > > > > > > > > > > > > * topic
> > > > > > > > > > > > > * partition
> > > > > > > > > > > > > * offset
> > > > > > > > > > > > > * timestamp
> > > > > > > > > > > > > * headers
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Your proposal sounds good to me as-is. Just to cover 
> > > > > > > > > > > > > the
> > > > > > > > > > > > > bases, though, I'm wondering if we should push the 
> > > > > > > > > > > > > idea
> > > > just
> > > > > > > > > > > > > a little farther. Instead of decomposing
> > > > key,value,context,
> > > > > > > > > > > > > we could just keep them all in one object, like this:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Record:
> > > > > > > > > > > > > * key
> > > > > > > > > > > > > * value
> > > > > > > > > > > > > * topic
> > > > > > > > > > > > > * partition
> > > > > > > > > > > > > * offset
> > > > > > > > > > > > > * timestamp
> > > > > > > > > > > > > * headers
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Then, we could have:
> > > > > > > > > > > > > Processor#process(Record)
> > > > > > > > > > > > > ProcessorContext#forward(Record, To)
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Viewed from this perspective, a record has three
> > > > properties
> > > > > > > > > > > > > that people may specify in their processors: key, 
> > > > > > > > > > > > > value,
> > > > and
> > > > > > > > > > > > > timestamp.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > We could deprecate `To#withTimestamp` and enable 
> > > > > > > > > > > > > people to
> > > > > > > > > > > > > specify the timestamp along with the key and value 
> > > > > > > > > > > > > when
> > > > they
> > > > > > > > > > > > > forward a record.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > E.g.,
> > > > > > > > > > > > > RecordBuilder toForward = RecordBuilder.copy(record)
> > > > > > > > > > > > > toForward.withKey(newKey)
> > > > > > > > > > > > > toForward.withValue(newValue)
> > > > > > > > > > > > > toForward.withTimestamp(newTimestamp)
> > > > > > > > > > > > > Record newRecord = toForward.build()
> > > > > > > > > > > > > context.forward(newRecord, To.child("child1"))
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Or, the more compact common case:
> > > > > > > > > > > > > current:
> > > > > > > > > > > > >  context.forward(key, "newValue")
> > > > > > > > > > > > > proposed:
> > > > > > > > > > > > > 
> > > > context.forward(copy(record).withValue("newValue").build())
> > > > > > > > > > > > > It's slightly more verbose, but also more extensible. 
> > > > > > > > > > > > > This
> > > > > > > > > > > > > would give us a clean path to add header support in 
> > > > > > > > > > > > > PAPI
> > > > as
> > > > > > > > > > > > > well, simply by adding `withHeaders` in RecordBuilder.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > It's also more symmetrical, since the recipient of
> > > > `forward`
> > > > > > > > > > > > > would just get the sent `Record`. Whereas today, the
> > > > sender
> > > > > > > > > > > > > puts the timestamp in `To`, but the recipient gets in 
> > > > > > > > > > > > > in
> > > > its
> > > > > > > > > > > > > own `ProcessorContext`.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > WDYT?
> > > > > > > > > > > > > -John
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax 
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > I think separating the different contexts make 
> > > > > > > > > > > > > > sense.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > In fact, we could even go one step further and 
> > > > > > > > > > > > > > remove
> > > > the
> > > > > > > > record
> > > > > > > > > > > context
> > > > > > > > > > > > > > from the processor context completely and we add a 
> > > > > > > > > > > > > > third
> > > > > > > > parameter to
> > > > > > > > > > > > > > `process(key, value, recordContext)`. This would 
> > > > > > > > > > > > > > make it
> > > > > > clear
> > > > > > > > that
> > > > > > > > > > > the
> > > > > > > > > > > > > > context is for the input record only and it's not
> > > > > > possible to
> > > > > > > > pass
> > > > > > > > > > > it to
> > > > > > > > > > > > > > a `punctuate` callback.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > For the stores and changelogging: I think there are 
> > > > > > > > > > > > > > two
> > > > > > cases.
> > > > > > > > (1)
> > > > > > > > > > > You
> > > > > > > > > > > > > > use a plain key-value store. For this case, it 
> > > > > > > > > > > > > > seems you
> > > > > > do
> > > > > > > > not care
> > > > > > > > > > > > > > about the timestamp and thus does not care what
> > > > timestamp
> > > > > > is
> > > > > > > > set in
> > > > > > > > > > > the
> > > > > > > > > > > > > > changelog records. (We can set anything we want, as 
> > > > > > > > > > > > > > it's
> > > > > > not
> > > > > > > > > > > relevant at
> > > > > > > > > > > > > > all -- the timestamp is ignored on read anyway.) 
> > > > > > > > > > > > > > (2) The
> > > > > > other
> > > > > > > > case
> > > > > > > > > > > is,
> > > > > > > > > > > > > > that one does care about timestamps, and for this 
> > > > > > > > > > > > > > case
> > > > > > should
> > > > > > > > use
> > > > > > > > > > > > > > TimestampedKeyValueStore. The passed timestamp will 
> > > > > > > > > > > > > > be
> > > > > > set on
> > > > > > > > the
> > > > > > > > > > > > > > changelog records for this case.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Thus, for both cases, accessing the record context 
> > > > > > > > > > > > > > does
> > > > > > not
> > > > > > > > seems to
> > > > > > > > > > > be
> > > > > > > > > > > > > > a requirement. And providing access to the processor
> > > > > > context
> > > > > > > > to, eg.,
> > > > > > > > > > > > > > `forward()` or similar seems safe.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On 9/10/20 7:25 PM, John Roesler wrote:
> > > > > > > > > > > > > > > Thanks for the reply, Paul!
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > I certainly intend to make sure that the 
> > > > > > > > > > > > > > > changelogging
> > > > > > layer
> > > > > > > > > > > > > > > continues to work the way it does now, by hook or 
> > > > > > > > > > > > > > > by
> > > > > > crook.
> > > > > > > > > > > > > > > I think the easiest path for me is to just 
> > > > > > > > > > > > > > > "cheat" and
> > > > > > get
> > > > > > > > > > > > > > > the real ProcessorContext into the 
> > > > > > > > > > > > > > > ChangeLoggingStore
> > > > > > > > > > > > > > > implementation somehow. I'll tag you on the PR 
> > > > > > > > > > > > > > > when I
> > > > > > create
> > > > > > > > > > > > > > > it, so you have an opportunity to express a 
> > > > > > > > > > > > > > > preference
> > > > > > about
> > > > > > > > > > > > > > > the implementation choice, and maybe even 
> > > > > > > > > > > > > > > compile/test
> > > > > > > > > > > > > > > against it to make sure your stuff still works.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Regarding this:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > we have an interest in making a state store 
> > > > > > > > > > > > > > > > with a
> > > > > > richer
> > > > > > > > > > > > > > > > way of querying its data (like perhaps getting 
> > > > > > > > > > > > > > > > all
> > > > > > values
> > > > > > > > > > > > > > > > associated with a secondary key), while still
> > > > > > ultimately
> > > > > > > > > > > > > > > > writing to the changelog topic for later
> > > > restoration.
> > > > > > > > > > > > > > > This is very intriguing to me. On the side, I've 
> > > > > > > > > > > > > > > been
> > > > > > > > > > > > > > > preparing a couple of ideas related to this 
> > > > > > > > > > > > > > > topic. I
> > > > > > don't
> > > > > > > > > > > > > > > think I have a coherent enough thought to even 
> > > > > > > > > > > > > > > express
> > > > > > it in
> > > > > > > > > > > > > > > a Jira right now, but when I do, I'll tag you on 
> > > > > > > > > > > > > > > it
> > > > > > also to
> > > > > > > > > > > > > > > see what you think.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Whenever you're ready to share the usability
> > > > improvement
> > > > > > > > > > > > > > > ideas, I'm very interested to see what you've 
> > > > > > > > > > > > > > > come up
> > > > > > with.
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen 
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > when you use a HashMap or RocksDB or other 
> > > > > > > > > > > > > > > > > "state
> > > > > > > > stores", you
> > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > expect them to automatically know extra stuff
> > > > about
> > > > > > the
> > > > > > > > record
> > > > > > > > > > > you're
> > > > > > > > > > > > > > > > > storing.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > So, I don't think there is any reason we *can't*
> > > > > > retain the
> > > > > > > > > > > record context
> > > > > > > > > > > > > > > > > in the StateStoreContext, and if any users 
> > > > > > > > > > > > > > > > > came
> > > > > > along
> > > > > > > > with a
> > > > > > > > > > > clear use case
> > > > > > > > > > > > > > > > > I'd find that convincing.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > I agree with the principle of being conservative
> > > > with
> > > > > > the
> > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > API.  Regarding user expectations or a clear use
> > > > > > case, the
> > > > > > > > only
> > > > > > > > > > > > > > > > counterpoint I would offer is that we sort of 
> > > > > > > > > > > > > > > > have
> > > > > > that
> > > > > > > > use case
> > > > > > > > > > > already,
> > > > > > > > > > > > > > > > which is the example I gave of the change 
> > > > > > > > > > > > > > > > logging
> > > > > > store
> > > > > > > > using the
> > > > > > > > > > > > > > > > timestamp.  I am curious if this functionality 
> > > > > > > > > > > > > > > > will
> > > > be
> > > > > > > > retained
> > > > > > > > > > > when using
> > > > > > > > > > > > > > > > built in state stores, or will a low-level 
> > > > > > > > > > > > > > > > processor
> > > > > > get a
> > > > > > > > > > > KeyValueStore
> > > > > > > > > > > > > > > > that no longer writes to the changelog topic 
> > > > > > > > > > > > > > > > with
> > > > the
> > > > > > > > record's
> > > > > > > > > > > timestamp.
> > > > > > > > > > > > > > > > While I personally don't care much about that
> > > > > > functionality
> > > > > > > > > > > specifically, I
> > > > > > > > > > > > > > > > have a general desire for custom state stores to
> > > > > > easily do
> > > > > > > > the
> > > > > > > > > > > things that
> > > > > > > > > > > > > > > > built in state stores do.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > It genuinely did not occur to me that users 
> > > > > > > > > > > > > > > > might be
> > > > > > > > looking up
> > > > > > > > > > > and/or
> > > > > > > > > > > > > > > > > updating records of other keys from within a
> > > > > > Processor.
> > > > > > > > > > > > > > > > I'm glad you said this Sophie, because it gives 
> > > > > > > > > > > > > > > > me
> > > > an
> > > > > > > > > > > opportunity to say
> > > > > > > > > > > > > > > > that this is actually a *huge* use case for my 
> > > > > > > > > > > > > > > > team.
> > > > > > The
> > > > > > > > state
> > > > > > > > > > > store
> > > > > > > > > > > > > > > > usability improvements I was referring to in my
> > > > > > previous
> > > > > > > > message
> > > > > > > > > > > were about
> > > > > > > > > > > > > > > > enabling the user to write custom stores while 
> > > > > > > > > > > > > > > > still
> > > > > > easily
> > > > > > > > > > > hooking into
> > > > > > > > > > > > > > > > the ability to write to a changelog topic.  I 
> > > > > > > > > > > > > > > > think
> > > > > > that is
> > > > > > > > > > > technically
> > > > > > > > > > > > > > > > possible now, but I don't think it's trivial.
> > > > > > > > Specifically, we
> > > > > > > > > > > have an
> > > > > > > > > > > > > > > > interest in making a state store with a richer 
> > > > > > > > > > > > > > > > way
> > > > of
> > > > > > > > querying
> > > > > > > > > > > its data
> > > > > > > > > > > > > > > > (like perhaps getting all values associated 
> > > > > > > > > > > > > > > > with a
> > > > > > > > secondary
> > > > > > > > > > > key), while
> > > > > > > > > > > > > > > > still ultimately writing to the changelog topic 
> > > > > > > > > > > > > > > > for
> > > > > > later
> > > > > > > > > > > restoration.
> > > > > > > > > > > > > > > > We recognize that this use case throws away 
> > > > > > > > > > > > > > > > some of
> > > > > > what
> > > > > > > > kafka
> > > > > > > > > > > streams
> > > > > > > > > > > > > > > > (especially the DSL) is good at - easy
> > > > > > parallelizability by
> > > > > > > > > > > partitioning
> > > > > > > > > > > > > > > > all processing by key - and that our business 
> > > > > > > > > > > > > > > > logic
> > > > > > would
> > > > > > > > > > > completely fall
> > > > > > > > > > > > > > > > apart if we were consuming from multi-partition
> > > > > > topics with
> > > > > > > > > > > multiple
> > > > > > > > > > > > > > > > consumers.  But we have found that using the low
> > > > level
> > > > > > > > processor
> > > > > > > > > > > API is
> > > > > > > > > > > > > > > > good for the very simple stream processing
> > > > primitives
> > > > > > it
> > > > > > > > > > > provides: handling
> > > > > > > > > > > > > > > > the plumbing of consuming from multiple kafka 
> > > > > > > > > > > > > > > > topics
> > > > > > and
> > > > > > > > > > > potentially
> > > > > > > > > > > > > > > > updating persistent local state in a reliable 
> > > > > > > > > > > > > > > > way.
> > > > > > That in
> > > > > > > > > > > itself has
> > > > > > > > > > > > > > > > proven to be a worthwhile programming model.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Since I got off track a bit, let me summarize: I
> > > > don't
> > > > > > > > > > > particularly care
> > > > > > > > > > > > > > > > about the record context being available to 
> > > > > > > > > > > > > > > > state
> > > > > > store
> > > > > > > > > > > implementations,
> > > > > > > > > > > > > > > > and I think this KIP is headed in the right
> > > > direction
> > > > > > in
> > > > > > > > that
> > > > > > > > > > > regard.  But
> > > > > > > > > > > > > > > > more generally, I wanted to express the 
> > > > > > > > > > > > > > > > importance
> > > > of
> > > > > > > > > > > maintaining a
> > > > > > > > > > > > > > > > powerful and flexible StateStore interface.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Thanks!
> > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie 
> > > > > > > > > > > > > > > > Blee-Goldman
> > > > <
> > > > > > > > > > > sop...@confluent.io>
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Aha, I did misinterpret the example in your
> > > > previous
> > > > > > > > response
> > > > > > > > > > > regarding the
> > > > > > > > > > > > > > > > > range query after all. I thought you just 
> > > > > > > > > > > > > > > > > meant a
> > > > > > > > time-range
> > > > > > > > > > > query inside a
> > > > > > > > > > > > > > > > > punctuator. It genuinely did not occur to me 
> > > > > > > > > > > > > > > > > that
> > > > > > users
> > > > > > > > might
> > > > > > > > > > > be looking up
> > > > > > > > > > > > > > > > > and/or updating records of other keys from 
> > > > > > > > > > > > > > > > > within
> > > > a
> > > > > > > > Processor.
> > > > > > > > > > > Sorry for
> > > > > > > > > > > > > > > > > being closed minded
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > I won't drag out this discussion any further 
> > > > > > > > > > > > > > > > > by
> > > > > > asking
> > > > > > > > whether
> > > > > > > > > > > that might
> > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > a valid use case or just a lurking bug in 
> > > > > > > > > > > > > > > > > itself
> > > > :)
> > > > > > > > > > > > > > > > > Thanks for humoring me. The current proposal 
> > > > > > > > > > > > > > > > > for
> > > > > > KIP-478
> > > > > > > > > > > sounds good to me
> > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <
> > > > > > > > > > > vvcep...@apache.org> wrote:
> > > > > > > > > > > > > > > > > > Ah, thanks Sophie,
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I'm sorry for misinterpreting your resonse. 
> > > > > > > > > > > > > > > > > > Yes,
> > > > > > we
> > > > > > > > > > > > > > > > > > absolutely can and should clear the context
> > > > before
> > > > > > > > > > > > > > > > > > punctuating.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > My secondary concern is maybe more 
> > > > > > > > > > > > > > > > > > far-fetched.
> > > > I
> > > > > > was
> > > > > > > > > > > > > > > > > > thinking that inside process(key,value), a
> > > > > > Processor
> > > > > > > > might
> > > > > > > > > > > > > > > > > > do a get/put of a _different_ key. 
> > > > > > > > > > > > > > > > > > Consider, for
> > > > > > > > example,
> > > > > > > > > > > > > > > > > > the way that Suppress processors work. When 
> > > > > > > > > > > > > > > > > > they
> > > > > > get a
> > > > > > > > > > > > > > > > > > record, they add it to the store and then 
> > > > > > > > > > > > > > > > > > do a
> > > > > > range
> > > > > > > > scan
> > > > > > > > > > > > > > > > > > and possibly forward a _different_ record. 
> > > > > > > > > > > > > > > > > > Of
> > > > > > course,
> > > > > > > > this
> > > > > > > > > > > > > > > > > > is an operation that is deeply coupled to 
> > > > > > > > > > > > > > > > > > the
> > > > > > > > internals, and
> > > > > > > > > > > > > > > > > > the Suppress processor accordingly actually 
> > > > > > > > > > > > > > > > > > does
> > > > > > get
> > > > > > > > access
> > > > > > > > > > > > > > > > > > to the internal context so that it can set 
> > > > > > > > > > > > > > > > > > the
> > > > > > context
> > > > > > > > > > > > > > > > > > before forwarding.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Still, it seems like I've had a handful of
> > > > > > > > conversations
> > > > > > > > > > > > > > > > > > with people over the years in which they 
> > > > > > > > > > > > > > > > > > tell me
> > > > > > they
> > > > > > > > are
> > > > > > > > > > > > > > > > > > using state stores in a way that transcends 
> > > > > > > > > > > > > > > > > > the
> > > > > > "get
> > > > > > > > and put
> > > > > > > > > > > > > > > > > > the currently processing record" access
> > > > pattern. I
> > > > > > > > doubt
> > > > > > > > > > > > > > > > > > that those folks would even have considered 
> > > > > > > > > > > > > > > > > > the
> > > > > > > > possiblity
> > > > > > > > > > > > > > > > > > that the currently processing record's 
> > > > > > > > > > > > > > > > > > _context_
> > > > > > could
> > > > > > > > > > > > > > > > > > pollute their state store operations, as I
> > > > myself
> > > > > > > > never gave
> > > > > > > > > > > > > > > > > > it a second thought until the current
> > > > conversation
> > > > > > > > began. In
> > > > > > > > > > > > > > > > > > cases like that, we have actually set a 
> > > > > > > > > > > > > > > > > > trap for
> > > > > > these
> > > > > > > > > > > > > > > > > > people, and it seems better to dismantle the
> > > > trap.
> > > > > > > > > > > > > > > > > > As you noted, really the only people who 
> > > > > > > > > > > > > > > > > > would
> > > > be
> > > > > > > > negatively
> > > > > > > > > > > > > > > > > > impacted are people who implement their own
> > > > state
> > > > > > > > stores.
> > > > > > > > > > > > > > > > > > These folks will get the deprecation 
> > > > > > > > > > > > > > > > > > warning and
> > > > > > try to
> > > > > > > > > > > > > > > > > > adapt their stores to the new interface. If 
> > > > > > > > > > > > > > > > > > they
> > > > > > needed
> > > > > > > > > > > > > > > > > > access to the record context, they would 
> > > > > > > > > > > > > > > > > > find
> > > > > > it's now
> > > > > > > > > > > > > > > > > > missing. They'd ask us about it, and we'd 
> > > > > > > > > > > > > > > > > > have
> > > > the
> > > > > > > > ability
> > > > > > > > > > > > > > > > > > to explain the lurking bug that they have 
> > > > > > > > > > > > > > > > > > had in
> > > > > > their
> > > > > > > > > > > > > > > > > > stores all along, as well as the new 
> > > > > > > > > > > > > > > > > > recommended
> > > > > > > > pattern
> > > > > > > > > > > > > > > > > > (just pass everything you need in the 
> > > > > > > > > > > > > > > > > > value). If
> > > > > > that's
> > > > > > > > > > > > > > > > > > unsatisfying, _then_ we should consider 
> > > > > > > > > > > > > > > > > > amending
> > > > > > the
> > > > > > > > API.
> > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie
> > > > > > Blee-Goldman
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > Regarding your first sentence, "...the
> > > > > > processor
> > > > > > > > would
> > > > > > > > > > > null
> > > > > > > > > > > > > > > > > > > > out the record context...", this is not
> > > > > > possible,
> > > > > > > > since
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > processor doesn't have write access to 
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > context. We
> > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > add it,
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Sorry, this was poorly phrased, I 
> > > > > > > > > > > > > > > > > > > definitely
> > > > > > did not
> > > > > > > > mean
> > > > > > > > > > > to imply that
> > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > should make the context modifiable by the
> > > > > > Processors
> > > > > > > > > > > themselves. I
> > > > > > > > > > > > > > > > > meant
> > > > > > > > > > > > > > > > > > > this should be handled by the internal
> > > > > > processing
> > > > > > > > > > > framework that deals
> > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > passing records from one Processor to the
> > > > next,
> > > > > > > > setting
> > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > when a new record is picked up, invoking 
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > punctuators,
> > > > > > > > > > > etc. I
> > > > > > > > > > > > > > > > > believe
> > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > all currently happens in the StreamTask? 
> > > > > > > > > > > > > > > > > > > It
> > > > > > already
> > > > > > > > can
> > > > > > > > > > > and does
> > > > > > > > > > > > > > > > > > overwrite
> > > > > > > > > > > > > > > > > > > the record context as new records are
> > > > > > processed, and
> > > > > > > > is
> > > > > > > > > > > also
> > > > > > > > > > > > > > > > > responsible
> > > > > > > > > > > > > > > > > > > for calling the punctuators, so it doesn't
> > > > seem
> > > > > > like
> > > > > > > > a
> > > > > > > > > > > huge leap to
> > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > say
> > > > > > > > > > > > > > > > > > > "null out the current record before
> > > > punctuating"
> > > > > > > > > > > > > > > > > > > To clarify, I was never advocating or even
> > > > > > > > considering to
> > > > > > > > > > > give the
> > > > > > > > > > > > > > > > > > > Processors
> > > > > > > > > > > > > > > > > > > write access to the record context. Sorry 
> > > > > > > > > > > > > > > > > > > if
> > > > my
> > > > > > last
> > > > > > > > > > > message (or all of
> > > > > > > > > > > > > > > > > > > them)
> > > > > > > > > > > > > > > > > > > was misleading. I just wanted to point out
> > > > that
> > > > > > the
> > > > > > > > > > > punctuator concern
> > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > orthogonal to the question of whether we
> > > > should
> > > > > > > > include
> > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > in the StateStoreContext. It's definitely 
> > > > > > > > > > > > > > > > > > > a
> > > > real
> > > > > > > > problem,
> > > > > > > > > > > but it's a
> > > > > > > > > > > > > > > > > > > problem
> > > > > > > > > > > > > > > > > > > that exists at the Processor level and not
> > > > just
> > > > > > the
> > > > > > > > > > > StateStore.
> > > > > > > > > > > > > > > > > > > So, I don't think there is any reason we
> > > > *can't*
> > > > > > > > retain
> > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > in the
> > > > > > > > > > > > > > > > > > > StateStoreContext, and if any users came 
> > > > > > > > > > > > > > > > > > > along
> > > > > > with a
> > > > > > > > > > > clear use case
> > > > > > > > > > > > > > > > > I'd
> > > > > > > > > > > > > > > > > > > find
> > > > > > > > > > > > > > > > > > > that convincing. In the absence of any
> > > > > > examples, the
> > > > > > > > > > > conservative
> > > > > > > > > > > > > > > > > > approach
> > > > > > > > > > > > > > > > > > > sounds good to me.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > If it turns out that someone did need the
> > > > record
> > > > > > > > context
> > > > > > > > > > > in their
> > > > > > > > > > > > > > > > > custom
> > > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > store, I'm sure they'll submit a politely
> > > > > > worded bug
> > > > > > > > > > > report alerting us
> > > > > > > > > > > > > > > > > > > that we
> > > > > > > > > > > > > > > > > > > broke their application.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John 
> > > > > > > > > > > > > > > > > > > Roesler <
> > > > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > Thanks, Sophie,
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Yes, now that you point it out, I can 
> > > > > > > > > > > > > > > > > > > > see
> > > > > > that the
> > > > > > > > record
> > > > > > > > > > > > > > > > > > > > context itself should be nulled out by
> > > > Streams
> > > > > > > > before
> > > > > > > > > > > > > > > > > > > > invoking punctuators. From that 
> > > > > > > > > > > > > > > > > > > > perspective,
> > > > > > we
> > > > > > > > don't
> > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > > to think about the second-order problem 
> > > > > > > > > > > > > > > > > > > > of
> > > > > > what's
> > > > > > > > in the
> > > > > > > > > > > > > > > > > > > > context for the state store when called
> > > > from a
> > > > > > > > > > > punctuator.
> > > > > > > > > > > > > > > > > > > > Regarding your first sentence, "...the
> > > > > > processor
> > > > > > > > would
> > > > > > > > > > > null
> > > > > > > > > > > > > > > > > > > > out the record context...", this is not
> > > > > > possible,
> > > > > > > > since
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > processor doesn't have write access to 
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > context. We
> > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > add it, but then all kinds of strange
> > > > effects
> > > > > > > > would ensue
> > > > > > > > > > > > > > > > > > > > when downstream processors execute but 
> > > > > > > > > > > > > > > > > > > > the
> > > > > > context
> > > > > > > > is
> > > > > > > > > > > empty,
> > > > > > > > > > > > > > > > > > > > etc. Better to just let the framework 
> > > > > > > > > > > > > > > > > > > > manage
> > > > > > the
> > > > > > > > record
> > > > > > > > > > > > > > > > > > > > context and keep it read-only for
> > > > Processors.
> > > > > > > > > > > > > > > > > > > > Reading between the lines of your last
> > > > reply,
> > > > > > it
> > > > > > > > sounds
> > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > the disconnect may just have been a 
> > > > > > > > > > > > > > > > > > > > mutual
> > > > > > > > > > > misunderstanding
> > > > > > > > > > > > > > > > > > > > about whether or not Processors 
> > > > > > > > > > > > > > > > > > > > currently
> > > > have
> > > > > > > > access to
> > > > > > > > > > > set
> > > > > > > > > > > > > > > > > > > > the record context. Since they do not, 
> > > > > > > > > > > > > > > > > > > > if we
> > > > > > > > wanted to
> > > > > > > > > > > add
> > > > > > > > > > > > > > > > > > > > the record context to StateStoreContext 
> > > > > > > > > > > > > > > > > > > > in a
> > > > > > > > well-defined
> > > > > > > > > > > > > > > > > > > > way, we'd also have to add the ability 
> > > > > > > > > > > > > > > > > > > > for
> > > > > > > > Processors to
> > > > > > > > > > > > > > > > > > > > manipulate it. But then, we're just
> > > > creating a
> > > > > > > > > > > side-channel
> > > > > > > > > > > > > > > > > > > > for Processors to pass some information 
> > > > > > > > > > > > > > > > > > > > in
> > > > > > > > arguments to
> > > > > > > > > > > > > > > > > > > > "put()" and other information implicitly
> > > > > > through
> > > > > > > > the
> > > > > > > > > > > > > > > > > > > > context. It seems better just to go for 
> > > > > > > > > > > > > > > > > > > > a
> > > > > > single
> > > > > > > > channel
> > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > now.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > It sounds like you're basically in 
> > > > > > > > > > > > > > > > > > > > favor of
> > > > > > the
> > > > > > > > > > > conservative
> > > > > > > > > > > > > > > > > > > > approach, and you just wanted to 
> > > > > > > > > > > > > > > > > > > > understand
> > > > > > the
> > > > > > > > blockers
> > > > > > > > > > > > > > > > > > > > that I implied. Does my clarification 
> > > > > > > > > > > > > > > > > > > > make
> > > > > > sense?
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, 
> > > > > > > > > > > > > > > > > > > > Sophie
> > > > > > > > Blee-Goldman
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > I was just thinking that the processor
> > > > would
> > > > > > > > null out
> > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > after it
> > > > > > > > > > > > > > > > > > > > > finished processing the record, so 
> > > > > > > > > > > > > > > > > > > > > I'm not
> > > > > > sure I
> > > > > > > > > > > follow why this
> > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > possible? AFAIK we never call a 
> > > > > > > > > > > > > > > > > > > > > punctuator
> > > > > > in the
> > > > > > > > > > > middle of
> > > > > > > > > > > > > > > > > > processing a
> > > > > > > > > > > > > > > > > > > > > record through the topology, and even 
> > > > > > > > > > > > > > > > > > > > > if
> > > > we
> > > > > > did,
> > > > > > > > we
> > > > > > > > > > > still know when
> > > > > > > > > > > > > > > > > > it is
> > > > > > > > > > > > > > > > > > > > > about
> > > > > > > > > > > > > > > > > > > > > to be called and could set it to null
> > > > > > beforehand.
> > > > > > > > > > > > > > > > > > > > > I'm not trying to advocate for it 
> > > > > > > > > > > > > > > > > > > > > here,
> > > > I'm
> > > > > > in
> > > > > > > > > > > agreement that
> > > > > > > > > > > > > > > > > > anything
> > > > > > > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > > > > > > > to access within the store can and 
> > > > > > > > > > > > > > > > > > > > > should
> > > > be
> > > > > > > > accessed
> > > > > > > > > > > within the
> > > > > > > > > > > > > > > > > > calling
> > > > > > > > > > > > > > > > > > > > > Processor/Punctuator before reaching 
> > > > > > > > > > > > > > > > > > > > > the
> > > > > > store.
> > > > > > > > The
> > > > > > > > > > > "we can always
> > > > > > > > > > > > > > > > > > add it
> > > > > > > > > > > > > > > > > > > > > later if necessary" argument is also
> > > > pretty
> > > > > > > > > > > convincing. Just trying
> > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > understand
> > > > > > > > > > > > > > > > > > > > > why this wouldn't be possible.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > FWIW, the question of "what is the 
> > > > > > > > > > > > > > > > > > > > > current
> > > > > > > > record in
> > > > > > > > > > > the context
> > > > > > > > > > > > > > > > > of a
> > > > > > > > > > > > > > > > > > > > > Punctuator"
> > > > > > > > > > > > > > > > > > > > > exists independently of whether we 
> > > > > > > > > > > > > > > > > > > > > want to
> > > > > > add
> > > > > > > > this to
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > > > > > > or not. The full ProcessorContext,
> > > > > > including the
> > > > > > > > > > > current record
> > > > > > > > > > > > > > > > > > context,
> > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > already available within a 
> > > > > > > > > > > > > > > > > > > > > Punctuator, so
> > > > > > > > removing the
> > > > > > > > > > > current
> > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > from the StateStoreContext does not 
> > > > > > > > > > > > > > > > > > > > > solve
> > > > > > the
> > > > > > > > problem.
> > > > > > > > > > > Users can --
> > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > > > > (see KAFKA-9584 <
> > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9584
> > > > > > > > > > > > > > > > > > ;;)
> > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > hit
> > > > > > > > > > > > > > > > > > > > > such subtle bugs without ever 
> > > > > > > > > > > > > > > > > > > > > invoking a
> > > > > > > > StateStore
> > > > > > > > > > > > > > > > > > > > > from their punctuator.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Again, I think I do agree that we 
> > > > > > > > > > > > > > > > > > > > > should
> > > > > > leave
> > > > > > > > the
> > > > > > > > > > > current record
> > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > off of
> > > > > > > > > > > > > > > > > > > > > the StateStoreContext, but I don't 
> > > > > > > > > > > > > > > > > > > > > think
> > > > the
> > > > > > > > > > > Punctuator argument
> > > > > > > > > > > > > > > > > > against
> > > > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > very convincing. It sounds to me like 
> > > > > > > > > > > > > > > > > > > > > we
> > > > > > need to
> > > > > > > > > > > disallow access to
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > > > > > record context from within the 
> > > > > > > > > > > > > > > > > > > > > Punctuator,
> > > > > > > > independent
> > > > > > > > > > > of anything
> > > > > > > > > > > > > > > > > > to do
> > > > > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > state stores
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John
> > > > > > Roesler <
> > > > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > Thanks for the thoughts, Sophie.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > I agree that the extra information 
> > > > > > > > > > > > > > > > > > > > > > could
> > > > > > be
> > > > > > > > useful.
> > > > > > > > > > > My only
> > > > > > > > > > > > > > > > > > concern is
> > > > > > > > > > > > > > > > > > > > > > that it doesn’t seem like we can
> > > > actually
> > > > > > > > supply
> > > > > > > > > > > that extra
> > > > > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > > > > > > correctly. So, then we have a 
> > > > > > > > > > > > > > > > > > > > > > situation
> > > > > > where
> > > > > > > > the
> > > > > > > > > > > system offers
> > > > > > > > > > > > > > > > > > useful
> > > > > > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > > > > > calls that are only correct in a 
> > > > > > > > > > > > > > > > > > > > > > narrow
> > > > > > range
> > > > > > > > of use
> > > > > > > > > > > cases.
> > > > > > > > > > > > > > > > > > Outside of
> > > > > > > > > > > > > > > > > > > > > > those use cases, you get incorrect
> > > > > > behavior.
> > > > > > > > > > > > > > > > > > > > > > If it were possible to null out the
> > > > > > context
> > > > > > > > before
> > > > > > > > > > > you put a
> > > > > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > which the context doesn’t apply, 
> > > > > > > > > > > > > > > > > > > > > > then
> > > > the
> > > > > > > > concern
> > > > > > > > > > > would be
> > > > > > > > > > > > > > > > > > mitigated.
> > > > > > > > > > > > > > > > > > > > But
> > > > > > > > > > > > > > > > > > > > > > it would still be pretty weird from 
> > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > perspective
> > > > > > > > > > > of the store
> > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > sometimes the context is populated 
> > > > > > > > > > > > > > > > > > > > > > and
> > > > > > other
> > > > > > > > times,
> > > > > > > > > > > it’s null.
> > > > > > > > > > > > > > > > > > > > > > But that seems moot, since it 
> > > > > > > > > > > > > > > > > > > > > > doesn’t
> > > > seem
> > > > > > > > possible
> > > > > > > > > > > to null out
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > context. Only the Processor could 
> > > > > > > > > > > > > > > > > > > > > > know
> > > > > > whether
> > > > > > > > it’s
> > > > > > > > > > > about to put
> > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > > > > > > different from the context or not. 
> > > > > > > > > > > > > > > > > > > > > > And
> > > > it
> > > > > > > > would be
> > > > > > > > > > > inappropriate
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > offer a
> > > > > > > > > > > > > > > > > > > > > > public ProcessorContext api to 
> > > > > > > > > > > > > > > > > > > > > > manage
> > > > the
> > > > > > > > record
> > > > > > > > > > > context.
> > > > > > > > > > > > > > > > > > > > > > Ultimately, it still seems like if 
> > > > > > > > > > > > > > > > > > > > > > you
> > > > > > want to
> > > > > > > > store
> > > > > > > > > > > headers, you
> > > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > > > store them explicitly, right? That
> > > > > > doesn’t seem
> > > > > > > > > > > onerous to me,
> > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > kind
> > > > > > > > > > > > > > > > > > > > > > of seems better than relying on
> > > > undefined
> > > > > > or
> > > > > > > > > > > asymmetrical
> > > > > > > > > > > > > > > > > behavior
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > store itself.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Anyway, I’m not saying that we 
> > > > > > > > > > > > > > > > > > > > > > couldn’t
> > > > > > solve
> > > > > > > > these
> > > > > > > > > > > problems.
> > > > > > > > > > > > > > > > > Just
> > > > > > > > > > > > > > > > > > > > that it
> > > > > > > > > > > > > > > > > > > > > > seems a little that we can be
> > > > > > conservative and
> > > > > > > > avoid
> > > > > > > > > > > them for
> > > > > > > > > > > > > > > > > now.
> > > > > > > > > > > > > > > > > > If
> > > > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > > turns out we really need to solve 
> > > > > > > > > > > > > > > > > > > > > > them,
> > > > > > we can
> > > > > > > > > > > always do it
> > > > > > > > > > > > > > > > > later.
> > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, 
> > > > > > > > > > > > > > > > > > > > > > Sophie
> > > > > > > > Blee-Goldman
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > If you were to call "put" from a
> > > > > > > > punctuator, or
> > > > > > > > > > > do a
> > > > > > > > > > > > > > > > > > > > > > > > `range()` query and then update 
> > > > > > > > > > > > > > > > > > > > > > > > one
> > > > of
> > > > > > > > those
> > > > > > > > > > > records with
> > > > > > > > > > > > > > > > > > > > > > > > `put()`, you'd have a very 
> > > > > > > > > > > > > > > > > > > > > > > > subtle
> > > > bug
> > > > > > on
> > > > > > > > your
> > > > > > > > > > > hands.
> > > > > > > > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I
> > > > agree
> > > > > > > > that the
> > > > > > > > > > > punctuator
> > > > > > > > > > > > > > > > > > case is
> > > > > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > > > > obvious exemption to the 
> > > > > > > > > > > > > > > > > > > > > > > assumption
> > > > that
> > > > > > > > store
> > > > > > > > > > > invocations
> > > > > > > > > > > > > > > > > always
> > > > > > > > > > > > > > > > > > > > > > > have a corresponding "current 
> > > > > > > > > > > > > > > > > > > > > > > record",
> > > > > > but I
> > > > > > > > don't
> > > > > > > > > > > understand
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > second example. Are you 
> > > > > > > > > > > > > > > > > > > > > > > envisioning a
> > > > > > > > scenario
> > > > > > > > > > > where the
> > > > > > > > > > > > > > > > > #process
> > > > > > > > > > > > > > > > > > > > > > > method performs a range query and 
> > > > > > > > > > > > > > > > > > > > > > > then
> > > > > > > > updates
> > > > > > > > > > > records? Or were
> > > > > > > > > > > > > > > > > > > > > > > you just giving another example 
> > > > > > > > > > > > > > > > > > > > > > > of the
> > > > > > > > punctuator
> > > > > > > > > > > case?
> > > > > > > > > > > > > > > > > > > > > > > I only bring it up because I agree
> > > > that
> > > > > > the
> > > > > > > > > > > current record
> > > > > > > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > > > > still be useful within the 
> > > > > > > > > > > > > > > > > > > > > > > context of
> > > > > > the
> > > > > > > > store.
> > > > > > > > > > > As a non-user
> > > > > > > > > > > > > > > > > my
> > > > > > > > > > > > > > > > > > > > input
> > > > > > > > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > definitely has limited value, but 
> > > > > > > > > > > > > > > > > > > > > > > it
> > > > > > just
> > > > > > > > isn't
> > > > > > > > > > > striking me as
> > > > > > > > > > > > > > > > > > > > obvious
> > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > > > > > should remove access to the 
> > > > > > > > > > > > > > > > > > > > > > > current
> > > > > > record
> > > > > > > > context
> > > > > > > > > > > from the
> > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > stores.
> > > > > > > > > > > > > > > > > > > > > > > If there is no current record, as 
> > > > > > > > > > > > > > > > > > > > > > > in
> > > > the
> > > > > > > > > > > punctuator case, we
> > > > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > > > > set
> > > > > > > > > > > > > > > > > > > > > > > the record context to null (or
> > > > > > > > Optional.empty,
> > > > > > > > > > > etc).
> > > > > > > > > > > > > > > > > > > > > > > That said, the put() always has to
> > > > come
> > > > > > from
> > > > > > > > > > > somewhere, and
> > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > somewhere is always going to be
> > > > either a
> > > > > > > > Processor
> > > > > > > > > > > or a
> > > > > > > > > > > > > > > > > > Punctuator,
> > > > > > > > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > > > > > > > > of which will still have access 
> > > > > > > > > > > > > > > > > > > > > > > to the
> > > > > > full
> > > > > > > > > > > context. So
> > > > > > > > > > > > > > > > > > additional
> > > > > > > > > > > > > > > > > > > > info
> > > > > > > > > > > > > > > > > > > > > > > such as
> > > > > > > > > > > > > > > > > > > > > > > the timestamp can and should 
> > > > > > > > > > > > > > > > > > > > > > > probably
> > > > be
> > > > > > > > supplied
> > > > > > > > > > > to the store
> > > > > > > > > > > > > > > > > > before
> > > > > > > > > > > > > > > > > > > > > > > calling put(), rather than looked 
> > > > > > > > > > > > > > > > > > > > > > > up
> > > > by
> > > > > > the
> > > > > > > > store.
> > > > > > > > > > > But I can
> > > > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > > > > > > things being useful, for example 
> > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > current
> > > > > > > > > > > record's headers.
> > > > > > > > > > > > > > > > > > Maybe
> > > > > > > > > > > > > > > > > > > > > > if/when
> > > > > > > > > > > > > > > > > > > > > > > we add better (or any) support for
> > > > > > headers in
> > > > > > > > > > > state stores this
> > > > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > > > less true.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Of course as John has made clear, 
> > > > > > > > > > > > > > > > > > > > > > > it's
> > > > > > > > pretty hard
> > > > > > > > > > > to judge
> > > > > > > > > > > > > > > > > > without
> > > > > > > > > > > > > > > > > > > > > > > examples
> > > > > > > > > > > > > > > > > > > > > > > and more insight as to what 
> > > > > > > > > > > > > > > > > > > > > > > actually
> > > > > > goes on
> > > > > > > > > > > within a custom
> > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > store
> > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM 
> > > > > > > > > > > > > > > > > > > > > > > John
> > > > > > Roesler <
> > > > > > > > > > > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > Hi Paul,
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > It's good to hear from you!
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the
> > > > > > direction.
> > > > > > > > > > > Especially when
> > > > > > > > > > > > > > > > > > > > > > > > it comes to public API and 
> > > > > > > > > > > > > > > > > > > > > > > > usability
> > > > > > > > concens, I
> > > > > > > > > > > tend to
> > > > > > > > > > > > > > > > > > > > > > > > think that "the folks who 
> > > > > > > > > > > > > > > > > > > > > > > > matter"
> > > > are
> > > > > > > > actually
> > > > > > > > > > > the folks who
> > > > > > > > > > > > > > > > > > > > > > > > have to use the APIs to 
> > > > > > > > > > > > > > > > > > > > > > > > accomplish
> > > > > > real
> > > > > > > > tasks.
> > > > > > > > > > > It can be
> > > > > > > > > > > > > > > > > > > > > > > > hard for me to be sure I'm 
> > > > > > > > > > > > > > > > > > > > > > > > thinking
> > > > > > > > clearly from
> > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > perspective.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > Funny story, I also started down
> > > > this
> > > > > > road
> > > > > > > > a
> > > > > > > > > > > couple of times
> > > > > > > > > > > > > > > > > > > > > > > > already and backed them out 
> > > > > > > > > > > > > > > > > > > > > > > > before
> > > > > > the KIP
> > > > > > > > > > > because I was
> > > > > > > > > > > > > > > > > > > > > > > > afraid of the scope of the 
> > > > > > > > > > > > > > > > > > > > > > > > proposal.
> > > > > > > > > > > Unfortunately, needing
> > > > > > > > > > > > > > > > > > > > > > > > to make a new ProcessorContext 
> > > > > > > > > > > > > > > > > > > > > > > > kind
> > > > of
> > > > > > > > forced my
> > > > > > > > > > > hand.
> > > > > > > > > > > > > > > > > > > > > > > > I see you've called me out 
> > > > > > > > > > > > > > > > > > > > > > > > about the
> > > > > > > > > > > ChangeLogging stores :)
> > > > > > > > > > > > > > > > > > > > > > > > In fact, I think these are the
> > > > > > main/only
> > > > > > > > reason
> > > > > > > > > > > that stores
> > > > > > > > > > > > > > > > > > > > > > > > might really need to invoke
> > > > > > "forward()". My
> > > > > > > > > > > secret plan was
> > > > > > > > > > > > > > > > > > > > > > > > to cheat and either accomplish
> > > > > > > > change-logging by
> > > > > > > > > > > a different
> > > > > > > > > > > > > > > > > > > > > > > > mechanism than implementing the
> > > > store
> > > > > > > > interface,
> > > > > > > > > > > or by just
> > > > > > > > > > > > > > > > > > > > > > > > breaking encapsulation to sneak 
> > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > "real"
> > > > > > > > > > > ProcessorContext
> > > > > > > > > > > > > > > > > > > > > > > > into the ChangeLogging stores. 
> > > > > > > > > > > > > > > > > > > > > > > > But
> > > > > > those
> > > > > > > > are all
> > > > > > > > > > > > > > > > > > > > > > > > implementation details. I think 
> > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > key
> > > > > > > > question
> > > > > > > > > > > is whether
> > > > > > > > > > > > > > > > > > > > > > > > anyone else has a store
> > > > > > implementation that
> > > > > > > > > > > needs to call
> > > > > > > > > > > > > > > > > > > > > > > > "forward()". It's not what you
> > > > > > mentioned,
> > > > > > > > but
> > > > > > > > > > > since you
> > > > > > > > > > > > > > > > > > > > > > > > spoke up, I'll just ask: if you 
> > > > > > > > > > > > > > > > > > > > > > > > have
> > > > > > a use
> > > > > > > > case
> > > > > > > > > > > for calling
> > > > > > > > > > > > > > > > > > > > > > > > "forward()" in a store, please 
> > > > > > > > > > > > > > > > > > > > > > > > share
> > > > > > it.
> > > > > > > > > > > > > > > > > > > > > > > > Regarding the other 
> > > > > > > > > > > > > > > > > > > > > > > > record-specific
> > > > > > context
> > > > > > > > > > > methods, I think
> > > > > > > > > > > > > > > > > > > > > > > > you have a good point, but I 
> > > > > > > > > > > > > > > > > > > > > > > > also
> > > > > > can't
> > > > > > > > quite
> > > > > > > > > > > wrap my head
> > > > > > > > > > > > > > > > > > > > > > > > around how we can actually 
> > > > > > > > > > > > > > > > > > > > > > > > guarantee
> > > > > > it to
> > > > > > > > work
> > > > > > > > > > > in general.
> > > > > > > > > > > > > > > > > > > > > > > > For example, the case you cited,
> > > > > > where the
> > > > > > > > > > > implementation of
> > > > > > > > > > > > > > > > > > > > > > > > `KeyValueStore#put(key, value)` 
> > > > > > > > > > > > > > > > > > > > > > > > uses
> > > > > > the
> > > > > > > > context
> > > > > > > > > > > to augment
> > > > > > > > > > > > > > > > > > > > > > > > the record with timestamp
> > > > > > information. This
> > > > > > > > > > > relies on the
> > > > > > > > > > > > > > > > > > > > > > > > assumption that you would only 
> > > > > > > > > > > > > > > > > > > > > > > > call
> > > > > > > > "put()" from
> > > > > > > > > > > inside a
> > > > > > > > > > > > > > > > > > > > > > > > `Processor#process(key, value)` 
> > > > > > > > > > > > > > > > > > > > > > > > call
> > > > > > in
> > > > > > > > which
> > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > > > > > > > being processed is the same 
> > > > > > > > > > > > > > > > > > > > > > > > record
> > > > > > that
> > > > > > > > you're
> > > > > > > > > > > trying to put
> > > > > > > > > > > > > > > > > > > > > > > > into the store.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > If you were to call "put" from a
> > > > > > > > punctuator, or
> > > > > > > > > > > do a
> > > > > > > > > > > > > > > > > > > > > > > > `range()` query and then update 
> > > > > > > > > > > > > > > > > > > > > > > > one
> > > > of
> > > > > > > > those
> > > > > > > > > > > records with
> > > > > > > > > > > > > > > > > > > > > > > > `put()`, you'd have a very 
> > > > > > > > > > > > > > > > > > > > > > > > subtle
> > > > bug
> > > > > > on
> > > > > > > > your
> > > > > > > > > > > hands. Right
> > > > > > > > > > > > > > > > > > > > > > > > now, the Streams component that
> > > > > > actually
> > > > > > > > calls
> > > > > > > > > > > the Processor
> > > > > > > > > > > > > > > > > > > > > > > > takes care to set the right 
> > > > > > > > > > > > > > > > > > > > > > > > record
> > > > > > context
> > > > > > > > > > > before invoking
> > > > > > > > > > > > > > > > > > > > > > > > the method, and in the case of
> > > > > > caching,
> > > > > > > > etc., it
> > > > > > > > > > > also takes
> > > > > > > > > > > > > > > > > > > > > > > > care to swap out the old 
> > > > > > > > > > > > > > > > > > > > > > > > context and
> > > > > > keep
> > > > > > > > it
> > > > > > > > > > > somewhere safe.
> > > > > > > > > > > > > > > > > > > > > > > > But when it comes to public API
> > > > > > Processors
> > > > > > > > > > > calling methods
> > > > > > > > > > > > > > > > > > > > > > > > on StateStores, there's no
> > > > > > opportunity for
> > > > > > > > any
> > > > > > > > > > > component to
> > > > > > > > > > > > > > > > > > > > > > > > make sure the context is always
> > > > > > correct.
> > > > > > > > > > > > > > > > > > > > > > > > In the face of that situation, 
> > > > > > > > > > > > > > > > > > > > > > > > it
> > > > > > seemed
> > > > > > > > better
> > > > > > > > > > > to just move
> > > > > > > > > > > > > > > > > > > > > > > > in the direction of a "normal" 
> > > > > > > > > > > > > > > > > > > > > > > > data
> > > > > > store.
> > > > > > > > I.e.,
> > > > > > > > > > > when you
> > > > > > > > > > > > > > > > > > > > > > > > use a HashMap or RocksDB or 
> > > > > > > > > > > > > > > > > > > > > > > > other
> > > > > > "state
> > > > > > > > > > > stores", you don't
> > > > > > > > > > > > > > > > > > > > > > > > expect them to automatically 
> > > > > > > > > > > > > > > > > > > > > > > > know
> > > > > > extra
> > > > > > > > stuff
> > > > > > > > > > > about the
> > > > > > > > > > > > > > > > > > > > > > > > record you're storing. If you 
> > > > > > > > > > > > > > > > > > > > > > > > need
> > > > > > them to
> > > > > > > > know
> > > > > > > > > > > something,
> > > > > > > > > > > > > > > > > > > > > > > > you just put it in the value.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > All of that said, I'm just 
> > > > > > > > > > > > > > > > > > > > > > > > reasoning
> > > > > > from
> > > > > > > > first
> > > > > > > > > > > principles
> > > > > > > > > > > > > > > > > > > > > > > > here. To really know if this is 
> > > > > > > > > > > > > > > > > > > > > > > > a
> > > > > > mistake
> > > > > > > > or
> > > > > > > > > > > not, I need to
> > > > > > > > > > > > > > > > > > > > > > > > be in your place. So please push
> > > > back
> > > > > > if
> > > > > > > > you
> > > > > > > > > > > think what I
> > > > > > > > > > > > > > > > > > > > > > > > said is nonsense. My personal 
> > > > > > > > > > > > > > > > > > > > > > > > plan
> > > > > > was to
> > > > > > > > keep
> > > > > > > > > > > an eye out
> > > > > > > > > > > > > > > > > > > > > > > > during the period where the old 
> > > > > > > > > > > > > > > > > > > > > > > > API
> > > > > > was
> > > > > > > > still
> > > > > > > > > > > present, but
> > > > > > > > > > > > > > > > > > > > > > > > deprecated, to see if people 
> > > > > > > > > > > > > > > > > > > > > > > > were
> > > > > > > > struggling to
> > > > > > > > > > > use the new
> > > > > > > > > > > > > > > > > > > > > > > > API. If so, then we'd have a 
> > > > > > > > > > > > > > > > > > > > > > > > chance
> > > > to
> > > > > > > > address
> > > > > > > > > > > it before
> > > > > > > > > > > > > > > > > > > > > > > > dropping the old API. But it's 
> > > > > > > > > > > > > > > > > > > > > > > > even
> > > > > > better
> > > > > > > > if
> > > > > > > > > > > you can help
> > > > > > > > > > > > > > > > > > > > > > > > think it through now.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > It did also cross my mind to 
> > > > > > > > > > > > > > > > > > > > > > > > _not_
> > > > > > add the
> > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext, but just to
> > > > > > continue to
> > > > > > > > punt
> > > > > > > > > > > on the
> > > > > > > > > > > > > > > > > > > > > > > > question by just dropping in 
> > > > > > > > > > > > > > > > > > > > > > > > the new
> > > > > > > > > > > ProcessorContext to the
> > > > > > > > > > > > > > > > > > > > > > > > new init method. If
> > > > StateStoreContext
> > > > > > > > seems too
> > > > > > > > > > > bold, we can
> > > > > > > > > > > > > > > > > > > > > > > > go that direction. But if we
> > > > actually
> > > > > > add
> > > > > > > > some
> > > > > > > > > > > methods to
> > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext, I'd like to 
> > > > > > > > > > > > > > > > > > > > > > > > be
> > > > > > able to
> > > > > > > > ensure
> > > > > > > > > > > they would
> > > > > > > > > > > > > > > > > > > > > > > > be well defined. I think the 
> > > > > > > > > > > > > > > > > > > > > > > > current
> > > > > > > > situation
> > > > > > > > > > > was more of
> > > > > > > > > > > > > > > > > > > > > > > > an oversight than a choice.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > Thanks again for your reply,
> > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 
> > > > > > > > > > > > > > > > > > > > > > > > -0500,
> > > > > > Paul
> > > > > > > > Whalen
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > John,
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP 
> > > > > > > > > > > > > > > > > > > > > > > > > head
> > > > > > in
> > > > > > > > this
> > > > > > > > > > > direction!  In
> > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > > > > > > year
> > > > > > > > > > > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > > > > > > > > so I've tried to sketch out 
> > > > > > > > > > > > > > > > > > > > > > > > > some
> > > > > > > > usability
> > > > > > > > > > > improvements for
> > > > > > > > > > > > > > > > > > > > custom
> > > > > > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > > > > > > stores, and I also ended up
> > > > > > splitting
> > > > > > > > out the
> > > > > > > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > ProcessorContext in an 
> > > > > > > > > > > > > > > > > > > > > > > > > attempt to
> > > > > > > > facilitate
> > > > > > > > > > > what I was
> > > > > > > > > > > > > > > > > > doing.  I
> > > > > > > > > > > > > > > > > > > > > > sort of
> > > > > > > > > > > > > > > > > > > > > > > > > abandoned it when I realized 
> > > > > > > > > > > > > > > > > > > > > > > > > how
> > > > > > large
> > > > > > > > the
> > > > > > > > > > > ideal change
> > > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > > > > > to be,
> > > > > > > > > > > > > > > > > > > > > > > > > but it's great to see that 
> > > > > > > > > > > > > > > > > > > > > > > > > there
> > > > is
> > > > > > other
> > > > > > > > > > > interest in
> > > > > > > > > > > > > > > > > moving
> > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > > direction (from the folks that
> > > > > > matter :)
> > > > > > > > ).
> > > > > > > > > > > > > > > > > > > > > > > > > Having taken a stab at it 
> > > > > > > > > > > > > > > > > > > > > > > > > myself,
> > > > I
> > > > > > have
> > > > > > > > a
> > > > > > > > > > > comment/question
> > > > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > bullet
> > > > > > > > > > > > > > > > > > > > > > > > > about StateStoreContext:
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > It does *not*  include 
> > > > > > > > > > > > > > > > > > > > > > > > > anything
> > > > > > > > processor- or
> > > > > > > > > > > record-
> > > > > > > > > > > > > > > > > > specific,
> > > > > > > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > > > > > > > > > > `forward()` or any 
> > > > > > > > > > > > > > > > > > > > > > > > > > information
> > > > > > about
> > > > > > > > the
> > > > > > > > > > > "current"
> > > > > > > > > > > > > > > > > record,
> > > > > > > > > > > > > > > > > > > > which is
> > > > > > > > > > > > > > > > > > > > > > > > only a
> > > > > > > > > > > > > > > > > > > > > > > > > > well-defined in the context 
> > > > > > > > > > > > > > > > > > > > > > > > > > of
> > > > the
> > > > > > > > > > > Processor. Processors
> > > > > > > > > > > > > > > > > > > > process
> > > > > > > > > > > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > > > > > > > > at a time, but state stores 
> > > > > > > > > > > > > > > > > > > > > > > > > > may
> > > > be
> > > > > > > > used to
> > > > > > > > > > > store and
> > > > > > > > > > > > > > > > > fetch
> > > > > > > > > > > > > > > > > > many
> > > > > > > > > > > > > > > > > > > > > > > > records, so
> > > > > > > > > > > > > > > > > > > > > > > > > > there is no "current 
> > > > > > > > > > > > > > > > > > > > > > > > > > record".
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > I totally agree that
> > > > > > record-specific or
> > > > > > > > > > > processor-specific
> > > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > > in a
> > > > > > > > > > > > > > > > > > > > > > > > > state store is often not
> > > > > > well-defined
> > > > > > > > and it
> > > > > > > > > > > would be good
> > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > separate
> > > > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > out, but sometimes it (at 
> > > > > > > > > > > > > > > > > > > > > > > > > least
> > > > > > > > > > > record-specific context) is
> > > > > > > > > > > > > > > > > > > > actually
> > > > > > > > > > > > > > > > > > > > > > > > > useful, for example, passing 
> > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > record's
> > > > > > > > > > > timestamp through
> > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > underlying storage (or 
> > > > > > > > > > > > > > > > > > > > > > > > > changelog
> > > > > > topic):
> > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > > > > > > > > > > > > > > > > > > You could have the writer 
> > > > > > > > > > > > > > > > > > > > > > > > > client
> > > > of
> > > > > > the
> > > > > > > > state
> > > > > > > > > > > store pass
> > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > through,
> > > > > > > > > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > > > > > > > it would be nice to be able to
> > > > write
> > > > > > > > state
> > > > > > > > > > > stores where the
> > > > > > > > > > > > > > > > > > > > client
> > > > > > > > > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > > > > have this responsibility.  
> > > > > > > > > > > > > > > > > > > > > > > > > I'm not
> > > > > > sure
> > > > > > > > if the
> > > > > > > > > > > solution is
> > > > > > > > > > > > > > > > > > to add
> > > > > > > > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > > > > > > things back to 
> > > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext,
> > > > or
> > > > > > > > make yet
> > > > > > > > > > > another
> > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > represents record-specific 
> > > > > > > > > > > > > > > > > > > > > > > > > context
> > > > > > while
> > > > > > > > > > > inside a state
> > > > > > > > > > > > > > > > > > store.
> > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM
> > > > John
> > > > > > > > Roesler <
> > > > > > > > > > > > > > > > > > j...@vvcephei.org>
> > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > I've been slowly pushing 
> > > > > > > > > > > > > > > > > > > > > > > > > > KIP-478
> > > > > > > > forward
> > > > > > > > > > > over the last
> > > > > > > > > > > > > > > > > > year,
> > > > > > > > > > > > > > > > > > > > > > > > > > and I'm happy to say that 
> > > > > > > > > > > > > > > > > > > > > > > > > > we're
> > > > > > making
> > > > > > > > good
> > > > > > > > > > > progress now.
> > > > > > > > > > > > > > > > > > > > > > > > > > However, several issues 
> > > > > > > > > > > > > > > > > > > > > > > > > > with the
> > > > > > > > original
> > > > > > > > > > > design have
> > > > > > > > > > > > > > > > > come
> > > > > > > > > > > > > > > > > > > > > > > > > > to light.
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > The major changes:
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > We discovered that the 
> > > > > > > > > > > > > > > > > > > > > > > > > > original
> > > > > > plan
> > > > > > > > of just
> > > > > > > > > > > adding
> > > > > > > > > > > > > > > > > generic
> > > > > > > > > > > > > > > > > > > > > > > > > > parameters to 
> > > > > > > > > > > > > > > > > > > > > > > > > > ProcessorContext
> > > > > > was too
> > > > > > > > > > > disruptive, so we
> > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > > > > now adding a new
> > > > > > api.ProcessorContext.
> > > > > > > > > > > > > > > > > > > > > > > > > > That choice forces us to 
> > > > > > > > > > > > > > > > > > > > > > > > > > add a
> > > > new
> > > > > > > > > > > StateStore.init method
> > > > > > > > > > > > > > > > > > > > > > > > > > for the new context, but
> > > > > > > > ProcessorContext
> > > > > > > > > > > really isn't
> > > > > > > > > > > > > > > > > > ideal
> > > > > > > > > > > > > > > > > > > > > > > > > > for state stores to begin 
> > > > > > > > > > > > > > > > > > > > > > > > > > with,
> > > > > > so I'm
> > > > > > > > > > > proposing a new
> > > > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext for this
> > > > > > purpose. In
> > > > > > > > a
> > > > > > > > > > > nutshell, there
> > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > > > > quite a few methods in
> > > > > > > > ProcessorContext that
> > > > > > > > > > > actually
> > > > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > > > > > > > never be called from inside 
> > > > > > > > > > > > > > > > > > > > > > > > > > a
> > > > > > > > StateStore.
> > > > > > > > > > > > > > > > > > > > > > > > > > Also, since there is a new
> > > > > > > > ProcessorContext
> > > > > > > > > > > interface, we
> > > > > > > > > > > > > > > > > > > > > > > > > > need a new 
> > > > > > > > > > > > > > > > > > > > > > > > > > MockProcessorContext
> > > > > > > > > > > implementation in the
> > > > > > > > > > > > > > > > > test-
> > > > > > > > > > > > > > > > > > > > > > > > > > utils module.
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > The changeset for the KIP
> > > > > > document is
> > > > > > > > here:
> > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > > > > > > > > > > > > > > > > > > And the KIP itself is here:
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > > > > > > > > > > > > > > > > > > If you have any concerns, 
> > > > > > > > > > > > > > > > > > > > > > > > > > please
> > > > > > let
> > > > > > > > me know!
> > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > 

Reply via email to