Hello again, all,

Thanks for the latest round of discussion. I've taken the
recent feedback and come up with an updated KIP that seems
actually quite a bit nicer than the prior proposal.

The specific diff on the KIP is here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14

These changes are implemented in this POC PR:
https://github.com/apache/kafka/pull/9346

The basic idea is that, building on the recent conversaion,
we would transition away from the current API where we get
only key/value in the process() method and other "data"
comes in the ProcessorContext along with the "metadata".

Instead, we formalize what is "data" and what is "metadata",
and pass it all in to the process method:
Processor#process(Record, Optional<RecordMetadata>)

Also, you forward the whole data class instead of mutating
the ProcessorContext fields and also calling forward:
ProcessorContext#forward(Record)

The Record class itself ships with methods like
record#withValue(NewV newValue)
that make a shallow copy of the input Record, enabling
Processors to safely handle the record without polluting the
context of their parents and siblings.

This proposal has a number of key benefits:
* As we've discovered in KAFKA-9584, it's unsafe to mutate
the Headers via the ProcessorContext. This proposal offers a
way to safely forward changes only to downstream processors.
* The new API has symmetry (each processor's input is the
output of its parent processor)
* The API makes clear that the record metadata isn't always
defined (for example, in a punctuation, there is no current
topic/partition/offset)
* The API enables punctuators to forward well defined
headers downstream, which is currently not possible.

Unless their are objections, I'll go ahead and re-finalize
this KIP and update that PR to a mergeable state.

Thanks, all,
-John


