Hello again, all,

I'm sorry to make another tweak to this KIP, but during the
implementation of the design we've just agreed on, I
realized that Processors would almost never need to
reference the RecordMetadata. Therefore, I'm proposing to
streamline the API by moving the Optional<RecordMetadata> to
the new ProcessorContext as a method, rather than making it
a method argument to Processor#process.

The change is visible here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=16&selectedPageVersions=15

All of the same semantics and considerations we discussed
still apply, it's just that Processor implementers won't
have to think about it unless they actually _need_ the
topic/partition/offset information from the RecordMetadata.

Also, the PR for this part of the KIP is now available here:
https://github.com/apache/kafka/pull/9361

I know it's a bit on the heavy side; I've annotated the PR
to try and ease the reviewer's job. I'd greatly appreciate
it if anyone can take the time to review.

Thanks,
-John

On Wed, 2020-09-30 at 10:16 -0500, John Roesler wrote:
> Thanks, Matthias!
> 
> I can certainly document it. I didn't bother because the old
> Processor, Supplier, and Context will themselves be
> deprecated, so any method that handles them won't be able to
> avoid the deprecation warning. Nevertheless, it doesn't hurt
> just to explicitly deprecated those methods.
> 
> Thanks,
> -John
> 
> On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote:
> > Thanks John. I like the proposal.
> > 
> > Btw: I was just going over the KIP and realized that we add new methods
> > to `StreamBuilder`, `Topology`, and `KStream` that take the new
> > `ProcessorSupplier` class -- should we also deprecate the corresponding
> > existing ones that take the old `ProcessorSupplier`?
> > 
> > 
> > -Matthias
> > 
> > 
> > On 9/30/20 7:46 AM, John Roesler wrote:
> > > Thanks Paul and Sophie,
> > > 
> > > Your feedback certainly underscores the need to be explicit
> > > in the javadoc about why that parameter is Optional. Getting
> > > this kind of feedback before the release is exactly the kind
> > > of outcome we hope to get from the KIP process!
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> > > > John, I totally agree that adding a method to Processor is cumbersome 
> > > > and
> > > > not a good path.  I was imagining maybe a separate interface that could 
> > > > be
> > > > used in the appropriate context, but I don't think that makes too much
> > > > sense - it's just too far away from what Kafka Streams is.  I was
> > > > originally more interested in the "why" Optional than the "how" (I 
> > > > think my
> > > > original reply overplayed the "optional as an argument" concern).  But
> > > > you've convinced me that there is a perfectly legitimate "why".  We 
> > > > should
> > > > make sure that it's clear why it's Optional, but I suppose that goes
> > > > without saying.  It's a nice opportunity to make the API reflect more 
> > > > what
> > > > is actually going on under the hood.
> > > > 
> > > > Thanks!
> > > > Paul
> > > > 
> > > > On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman 
> > > > <sop...@confluent.io>
> > > > wrote:
> > > > 
> > > > > FWIW, while I'm really not a fan of Optional in general, I agree that 
> > > > > its
> > > > > usage
> > > > > here seems appropriate. Even for those rare software developers who
> > > > > carefully
> > > > > read all the docs several times over, I think it wouldn't be too hard 
> > > > > to
> > > > > miss a
> > > > > note about the RecordMetadata possibly being null.
> > > > > 
> > > > > Especially because it's not that obvious why at first glance, and 
> > > > > takes a
> > > > > bit of
> > > > > thinking to realize that records originating from a Punctuator 
> > > > > wouldn't
> > > > > have a
> > > > > "current record". This  is something that has definitely confused 
> > > > > users
> > > > > today.
> > > > > 
> > > > > It's on us to improve the education here -- and an 
> > > > > Optional<RecordMetadata>
> > > > > would naturally raise awareness of this subtlety
> > > > > 
> > > > > On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman 
> > > > > <sop...@confluent.io>
> > > > > wrote:
> > > > > 
> > > > > > Does my reply address your concerns?
> > > > > > 
> > > > > > 
> > > > > > Yes; also, I definitely misread part of the proposal earlier and 
> > > > > > thought
> > > > > > you had put
> > > > > > the timestamp field in RecordMetadata. Sorry for not giving things a
> > > > > > closer look
> > > > > > before responding! I'm not sure my original message made much sense 
> > > > > > given
> > > > > > the misunderstanding, but thanks for responding anyway :P
> > > > > > 
> > > > > > Having given the proposal a second pass, I agree, it's very 
> > > > > > elegant. +1
> > > > > > 
> > > > > > On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org>
> > > > > wrote:
> > > > > > > Thanks for the reply, Sophie,
> > > > > > > 
> > > > > > > I think I may have summarized too much in my prior reply.
> > > > > > > 
> > > > > > > In the currently proposed KIP, any caller of forward() must
> > > > > > > supply a Record, which consists of:
> > > > > > > * key
> > > > > > > * value
> > > > > > > * timestamp
> > > > > > > * headers (with a convenience constructor that sets empty
> > > > > > > headers)
> > > > > > > 
> > > > > > > These aren't what I was referring to as potentially being
> > > > > > > undefined downstream, since thanks to the introduction of
> > > > > > > Record, they are, as you're advocating, required to be
> > > > > > > defined everywhere, even when forwarding from a punctuator.
> > > > > > > 
> > > > > > > So to be clear, the intent of this change is actually to
> > > > > > > _enforce_ that timestamp would never be undefined (which it
> > > > > > > currently can be). Also, since punctuators _are_ going to
> > > > > > > have to "make up" a timestamp going forward, we should note
> > > > > > > that the "punctuate" method currently passes in a good
> > > > > > > timestamp that they can use: for system-time punctuations,
> > > > > > > they receive the current system time, and for stream-time
> > > > > > > punctuations, they get the current stream time.
> > > > > > > 
> > > > > > > The potentially undefined RecordMetadata only contains these
> > > > > > > fields:
> > > > > > > * topic
> > > > > > > * partition
> > > > > > > * offset
> > > > > > > 
> > > > > > > These fields aren't required (or even used) in a Sink, and
> > > > > > > it doesn't seem like they would be important to many
> > > > > > > applications. Furthermore, it doesn't _seem_ like you'd even
> > > > > > > want to set these fields. They seem purely informational and
> > > > > > > only useful in the context when you are actually processing
> > > > > > > a real input record. It doesn't sound like you were asking
> > > > > > > for it, but just to put it on the record, I think if we were
> > > > > > > to require values for the metadata from punctuators, people
> > > > > > > would mostly just make up their own dummy values, to no
> > > > > > > one's benefit.
> > > > > > > 
> > > > > > > I should also note that with the current
> > > > > > > Record/RecordMetadata split, we will have the freedom to
> > > > > > > move fields into the Record class (or even add new fields)
> > > > > > > if we want them to become "data" as opposed to "metadata" in
> > > > > > > the future.
> > > > > > > 
> > > > > > > Thanks for your reply; I was similarly floored when I
> > > > > > > realized the true nature of the current situation. Does my
> > > > > > > reply address your concerns?
> > > > > > > 
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > > 
> > > > > > > On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> > > > > > > wrote:
> > > > > > > > > However, the record metadata is only defined when the parent
> > > > > forwards
> > > > > > > > > while processing a
> > > > > > > > 
> > > > > > > > real record, not when it calls forward from the punctuator
> > > > > > > > 
> > > > > > > > 
> > > > > > > > Can we take a step back for a second...why wouldn't you be 
> > > > > > > > required to
> > > > > > > set
> > > > > > > > the RecordContext
> > > > > > > > yourself when calling forward from a Punctuator? I think I 
> > > > > > > > agree with
> > > > > > > Paul
> > > > > > > > here, it seems kind of
> > > > > > > > absurd not to enforce that the RecordContext be present inside 
> > > > > > > > the
> > > > > > > > process() method.
> > > > > > > > 
> > > > > > > > The original problem with Punctuators, as I understood it, was 
> > > > > > > > that
> > > > > all
> > > > > > > of
> > > > > > > > the RecordContext
> > > > > > > > fields were exposed automatically to both the Processor and any
> > > > > > > Punctuator,
> > > > > > > > due to being
> > > > > > > > direct methods on the ProcessorContext. We can't control which
> > > > > > > > ProcessorContext methods
> > > > > > > > someone will call from with a Punctuator vs from a Processor. 
> > > > > > > > The best
> > > > > > > we
> > > > > > > > could do was
> > > > > > > > set these "nonsense" fields to null when inside a Punctuator, 
> > > > > > > > or set
> > > > > > > them
> > > > > > > > to some dummy
> > > > > > > > values as you pointed out.
> > > > > > > > 
> > > > > > > > But then you proposed the solution of a separate RecordContext 
> > > > > > > > which
> > > > > is
> > > > > > > not
> > > > > > > > attached to the
> > > > > > > > ProcessorContext at all. This seemed to solve the above problem 
> > > > > > > > very
> > > > > > > > neatly: we only pass
> > > > > > > > in the RecordContext to the process() method, so we don't have 
> > > > > > > > to
> > > > > worry
> > > > > > > > about people trying
> > > > > > > > to access these fields from within a Punctuator. The fields 
> > > > > > > > aren't
> > > > > > > > accessible unless they're
> > > > > > > > defined.
> > > > > > > > 
> > > > > > > > So what happens when someone wants to forward something from 
> > > > > > > > within a
> > > > > > > > Punctuator? I
> > > > > > > > don't think it's reasonable to let the timestamp field be 
> > > > > > > > undefined,
> > > > > > > ever.
> > > > > > > > What if the Punctuator
> > > > > > > > forwards directly to a sink, or directly to some windowing 
> > > > > > > > logic. Are
> > > > > we
> > > > > > > > supposed to add
> > > > > > > > handling for the RecordContext == null case to every processor? 
> > > > > > > > Or are
> > > > > > > we
> > > > > > > > just going to
> > > > > > > > assume the implicit restriction that users will only forward 
> > > > > > > > records
> > > > > > > from a
> > > > > > > > Punctuator to
> > > > > > > > downstream processors that know how to handle and/or set the
> > > > > > > RecordContext
> > > > > > > > if it's
> > > > > > > > undefined. That seems to throw away a lot of the awesome safety 
> > > > > > > > added
> > > > > in
> > > > > > > > this KIP
> > > > > > > > 
> > > > > > > > Apologies for the rant. But I feel pretty strongly that 
> > > > > > > > allowing to
> > > > > > > forward
> > > > > > > > records from a
> > > > > > > > Punctuator without a defined RecordContext would be asking for
> > > > > trouble.
> > > > > > > > Imo, if you
> > > > > > > > want to forward from a Punctuator, you need to store the info 
> > > > > > > > you need
> > > > > > > in
> > > > > > > > order to
> > > > > > > > set the timestamp, or make one up yourself
> > > > > > > > 
> > > > > > > > (the one alternative I can think of here is that maybe we could 
> > > > > > > > pass
> > > > > in
> > > > > > > the
> > > > > > > > current
> > > > > > > > partition time, so users can at least put in a reasonable 
> > > > > > > > estimate for
> > > > > > > the
> > > > > > > > timestamp
> > > > > > > > that won't cause it to get dropped and won't potentially lurch 
> > > > > > > > the
> > > > > > > > streamtime far into
> > > > > > > > the future. This would be similar to what we do in the
> > > > > > > TimestampExtractor)
> > > > > > > > On Tue, Sep 29, 2020 at 6:06 PM John Roesler 
> > > > > > > > <vvcep...@apache.org>
> > > > > > > wrote:
> > > > > > > > > Oh, I guess one other thing I should have mentioned is that 
> > > > > > > > > I’ve
> > > > > > > recently
> > > > > > > > > discovered that in cases where the context is undefined, we
> > > > > currently
> > > > > > > just
> > > > > > > > > fill in dummy values for the context. So there’s a good 
> > > > > > > > > chance that
> > > > > > > real
> > > > > > > > > applications in use are depending on undefined context 
> > > > > > > > > without even
> > > > > > > > > realizing it. What I’m hoping to do is just make the situation
> > > > > > > explicit and
> > > > > > > > > get rid of the dummy values.
> > > > > > > > > 
> > > > > > > > > Thanks,
> > > > > > > > > John
> > > > > > > > > 
> > > > > > > > > On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> > > > > > > > > > Thanks for the review, Paul!
> > > > > > > > > > 
> > > > > > > > > > I had read some of that debate before. There seems to be 
> > > > > > > > > > some
> > > > > > > subtext
> > > > > > > > > > there, because they advise against using Optional in cases 
> > > > > > > > > > like
> > > > > > > this,
> > > > > > > > > > but there doesn’t seem to be a specific reason why it’s
> > > > > > > inappropriate.
> > > > > > > > > > I got the impression they were just afraid that people 
> > > > > > > > > > would go
> > > > > > > > > > overboard and make everything Optional.
> > > > > > > > > > 
> > > > > > > > > > I could also make two methods, but it seemed like it might 
> > > > > > > > > > be an
> > > > > > > > > > unfortunate way to handle the issue, since Processor is just
> > > > > about a
> > > > > > > > > > Function as-is, but the two-method approach would require 
> > > > > > > > > > people
> > > > > to
> > > > > > > > > > implement both methods.
> > > > > > > > > > 
> > > > > > > > > > To your question, this is something that’s only recently 
> > > > > > > > > > became
> > > > > > > clear
> > > > > > > > > > to me. Imagine you have a parent processor that calls 
> > > > > > > > > > forward both
> > > > > > > from
> > > > > > > > > > process and a punctuator. The child will have process() 
> > > > > > > > > > invoked in
> > > > > > > both
> > > > > > > > > > cases, and won’t be able to distinguish them. However, the 
> > > > > > > > > > record
> > > > > > > > > > metadata is only defined when the parent forwards while
> > > > > processing a
> > > > > > > > > > real record, not when it calls forward from the punctuator.
> > > > > > > > > > 
> > > > > > > > > > This is why I wanted to make the metadata Optional, to 
> > > > > > > > > > advertise
> > > > > > > that
> > > > > > > > > > the metadata might be undefined if any ancestor processor 
> > > > > > > > > > ever
> > > > > calls
> > > > > > > > > > forward from a punctuator. We could remove the Optional and
> > > > > instead
> > > > > > > > > > just document that the argument might be null.
> > > > > > > > > > 
> > > > > > > > > > With that context in place, what’s your take?
> > > > > > > > > > 
> > > > > > > > > > Thanks,
> > > > > > > > > > John
> > > > > > > > > > 
> > > > > > > > > > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > > > > > > > > > > Looks pretty good to me, though the 
> > > > > > > > > > > Processor#process(Record,
> > > > > > > > > > > Optional<RecordMetadata>) signature caught my eye.  
> > > > > > > > > > > There's some
> > > > > > > > > debate
> > > > > > > > > > > (
> > > > > > > > > > > 
> > > > > https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
> > > > > > > > > )
> > > > > > > > > > > about whether to use Optionals in arguments, and while 
> > > > > > > > > > > that's a
> > > > > > > bit of
> > > > > > > > > a
> > > > > > > > > > > religious debate in the abstract, it did make me wonder 
> > > > > > > > > > > whether
> > > > > it
> > > > > > > > > makes
> > > > > > > > > > > sense in this specific case.  When is it actually not 
> > > > > > > > > > > present?
> > > > > I
> > > > > > > was
> > > > > > > > > > > under
> > > > > > > > > > > the impression that we should always have access to it in
> > > > > > > process(),
> > > > > > > > > and
> > > > > > > > > > > that the concern about metadata being undefined was about 
> > > > > > > > > > > having
> > > > > > > > > access
> > > > > > > > > > > to
> > > > > > > > > > > record metadata in the ProcessorContext held for use 
> > > > > > > > > > > inside a
> > > > > > > > > > > Punctuator.
> > > > > > > > > > > 
> > > > > > > > > > > If that's not the case and it is truly optional in 
> > > > > > > > > > > process(), is
> > > > > > > there
> > > > > > > > > an
> > > > > > > > > > > opportunity for an alternate interface for the cases when 
> > > > > > > > > > > we
> > > > > > > don't get
> > > > > > > > > it,
> > > > > > > > > > > rather than force the branching on implementers of the
> > > > > interface?
> > > > > > > > > > > Apologies if I've missed something, I took a look at the 
> > > > > > > > > > > PR and
> > > > > I
> > > > > > > > > didn't
> > > > > > > > > > > see any spots where I thought it would be empty.  Perhaps 
> > > > > > > > > > > an
> > > > > > > example
> > > > > > > > > of a
> > > > > > > > > > > Punctuator using (and not using) the new API would clear 
> > > > > > > > > > > things
> > > > > > > up.
> > > > > > > > > > > Best,
> > > > > > > > > > > Paul
> > > > > > > > > > > 
> > > > > > > > > > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler <
> > > > > vvcep...@apache.org
> > > > > > > > > wrote:
> > > > > > > > > > > > Hello again, all,
> > > > > > > > > > > > 
> > > > > > > > > > > > Thanks for the latest round of discussion. I've taken 
> > > > > > > > > > > > the
> > > > > > > > > > > > recent feedback and come up with an updated KIP that 
> > > > > > > > > > > > seems
> > > > > > > > > > > > actually quite a bit nicer than the prior proposal.
> > > > > > > > > > > > 
> > > > > > > > > > > > The specific diff on the KIP is here:
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
> > > > > > > > > > > > These changes are implemented in this POC PR:
> > > > > > > > > > > > https://github.com/apache/kafka/pull/9346
> > > > > > > > > > > > 
> > > > > > > > > > > > The basic idea is that, building on the recent 
> > > > > > > > > > > > conversaion,
> > > > > > > > > > > > we would transition away from the current API where we 
> > > > > > > > > > > > get
> > > > > > > > > > > > only key/value in the process() method and other "data"
> > > > > > > > > > > > comes in the ProcessorContext along with the "metadata".
> > > > > > > > > > > > 
> > > > > > > > > > > > Instead, we formalize what is "data" and what is 
> > > > > > > > > > > > "metadata",
> > > > > > > > > > > > and pass it all in to the process method:
> > > > > > > > > > > > Processor#process(Record, Optional<RecordMetadata>)
> > > > > > > > > > > > 
> > > > > > > > > > > > Also, you forward the whole data class instead of 
> > > > > > > > > > > > mutating
> > > > > > > > > > > > the ProcessorContext fields and also calling forward:
> > > > > > > > > > > > ProcessorContext#forward(Record)
> > > > > > > > > > > > 
> > > > > > > > > > > > The Record class itself ships with methods like
> > > > > > > > > > > > record#withValue(NewV newValue)
> > > > > > > > > > > > that make a shallow copy of the input Record, enabling
> > > > > > > > > > > > Processors to safely handle the record without 
> > > > > > > > > > > > polluting the
> > > > > > > > > > > > context of their parents and siblings.
> > > > > > > > > > > > 
> > > > > > > > > > > > This proposal has a number of key benefits:
> > > > > > > > > > > > * As we've discovered in KAFKA-9584, it's unsafe to 
> > > > > > > > > > > > mutate
> > > > > > > > > > > > the Headers via the ProcessorContext. This proposal 
> > > > > > > > > > > > offers a
> > > > > > > > > > > > way to safely forward changes only to downstream 
> > > > > > > > > > > > processors.
> > > > > > > > > > > > * The new API has symmetry (each processor's input is 
> > > > > > > > > > > > the
> > > > > > > > > > > > output of its parent processor)
> > > > > > > > > > > > * The API makes clear that the record metadata isn't 
> > > > > > > > > > > > always
> > > > > > > > > > > > defined (for example, in a punctuation, there is no 
> > > > > > > > > > > > current
> > > > > > > > > > > > topic/partition/offset)
> > > > > > > > > > > > * The API enables punctuators to forward well defined
> > > > > > > > > > > > headers downstream, which is currently not possible.
> > > > > > > > > > > > 
> > > > > > > > > > > > Unless their are objections, I'll go ahead and 
> > > > > > > > > > > > re-finalize
> > > > > > > > > > > > this KIP and update that PR to a mergeable state.
> > > > > > > > > > > > 
> > > > > > > > > > > > Thanks, all,
> > > > > > > > > > > > -John
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax 
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > Interesting proposal. However, I am not totally 
> > > > > > > > > > > > > convinced,
> > > > > > > because
> > > > > > > > > I see
> > > > > > > > > > > > > a fundamental difference between "data" and 
> > > > > > > > > > > > > "metadata".
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Topic/partition/offset are "metadata" in the strong 
> > > > > > > > > > > > > sense
> > > > > and
> > > > > > > they
> > > > > > > > > are
> > > > > > > > > > > > > immutable.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On the other hand there is "primary" data like key and
> > > > > value,
> > > > > > > as
> > > > > > > > > well as
> > > > > > > > > > > > > "secondary" data like timestamp and headers. The 
> > > > > > > > > > > > > issue seems
> > > > > > > that
> > > > > > > > > we
> > > > > > > > > > > > > treat "secondary data" more like metadata atm?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Thus, promoting timestamp and headers into a first 
> > > > > > > > > > > > > class
> > > > > > > citizen
> > > > > > > > > roll
> > > > > > > > > > > > > make sense to me (my original proposal about 
> > > > > > > > > > > > > `RecordContext`
> > > > > > > would
> > > > > > > > > still
> > > > > > > > > > > > > fall short with this regard). However, putting both 
> > > > > > > > > > > > > (data
> > > > > and
> > > > > > > > > metadata)
> > > > > > > > > > > > > into a `Record` abstraction might go too far?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I am also a little bit concerned about `Record.copy()`
> > > > > > > because it
> > > > > > > > > might
> > > > > > > > > > > > > be a trap: Users might assume it does a full deep 
> > > > > > > > > > > > > copy of
> > > > > the
> > > > > > > > > record,
> > > > > > > > > > > > > however, it would not. It would only create a new 
> > > > > > > > > > > > > `Record`
> > > > > > > object
> > > > > > > > > as
> > > > > > > > > > > > > wrapper that points to the same key/value/header 
> > > > > > > > > > > > > objects as
> > > > > > > the
> > > > > > > > > input
> > > > > > > > > > > > > record.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > With the current `context.forward(key, value)` we 
> > > > > > > > > > > > > don't have
> > > > > > > this
> > > > > > > > > "deep
> > > > > > > > > > > > > copy" issue -- it's pretty clear what is happening.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Instead of `To.all().withTimestamp()` we could also 
> > > > > > > > > > > > > add
> > > > > > > > > > > > > `context.forward(key, value, timestamp)` etc (just 
> > > > > > > > > > > > > wondering
> > > > > > > about
> > > > > > > > > the
> > > > > > > > > > > > > exposition in overload)?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Also, `Record.withValue` etc sounds odd? Should a 
> > > > > > > > > > > > > record not
> > > > > > > be
> > > > > > > > > > > > > immutable? So, we could have something like
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> > > > > > > > > > > > > But it looks rather verbose?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > The other question is of course, to what extend to we 
> > > > > > > > > > > > > want
> > > > > to
> > > > > > > keep
> > > > > > > > > the
> > > > > > > > > > > > > distinction between "primary" and "secondary" data? 
> > > > > > > > > > > > > To me,
> > > > > > > it's a
> > > > > > > > > > > > > question of easy of use?
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Just putting all this out to move the discussion 
> > > > > > > > > > > > > forward.
> > > > > > > Don't
> > > > > > > > > have a
> > > > > > > > > > > > > concrete proposal atm.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On 9/14/20 9:24 AM, John Roesler wrote:
> > > > > > > > > > > > > > Thanks for this thought, Matthias!
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > To be honest, it's bugged me quite a bit that _all_ 
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > record information hasn't been an argument to 
> > > > > > > > > > > > > > `process`. I
> > > > > > > > > > > > > > suppose I was trying to be conservative in this 
> > > > > > > > > > > > > > proposal,
> > > > > > > > > > > > > > but then again, if we're adding new Processor and
> > > > > > > > > > > > > > ProcessorContext interfaces, then this is the time 
> > > > > > > > > > > > > > to make
> > > > > > > > > > > > > > such a change.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > To be unambiguous, I think this is what we're 
> > > > > > > > > > > > > > talking
> > > > > about:
> > > > > > > > > > > > > > ProcessorContext:
> > > > > > > > > > > > > > * applicationId
> > > > > > > > > > > > > > * taskId
> > > > > > > > > > > > > > * appConfigs
> > > > > > > > > > > > > > * appConfigsWithPrefix
> > > > > > > > > > > > > > * keySerde
> > > > > > > > > > > > > > * valueSerde
> > > > > > > > > > > > > > * stateDir
> > > > > > > > > > > > > > * metrics
> > > > > > > > > > > > > > * schedule
> > > > > > > > > > > > > > * commit
> > > > > > > > > > > > > > * forward
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > StateStoreContext:
> > > > > > > > > > > > > > * applicationId
> > > > > > > > > > > > > > * taskId
> > > > > > > > > > > > > > * appConfigs
> > > > > > > > > > > > > > * appConfigsWithPrefix
> > > > > > > > > > > > > > * keySerde
> > > > > > > > > > > > > > * valueSerde
> > > > > > > > > > > > > > * stateDir
> > > > > > > > > > > > > > * metrics
> > > > > > > > > > > > > > * register
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > RecordContext
> > > > > > > > > > > > > > * topic
> > > > > > > > > > > > > > * partition
> > > > > > > > > > > > > > * offset
> > > > > > > > > > > > > > * timestamp
> > > > > > > > > > > > > > * headers
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Your proposal sounds good to me as-is. Just to 
> > > > > > > > > > > > > > cover the
> > > > > > > > > > > > > > bases, though, I'm wondering if we should push the 
> > > > > > > > > > > > > > idea
> > > > > just
> > > > > > > > > > > > > > a little farther. Instead of decomposing
> > > > > key,value,context,
> > > > > > > > > > > > > > we could just keep them all in one object, like 
> > > > > > > > > > > > > > this:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Record:
> > > > > > > > > > > > > > * key
> > > > > > > > > > > > > > * value
> > > > > > > > > > > > > > * topic
> > > > > > > > > > > > > > * partition
> > > > > > > > > > > > > > * offset
> > > > > > > > > > > > > > * timestamp
> > > > > > > > > > > > > > * headers
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Then, we could have:
> > > > > > > > > > > > > > Processor#process(Record)
> > > > > > > > > > > > > > ProcessorContext#forward(Record, To)
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Viewed from this perspective, a record has three
> > > > > properties
> > > > > > > > > > > > > > that people may specify in their processors: key, 
> > > > > > > > > > > > > > value,
> > > > > and
> > > > > > > > > > > > > > timestamp.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > We could deprecate `To#withTimestamp` and enable 
> > > > > > > > > > > > > > people to
> > > > > > > > > > > > > > specify the timestamp along with the key and value 
> > > > > > > > > > > > > > when
> > > > > they
> > > > > > > > > > > > > > forward a record.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > E.g.,
> > > > > > > > > > > > > > RecordBuilder toForward = RecordBuilder.copy(record)
> > > > > > > > > > > > > > toForward.withKey(newKey)
> > > > > > > > > > > > > > toForward.withValue(newValue)
> > > > > > > > > > > > > > toForward.withTimestamp(newTimestamp)
> > > > > > > > > > > > > > Record newRecord = toForward.build()
> > > > > > > > > > > > > > context.forward(newRecord, To.child("child1"))
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Or, the more compact common case:
> > > > > > > > > > > > > > current:
> > > > > > > > > > > > > >  context.forward(key, "newValue")
> > > > > > > > > > > > > > proposed:
> > > > > > > > > > > > > > 
> > > > > context.forward(copy(record).withValue("newValue").build())
> > > > > > > > > > > > > > It's slightly more verbose, but also more 
> > > > > > > > > > > > > > extensible. This
> > > > > > > > > > > > > > would give us a clean path to add header support in 
> > > > > > > > > > > > > > PAPI
> > > > > as
> > > > > > > > > > > > > > well, simply by adding `withHeaders` in 
> > > > > > > > > > > > > > RecordBuilder.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > It's also more symmetrical, since the recipient of
> > > > > `forward`
> > > > > > > > > > > > > > would just get the sent `Record`. Whereas today, the
> > > > > sender
> > > > > > > > > > > > > > puts the timestamp in `To`, but the recipient gets 
> > > > > > > > > > > > > > in in
> > > > > its
> > > > > > > > > > > > > > own `ProcessorContext`.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > WDYT?
> > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax 
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > I think separating the different contexts make 
> > > > > > > > > > > > > > > sense.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > In fact, we could even go one step further and 
> > > > > > > > > > > > > > > remove
> > > > > the
> > > > > > > > > record
> > > > > > > > > > > > context
> > > > > > > > > > > > > > > from the processor context completely and we add 
> > > > > > > > > > > > > > > a third
> > > > > > > > > parameter to
> > > > > > > > > > > > > > > `process(key, value, recordContext)`. This would 
> > > > > > > > > > > > > > > make it
> > > > > > > clear
> > > > > > > > > that
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > context is for the input record only and it's not
> > > > > > > possible to
> > > > > > > > > pass
> > > > > > > > > > > > it to
> > > > > > > > > > > > > > > a `punctuate` callback.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > For the stores and changelogging: I think there 
> > > > > > > > > > > > > > > are two
> > > > > > > cases.
> > > > > > > > > (1)
> > > > > > > > > > > > You
> > > > > > > > > > > > > > > use a plain key-value store. For this case, it 
> > > > > > > > > > > > > > > seems you
> > > > > > > do
> > > > > > > > > not care
> > > > > > > > > > > > > > > about the timestamp and thus does not care what
> > > > > timestamp
> > > > > > > is
> > > > > > > > > set in
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > changelog records. (We can set anything we want, 
> > > > > > > > > > > > > > > as it's
> > > > > > > not
> > > > > > > > > > > > relevant at
> > > > > > > > > > > > > > > all -- the timestamp is ignored on read anyway.) 
> > > > > > > > > > > > > > > (2) The
> > > > > > > other
> > > > > > > > > case
> > > > > > > > > > > > is,
> > > > > > > > > > > > > > > that one does care about timestamps, and for this 
> > > > > > > > > > > > > > > case
> > > > > > > should
> > > > > > > > > use
> > > > > > > > > > > > > > > TimestampedKeyValueStore. The passed timestamp 
> > > > > > > > > > > > > > > will be
> > > > > > > set on
> > > > > > > > > the
> > > > > > > > > > > > > > > changelog records for this case.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Thus, for both cases, accessing the record 
> > > > > > > > > > > > > > > context does
> > > > > > > not
> > > > > > > > > seems to
> > > > > > > > > > > > be
> > > > > > > > > > > > > > > a requirement. And providing access to the 
> > > > > > > > > > > > > > > processor
> > > > > > > context
> > > > > > > > > to, eg.,
> > > > > > > > > > > > > > > `forward()` or similar seems safe.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > -Matthias
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > On 9/10/20 7:25 PM, John Roesler wrote:
> > > > > > > > > > > > > > > > Thanks for the reply, Paul!
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > I certainly intend to make sure that the 
> > > > > > > > > > > > > > > > changelogging
> > > > > > > layer
> > > > > > > > > > > > > > > > continues to work the way it does now, by hook 
> > > > > > > > > > > > > > > > or by
> > > > > > > crook.
> > > > > > > > > > > > > > > > I think the easiest path for me is to just 
> > > > > > > > > > > > > > > > "cheat" and
> > > > > > > get
> > > > > > > > > > > > > > > > the real ProcessorContext into the 
> > > > > > > > > > > > > > > > ChangeLoggingStore
> > > > > > > > > > > > > > > > implementation somehow. I'll tag you on the PR 
> > > > > > > > > > > > > > > > when I
> > > > > > > create
> > > > > > > > > > > > > > > > it, so you have an opportunity to express a 
> > > > > > > > > > > > > > > > preference
> > > > > > > about
> > > > > > > > > > > > > > > > the implementation choice, and maybe even 
> > > > > > > > > > > > > > > > compile/test
> > > > > > > > > > > > > > > > against it to make sure your stuff still works.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Regarding this:
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > we have an interest in making a state store 
> > > > > > > > > > > > > > > > > with a
> > > > > > > richer
> > > > > > > > > > > > > > > > > way of querying its data (like perhaps 
> > > > > > > > > > > > > > > > > getting all
> > > > > > > values
> > > > > > > > > > > > > > > > > associated with a secondary key), while still
> > > > > > > ultimately
> > > > > > > > > > > > > > > > > writing to the changelog topic for later
> > > > > restoration.
> > > > > > > > > > > > > > > > This is very intriguing to me. On the side, 
> > > > > > > > > > > > > > > > I've been
> > > > > > > > > > > > > > > > preparing a couple of ideas related to this 
> > > > > > > > > > > > > > > > topic. I
> > > > > > > don't
> > > > > > > > > > > > > > > > think I have a coherent enough thought to even 
> > > > > > > > > > > > > > > > express
> > > > > > > it in
> > > > > > > > > > > > > > > > a Jira right now, but when I do, I'll tag you 
> > > > > > > > > > > > > > > > on it
> > > > > > > also to
> > > > > > > > > > > > > > > > see what you think.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Whenever you're ready to share the usability
> > > > > improvement
> > > > > > > > > > > > > > > > ideas, I'm very interested to see what you've 
> > > > > > > > > > > > > > > > come up
> > > > > > > with.
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen 
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > when you use a HashMap or RocksDB or other 
> > > > > > > > > > > > > > > > > > "state
> > > > > > > > > stores", you
> > > > > > > > > > > > don't
> > > > > > > > > > > > > > > > > > expect them to automatically know extra 
> > > > > > > > > > > > > > > > > > stuff
> > > > > about
> > > > > > > the
> > > > > > > > > record
> > > > > > > > > > > > you're
> > > > > > > > > > > > > > > > > > storing.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > So, I don't think there is any reason we 
> > > > > > > > > > > > > > > > > *can't*
> > > > > > > retain the
> > > > > > > > > > > > record context
> > > > > > > > > > > > > > > > > > in the StateStoreContext, and if any users 
> > > > > > > > > > > > > > > > > > came
> > > > > > > along
> > > > > > > > > with a
> > > > > > > > > > > > clear use case
> > > > > > > > > > > > > > > > > > I'd find that convincing.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > I agree with the principle of being 
> > > > > > > > > > > > > > > > > conservative
> > > > > with
> > > > > > > the
> > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > > API.  Regarding user expectations or a clear 
> > > > > > > > > > > > > > > > > use
> > > > > > > case, the
> > > > > > > > > only
> > > > > > > > > > > > > > > > > counterpoint I would offer is that we sort of 
> > > > > > > > > > > > > > > > > have
> > > > > > > that
> > > > > > > > > use case
> > > > > > > > > > > > already,
> > > > > > > > > > > > > > > > > which is the example I gave of the change 
> > > > > > > > > > > > > > > > > logging
> > > > > > > store
> > > > > > > > > using the
> > > > > > > > > > > > > > > > > timestamp.  I am curious if this 
> > > > > > > > > > > > > > > > > functionality will
> > > > > be
> > > > > > > > > retained
> > > > > > > > > > > > when using
> > > > > > > > > > > > > > > > > built in state stores, or will a low-level 
> > > > > > > > > > > > > > > > > processor
> > > > > > > get a
> > > > > > > > > > > > KeyValueStore
> > > > > > > > > > > > > > > > > that no longer writes to the changelog topic 
> > > > > > > > > > > > > > > > > with
> > > > > the
> > > > > > > > > record's
> > > > > > > > > > > > timestamp.
> > > > > > > > > > > > > > > > > While I personally don't care much about that
> > > > > > > functionality
> > > > > > > > > > > > specifically, I
> > > > > > > > > > > > > > > > > have a general desire for custom state stores 
> > > > > > > > > > > > > > > > > to
> > > > > > > easily do
> > > > > > > > > the
> > > > > > > > > > > > things that
> > > > > > > > > > > > > > > > > built in state stores do.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > It genuinely did not occur to me that users 
> > > > > > > > > > > > > > > > > might be
> > > > > > > > > looking up
> > > > > > > > > > > > and/or
> > > > > > > > > > > > > > > > > > updating records of other keys from within a
> > > > > > > Processor.
> > > > > > > > > > > > > > > > > I'm glad you said this Sophie, because it 
> > > > > > > > > > > > > > > > > gives me
> > > > > an
> > > > > > > > > > > > opportunity to say
> > > > > > > > > > > > > > > > > that this is actually a *huge* use case for 
> > > > > > > > > > > > > > > > > my team.
> > > > > > > The
> > > > > > > > > state
> > > > > > > > > > > > store
> > > > > > > > > > > > > > > > > usability improvements I was referring to in 
> > > > > > > > > > > > > > > > > my
> > > > > > > previous
> > > > > > > > > message
> > > > > > > > > > > > were about
> > > > > > > > > > > > > > > > > enabling the user to write custom stores 
> > > > > > > > > > > > > > > > > while still
> > > > > > > easily
> > > > > > > > > > > > hooking into
> > > > > > > > > > > > > > > > > the ability to write to a changelog topic.  I 
> > > > > > > > > > > > > > > > > think
> > > > > > > that is
> > > > > > > > > > > > technically
> > > > > > > > > > > > > > > > > possible now, but I don't think it's trivial.
> > > > > > > > > Specifically, we
> > > > > > > > > > > > have an
> > > > > > > > > > > > > > > > > interest in making a state store with a 
> > > > > > > > > > > > > > > > > richer way
> > > > > of
> > > > > > > > > querying
> > > > > > > > > > > > its data
> > > > > > > > > > > > > > > > > (like perhaps getting all values associated 
> > > > > > > > > > > > > > > > > with a
> > > > > > > > > secondary
> > > > > > > > > > > > key), while
> > > > > > > > > > > > > > > > > still ultimately writing to the changelog 
> > > > > > > > > > > > > > > > > topic for
> > > > > > > later
> > > > > > > > > > > > restoration.
> > > > > > > > > > > > > > > > > We recognize that this use case throws away 
> > > > > > > > > > > > > > > > > some of
> > > > > > > what
> > > > > > > > > kafka
> > > > > > > > > > > > streams
> > > > > > > > > > > > > > > > > (especially the DSL) is good at - easy
> > > > > > > parallelizability by
> > > > > > > > > > > > partitioning
> > > > > > > > > > > > > > > > > all processing by key - and that our business 
> > > > > > > > > > > > > > > > > logic
> > > > > > > would
> > > > > > > > > > > > completely fall
> > > > > > > > > > > > > > > > > apart if we were consuming from 
> > > > > > > > > > > > > > > > > multi-partition
> > > > > > > topics with
> > > > > > > > > > > > multiple
> > > > > > > > > > > > > > > > > consumers.  But we have found that using the 
> > > > > > > > > > > > > > > > > low
> > > > > level
> > > > > > > > > processor
> > > > > > > > > > > > API is
> > > > > > > > > > > > > > > > > good for the very simple stream processing
> > > > > primitives
> > > > > > > it
> > > > > > > > > > > > provides: handling
> > > > > > > > > > > > > > > > > the plumbing of consuming from multiple kafka 
> > > > > > > > > > > > > > > > > topics
> > > > > > > and
> > > > > > > > > > > > potentially
> > > > > > > > > > > > > > > > > updating persistent local state in a reliable 
> > > > > > > > > > > > > > > > > way.
> > > > > > > That in
> > > > > > > > > > > > itself has
> > > > > > > > > > > > > > > > > proven to be a worthwhile programming model.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Since I got off track a bit, let me 
> > > > > > > > > > > > > > > > > summarize: I
> > > > > don't
> > > > > > > > > > > > particularly care
> > > > > > > > > > > > > > > > > about the record context being available to 
> > > > > > > > > > > > > > > > > state
> > > > > > > store
> > > > > > > > > > > > implementations,
> > > > > > > > > > > > > > > > > and I think this KIP is headed in the right
> > > > > direction
> > > > > > > in
> > > > > > > > > that
> > > > > > > > > > > > regard.  But
> > > > > > > > > > > > > > > > > more generally, I wanted to express the 
> > > > > > > > > > > > > > > > > importance
> > > > > of
> > > > > > > > > > > > maintaining a
> > > > > > > > > > > > > > > > > powerful and flexible StateStore interface.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Thanks!
> > > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie 
> > > > > > > > > > > > > > > > > Blee-Goldman
> > > > > <
> > > > > > > > > > > > sop...@confluent.io>
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Aha, I did misinterpret the example in your
> > > > > previous
> > > > > > > > > response
> > > > > > > > > > > > regarding the
> > > > > > > > > > > > > > > > > > range query after all. I thought you just 
> > > > > > > > > > > > > > > > > > meant a
> > > > > > > > > time-range
> > > > > > > > > > > > query inside a
> > > > > > > > > > > > > > > > > > punctuator. It genuinely did not occur to 
> > > > > > > > > > > > > > > > > > me that
> > > > > > > users
> > > > > > > > > might
> > > > > > > > > > > > be looking up
> > > > > > > > > > > > > > > > > > and/or updating records of other keys from 
> > > > > > > > > > > > > > > > > > within
> > > > > a
> > > > > > > > > Processor.
> > > > > > > > > > > > Sorry for
> > > > > > > > > > > > > > > > > > being closed minded
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > I won't drag out this discussion any 
> > > > > > > > > > > > > > > > > > further by
> > > > > > > asking
> > > > > > > > > whether
> > > > > > > > > > > > that might
> > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > a valid use case or just a lurking bug in 
> > > > > > > > > > > > > > > > > > itself
> > > > > :)
> > > > > > > > > > > > > > > > > > Thanks for humoring me. The current 
> > > > > > > > > > > > > > > > > > proposal for
> > > > > > > KIP-478
> > > > > > > > > > > > sounds good to me
> > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John 
> > > > > > > > > > > > > > > > > > Roesler <
> > > > > > > > > > > > vvcep...@apache.org> wrote:
> > > > > > > > > > > > > > > > > > > Ah, thanks Sophie,
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > I'm sorry for misinterpreting your 
> > > > > > > > > > > > > > > > > > > resonse. Yes,
> > > > > > > we
> > > > > > > > > > > > > > > > > > > absolutely can and should clear the 
> > > > > > > > > > > > > > > > > > > context
> > > > > before
> > > > > > > > > > > > > > > > > > > punctuating.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > My secondary concern is maybe more 
> > > > > > > > > > > > > > > > > > > far-fetched.
> > > > > I
> > > > > > > was
> > > > > > > > > > > > > > > > > > > thinking that inside process(key,value), a
> > > > > > > Processor
> > > > > > > > > might
> > > > > > > > > > > > > > > > > > > do a get/put of a _different_ key. 
> > > > > > > > > > > > > > > > > > > Consider, for
> > > > > > > > > example,
> > > > > > > > > > > > > > > > > > > the way that Suppress processors work. 
> > > > > > > > > > > > > > > > > > > When they
> > > > > > > get a
> > > > > > > > > > > > > > > > > > > record, they add it to the store and then 
> > > > > > > > > > > > > > > > > > > do a
> > > > > > > range
> > > > > > > > > scan
> > > > > > > > > > > > > > > > > > > and possibly forward a _different_ 
> > > > > > > > > > > > > > > > > > > record. Of
> > > > > > > course,
> > > > > > > > > this
> > > > > > > > > > > > > > > > > > > is an operation that is deeply coupled to 
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > internals, and
> > > > > > > > > > > > > > > > > > > the Suppress processor accordingly 
> > > > > > > > > > > > > > > > > > > actually does
> > > > > > > get
> > > > > > > > > access
> > > > > > > > > > > > > > > > > > > to the internal context so that it can 
> > > > > > > > > > > > > > > > > > > set the
> > > > > > > context
> > > > > > > > > > > > > > > > > > > before forwarding.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Still, it seems like I've had a handful of
> > > > > > > > > conversations
> > > > > > > > > > > > > > > > > > > with people over the years in which they 
> > > > > > > > > > > > > > > > > > > tell me
> > > > > > > they
> > > > > > > > > are
> > > > > > > > > > > > > > > > > > > using state stores in a way that 
> > > > > > > > > > > > > > > > > > > transcends the
> > > > > > > "get
> > > > > > > > > and put
> > > > > > > > > > > > > > > > > > > the currently processing record" access
> > > > > pattern. I
> > > > > > > > > doubt
> > > > > > > > > > > > > > > > > > > that those folks would even have 
> > > > > > > > > > > > > > > > > > > considered the
> > > > > > > > > possiblity
> > > > > > > > > > > > > > > > > > > that the currently processing record's 
> > > > > > > > > > > > > > > > > > > _context_
> > > > > > > could
> > > > > > > > > > > > > > > > > > > pollute their state store operations, as I
> > > > > myself
> > > > > > > > > never gave
> > > > > > > > > > > > > > > > > > > it a second thought until the current
> > > > > conversation
> > > > > > > > > began. In
> > > > > > > > > > > > > > > > > > > cases like that, we have actually set a 
> > > > > > > > > > > > > > > > > > > trap for
> > > > > > > these
> > > > > > > > > > > > > > > > > > > people, and it seems better to dismantle 
> > > > > > > > > > > > > > > > > > > the
> > > > > trap.
> > > > > > > > > > > > > > > > > > > As you noted, really the only people who 
> > > > > > > > > > > > > > > > > > > would
> > > > > be
> > > > > > > > > negatively
> > > > > > > > > > > > > > > > > > > impacted are people who implement their 
> > > > > > > > > > > > > > > > > > > own
> > > > > state
> > > > > > > > > stores.
> > > > > > > > > > > > > > > > > > > These folks will get the deprecation 
> > > > > > > > > > > > > > > > > > > warning and
> > > > > > > try to
> > > > > > > > > > > > > > > > > > > adapt their stores to the new interface. 
> > > > > > > > > > > > > > > > > > > If they
> > > > > > > needed
> > > > > > > > > > > > > > > > > > > access to the record context, they would 
> > > > > > > > > > > > > > > > > > > find
> > > > > > > it's now
> > > > > > > > > > > > > > > > > > > missing. They'd ask us about it, and we'd 
> > > > > > > > > > > > > > > > > > > have
> > > > > the
> > > > > > > > > ability
> > > > > > > > > > > > > > > > > > > to explain the lurking bug that they have 
> > > > > > > > > > > > > > > > > > > had in
> > > > > > > their
> > > > > > > > > > > > > > > > > > > stores all along, as well as the new 
> > > > > > > > > > > > > > > > > > > recommended
> > > > > > > > > pattern
> > > > > > > > > > > > > > > > > > > (just pass everything you need in the 
> > > > > > > > > > > > > > > > > > > value). If
> > > > > > > that's
> > > > > > > > > > > > > > > > > > > unsatisfying, _then_ we should consider 
> > > > > > > > > > > > > > > > > > > amending
> > > > > > > the
> > > > > > > > > API.
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie
> > > > > > > Blee-Goldman
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > Regarding your first sentence, "...the
> > > > > > > processor
> > > > > > > > > would
> > > > > > > > > > > > null
> > > > > > > > > > > > > > > > > > > > > out the record context...", this is 
> > > > > > > > > > > > > > > > > > > > > not
> > > > > > > possible,
> > > > > > > > > since
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > processor doesn't have write access 
> > > > > > > > > > > > > > > > > > > > > to the
> > > > > > > > > context. We
> > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > > add it,
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Sorry, this was poorly phrased, I 
> > > > > > > > > > > > > > > > > > > > definitely
> > > > > > > did not
> > > > > > > > > mean
> > > > > > > > > > > > to imply that
> > > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > > should make the context modifiable by 
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > Processors
> > > > > > > > > > > > themselves. I
> > > > > > > > > > > > > > > > > > meant
> > > > > > > > > > > > > > > > > > > > this should be handled by the internal
> > > > > > > processing
> > > > > > > > > > > > framework that deals
> > > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > passing records from one Processor to 
> > > > > > > > > > > > > > > > > > > > the
> > > > > next,
> > > > > > > > > setting
> > > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > when a new record is picked up, 
> > > > > > > > > > > > > > > > > > > > invoking the
> > > > > > > > > punctuators,
> > > > > > > > > > > > etc. I
> > > > > > > > > > > > > > > > > > believe
> > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > all currently happens in the 
> > > > > > > > > > > > > > > > > > > > StreamTask? It
> > > > > > > already
> > > > > > > > > can
> > > > > > > > > > > > and does
> > > > > > > > > > > > > > > > > > > overwrite
> > > > > > > > > > > > > > > > > > > > the record context as new records are
> > > > > > > processed, and
> > > > > > > > > is
> > > > > > > > > > > > also
> > > > > > > > > > > > > > > > > > responsible
> > > > > > > > > > > > > > > > > > > > for calling the punctuators, so it 
> > > > > > > > > > > > > > > > > > > > doesn't
> > > > > seem
> > > > > > > like
> > > > > > > > > a
> > > > > > > > > > > > huge leap to
> > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > say
> > > > > > > > > > > > > > > > > > > > "null out the current record before
> > > > > punctuating"
> > > > > > > > > > > > > > > > > > > > To clarify, I was never advocating or 
> > > > > > > > > > > > > > > > > > > > even
> > > > > > > > > considering to
> > > > > > > > > > > > give the
> > > > > > > > > > > > > > > > > > > > Processors
> > > > > > > > > > > > > > > > > > > > write access to the record context. 
> > > > > > > > > > > > > > > > > > > > Sorry if
> > > > > my
> > > > > > > last
> > > > > > > > > > > > message (or all of
> > > > > > > > > > > > > > > > > > > > them)
> > > > > > > > > > > > > > > > > > > > was misleading. I just wanted to point 
> > > > > > > > > > > > > > > > > > > > out
> > > > > that
> > > > > > > the
> > > > > > > > > > > > punctuator concern
> > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > orthogonal to the question of whether we
> > > > > should
> > > > > > > > > include
> > > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > in the StateStoreContext. It's 
> > > > > > > > > > > > > > > > > > > > definitely a
> > > > > real
> > > > > > > > > problem,
> > > > > > > > > > > > but it's a
> > > > > > > > > > > > > > > > > > > > problem
> > > > > > > > > > > > > > > > > > > > that exists at the Processor level and 
> > > > > > > > > > > > > > > > > > > > not
> > > > > just
> > > > > > > the
> > > > > > > > > > > > StateStore.
> > > > > > > > > > > > > > > > > > > > So, I don't think there is any reason we
> > > > > *can't*
> > > > > > > > > retain
> > > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > in the
> > > > > > > > > > > > > > > > > > > > StateStoreContext, and if any users 
> > > > > > > > > > > > > > > > > > > > came along
> > > > > > > with a
> > > > > > > > > > > > clear use case
> > > > > > > > > > > > > > > > > > I'd
> > > > > > > > > > > > > > > > > > > > find
> > > > > > > > > > > > > > > > > > > > that convincing. In the absence of any
> > > > > > > examples, the
> > > > > > > > > > > > conservative
> > > > > > > > > > > > > > > > > > > approach
> > > > > > > > > > > > > > > > > > > > sounds good to me.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > If it turns out that someone did need 
> > > > > > > > > > > > > > > > > > > > the
> > > > > record
> > > > > > > > > context
> > > > > > > > > > > > in their
> > > > > > > > > > > > > > > > > > custom
> > > > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > store, I'm sure they'll submit a 
> > > > > > > > > > > > > > > > > > > > politely
> > > > > > > worded bug
> > > > > > > > > > > > report alerting us
> > > > > > > > > > > > > > > > > > > > that we
> > > > > > > > > > > > > > > > > > > > broke their application.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John 
> > > > > > > > > > > > > > > > > > > > Roesler <
> > > > > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > Thanks, Sophie,
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Yes, now that you point it out, I can 
> > > > > > > > > > > > > > > > > > > > > see
> > > > > > > that the
> > > > > > > > > record
> > > > > > > > > > > > > > > > > > > > > context itself should be nulled out by
> > > > > Streams
> > > > > > > > > before
> > > > > > > > > > > > > > > > > > > > > invoking punctuators. From that 
> > > > > > > > > > > > > > > > > > > > > perspective,
> > > > > > > we
> > > > > > > > > don't
> > > > > > > > > > > > need
> > > > > > > > > > > > > > > > > > > > > to think about the second-order 
> > > > > > > > > > > > > > > > > > > > > problem of
> > > > > > > what's
> > > > > > > > > in the
> > > > > > > > > > > > > > > > > > > > > context for the state store when 
> > > > > > > > > > > > > > > > > > > > > called
> > > > > from a
> > > > > > > > > > > > punctuator.
> > > > > > > > > > > > > > > > > > > > > Regarding your first sentence, "...the
> > > > > > > processor
> > > > > > > > > would
> > > > > > > > > > > > null
> > > > > > > > > > > > > > > > > > > > > out the record context...", this is 
> > > > > > > > > > > > > > > > > > > > > not
> > > > > > > possible,
> > > > > > > > > since
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > processor doesn't have write access 
> > > > > > > > > > > > > > > > > > > > > to the
> > > > > > > > > context. We
> > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > > add it, but then all kinds of strange
> > > > > effects
> > > > > > > > > would ensue
> > > > > > > > > > > > > > > > > > > > > when downstream processors execute 
> > > > > > > > > > > > > > > > > > > > > but the
> > > > > > > context
> > > > > > > > > is
> > > > > > > > > > > > empty,
> > > > > > > > > > > > > > > > > > > > > etc. Better to just let the framework 
> > > > > > > > > > > > > > > > > > > > > manage
> > > > > > > the
> > > > > > > > > record
> > > > > > > > > > > > > > > > > > > > > context and keep it read-only for
> > > > > Processors.
> > > > > > > > > > > > > > > > > > > > > Reading between the lines of your last
> > > > > reply,
> > > > > > > it
> > > > > > > > > sounds
> > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > the disconnect may just have been a 
> > > > > > > > > > > > > > > > > > > > > mutual
> > > > > > > > > > > > misunderstanding
> > > > > > > > > > > > > > > > > > > > > about whether or not Processors 
> > > > > > > > > > > > > > > > > > > > > currently
> > > > > have
> > > > > > > > > access to
> > > > > > > > > > > > set
> > > > > > > > > > > > > > > > > > > > > the record context. Since they do 
> > > > > > > > > > > > > > > > > > > > > not, if we
> > > > > > > > > wanted to
> > > > > > > > > > > > add
> > > > > > > > > > > > > > > > > > > > > the record context to 
> > > > > > > > > > > > > > > > > > > > > StateStoreContext in a
> > > > > > > > > well-defined
> > > > > > > > > > > > > > > > > > > > > way, we'd also have to add the 
> > > > > > > > > > > > > > > > > > > > > ability for
> > > > > > > > > Processors to
> > > > > > > > > > > > > > > > > > > > > manipulate it. But then, we're just
> > > > > creating a
> > > > > > > > > > > > side-channel
> > > > > > > > > > > > > > > > > > > > > for Processors to pass some 
> > > > > > > > > > > > > > > > > > > > > information in
> > > > > > > > > arguments to
> > > > > > > > > > > > > > > > > > > > > "put()" and other information 
> > > > > > > > > > > > > > > > > > > > > implicitly
> > > > > > > through
> > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > context. It seems better just to go 
> > > > > > > > > > > > > > > > > > > > > for a
> > > > > > > single
> > > > > > > > > channel
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > > > > > > > now.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > It sounds like you're basically in 
> > > > > > > > > > > > > > > > > > > > > favor of
> > > > > > > the
> > > > > > > > > > > > conservative
> > > > > > > > > > > > > > > > > > > > > approach, and you just wanted to 
> > > > > > > > > > > > > > > > > > > > > understand
> > > > > > > the
> > > > > > > > > blockers
> > > > > > > > > > > > > > > > > > > > > that I implied. Does my clarification 
> > > > > > > > > > > > > > > > > > > > > make
> > > > > > > sense?
> > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, 
> > > > > > > > > > > > > > > > > > > > > Sophie
> > > > > > > > > Blee-Goldman
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > I was just thinking that the 
> > > > > > > > > > > > > > > > > > > > > > processor
> > > > > would
> > > > > > > > > null out
> > > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > > after it
> > > > > > > > > > > > > > > > > > > > > > finished processing the record, so 
> > > > > > > > > > > > > > > > > > > > > > I'm not
> > > > > > > sure I
> > > > > > > > > > > > follow why this
> > > > > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > > possible? AFAIK we never call a 
> > > > > > > > > > > > > > > > > > > > > > punctuator
> > > > > > > in the
> > > > > > > > > > > > middle of
> > > > > > > > > > > > > > > > > > > processing a
> > > > > > > > > > > > > > > > > > > > > > record through the topology, and 
> > > > > > > > > > > > > > > > > > > > > > even if
> > > > > we
> > > > > > > did,
> > > > > > > > > we
> > > > > > > > > > > > still know when
> > > > > > > > > > > > > > > > > > > it is
> > > > > > > > > > > > > > > > > > > > > > about
> > > > > > > > > > > > > > > > > > > > > > to be called and could set it to 
> > > > > > > > > > > > > > > > > > > > > > null
> > > > > > > beforehand.
> > > > > > > > > > > > > > > > > > > > > > I'm not trying to advocate for it 
> > > > > > > > > > > > > > > > > > > > > > here,
> > > > > I'm
> > > > > > > in
> > > > > > > > > > > > agreement that
> > > > > > > > > > > > > > > > > > > anything
> > > > > > > > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > > > > > > > > to access within the store can and 
> > > > > > > > > > > > > > > > > > > > > > should
> > > > > be
> > > > > > > > > accessed
> > > > > > > > > > > > within the
> > > > > > > > > > > > > > > > > > > calling
> > > > > > > > > > > > > > > > > > > > > > Processor/Punctuator before 
> > > > > > > > > > > > > > > > > > > > > > reaching the
> > > > > > > store.
> > > > > > > > > The
> > > > > > > > > > > > "we can always
> > > > > > > > > > > > > > > > > > > add it
> > > > > > > > > > > > > > > > > > > > > > later if necessary" argument is also
> > > > > pretty
> > > > > > > > > > > > convincing. Just trying
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > understand
> > > > > > > > > > > > > > > > > > > > > > why this wouldn't be possible.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > FWIW, the question of "what is the 
> > > > > > > > > > > > > > > > > > > > > > current
> > > > > > > > > record in
> > > > > > > > > > > > the context
> > > > > > > > > > > > > > > > > > of a
> > > > > > > > > > > > > > > > > > > > > > Punctuator"
> > > > > > > > > > > > > > > > > > > > > > exists independently of whether we 
> > > > > > > > > > > > > > > > > > > > > > want to
> > > > > > > add
> > > > > > > > > this to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > > > > > > > or not. The full ProcessorContext,
> > > > > > > including the
> > > > > > > > > > > > current record
> > > > > > > > > > > > > > > > > > > context,
> > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > already available within a 
> > > > > > > > > > > > > > > > > > > > > > Punctuator, so
> > > > > > > > > removing the
> > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > > from the StateStoreContext does not 
> > > > > > > > > > > > > > > > > > > > > > solve
> > > > > > > the
> > > > > > > > > problem.
> > > > > > > > > > > > Users can --
> > > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > > > > > (see KAFKA-9584 <
> > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9584
> > > > > > > > > > > > > > > > > > > ;;)
> > > > > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > > > > hit
> > > > > > > > > > > > > > > > > > > > > > such subtle bugs without ever 
> > > > > > > > > > > > > > > > > > > > > > invoking a
> > > > > > > > > StateStore
> > > > > > > > > > > > > > > > > > > > > > from their punctuator.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Again, I think I do agree that we 
> > > > > > > > > > > > > > > > > > > > > > should
> > > > > > > leave
> > > > > > > > > the
> > > > > > > > > > > > current record
> > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > > off of
> > > > > > > > > > > > > > > > > > > > > > the StateStoreContext, but I don't 
> > > > > > > > > > > > > > > > > > > > > > think
> > > > > the
> > > > > > > > > > > > Punctuator argument
> > > > > > > > > > > > > > > > > > > against
> > > > > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > > > > very convincing. It sounds to me 
> > > > > > > > > > > > > > > > > > > > > > like we
> > > > > > > need to
> > > > > > > > > > > > disallow access to
> > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > > > > > > record context from within the 
> > > > > > > > > > > > > > > > > > > > > > Punctuator,
> > > > > > > > > independent
> > > > > > > > > > > > of anything
> > > > > > > > > > > > > > > > > > > to do
> > > > > > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > > > > state stores
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John
> > > > > > > Roesler <
> > > > > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > Thanks for the thoughts, Sophie.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > I agree that the extra 
> > > > > > > > > > > > > > > > > > > > > > > information could
> > > > > > > be
> > > > > > > > > useful.
> > > > > > > > > > > > My only
> > > > > > > > > > > > > > > > > > > concern is
> > > > > > > > > > > > > > > > > > > > > > > that it doesn’t seem like we can
> > > > > actually
> > > > > > > > > supply
> > > > > > > > > > > > that extra
> > > > > > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > > > > > > > correctly. So, then we have a 
> > > > > > > > > > > > > > > > > > > > > > > situation
> > > > > > > where
> > > > > > > > > the
> > > > > > > > > > > > system offers
> > > > > > > > > > > > > > > > > > > useful
> > > > > > > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > > > > > > calls that are only correct in a 
> > > > > > > > > > > > > > > > > > > > > > > narrow
> > > > > > > range
> > > > > > > > > of use
> > > > > > > > > > > > cases.
> > > > > > > > > > > > > > > > > > > Outside of
> > > > > > > > > > > > > > > > > > > > > > > those use cases, you get incorrect
> > > > > > > behavior.
> > > > > > > > > > > > > > > > > > > > > > > If it were possible to null out 
> > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > context
> > > > > > > > > before
> > > > > > > > > > > > you put a
> > > > > > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > which the context doesn’t apply, 
> > > > > > > > > > > > > > > > > > > > > > > then
> > > > > the
> > > > > > > > > concern
> > > > > > > > > > > > would be
> > > > > > > > > > > > > > > > > > > mitigated.
> > > > > > > > > > > > > > > > > > > > > But
> > > > > > > > > > > > > > > > > > > > > > > it would still be pretty weird 
> > > > > > > > > > > > > > > > > > > > > > > from the
> > > > > > > > > perspective
> > > > > > > > > > > > of the store
> > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > sometimes the context is 
> > > > > > > > > > > > > > > > > > > > > > > populated and
> > > > > > > other
> > > > > > > > > times,
> > > > > > > > > > > > it’s null.
> > > > > > > > > > > > > > > > > > > > > > > But that seems moot, since it 
> > > > > > > > > > > > > > > > > > > > > > > doesn’t
> > > > > seem
> > > > > > > > > possible
> > > > > > > > > > > > to null out
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > context. Only the Processor could 
> > > > > > > > > > > > > > > > > > > > > > > know
> > > > > > > whether
> > > > > > > > > it’s
> > > > > > > > > > > > about to put
> > > > > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > > > > > > > different from the context or 
> > > > > > > > > > > > > > > > > > > > > > > not. And
> > > > > it
> > > > > > > > > would be
> > > > > > > > > > > > inappropriate
> > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > offer a
> > > > > > > > > > > > > > > > > > > > > > > public ProcessorContext api to 
> > > > > > > > > > > > > > > > > > > > > > > manage
> > > > > the
> > > > > > > > > record
> > > > > > > > > > > > context.
> > > > > > > > > > > > > > > > > > > > > > > Ultimately, it still seems like 
> > > > > > > > > > > > > > > > > > > > > > > if you
> > > > > > > want to
> > > > > > > > > store
> > > > > > > > > > > > headers, you
> > > > > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > > > > store them explicitly, right? That
> > > > > > > doesn’t seem
> > > > > > > > > > > > onerous to me,
> > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > kind
> > > > > > > > > > > > > > > > > > > > > > > of seems better than relying on
> > > > > undefined
> > > > > > > or
> > > > > > > > > > > > asymmetrical
> > > > > > > > > > > > > > > > > > behavior
> > > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > store itself.
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > Anyway, I’m not saying that we 
> > > > > > > > > > > > > > > > > > > > > > > couldn’t
> > > > > > > solve
> > > > > > > > > these
> > > > > > > > > > > > problems.
> > > > > > > > > > > > > > > > > > Just
> > > > > > > > > > > > > > > > > > > > > that it
> > > > > > > > > > > > > > > > > > > > > > > seems a little that we can be
> > > > > > > conservative and
> > > > > > > > > avoid
> > > > > > > > > > > > them for
> > > > > > > > > > > > > > > > > > now.
> > > > > > > > > > > > > > > > > > > If
> > > > > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > > > > turns out we really need to solve 
> > > > > > > > > > > > > > > > > > > > > > > them,
> > > > > > > we can
> > > > > > > > > > > > always do it
> > > > > > > > > > > > > > > > > > later.
> > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, 
> > > > > > > > > > > > > > > > > > > > > > > Sophie
> > > > > > > > > Blee-Goldman
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > If you were to call "put" 
> > > > > > > > > > > > > > > > > > > > > > > > > from a
> > > > > > > > > punctuator, or
> > > > > > > > > > > > do a
> > > > > > > > > > > > > > > > > > > > > > > > > `range()` query and then 
> > > > > > > > > > > > > > > > > > > > > > > > > update one
> > > > > of
> > > > > > > > > those
> > > > > > > > > > > > records with
> > > > > > > > > > > > > > > > > > > > > > > > > `put()`, you'd have a very 
> > > > > > > > > > > > > > > > > > > > > > > > > subtle
> > > > > bug
> > > > > > > on
> > > > > > > > > your
> > > > > > > > > > > > hands.
> > > > > > > > > > > > > > > > > > > > > > > > Can you elaborate on this a 
> > > > > > > > > > > > > > > > > > > > > > > > bit? I
> > > > > agree
> > > > > > > > > that the
> > > > > > > > > > > > punctuator
> > > > > > > > > > > > > > > > > > > case is
> > > > > > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > > > > > obvious exemption to the 
> > > > > > > > > > > > > > > > > > > > > > > > assumption
> > > > > that
> > > > > > > > > store
> > > > > > > > > > > > invocations
> > > > > > > > > > > > > > > > > > always
> > > > > > > > > > > > > > > > > > > > > > > > have a corresponding "current 
> > > > > > > > > > > > > > > > > > > > > > > > record",
> > > > > > > but I
> > > > > > > > > don't
> > > > > > > > > > > > understand
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > second example. Are you 
> > > > > > > > > > > > > > > > > > > > > > > > envisioning a
> > > > > > > > > scenario
> > > > > > > > > > > > where the
> > > > > > > > > > > > > > > > > > #process
> > > > > > > > > > > > > > > > > > > > > > > > method performs a range query 
> > > > > > > > > > > > > > > > > > > > > > > > and then
> > > > > > > > > updates
> > > > > > > > > > > > records? Or were
> > > > > > > > > > > > > > > > > > > > > > > > you just giving another example 
> > > > > > > > > > > > > > > > > > > > > > > > of the
> > > > > > > > > punctuator
> > > > > > > > > > > > case?
> > > > > > > > > > > > > > > > > > > > > > > > I only bring it up because I 
> > > > > > > > > > > > > > > > > > > > > > > > agree
> > > > > that
> > > > > > > the
> > > > > > > > > > > > current record
> > > > > > > > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > > > > > still be useful within the 
> > > > > > > > > > > > > > > > > > > > > > > > context of
> > > > > > > the
> > > > > > > > > store.
> > > > > > > > > > > > As a non-user
> > > > > > > > > > > > > > > > > > my
> > > > > > > > > > > > > > > > > > > > > input
> > > > > > > > > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > definitely has limited value, 
> > > > > > > > > > > > > > > > > > > > > > > > but it
> > > > > > > just
> > > > > > > > > isn't
> > > > > > > > > > > > striking me as
> > > > > > > > > > > > > > > > > > > > > obvious
> > > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > > > > > > should remove access to the 
> > > > > > > > > > > > > > > > > > > > > > > > current
> > > > > > > record
> > > > > > > > > context
> > > > > > > > > > > > from the
> > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > > stores.
> > > > > > > > > > > > > > > > > > > > > > > > If there is no current record, 
> > > > > > > > > > > > > > > > > > > > > > > > as in
> > > > > the
> > > > > > > > > > > > punctuator case, we
> > > > > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > > > > > set
> > > > > > > > > > > > > > > > > > > > > > > > the record context to null (or
> > > > > > > > > Optional.empty,
> > > > > > > > > > > > etc).
> > > > > > > > > > > > > > > > > > > > > > > > That said, the put() always has 
> > > > > > > > > > > > > > > > > > > > > > > > to
> > > > > come
> > > > > > > from
> > > > > > > > > > > > somewhere, and
> > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > somewhere is always going to be
> > > > > either a
> > > > > > > > > Processor
> > > > > > > > > > > > or a
> > > > > > > > > > > > > > > > > > > Punctuator,
> > > > > > > > > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > > > > > > > > > of which will still have access 
> > > > > > > > > > > > > > > > > > > > > > > > to the
> > > > > > > full
> > > > > > > > > > > > context. So
> > > > > > > > > > > > > > > > > > > additional
> > > > > > > > > > > > > > > > > > > > > info
> > > > > > > > > > > > > > > > > > > > > > > > such as
> > > > > > > > > > > > > > > > > > > > > > > > the timestamp can and should 
> > > > > > > > > > > > > > > > > > > > > > > > probably
> > > > > be
> > > > > > > > > supplied
> > > > > > > > > > > > to the store
> > > > > > > > > > > > > > > > > > > before
> > > > > > > > > > > > > > > > > > > > > > > > calling put(), rather than 
> > > > > > > > > > > > > > > > > > > > > > > > looked up
> > > > > by
> > > > > > > the
> > > > > > > > > store.
> > > > > > > > > > > > But I can
> > > > > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > > > > > > > things being useful, for 
> > > > > > > > > > > > > > > > > > > > > > > > example the
> > > > > > > current
> > > > > > > > > > > > record's headers.
> > > > > > > > > > > > > > > > > > > Maybe
> > > > > > > > > > > > > > > > > > > > > > > if/when
> > > > > > > > > > > > > > > > > > > > > > > > we add better (or any) support 
> > > > > > > > > > > > > > > > > > > > > > > > for
> > > > > > > headers in
> > > > > > > > > > > > state stores this
> > > > > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > > > > less true.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > Of course as John has made 
> > > > > > > > > > > > > > > > > > > > > > > > clear, it's
> > > > > > > > > pretty hard
> > > > > > > > > > > > to judge
> > > > > > > > > > > > > > > > > > > without
> > > > > > > > > > > > > > > > > > > > > > > > examples
> > > > > > > > > > > > > > > > > > > > > > > > and more insight as to what 
> > > > > > > > > > > > > > > > > > > > > > > > actually
> > > > > > > goes on
> > > > > > > > > > > > within a custom
> > > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > > store
> > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM 
> > > > > > > > > > > > > > > > > > > > > > > > John
> > > > > > > Roesler <
> > > > > > > > > > > > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > Hi Paul,
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > It's good to hear from you!
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > I'm glad you're in favor of 
> > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > direction.
> > > > > > > > > > > > Especially when
> > > > > > > > > > > > > > > > > > > > > > > > > it comes to public API and 
> > > > > > > > > > > > > > > > > > > > > > > > > usability
> > > > > > > > > concens, I
> > > > > > > > > > > > tend to
> > > > > > > > > > > > > > > > > > > > > > > > > think that "the folks who 
> > > > > > > > > > > > > > > > > > > > > > > > > matter"
> > > > > are
> > > > > > > > > actually
> > > > > > > > > > > > the folks who
> > > > > > > > > > > > > > > > > > > > > > > > > have to use the APIs to 
> > > > > > > > > > > > > > > > > > > > > > > > > accomplish
> > > > > > > real
> > > > > > > > > tasks.
> > > > > > > > > > > > It can be
> > > > > > > > > > > > > > > > > > > > > > > > > hard for me to be sure I'm 
> > > > > > > > > > > > > > > > > > > > > > > > > thinking
> > > > > > > > > clearly from
> > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > perspective.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > Funny story, I also started 
> > > > > > > > > > > > > > > > > > > > > > > > > down
> > > > > this
> > > > > > > road
> > > > > > > > > a
> > > > > > > > > > > > couple of times
> > > > > > > > > > > > > > > > > > > > > > > > > already and backed them out 
> > > > > > > > > > > > > > > > > > > > > > > > > before
> > > > > > > the KIP
> > > > > > > > > > > > because I was
> > > > > > > > > > > > > > > > > > > > > > > > > afraid of the scope of the 
> > > > > > > > > > > > > > > > > > > > > > > > > proposal.
> > > > > > > > > > > > Unfortunately, needing
> > > > > > > > > > > > > > > > > > > > > > > > > to make a new 
> > > > > > > > > > > > > > > > > > > > > > > > > ProcessorContext kind
> > > > > of
> > > > > > > > > forced my
> > > > > > > > > > > > hand.
> > > > > > > > > > > > > > > > > > > > > > > > > I see you've called me out 
> > > > > > > > > > > > > > > > > > > > > > > > > about the
> > > > > > > > > > > > ChangeLogging stores :)
> > > > > > > > > > > > > > > > > > > > > > > > > In fact, I think these are the
> > > > > > > main/only
> > > > > > > > > reason
> > > > > > > > > > > > that stores
> > > > > > > > > > > > > > > > > > > > > > > > > might really need to invoke
> > > > > > > "forward()". My
> > > > > > > > > > > > secret plan was
> > > > > > > > > > > > > > > > > > > > > > > > > to cheat and either accomplish
> > > > > > > > > change-logging by
> > > > > > > > > > > > a different
> > > > > > > > > > > > > > > > > > > > > > > > > mechanism than implementing 
> > > > > > > > > > > > > > > > > > > > > > > > > the
> > > > > store
> > > > > > > > > interface,
> > > > > > > > > > > > or by just
> > > > > > > > > > > > > > > > > > > > > > > > > breaking encapsulation to 
> > > > > > > > > > > > > > > > > > > > > > > > > sneak the
> > > > > > > "real"
> > > > > > > > > > > > ProcessorContext
> > > > > > > > > > > > > > > > > > > > > > > > > into the ChangeLogging 
> > > > > > > > > > > > > > > > > > > > > > > > > stores. But
> > > > > > > those
> > > > > > > > > are all
> > > > > > > > > > > > > > > > > > > > > > > > > implementation details. I 
> > > > > > > > > > > > > > > > > > > > > > > > > think the
> > > > > > > key
> > > > > > > > > question
> > > > > > > > > > > > is whether
> > > > > > > > > > > > > > > > > > > > > > > > > anyone else has a store
> > > > > > > implementation that
> > > > > > > > > > > > needs to call
> > > > > > > > > > > > > > > > > > > > > > > > > "forward()". It's not what you
> > > > > > > mentioned,
> > > > > > > > > but
> > > > > > > > > > > > since you
> > > > > > > > > > > > > > > > > > > > > > > > > spoke up, I'll just ask: if 
> > > > > > > > > > > > > > > > > > > > > > > > > you have
> > > > > > > a use
> > > > > > > > > case
> > > > > > > > > > > > for calling
> > > > > > > > > > > > > > > > > > > > > > > > > "forward()" in a store, 
> > > > > > > > > > > > > > > > > > > > > > > > > please share
> > > > > > > it.
> > > > > > > > > > > > > > > > > > > > > > > > > Regarding the other 
> > > > > > > > > > > > > > > > > > > > > > > > > record-specific
> > > > > > > context
> > > > > > > > > > > > methods, I think
> > > > > > > > > > > > > > > > > > > > > > > > > you have a good point, but I 
> > > > > > > > > > > > > > > > > > > > > > > > > also
> > > > > > > can't
> > > > > > > > > quite
> > > > > > > > > > > > wrap my head
> > > > > > > > > > > > > > > > > > > > > > > > > around how we can actually 
> > > > > > > > > > > > > > > > > > > > > > > > > guarantee
> > > > > > > it to
> > > > > > > > > work
> > > > > > > > > > > > in general.
> > > > > > > > > > > > > > > > > > > > > > > > > For example, the case you 
> > > > > > > > > > > > > > > > > > > > > > > > > cited,
> > > > > > > where the
> > > > > > > > > > > > implementation of
> > > > > > > > > > > > > > > > > > > > > > > > > `KeyValueStore#put(key, 
> > > > > > > > > > > > > > > > > > > > > > > > > value)` uses
> > > > > > > the
> > > > > > > > > context
> > > > > > > > > > > > to augment
> > > > > > > > > > > > > > > > > > > > > > > > > the record with timestamp
> > > > > > > information. This
> > > > > > > > > > > > relies on the
> > > > > > > > > > > > > > > > > > > > > > > > > assumption that you would 
> > > > > > > > > > > > > > > > > > > > > > > > > only call
> > > > > > > > > "put()" from
> > > > > > > > > > > > inside a
> > > > > > > > > > > > > > > > > > > > > > > > > `Processor#process(key, 
> > > > > > > > > > > > > > > > > > > > > > > > > value)` call
> > > > > > > in
> > > > > > > > > which
> > > > > > > > > > > > the record
> > > > > > > > > > > > > > > > > > > > > > > > > being processed is the same 
> > > > > > > > > > > > > > > > > > > > > > > > > record
> > > > > > > that
> > > > > > > > > you're
> > > > > > > > > > > > trying to put
> > > > > > > > > > > > > > > > > > > > > > > > > into the store.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > If you were to call "put" 
> > > > > > > > > > > > > > > > > > > > > > > > > from a
> > > > > > > > > punctuator, or
> > > > > > > > > > > > do a
> > > > > > > > > > > > > > > > > > > > > > > > > `range()` query and then 
> > > > > > > > > > > > > > > > > > > > > > > > > update one
> > > > > of
> > > > > > > > > those
> > > > > > > > > > > > records with
> > > > > > > > > > > > > > > > > > > > > > > > > `put()`, you'd have a very 
> > > > > > > > > > > > > > > > > > > > > > > > > subtle
> > > > > bug
> > > > > > > on
> > > > > > > > > your
> > > > > > > > > > > > hands. Right
> > > > > > > > > > > > > > > > > > > > > > > > > now, the Streams component 
> > > > > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > actually
> > > > > > > > > calls
> > > > > > > > > > > > the Processor
> > > > > > > > > > > > > > > > > > > > > > > > > takes care to set the right 
> > > > > > > > > > > > > > > > > > > > > > > > > record
> > > > > > > context
> > > > > > > > > > > > before invoking
> > > > > > > > > > > > > > > > > > > > > > > > > the method, and in the case of
> > > > > > > caching,
> > > > > > > > > etc., it
> > > > > > > > > > > > also takes
> > > > > > > > > > > > > > > > > > > > > > > > > care to swap out the old 
> > > > > > > > > > > > > > > > > > > > > > > > > context and
> > > > > > > keep
> > > > > > > > > it
> > > > > > > > > > > > somewhere safe.
> > > > > > > > > > > > > > > > > > > > > > > > > But when it comes to public 
> > > > > > > > > > > > > > > > > > > > > > > > > API
> > > > > > > Processors
> > > > > > > > > > > > calling methods
> > > > > > > > > > > > > > > > > > > > > > > > > on StateStores, there's no
> > > > > > > opportunity for
> > > > > > > > > any
> > > > > > > > > > > > component to
> > > > > > > > > > > > > > > > > > > > > > > > > make sure the context is 
> > > > > > > > > > > > > > > > > > > > > > > > > always
> > > > > > > correct.
> > > > > > > > > > > > > > > > > > > > > > > > > In the face of that 
> > > > > > > > > > > > > > > > > > > > > > > > > situation, it
> > > > > > > seemed
> > > > > > > > > better
> > > > > > > > > > > > to just move
> > > > > > > > > > > > > > > > > > > > > > > > > in the direction of a 
> > > > > > > > > > > > > > > > > > > > > > > > > "normal" data
> > > > > > > store.
> > > > > > > > > I.e.,
> > > > > > > > > > > > when you
> > > > > > > > > > > > > > > > > > > > > > > > > use a HashMap or RocksDB or 
> > > > > > > > > > > > > > > > > > > > > > > > > other
> > > > > > > "state
> > > > > > > > > > > > stores", you don't
> > > > > > > > > > > > > > > > > > > > > > > > > expect them to automatically 
> > > > > > > > > > > > > > > > > > > > > > > > > know
> > > > > > > extra
> > > > > > > > > stuff
> > > > > > > > > > > > about the
> > > > > > > > > > > > > > > > > > > > > > > > > record you're storing. If you 
> > > > > > > > > > > > > > > > > > > > > > > > > need
> > > > > > > them to
> > > > > > > > > know
> > > > > > > > > > > > something,
> > > > > > > > > > > > > > > > > > > > > > > > > you just put it in the value.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > All of that said, I'm just 
> > > > > > > > > > > > > > > > > > > > > > > > > reasoning
> > > > > > > from
> > > > > > > > > first
> > > > > > > > > > > > principles
> > > > > > > > > > > > > > > > > > > > > > > > > here. To really know if this 
> > > > > > > > > > > > > > > > > > > > > > > > > is a
> > > > > > > mistake
> > > > > > > > > or
> > > > > > > > > > > > not, I need to
> > > > > > > > > > > > > > > > > > > > > > > > > be in your place. So please 
> > > > > > > > > > > > > > > > > > > > > > > > > push
> > > > > back
> > > > > > > if
> > > > > > > > > you
> > > > > > > > > > > > think what I
> > > > > > > > > > > > > > > > > > > > > > > > > said is nonsense. My personal 
> > > > > > > > > > > > > > > > > > > > > > > > > plan
> > > > > > > was to
> > > > > > > > > keep
> > > > > > > > > > > > an eye out
> > > > > > > > > > > > > > > > > > > > > > > > > during the period where the 
> > > > > > > > > > > > > > > > > > > > > > > > > old API
> > > > > > > was
> > > > > > > > > still
> > > > > > > > > > > > present, but
> > > > > > > > > > > > > > > > > > > > > > > > > deprecated, to see if people 
> > > > > > > > > > > > > > > > > > > > > > > > > were
> > > > > > > > > struggling to
> > > > > > > > > > > > use the new
> > > > > > > > > > > > > > > > > > > > > > > > > API. If so, then we'd have a 
> > > > > > > > > > > > > > > > > > > > > > > > > chance
> > > > > to
> > > > > > > > > address
> > > > > > > > > > > > it before
> > > > > > > > > > > > > > > > > > > > > > > > > dropping the old API. But 
> > > > > > > > > > > > > > > > > > > > > > > > > it's even
> > > > > > > better
> > > > > > > > > if
> > > > > > > > > > > > you can help
> > > > > > > > > > > > > > > > > > > > > > > > > think it through now.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > It did also cross my mind to 
> > > > > > > > > > > > > > > > > > > > > > > > > _not_
> > > > > > > add the
> > > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext, but just to
> > > > > > > continue to
> > > > > > > > > punt
> > > > > > > > > > > > on the
> > > > > > > > > > > > > > > > > > > > > > > > > question by just dropping in 
> > > > > > > > > > > > > > > > > > > > > > > > > the new
> > > > > > > > > > > > ProcessorContext to the
> > > > > > > > > > > > > > > > > > > > > > > > > new init method. If
> > > > > StateStoreContext
> > > > > > > > > seems too
> > > > > > > > > > > > bold, we can
> > > > > > > > > > > > > > > > > > > > > > > > > go that direction. But if we
> > > > > actually
> > > > > > > add
> > > > > > > > > some
> > > > > > > > > > > > methods to
> > > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext, I'd like 
> > > > > > > > > > > > > > > > > > > > > > > > > to be
> > > > > > > able to
> > > > > > > > > ensure
> > > > > > > > > > > > they would
> > > > > > > > > > > > > > > > > > > > > > > > > be well defined. I think the 
> > > > > > > > > > > > > > > > > > > > > > > > > current
> > > > > > > > > situation
> > > > > > > > > > > > was more of
> > > > > > > > > > > > > > > > > > > > > > > > > an oversight than a choice.
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > Thanks again for your reply,
> > > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 
> > > > > > > > > > > > > > > > > > > > > > > > > -0500,
> > > > > > > Paul
> > > > > > > > > Whalen
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > John,
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > It's exciting to see this 
> > > > > > > > > > > > > > > > > > > > > > > > > > KIP head
> > > > > > > in
> > > > > > > > > this
> > > > > > > > > > > > direction!  In
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > > > > > > > year
> > > > > > > > > > > > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > > > > > > > > > so I've tried to sketch out 
> > > > > > > > > > > > > > > > > > > > > > > > > > some
> > > > > > > > > usability
> > > > > > > > > > > > improvements for
> > > > > > > > > > > > > > > > > > > > > custom
> > > > > > > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > > > > > > > stores, and I also ended up
> > > > > > > splitting
> > > > > > > > > out the
> > > > > > > > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > ProcessorContext in an 
> > > > > > > > > > > > > > > > > > > > > > > > > > attempt to
> > > > > > > > > facilitate
> > > > > > > > > > > > what I was
> > > > > > > > > > > > > > > > > > > doing.  I
> > > > > > > > > > > > > > > > > > > > > > > sort of
> > > > > > > > > > > > > > > > > > > > > > > > > > abandoned it when I 
> > > > > > > > > > > > > > > > > > > > > > > > > > realized how
> > > > > > > large
> > > > > > > > > the
> > > > > > > > > > > > ideal change
> > > > > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > > > > > > to be,
> > > > > > > > > > > > > > > > > > > > > > > > > > but it's great to see that 
> > > > > > > > > > > > > > > > > > > > > > > > > > there
> > > > > is
> > > > > > > other
> > > > > > > > > > > > interest in
> > > > > > > > > > > > > > > > > > moving
> > > > > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > > > direction (from the folks 
> > > > > > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > matter :)
> > > > > > > > > ).
> > > > > > > > > > > > > > > > > > > > > > > > > > Having taken a stab at it 
> > > > > > > > > > > > > > > > > > > > > > > > > > myself,
> > > > > I
> > > > > > > have
> > > > > > > > > a
> > > > > > > > > > > > comment/question
> > > > > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > > > bullet
> > > > > > > > > > > > > > > > > > > > > > > > > > about StateStoreContext:
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > It does *not*  include 
> > > > > > > > > > > > > > > > > > > > > > > > > > anything
> > > > > > > > > processor- or
> > > > > > > > > > > > record-
> > > > > > > > > > > > > > > > > > > specific,
> > > > > > > > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > > > > > > > > > > > `forward()` or any 
> > > > > > > > > > > > > > > > > > > > > > > > > > > information
> > > > > > > about
> > > > > > > > > the
> > > > > > > > > > > > "current"
> > > > > > > > > > > > > > > > > > record,
> > > > > > > > > > > > > > > > > > > > > which is
> > > > > > > > > > > > > > > > > > > > > > > > > only a
> > > > > > > > > > > > > > > > > > > > > > > > > > > well-defined in the 
> > > > > > > > > > > > > > > > > > > > > > > > > > > context of
> > > > > the
> > > > > > > > > > > > Processor. Processors
> > > > > > > > > > > > > > > > > > > > > process
> > > > > > > > > > > > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > > > > > > > > > at a time, but state 
> > > > > > > > > > > > > > > > > > > > > > > > > > > stores may
> > > > > be
> > > > > > > > > used to
> > > > > > > > > > > > store and
> > > > > > > > > > > > > > > > > > fetch
> > > > > > > > > > > > > > > > > > > many
> > > > > > > > > > > > > > > > > > > > > > > > > records, so
> > > > > > > > > > > > > > > > > > > > > > > > > > > there is no "current 
> > > > > > > > > > > > > > > > > > > > > > > > > > > record".
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > I totally agree that
> > > > > > > record-specific or
> > > > > > > > > > > > processor-specific
> > > > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > > > in a
> > > > > > > > > > > > > > > > > > > > > > > > > > state store is often not
> > > > > > > well-defined
> > > > > > > > > and it
> > > > > > > > > > > > would be good
> > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > > > separate
> > > > > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > > out, but sometimes it (at 
> > > > > > > > > > > > > > > > > > > > > > > > > > least
> > > > > > > > > > > > record-specific context) is
> > > > > > > > > > > > > > > > > > > > > actually
> > > > > > > > > > > > > > > > > > > > > > > > > > useful, for example, 
> > > > > > > > > > > > > > > > > > > > > > > > > > passing the
> > > > > > > record's
> > > > > > > > > > > > timestamp through
> > > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > > > > underlying storage (or 
> > > > > > > > > > > > > > > > > > > > > > > > > > changelog
> > > > > > > topic):
> > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > > > > > > > > > > > > > > > > > > > You could have the writer 
> > > > > > > > > > > > > > > > > > > > > > > > > > client
> > > > > of
> > > > > > > the
> > > > > > > > > state
> > > > > > > > > > > > store pass
> > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > through,
> > > > > > > > > > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > > > > > > > > it would be nice to be able 
> > > > > > > > > > > > > > > > > > > > > > > > > > to
> > > > > write
> > > > > > > > > state
> > > > > > > > > > > > stores where the
> > > > > > > > > > > > > > > > > > > > > client
> > > > > > > > > > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > > > > > have this responsibility.  
> > > > > > > > > > > > > > > > > > > > > > > > > > I'm not
> > > > > > > sure
> > > > > > > > > if the
> > > > > > > > > > > > solution is
> > > > > > > > > > > > > > > > > > > to add
> > > > > > > > > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > > > > > > > things back to 
> > > > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext,
> > > > > or
> > > > > > > > > make yet
> > > > > > > > > > > > another
> > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > > > > represents record-specific 
> > > > > > > > > > > > > > > > > > > > > > > > > > context
> > > > > > > while
> > > > > > > > > > > > inside a state
> > > > > > > > > > > > > > > > > > > store.
> > > > > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 
> > > > > > > > > > > > > > > > > > > > > > > > > > PM
> > > > > John
> > > > > > > > > Roesler <
> > > > > > > > > > > > > > > > > > > j...@vvcephei.org>
> > > > > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > I've been slowly pushing 
> > > > > > > > > > > > > > > > > > > > > > > > > > > KIP-478
> > > > > > > > > forward
> > > > > > > > > > > > over the last
> > > > > > > > > > > > > > > > > > > year,
> > > > > > > > > > > > > > > > > > > > > > > > > > > and I'm happy to say that 
> > > > > > > > > > > > > > > > > > > > > > > > > > > we're
> > > > > > > making
> > > > > > > > > good
> > > > > > > > > > > > progress now.
> > > > > > > > > > > > > > > > > > > > > > > > > > > However, several issues 
> > > > > > > > > > > > > > > > > > > > > > > > > > > with the
> > > > > > > > > original
> > > > > > > > > > > > design have
> > > > > > > > > > > > > > > > > > come
> > > > > > > > > > > > > > > > > > > > > > > > > > > to light.
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > The major changes:
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > We discovered that the 
> > > > > > > > > > > > > > > > > > > > > > > > > > > original
> > > > > > > plan
> > > > > > > > > of just
> > > > > > > > > > > > adding
> > > > > > > > > > > > > > > > > > generic
> > > > > > > > > > > > > > > > > > > > > > > > > > > parameters to 
> > > > > > > > > > > > > > > > > > > > > > > > > > > ProcessorContext
> > > > > > > was too
> > > > > > > > > > > > disruptive, so we
> > > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > > > > > now adding a new
> > > > > > > api.ProcessorContext.
> > > > > > > > > > > > > > > > > > > > > > > > > > > That choice forces us to 
> > > > > > > > > > > > > > > > > > > > > > > > > > > add a
> > > > > new
> > > > > > > > > > > > StateStore.init method
> > > > > > > > > > > > > > > > > > > > > > > > > > > for the new context, but
> > > > > > > > > ProcessorContext
> > > > > > > > > > > > really isn't
> > > > > > > > > > > > > > > > > > > ideal
> > > > > > > > > > > > > > > > > > > > > > > > > > > for state stores to begin 
> > > > > > > > > > > > > > > > > > > > > > > > > > > with,
> > > > > > > so I'm
> > > > > > > > > > > > proposing a new
> > > > > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext for this
> > > > > > > purpose. In
> > > > > > > > > a
> > > > > > > > > > > > nutshell, there
> > > > > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > > > > > quite a few methods in
> > > > > > > > > ProcessorContext that
> > > > > > > > > > > > actually
> > > > > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > > > > > > > > never be called from 
> > > > > > > > > > > > > > > > > > > > > > > > > > > inside a
> > > > > > > > > StateStore.
> > > > > > > > > > > > > > > > > > > > > > > > > > > Also, since there is a new
> > > > > > > > > ProcessorContext
> > > > > > > > > > > > interface, we
> > > > > > > > > > > > > > > > > > > > > > > > > > > need a new 
> > > > > > > > > > > > > > > > > > > > > > > > > > > MockProcessorContext
> > > > > > > > > > > > implementation in the
> > > > > > > > > > > > > > > > > > test-
> > > > > > > > > > > > > > > > > > > > > > > > > > > utils module.
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > The changeset for the KIP
> > > > > > > document is
> > > > > > > > > here:
> > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > > > > > > > > > > > > > > > > > > > And the KIP itself is 
> > > > > > > > > > > > > > > > > > > > > > > > > > > here:
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > > > > > > > > > > > > > > > > > > > If you have any concerns, 
> > > > > > > > > > > > > > > > > > > > > > > > > > > please
> > > > > > > let
> > > > > > > > > me know!
> > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > > > > 

Reply via email to