On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> Interesting proposal. However, I am not totally convinced, because I see
> a fundamental difference between "data" and "metadata".
> 
> Topic/partition/offset are "metadata" in the strong sense and they are
> immutable.
> 
> On the other hand there is "primary" data like key and value, as well as
> "secondary" data like timestamp and headers. The issue seems that we
> treat "secondary data" more like metadata atm?
> 
> Thus, promoting timestamp and headers into a first class citizen roll
> make sense to me (my original proposal about `RecordContext` would still
> fall short with this regard). However, putting both (data and metadata)
> into a `Record` abstraction might go too far?
> 
> I am also a little bit concerned about `Record.copy()` because it might
> be a trap: Users might assume it does a full deep copy of the record,
> however, it would not. It would only create a new `Record` object as
> wrapper that points to the same key/value/header objects as the input
> record.
> 
> With the current `context.forward(key, value)` we don't have this "deep
> copy" issue -- it's pretty clear what is happening.
> 
> Instead of `To.all().withTimestamp()` we could also add
> `context.forward(key, value, timestamp)` etc (just wondering about the
> exposition in overload)?
> 
> Also, `Record.withValue` etc sounds odd? Should a record not be
> immutable? So, we could have something like
> 
> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> But it looks rather verbose?
> 
> The other question is of course, to what extend to we want to keep the
> distinction between "primary" and "secondary" data? To me, it's a
> question of easy of use?
> 
> Just putting all this out to move the discussion forward. Don't have a
> concrete proposal atm.
> 
> 
> -Matthias
> 
> 
> On 9/14/20 9:24 AM, John Roesler wrote:
> > Thanks for this thought, Matthias!
> > 
> > To be honest, it's bugged me quite a bit that _all_ the
> > record information hasn't been an argument to `process`. I
> > suppose I was trying to be conservative in this proposal,
> > but then again, if we're adding new Processor and
> > ProcessorContext interfaces, then this is the time to make
> > such a change.
> > 
> > To be unambiguous, I think this is what we're talking about:
> > ProcessorContext:
> > * applicationId
> > * taskId
> > * appConfigs
> > * appConfigsWithPrefix
> > * keySerde
> > * valueSerde
> > * stateDir
> > * metrics
> > * schedule
> > * commit
> > * forward
> > 
> > StateStoreContext:
> > * applicationId
> > * taskId
> > * appConfigs
> > * appConfigsWithPrefix
> > * keySerde
> > * valueSerde
> > * stateDir
> > * metrics
> > * register
> > 
> > 
> > RecordContext
> > * topic
> > * partition
> > * offset
> > * timestamp
> > * headers
> > 
> > 
> > Your proposal sounds good to me as-is. Just to cover the
> > bases, though, I'm wondering if we should push the idea just
> > a little farther. Instead of decomposing key,value,context,
> > we could just keep them all in one object, like this:
> > 
> > Record:
> > * key
> > * value
> > * topic
> > * partition
> > * offset
> > * timestamp
> > * headers
> > 
> > Then, we could have:
> > Processor#process(Record)
> > ProcessorContext#forward(Record, To)
> > 
> > Viewed from this perspective, a record has three properties
> > that people may specify in their processors: key, value, and
> > timestamp.
> > 
> > We could deprecate `To#withTimestamp` and enable people to
> > specify the timestamp along with the key and value when they
> > forward a record.
> > 
> > E.g.,
> > RecordBuilder toForward = RecordBuilder.copy(record)
> > toForward.withKey(newKey)
> > toForward.withValue(newValue)
> > toForward.withTimestamp(newTimestamp)
> > Record newRecord = toForward.build()
> > context.forward(newRecord, To.child("child1"))
> > 
> > Or, the more compact common case:
> > current:
> >  context.forward(key, "newValue")
> > proposed:
> >  context.forward(copy(record).withValue("newValue").build())
> > 
> > 
> > It's slightly more verbose, but also more extensible. This
> > would give us a clean path to add header support in PAPI as
> > well, simply by adding `withHeaders` in RecordBuilder.
> > 
> > It's also more symmetrical, since the recipient of `forward`
> > would just get the sent `Record`. Whereas today, the sender
> > puts the timestamp in `To`, but the recipient gets in in its
> > own `ProcessorContext`.
> > 
> > WDYT?
> > -John
> > 
> > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
> > > I think separating the different contexts make sense.
> > > 
> > > In fact, we could even go one step further and remove the record context
> > > from the processor context completely and we add a third parameter to
> > > `process(key, value, recordContext)`. This would make it clear that the
> > > context is for the input record only and it's not possible to pass it to
> > > a `punctuate` callback.
> > > 
> > > For the stores and changelogging: I think there are two cases. (1) You
> > > use a plain key-value store. For this case, it seems you do not care
> > > about the timestamp and thus does not care what timestamp is set in the
> > > changelog records. (We can set anything we want, as it's not relevant at
> > > all -- the timestamp is ignored on read anyway.) (2) The other case is,
> > > that one does care about timestamps, and for this case should use
> > > TimestampedKeyValueStore. The passed timestamp will be set on the
> > > changelog records for this case.
> > > 
> > > Thus, for both cases, accessing the record context does not seems to be
> > > a requirement. And providing access to the processor context to, eg.,
> > > `forward()` or similar seems safe.
> > > 
> > > 
> > > -Matthias
> > > 
> > > On 9/10/20 7:25 PM, John Roesler wrote:
> > > > Thanks for the reply, Paul!
> > > > 
> > > > I certainly intend to make sure that the changelogging layer
> > > > continues to work the way it does now, by hook or by crook.
> > > > I think the easiest path for me is to just "cheat" and get
> > > > the real ProcessorContext into the ChangeLoggingStore
> > > > implementation somehow. I'll tag you on the PR when I create
> > > > it, so you have an opportunity to express a preference about
> > > > the implementation choice, and maybe even compile/test
> > > > against it to make sure your stuff still works.
> > > > 
> > > > Regarding this:
> > > > 
> > > > > we have an interest in making a state store with a richer
> > > > > way of querying its data (like perhaps getting all values
> > > > > associated with a secondary key), while still ultimately
> > > > > writing to the changelog topic for later restoration.
> > > > 
> > > > This is very intriguing to me. On the side, I've been
> > > > preparing a couple of ideas related to this topic. I don't
> > > > think I have a coherent enough thought to even express it in
> > > > a Jira right now, but when I do, I'll tag you on it also to
> > > > see what you think.
> > > > 
> > > > Whenever you're ready to share the usability improvement
> > > > ideas, I'm very interested to see what you've come up with.
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
> > > > > > when you use a HashMap or RocksDB or other "state stores", you don't
> > > > > > expect them to automatically know extra stuff about the record 
> > > > > > you're
> > > > > > storing.
> > > > > 
> > > > > So, I don't think there is any reason we *can't* retain the record 
> > > > > context
> > > > > > in the StateStoreContext, and if any users came along with a clear 
> > > > > > use case
> > > > > > I'd find that convincing.
> > > > > > 
> > > > > 
> > > > > I agree with the principle of being conservative with the 
> > > > > StateStoreContext
> > > > > API.  Regarding user expectations or a clear use case, the only
> > > > > counterpoint I would offer is that we sort of have that use case 
> > > > > already,
> > > > > which is the example I gave of the change logging store using the
> > > > > timestamp.  I am curious if this functionality will be retained when 
> > > > > using
> > > > > built in state stores, or will a low-level processor get a 
> > > > > KeyValueStore
> > > > > that no longer writes to the changelog topic with the record's 
> > > > > timestamp.
> > > > > While I personally don't care much about that functionality 
> > > > > specifically, I
> > > > > have a general desire for custom state stores to easily do the things 
> > > > > that
> > > > > built in state stores do.
> > > > > 
> > > > > It genuinely did not occur to me that users might be looking up and/or
> > > > > > updating records of other keys from within a Processor.
> > > > > > 
> > > > > 
> > > > > I'm glad you said this Sophie, because it gives me an opportunity to 
> > > > > say
> > > > > that this is actually a *huge* use case for my team.  The state store
> > > > > usability improvements I was referring to in my previous message were 
> > > > > about
> > > > > enabling the user to write custom stores while still easily hooking 
> > > > > into
> > > > > the ability to write to a changelog topic.  I think that is 
> > > > > technically
> > > > > possible now, but I don't think it's trivial.  Specifically, we have 
> > > > > an
> > > > > interest in making a state store with a richer way of querying its 
> > > > > data
> > > > > (like perhaps getting all values associated with a secondary key), 
> > > > > while
> > > > > still ultimately writing to the changelog topic for later restoration.
> > > > > 
> > > > > We recognize that this use case throws away some of what kafka streams
> > > > > (especially the DSL) is good at - easy parallelizability by 
> > > > > partitioning
> > > > > all processing by key - and that our business logic would completely 
> > > > > fall
> > > > > apart if we were consuming from multi-partition topics with multiple
> > > > > consumers.  But we have found that using the low level processor API 
> > > > > is
> > > > > good for the very simple stream processing primitives it provides: 
> > > > > handling
> > > > > the plumbing of consuming from multiple kafka topics and potentially
> > > > > updating persistent local state in a reliable way.  That in itself has
> > > > > proven to be a worthwhile programming model.
> > > > > 
> > > > > Since I got off track a bit, let me summarize: I don't particularly 
> > > > > care
> > > > > about the record context being available to state store 
> > > > > implementations,
> > > > > and I think this KIP is headed in the right direction in that regard. 
> > > > >  But
> > > > > more generally, I wanted to express the importance of maintaining a
> > > > > powerful and flexible StateStore interface.
> > > > > 
> > > > > Thanks!
> > > > > Paul
> > > > > 
> > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman 
> > > > > <sop...@confluent.io>
> > > > > wrote:
> > > > > 
> > > > > > Aha, I did misinterpret the example in your previous response 
> > > > > > regarding the
> > > > > > range query after all. I thought you just meant a time-range query 
> > > > > > inside a
> > > > > > punctuator. It genuinely did not occur to me that users might be 
> > > > > > looking up
> > > > > > and/or updating records of other keys from within a Processor. 
> > > > > > Sorry for
> > > > > > being closed minded
> > > > > > 
> > > > > > I won't drag out this discussion any further by asking whether that 
> > > > > > might
> > > > > > be
> > > > > > a valid use case or just a lurking bug in itself :)
> > > > > > 
> > > > > > Thanks for humoring me. The current proposal for KIP-478 sounds 
> > > > > > good to me
> > > > > > 
> > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <vvcep...@apache.org> 
> > > > > > wrote:
> > > > > > 
> > > > > > > Ah, thanks Sophie,
> > > > > > > 
> > > > > > > I'm sorry for misinterpreting your resonse. Yes, we
> > > > > > > absolutely can and should clear the context before
> > > > > > > punctuating.
> > > > > > > 
> > > > > > > My secondary concern is maybe more far-fetched. I was
> > > > > > > thinking that inside process(key,value), a Processor might
> > > > > > > do a get/put of a _different_ key. Consider, for example,
> > > > > > > the way that Suppress processors work. When they get a
> > > > > > > record, they add it to the store and then do a range scan
> > > > > > > and possibly forward a _different_ record. Of course, this
> > > > > > > is an operation that is deeply coupled to the internals, and
> > > > > > > the Suppress processor accordingly actually does get access
> > > > > > > to the internal context so that it can set the context
> > > > > > > before forwarding.
> > > > > > > 
> > > > > > > Still, it seems like I've had a handful of conversations
> > > > > > > with people over the years in which they tell me they are
> > > > > > > using state stores in a way that transcends the "get and put
> > > > > > > the currently processing record" access pattern. I doubt
> > > > > > > that those folks would even have considered the possiblity
> > > > > > > that the currently processing record's _context_ could
> > > > > > > pollute their state store operations, as I myself never gave
> > > > > > > it a second thought until the current conversation began. In
> > > > > > > cases like that, we have actually set a trap for these
> > > > > > > people, and it seems better to dismantle the trap.
> > > > > > > 
> > > > > > > As you noted, really the only people who would be negatively
> > > > > > > impacted are people who implement their own state stores.
> > > > > > > These folks will get the deprecation warning and try to
> > > > > > > adapt their stores to the new interface. If they needed
> > > > > > > access to the record context, they would find it's now
> > > > > > > missing. They'd ask us about it, and we'd have the ability
> > > > > > > to explain the lurking bug that they have had in their
> > > > > > > stores all along, as well as the new recommended pattern
> > > > > > > (just pass everything you need in the value). If that's
> > > > > > > unsatisfying, _then_ we should consider amending the API.
> > > > > > > 
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > > 
> > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman
> > > > > > > wrote:
> > > > > > > > > Regarding your first sentence, "...the processor would null
> > > > > > > > > out the record context...", this is not possible, since the
> > > > > > > > > processor doesn't have write access to the context. We could
> > > > > > > > > add it,
> > > > > > > > > 
> > > > > > > > 
> > > > > > > > Sorry, this was poorly phrased, I definitely did not mean to 
> > > > > > > > imply that
> > > > > > > we
> > > > > > > > should make the context modifiable by the Processors 
> > > > > > > > themselves. I
> > > > > > meant
> > > > > > > > this should be handled by the internal processing framework 
> > > > > > > > that deals
> > > > > > > with
> > > > > > > > passing records from one Processor to the next, setting the 
> > > > > > > > record
> > > > > > > context
> > > > > > > > when a new record is picked up, invoking the punctuators, etc. I
> > > > > > believe
> > > > > > > > this
> > > > > > > > all currently happens in the StreamTask? It already can and does
> > > > > > > overwrite
> > > > > > > > the record context as new records are processed, and is also
> > > > > > responsible
> > > > > > > > for calling the punctuators, so it doesn't seem like a huge 
> > > > > > > > leap to
> > > > > > just
> > > > > > > say
> > > > > > > > "null out the current record before punctuating"
> > > > > > > > 
> > > > > > > > To clarify, I was never advocating or even considering to give 
> > > > > > > > the
> > > > > > > > Processors
> > > > > > > > write access to the record context. Sorry if my last message 
> > > > > > > > (or all of
> > > > > > > > them)
> > > > > > > > was misleading. I just wanted to point out that the punctuator 
> > > > > > > > concern
> > > > > > is
> > > > > > > > orthogonal to the question of whether we should include the 
> > > > > > > > record
> > > > > > > context
> > > > > > > > in the StateStoreContext. It's definitely a real problem, but 
> > > > > > > > it's a
> > > > > > > > problem
> > > > > > > > that exists at the Processor level and not just the StateStore.
> > > > > > > > 
> > > > > > > > So, I don't think there is any reason we *can't* retain the 
> > > > > > > > record
> > > > > > > context
> > > > > > > > in the
> > > > > > > > StateStoreContext, and if any users came along with a clear use 
> > > > > > > > case
> > > > > > I'd
> > > > > > > > find
> > > > > > > > that convincing. In the absence of any examples, the 
> > > > > > > > conservative
> > > > > > > approach
> > > > > > > > sounds good to me.
> > > > > > > > 
> > > > > > > > If it turns out that someone did need the record context in 
> > > > > > > > their
> > > > > > custom
> > > > > > > > state
> > > > > > > > store, I'm sure they'll submit a politely worded bug report 
> > > > > > > > alerting us
> > > > > > > > that we
> > > > > > > > broke their application.
> > > > > > > > 
> > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler 
> > > > > > > > <vvcep...@apache.org>
> > > > > > > wrote:
> > > > > > > > > Thanks, Sophie,
> > > > > > > > > 
> > > > > > > > > Yes, now that you point it out, I can see that the record
> > > > > > > > > context itself should be nulled out by Streams before
> > > > > > > > > invoking punctuators. From that perspective, we don't need
> > > > > > > > > to think about the second-order problem of what's in the
> > > > > > > > > context for the state store when called from a punctuator.
> > > > > > > > > 
> > > > > > > > > Regarding your first sentence, "...the processor would null
> > > > > > > > > out the record context...", this is not possible, since the
> > > > > > > > > processor doesn't have write access to the context. We could
> > > > > > > > > add it, but then all kinds of strange effects would ensue
> > > > > > > > > when downstream processors execute but the context is empty,
> > > > > > > > > etc. Better to just let the framework manage the record
> > > > > > > > > context and keep it read-only for Processors.
> > > > > > > > > 
> > > > > > > > > Reading between the lines of your last reply, it sounds that
> > > > > > > > > the disconnect may just have been a mutual misunderstanding
> > > > > > > > > about whether or not Processors currently have access to set
> > > > > > > > > the record context. Since they do not, if we wanted to add
> > > > > > > > > the record context to StateStoreContext in a well-defined
> > > > > > > > > way, we'd also have to add the ability for Processors to
> > > > > > > > > manipulate it. But then, we're just creating a side-channel
> > > > > > > > > for Processors to pass some information in arguments to
> > > > > > > > > "put()" and other information implicitly through the
> > > > > > > > > context. It seems better just to go for a single channel for
> > > > > > > > > now.
> > > > > > > > > 
> > > > > > > > > It sounds like you're basically in favor of the conservative
> > > > > > > > > approach, and you just wanted to understand the blockers
> > > > > > > > > that I implied. Does my clarification make sense?
> > > > > > > > > 
> > > > > > > > > Thanks,
> > > > > > > > > -John
> > > > > > > > > 
> > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman
> > > > > > > > > wrote:
> > > > > > > > > > I was just thinking that the processor would null out the 
> > > > > > > > > > record
> > > > > > > context
> > > > > > > > > > after it
> > > > > > > > > > finished processing the record, so I'm not sure I follow 
> > > > > > > > > > why this
> > > > > > > would
> > > > > > > > > not
> > > > > > > > > > be
> > > > > > > > > > possible? AFAIK we never call a punctuator in the middle of
> > > > > > > processing a
> > > > > > > > > > record through the topology, and even if we did, we still 
> > > > > > > > > > know when
> > > > > > > it is
> > > > > > > > > > about
> > > > > > > > > > to be called and could set it to null beforehand.
> > > > > > > > > > 
> > > > > > > > > > I'm not trying to advocate for it here, I'm in agreement 
> > > > > > > > > > that
> > > > > > > anything
> > > > > > > > > you
> > > > > > > > > > want
> > > > > > > > > > to access within the store can and should be accessed 
> > > > > > > > > > within the
> > > > > > > calling
> > > > > > > > > > Processor/Punctuator before reaching the store. The "we can 
> > > > > > > > > > always
> > > > > > > add it
> > > > > > > > > > later if necessary" argument is also pretty convincing. 
> > > > > > > > > > Just trying
> > > > > > > to
> > > > > > > > > > understand
> > > > > > > > > > why this wouldn't be possible.
> > > > > > > > > > 
> > > > > > > > > > FWIW, the question of "what is the current record in the 
> > > > > > > > > > context
> > > > > > of a
> > > > > > > > > > Punctuator"
> > > > > > > > > > exists independently of whether we want to add this to the
> > > > > > > > > StateStoreContext
> > > > > > > > > > or not. The full ProcessorContext, including the current 
> > > > > > > > > > record
> > > > > > > context,
> > > > > > > > > is
> > > > > > > > > > already available within a Punctuator, so removing the 
> > > > > > > > > > current
> > > > > > record
> > > > > > > > > > context
> > > > > > > > > > from the StateStoreContext does not solve the problem. 
> > > > > > > > > > Users can --
> > > > > > > and
> > > > > > > > > have
> > > > > > > > > > (see KAFKA-9584 
> > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-9584
> > > > > > > ;;)
> > > > > > > --
> > > > > > > > > hit
> > > > > > > > > > such subtle bugs without ever invoking a StateStore
> > > > > > > > > > from their punctuator.
> > > > > > > > > > 
> > > > > > > > > > Again, I think I do agree that we should leave the current 
> > > > > > > > > > record
> > > > > > > context
> > > > > > > > > > off of
> > > > > > > > > > the StateStoreContext, but I don't think the Punctuator 
> > > > > > > > > > argument
> > > > > > > against
> > > > > > > > > it
> > > > > > > > > > is
> > > > > > > > > > very convincing. It sounds to me like we need to disallow 
> > > > > > > > > > access to
> > > > > > > the
> > > > > > > > > > current
> > > > > > > > > > record context from within the Punctuator, independent of 
> > > > > > > > > > anything
> > > > > > > to do
> > > > > > > > > > with
> > > > > > > > > > state stores
> > > > > > > > > > 
> > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler 
> > > > > > > > > > <vvcep...@apache.org>
> > > > > > > > > wrote:
> > > > > > > > > > > Thanks for the thoughts, Sophie.
> > > > > > > > > > > 
> > > > > > > > > > > I agree that the extra information could be useful. My 
> > > > > > > > > > > only
> > > > > > > concern is
> > > > > > > > > > > that it doesn’t seem like we can actually supply that 
> > > > > > > > > > > extra
> > > > > > > information
> > > > > > > > > > > correctly. So, then we have a situation where the system 
> > > > > > > > > > > offers
> > > > > > > useful
> > > > > > > > > API
> > > > > > > > > > > calls that are only correct in a narrow range of use 
> > > > > > > > > > > cases.
> > > > > > > Outside of
> > > > > > > > > > > those use cases, you get incorrect behavior.
> > > > > > > > > > > 
> > > > > > > > > > > If it were possible to null out the context before you 
> > > > > > > > > > > put a
> > > > > > > document
> > > > > > > > > to
> > > > > > > > > > > which the context doesn’t apply, then the concern would be
> > > > > > > mitigated.
> > > > > > > > > But
> > > > > > > > > > > it would still be pretty weird from the perspective of 
> > > > > > > > > > > the store
> > > > > > > that
> > > > > > > > > > > sometimes the context is populated and other times, it’s 
> > > > > > > > > > > null.
> > > > > > > > > > > 
> > > > > > > > > > > But that seems moot, since it doesn’t seem possible to 
> > > > > > > > > > > null out
> > > > > > the
> > > > > > > > > > > context. Only the Processor could know whether it’s about 
> > > > > > > > > > > to put
> > > > > > a
> > > > > > > > > document
> > > > > > > > > > > different from the context or not. And it would be 
> > > > > > > > > > > inappropriate
> > > > > > to
> > > > > > > > > offer a
> > > > > > > > > > > public ProcessorContext api to manage the record context.
> > > > > > > > > > > 
> > > > > > > > > > > Ultimately, it still seems like if you want to store 
> > > > > > > > > > > headers, you
> > > > > > > can
> > > > > > > > > > > store them explicitly, right? That doesn’t seem onerous 
> > > > > > > > > > > to me,
> > > > > > and
> > > > > > > it
> > > > > > > > > kind
> > > > > > > > > > > of seems better than relying on undefined or asymmetrical
> > > > > > behavior
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > > > store itself.
> > > > > > > > > > > 
> > > > > > > > > > > Anyway, I’m not saying that we couldn’t solve these 
> > > > > > > > > > > problems.
> > > > > > Just
> > > > > > > > > that it
> > > > > > > > > > > seems a little that we can be conservative and avoid them 
> > > > > > > > > > > for
> > > > > > now.
> > > > > > > If
> > > > > > > > > it
> > > > > > > > > > > turns out we really need to solve them, we can always do 
> > > > > > > > > > > it
> > > > > > later.
> > > > > > > > > > > Thanks,
> > > > > > > > > > > John
> > > > > > > > > > > 
> > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote:
> > > > > > > > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > > > > > > > `range()` query and then update one of those records 
> > > > > > > > > > > > > with
> > > > > > > > > > > > > `put()`, you'd have a very subtle bug on your hands.
> > > > > > > > > > > > 
> > > > > > > > > > > > Can you elaborate on this a bit? I agree that the 
> > > > > > > > > > > > punctuator
> > > > > > > case is
> > > > > > > > > an
> > > > > > > > > > > > obvious exemption to the assumption that store 
> > > > > > > > > > > > invocations
> > > > > > always
> > > > > > > > > > > > have a corresponding "current record", but I don't 
> > > > > > > > > > > > understand
> > > > > > the
> > > > > > > > > > > > second example. Are you envisioning a scenario where the
> > > > > > #process
> > > > > > > > > > > > method performs a range query and then updates records? 
> > > > > > > > > > > > Or were
> > > > > > > > > > > > you just giving another example of the punctuator case?
> > > > > > > > > > > > 
> > > > > > > > > > > > I only bring it up because I agree that the current 
> > > > > > > > > > > > record
> > > > > > > > > information
> > > > > > > > > > > could
> > > > > > > > > > > > still be useful within the context of the store. As a 
> > > > > > > > > > > > non-user
> > > > > > my
> > > > > > > > > input
> > > > > > > > > > > on
> > > > > > > > > > > > this
> > > > > > > > > > > > definitely has limited value, but it just isn't 
> > > > > > > > > > > > striking me as
> > > > > > > > > obvious
> > > > > > > > > > > that
> > > > > > > > > > > > we
> > > > > > > > > > > > should remove access to the current record context from 
> > > > > > > > > > > > the
> > > > > > state
> > > > > > > > > stores.
> > > > > > > > > > > > If there is no current record, as in the  punctuator 
> > > > > > > > > > > > case, we
> > > > > > > should
> > > > > > > > > just
> > > > > > > > > > > > set
> > > > > > > > > > > > the record context to null (or Optional.empty, etc).
> > > > > > > > > > > > 
> > > > > > > > > > > > That said, the put() always has to come from somewhere, 
> > > > > > > > > > > > and
> > > > > > that
> > > > > > > > > > > > somewhere is always going to be either a Processor or a
> > > > > > > Punctuator,
> > > > > > > > > both
> > > > > > > > > > > > of which will still have access to the full context. So
> > > > > > > additional
> > > > > > > > > info
> > > > > > > > > > > > such as
> > > > > > > > > > > > the timestamp can and should probably be supplied to 
> > > > > > > > > > > > the store
> > > > > > > before
> > > > > > > > > > > > calling put(), rather than looked up by the store. But 
> > > > > > > > > > > > I can
> > > > > > see
> > > > > > > some
> > > > > > > > > > > other
> > > > > > > > > > > > things being useful, for example the current record's 
> > > > > > > > > > > > headers.
> > > > > > > Maybe
> > > > > > > > > > > if/when
> > > > > > > > > > > > we add better (or any) support for headers in state 
> > > > > > > > > > > > stores this
> > > > > > > will
> > > > > > > > > be
> > > > > > > > > > > > less true.
> > > > > > > > > > > > 
> > > > > > > > > > > > Of course as John has made clear, it's pretty hard to 
> > > > > > > > > > > > judge
> > > > > > > without
> > > > > > > > > > > > examples
> > > > > > > > > > > > and more insight as to what actually goes on within a 
> > > > > > > > > > > > custom
> > > > > > > state
> > > > > > > > > store
> > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler <
> > > > > > vvcep...@apache.org
> > > > > > > > > wrote:
> > > > > > > > > > > > > Hi Paul,
> > > > > > > > > > > > > 
> > > > > > > > > > > > > It's good to hear from you!
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I'm glad you're in favor of the direction. Especially 
> > > > > > > > > > > > > when
> > > > > > > > > > > > > it comes to public API and usability concens, I tend 
> > > > > > > > > > > > > to
> > > > > > > > > > > > > think that "the folks who matter" are actually the 
> > > > > > > > > > > > > folks who
> > > > > > > > > > > > > have to use the APIs to accomplish real tasks. It can 
> > > > > > > > > > > > > be
> > > > > > > > > > > > > hard for me to be sure I'm thinking clearly from that
> > > > > > > > > > > > > perspective.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Funny story, I also started down this road a couple 
> > > > > > > > > > > > > of times
> > > > > > > > > > > > > already and backed them out before the KIP because I 
> > > > > > > > > > > > > was
> > > > > > > > > > > > > afraid of the scope of the proposal. Unfortunately, 
> > > > > > > > > > > > > needing
> > > > > > > > > > > > > to make a new ProcessorContext kind of forced my hand.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I see you've called me out about the ChangeLogging 
> > > > > > > > > > > > > stores :)
> > > > > > > > > > > > > In fact, I think these are the main/only reason that 
> > > > > > > > > > > > > stores
> > > > > > > > > > > > > might really need to invoke "forward()". My secret 
> > > > > > > > > > > > > plan was
> > > > > > > > > > > > > to cheat and either accomplish change-logging by a 
> > > > > > > > > > > > > different
> > > > > > > > > > > > > mechanism than implementing the store interface, or 
> > > > > > > > > > > > > by just
> > > > > > > > > > > > > breaking encapsulation to sneak the "real" 
> > > > > > > > > > > > > ProcessorContext
> > > > > > > > > > > > > into the ChangeLogging stores. But those are all
> > > > > > > > > > > > > implementation details. I think the key question is 
> > > > > > > > > > > > > whether
> > > > > > > > > > > > > anyone else has a store implementation that needs to 
> > > > > > > > > > > > > call
> > > > > > > > > > > > > "forward()". It's not what you mentioned, but since 
> > > > > > > > > > > > > you
> > > > > > > > > > > > > spoke up, I'll just ask: if you have a use case for 
> > > > > > > > > > > > > calling
> > > > > > > > > > > > > "forward()" in a store, please share it.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Regarding the other record-specific context methods, 
> > > > > > > > > > > > > I think
> > > > > > > > > > > > > you have a good point, but I also can't quite wrap my 
> > > > > > > > > > > > > head
> > > > > > > > > > > > > around how we can actually guarantee it to work in 
> > > > > > > > > > > > > general.
> > > > > > > > > > > > > For example, the case you cited, where the 
> > > > > > > > > > > > > implementation of
> > > > > > > > > > > > > `KeyValueStore#put(key, value)` uses the context to 
> > > > > > > > > > > > > augment
> > > > > > > > > > > > > the record with timestamp information. This relies on 
> > > > > > > > > > > > > the
> > > > > > > > > > > > > assumption that you would only call "put()" from 
> > > > > > > > > > > > > inside a
> > > > > > > > > > > > > `Processor#process(key, value)` call in which the 
> > > > > > > > > > > > > record
> > > > > > > > > > > > > being processed is the same record that you're trying 
> > > > > > > > > > > > > to put
> > > > > > > > > > > > > into the store.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > > > > > > > `range()` query and then update one of those records 
> > > > > > > > > > > > > with
> > > > > > > > > > > > > `put()`, you'd have a very subtle bug on your hands. 
> > > > > > > > > > > > > Right
> > > > > > > > > > > > > now, the Streams component that actually calls the 
> > > > > > > > > > > > > Processor
> > > > > > > > > > > > > takes care to set the right record context before 
> > > > > > > > > > > > > invoking
> > > > > > > > > > > > > the method, and in the case of caching, etc., it also 
> > > > > > > > > > > > > takes
> > > > > > > > > > > > > care to swap out the old context and keep it 
> > > > > > > > > > > > > somewhere safe.
> > > > > > > > > > > > > But when it comes to public API Processors calling 
> > > > > > > > > > > > > methods
> > > > > > > > > > > > > on StateStores, there's no opportunity for any 
> > > > > > > > > > > > > component to
> > > > > > > > > > > > > make sure the context is always correct.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > In the face of that situation, it seemed better to 
> > > > > > > > > > > > > just move
> > > > > > > > > > > > > in the direction of a "normal" data store. I.e., when 
> > > > > > > > > > > > > you
> > > > > > > > > > > > > use a HashMap or RocksDB or other "state stores", you 
> > > > > > > > > > > > > don't
> > > > > > > > > > > > > expect them to automatically know extra stuff about 
> > > > > > > > > > > > > the
> > > > > > > > > > > > > record you're storing. If you need them to know 
> > > > > > > > > > > > > something,
> > > > > > > > > > > > > you just put it in the value.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > All of that said, I'm just reasoning from first 
> > > > > > > > > > > > > principles
> > > > > > > > > > > > > here. To really know if this is a mistake or not, I 
> > > > > > > > > > > > > need to
> > > > > > > > > > > > > be in your place. So please push back if you think 
> > > > > > > > > > > > > what I
> > > > > > > > > > > > > said is nonsense. My personal plan was to keep an eye 
> > > > > > > > > > > > > out
> > > > > > > > > > > > > during the period where the old API was still 
> > > > > > > > > > > > > present, but
> > > > > > > > > > > > > deprecated, to see if people were struggling to use 
> > > > > > > > > > > > > the new
> > > > > > > > > > > > > API. If so, then we'd have a chance to address it 
> > > > > > > > > > > > > before
> > > > > > > > > > > > > dropping the old API. But it's even better if you can 
> > > > > > > > > > > > > help
> > > > > > > > > > > > > think it through now.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > It did also cross my mind to _not_ add the
> > > > > > > > > > > > > StateStoreContext, but just to continue to punt on the
> > > > > > > > > > > > > question by just dropping in the new ProcessorContext 
> > > > > > > > > > > > > to the
> > > > > > > > > > > > > new init method. If StateStoreContext seems too bold, 
> > > > > > > > > > > > > we can
> > > > > > > > > > > > > go that direction. But if we actually add some 
> > > > > > > > > > > > > methods to
> > > > > > > > > > > > > StateStoreContext, I'd like to be able to ensure they 
> > > > > > > > > > > > > would
> > > > > > > > > > > > > be well defined. I think the current situation was 
> > > > > > > > > > > > > more of
> > > > > > > > > > > > > an oversight than a choice.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Thanks again for your reply,
> > > > > > > > > > > > > -John
> > > > > > > > > > > > > 
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote:
> > > > > > > > > > > > > > John,
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > It's exciting to see this KIP head in this 
> > > > > > > > > > > > > > direction!  In
> > > > > > the
> > > > > > > > > last
> > > > > > > > > > > year
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > so I've tried to sketch out some usability 
> > > > > > > > > > > > > > improvements for
> > > > > > > > > custom
> > > > > > > > > > > state
> > > > > > > > > > > > > > stores, and I also ended up splitting out the
> > > > > > > StateStoreContext
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > > > ProcessorContext in an attempt to facilitate what I 
> > > > > > > > > > > > > > was
> > > > > > > doing.  I
> > > > > > > > > > > sort of
> > > > > > > > > > > > > > abandoned it when I realized how large the ideal 
> > > > > > > > > > > > > > change
> > > > > > might
> > > > > > > > > have
> > > > > > > > > > > to be,
> > > > > > > > > > > > > > but it's great to see that there is other interest 
> > > > > > > > > > > > > > in
> > > > > > moving
> > > > > > > in
> > > > > > > > > this
> > > > > > > > > > > > > > direction (from the folks that matter :) ).
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Having taken a stab at it myself, I have a 
> > > > > > > > > > > > > > comment/question
> > > > > > > on
> > > > > > > > > this
> > > > > > > > > > > > > bullet
> > > > > > > > > > > > > > about StateStoreContext:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > It does *not*  include anything processor- or 
> > > > > > > > > > > > > > record-
> > > > > > > specific,
> > > > > > > > > like
> > > > > > > > > > > > > > > `forward()` or any information about the "current"
> > > > > > record,
> > > > > > > > > which is
> > > > > > > > > > > > > only a
> > > > > > > > > > > > > > > well-defined in the context of the Processor. 
> > > > > > > > > > > > > > > Processors
> > > > > > > > > process
> > > > > > > > > > > one
> > > > > > > > > > > > > record
> > > > > > > > > > > > > > > at a time, but state stores may be used to store 
> > > > > > > > > > > > > > > and
> > > > > > fetch
> > > > > > > many
> > > > > > > > > > > > > records, so
> > > > > > > > > > > > > > > there is no "current record".
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > I totally agree that record-specific or 
> > > > > > > > > > > > > > processor-specific
> > > > > > > > > context
> > > > > > > > > > > in a
> > > > > > > > > > > > > > state store is often not well-defined and it would 
> > > > > > > > > > > > > > be good
> > > > > > to
> > > > > > > > > > > separate
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > out, but sometimes it (at least record-specific 
> > > > > > > > > > > > > > context) is
> > > > > > > > > actually
> > > > > > > > > > > > > > useful, for example, passing the record's timestamp 
> > > > > > > > > > > > > > through
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > > > > > underlying storage (or changelog topic):
> > > > > > > > > > > > > > 
> > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > > > > > > > You could have the writer client of the state store 
> > > > > > > > > > > > > > pass
> > > > > > this
> > > > > > > > > > > through,
> > > > > > > > > > > > > but
> > > > > > > > > > > > > > it would be nice to be able to write state stores 
> > > > > > > > > > > > > > where the
> > > > > > > > > client
> > > > > > > > > > > did
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > have this responsibility.  I'm not sure if the 
> > > > > > > > > > > > > > solution is
> > > > > > > to add
> > > > > > > > > > > some
> > > > > > > > > > > > > > things back to StateStoreContext, or make yet 
> > > > > > > > > > > > > > another
> > > > > > context
> > > > > > > > > that
> > > > > > > > > > > > > > represents record-specific context while inside a 
> > > > > > > > > > > > > > state
> > > > > > > store.
> > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John Roesler <
> > > > > > > j...@vvcephei.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > I've been slowly pushing KIP-478 forward over the 
> > > > > > > > > > > > > > > last
> > > > > > > year,
> > > > > > > > > > > > > > > and I'm happy to say that we're making good 
> > > > > > > > > > > > > > > progress now.
> > > > > > > > > > > > > > > However, several issues with the original design 
> > > > > > > > > > > > > > > have
> > > > > > come
> > > > > > > > > > > > > > > to light.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > The major changes:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > We discovered that the original plan of just 
> > > > > > > > > > > > > > > adding
> > > > > > generic
> > > > > > > > > > > > > > > parameters to ProcessorContext was too 
> > > > > > > > > > > > > > > disruptive, so we
> > > > > > > are
> > > > > > > > > > > > > > > now adding a new api.ProcessorContext.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > That choice forces us to add a new 
> > > > > > > > > > > > > > > StateStore.init method
> > > > > > > > > > > > > > > for the new context, but ProcessorContext really 
> > > > > > > > > > > > > > > isn't
> > > > > > > ideal
> > > > > > > > > > > > > > > for state stores to begin with, so I'm proposing 
> > > > > > > > > > > > > > > a new
> > > > > > > > > > > > > > > StateStoreContext for this purpose. In a 
> > > > > > > > > > > > > > > nutshell, there
> > > > > > > are
> > > > > > > > > > > > > > > quite a few methods in ProcessorContext that 
> > > > > > > > > > > > > > > actually
> > > > > > > should
> > > > > > > > > > > > > > > never be called from inside a StateStore.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Also, since there is a new ProcessorContext 
> > > > > > > > > > > > > > > interface, we
> > > > > > > > > > > > > > > need a new MockProcessorContext implementation in 
> > > > > > > > > > > > > > > the
> > > > > > test-
> > > > > > > > > > > > > > > utils module.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > The changeset for the KIP document is here:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > > > > > > > And the KIP itself is here:
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > > > > > > > If you have any concerns, please let me know!
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > 

Reply via email to