>
> However, the record metadata is only defined when the parent forwards
> while processing a

real record, not when it calls forward from the punctuator


Can we take a step back for a second...why wouldn't you be required to set
the RecordContext
yourself when calling forward from a Punctuator? I think I agree with Paul
here, it seems kind of
absurd not to enforce that the RecordContext be present inside the
process() method.

The original problem with Punctuators, as I understood it, was that all of
the RecordContext
fields were exposed automatically to both the Processor and any Punctuator,
due to being
direct methods on the ProcessorContext. We can't control which
ProcessorContext methods
someone will call from with a Punctuator vs from a Processor. The best we
could do was
set these "nonsense" fields to null when inside a Punctuator, or set them
to some dummy
values as you pointed out.

But then you proposed the solution of a separate RecordContext which is not
attached to the
ProcessorContext at all. This seemed to solve the above problem very
neatly: we only pass
in the RecordContext to the process() method, so we don't have to worry
about people trying
to access these fields from within a Punctuator. The fields aren't
accessible unless they're
defined.

So what happens when someone wants to forward something from within a
Punctuator? I
don't think it's reasonable to let the timestamp field be undefined, ever.
What if the Punctuator
forwards directly to a sink, or directly to some windowing logic. Are we
supposed to add
handling for the RecordContext == null case to every processor? Or are we
just going to
assume the implicit restriction that users will only forward records from a
Punctuator to
downstream processors that know how to handle and/or set the RecordContext
if it's
undefined. That seems to throw away a lot of the awesome safety added in
this KIP

Apologies for the rant. But I feel pretty strongly that allowing to forward
records from a
Punctuator without a defined RecordContext would be asking for trouble.
Imo, if you
want to forward from a Punctuator, you need to store the info you need in
order to
set the timestamp, or make one up yourself

(the one alternative I can think of here is that maybe we could pass in the
current
partition time, so users can at least put in a reasonable estimate for the
timestamp
that won't cause it to get dropped and won't potentially lurch the
streamtime far into
the future. This would be similar to what we do in the TimestampExtractor)

On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org> wrote:

> Oh, I guess one other thing I should have mentioned is that I’ve recently
> discovered that in cases where the context is undefined, we currently just
> fill in dummy values for the context. So there’s a good chance that real
> applications in use are depending on undefined context without even
> realizing it. What I’m hoping to do is just make the situation explicit and
> get rid of the dummy values.
>
> Thanks,
> John
>
> On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> > Thanks for the review, Paul!
> >
> > I had read some of that debate before. There seems to be some subtext
> > there, because they advise against using Optional in cases like this,
> > but there doesn’t seem to be a specific reason why it’s inappropriate.
> > I got the impression they were just afraid that people would go
> > overboard and make everything Optional.
> >
> > I could also make two methods, but it seemed like it might be an
> > unfortunate way to handle the issue, since Processor is just about a
> > Function as-is, but the two-method approach would require people to
> > implement both methods.
> >
> > To your question, this is something that’s only recently became clear
> > to me. Imagine you have a parent processor that calls forward both from
> > process and a punctuator. The child will have process() invoked in both
> > cases, and won’t be able to distinguish them. However, the record
> > metadata is only defined when the parent forwards while processing a
> > real record, not when it calls forward from the punctuator.
> >
> > This is why I wanted to make the metadata Optional, to advertise that
> > the metadata might be undefined if any ancestor processor ever calls
> > forward from a punctuator. We could remove the Optional and instead
> > just document that the argument might be null.
> >
> > With that context in place, what’s your take?
> >
> > Thanks,
> > John
> >
> > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > > Looks pretty good to me, though the Processor#process(Record,
> > > Optional<RecordMetadata>) signature caught my eye.  There's some
> debate
> > > (
> > >
> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
> )
> > > about whether to use Optionals in arguments, and while that's a bit of
> a
> > > religious debate in the abstract, it did make me wonder whether it
> makes
> > > sense in this specific case.  When is it actually not present?  I was
> > > under
> > > the impression that we should always have access to it in process(),
> and
> > > that the concern about metadata being undefined was about having
> access
> > > to
> > > record metadata in the ProcessorContext held for use inside a
> > > Punctuator.
> > >
> > > If that's not the case and it is truly optional in process(), is there
> an
> > > opportunity for an alternate interface for the cases when we don't get
> it,
> > > rather than force the branching on implementers of the interface?
> > >
> > > Apologies if I've missed something, I took a look at the PR and I
> didn't
> > > see any spots where I thought it would be empty.  Perhaps an example
> of a
> > > Punctuator using (and not using) the new API would clear things up.
> > >
> > > Best,
> > > Paul
> > >
> > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler <vvcep...@apache.org>
> wrote:
> > >
> > > > Hello again, all,
> > > >
> > > > Thanks for the latest round of discussion. I've taken the
> > > > recent feedback and come up with an updated KIP that seems
> > > > actually quite a bit nicer than the prior proposal.
> > > >
> > > > The specific diff on the KIP is here:
> > > >
> > > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
> > > >
> > > > These changes are implemented in this POC PR:
> > > > https://github.com/apache/kafka/pull/9346
> > > >
> > > > The basic idea is that, building on the recent conversaion,
> > > > we would transition away from the current API where we get
> > > > only key/value in the process() method and other "data"
> > > > comes in the ProcessorContext along with the "metadata".
> > > >
> > > > Instead, we formalize what is "data" and what is "metadata",
> > > > and pass it all in to the process method:
> > > > Processor#process(Record, Optional<RecordMetadata>)
> > > >
> > > > Also, you forward the whole data class instead of mutating
> > > > the ProcessorContext fields and also calling forward:
> > > > ProcessorContext#forward(Record)
> > > >
> > > > The Record class itself ships with methods like
> > > > record#withValue(NewV newValue)
> > > > that make a shallow copy of the input Record, enabling
> > > > Processors to safely handle the record without polluting the
> > > > context of their parents and siblings.
> > > >
> > > > This proposal has a number of key benefits:
> > > > * As we've discovered in KAFKA-9584, it's unsafe to mutate
> > > > the Headers via the ProcessorContext. This proposal offers a
> > > > way to safely forward changes only to downstream processors.
> > > > * The new API has symmetry (each processor's input is the
> > > > output of its parent processor)
> > > > * The API makes clear that the record metadata isn't always
> > > > defined (for example, in a punctuation, there is no current
> > > > topic/partition/offset)
> > > > * The API enables punctuators to forward well defined
> > > > headers downstream, which is currently not possible.
> > > >
> > > > Unless their are objections, I'll go ahead and re-finalize
> > > > this KIP and update that PR to a mergeable state.
> > > >
> > > > Thanks, all,
> > > > -John
> > > >
> > > >
> > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> > > > > Interesting proposal. However, I am not totally convinced, because
> I see
> > > > > a fundamental difference between "data" and "metadata".
> > > > >
> > > > > Topic/partition/offset are "metadata" in the strong sense and they
> are
> > > > > immutable.
> > > > >
> > > > > On the other hand there is "primary" data like key and value, as
> well as
> > > > > "secondary" data like timestamp and headers. The issue seems that
> we
> > > > > treat "secondary data" more like metadata atm?
> > > > >
> > > > > Thus, promoting timestamp and headers into a first class citizen
> roll
> > > > > make sense to me (my original proposal about `RecordContext` would
> still
> > > > > fall short with this regard). However, putting both (data and
> metadata)
> > > > > into a `Record` abstraction might go too far?
> > > > >
> > > > > I am also a little bit concerned about `Record.copy()` because it
> might
> > > > > be a trap: Users might assume it does a full deep copy of the
> record,
> > > > > however, it would not. It would only create a new `Record` object
> as
> > > > > wrapper that points to the same key/value/header objects as the
> input
> > > > > record.
> > > > >
> > > > > With the current `context.forward(key, value)` we don't have this
> "deep
> > > > > copy" issue -- it's pretty clear what is happening.
> > > > >
> > > > > Instead of `To.all().withTimestamp()` we could also add
> > > > > `context.forward(key, value, timestamp)` etc (just wondering about
> the
> > > > > exposition in overload)?
> > > > >
> > > > > Also, `Record.withValue` etc sounds odd? Should a record not be
> > > > > immutable? So, we could have something like
> > > > >
> > > > >
> > > >
> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> > > > > But it looks rather verbose?
> > > > >
> > > > > The other question is of course, to what extend to we want to keep
> the
> > > > > distinction between "primary" and "secondary" data? To me, it's a
> > > > > question of easy of use?
> > > > >
> > > > > Just putting all this out to move the discussion forward. Don't
> have a
> > > > > concrete proposal atm.
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 9/14/20 9:24 AM, John Roesler wrote:
> > > > > > Thanks for this thought, Matthias!
> > > > > >
> > > > > > To be honest, it's bugged me quite a bit that _all_ the
> > > > > > record information hasn't been an argument to `process`. I
> > > > > > suppose I was trying to be conservative in this proposal,
> > > > > > but then again, if we're adding new Processor and
> > > > > > ProcessorContext interfaces, then this is the time to make
> > > > > > such a change.
> > > > > >
> > > > > > To be unambiguous, I think this is what we're talking about:
> > > > > > ProcessorContext:
> > > > > > * applicationId
> > > > > > * taskId
> > > > > > * appConfigs
> > > > > > * appConfigsWithPrefix
> > > > > > * keySerde
> > > > > > * valueSerde
> > > > > > * stateDir
> > > > > > * metrics
> > > > > > * schedule
> > > > > > * commit
> > > > > > * forward
> > > > > >
> > > > > > StateStoreContext:
> > > > > > * applicationId
> > > > > > * taskId
> > > > > > * appConfigs
> > > > > > * appConfigsWithPrefix
> > > > > > * keySerde
> > > > > > * valueSerde
> > > > > > * stateDir
> > > > > > * metrics
> > > > > > * register
> > > > > >
> > > > > >
> > > > > > RecordContext
> > > > > > * topic
> > > > > > * partition
> > > > > > * offset
> > > > > > * timestamp
> > > > > > * headers
> > > > > >
> > > > > >
> > > > > > Your proposal sounds good to me as-is. Just to cover the
> > > > > > bases, though, I'm wondering if we should push the idea just
> > > > > > a little farther. Instead of decomposing key,value,context,
> > > > > > we could just keep them all in one object, like this:
> > > > > >
> > > > > > Record:
> > > > > > * key
> > > > > > * value
> > > > > > * topic
> > > > > > * partition
> > > > > > * offset
> > > > > > * timestamp
> > > > > > * headers
> > > > > >
> > > > > > Then, we could have:
> > > > > > Processor#process(Record)
> > > > > > ProcessorContext#forward(Record, To)
> > > > > >
> > > > > > Viewed from this perspective, a record has three properties
> > > > > > that people may specify in their processors: key, value, and
> > > > > > timestamp.
> > > > > >
> > > > > > We could deprecate `To#withTimestamp` and enable people to
> > > > > > specify the timestamp along with the key and value when they
> > > > > > forward a record.
> > > > > >
> > > > > > E.g.,
> > > > > > RecordBuilder toForward = RecordBuilder.copy(record)
> > > > > > toForward.withKey(newKey)
> > > > > > toForward.withValue(newValue)
> > > > > > toForward.withTimestamp(newTimestamp)
> > > > > > Record newRecord = toForward.build()
> > > > > > context.forward(newRecord, To.child("child1"))
> > > > > >
> > > > > > Or, the more compact common case:
> > > > > > current:
> > > > > >  context.forward(key, "newValue")
> > > > > > proposed:
> > > > > >  context.forward(copy(record).withValue("newValue").build())
> > > > > >
> > > > > >
> > > > > > It's slightly more verbose, but also more extensible. This
> > > > > > would give us a clean path to add header support in PAPI as
> > > > > > well, simply by adding `withHeaders` in RecordBuilder.
> > > > > >
> > > > > > It's also more symmetrical, since the recipient of `forward`
> > > > > > would just get the sent `Record`. Whereas today, the sender
> > > > > > puts the timestamp in `To`, but the recipient gets in in its
> > > > > > own `ProcessorContext`.
> > > > > >
> > > > > > WDYT?
> > > > > > -John
> > > > > >
> > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
> > > > > > > I think separating the different contexts make sense.
> > > > > > >
> > > > > > > In fact, we could even go one step further and remove the
> record
> > > > context
> > > > > > > from the processor context completely and we add a third
> parameter to
> > > > > > > `process(key, value, recordContext)`. This would make it clear
> that
> > > > the
> > > > > > > context is for the input record only and it's not possible to
> pass
> > > > it to
> > > > > > > a `punctuate` callback.
> > > > > > >
> > > > > > > For the stores and changelogging: I think there are two cases.
> (1)
> > > > You
> > > > > > > use a plain key-value store. For this case, it seems you do
> not care
> > > > > > > about the timestamp and thus does not care what timestamp is
> set in
> > > > the
> > > > > > > changelog records. (We can set anything we want, as it's not
> > > > relevant at
> > > > > > > all -- the timestamp is ignored on read anyway.) (2) The other
> case
> > > > is,
> > > > > > > that one does care about timestamps, and for this case should
> use
> > > > > > > TimestampedKeyValueStore. The passed timestamp will be set on
> the
> > > > > > > changelog records for this case.
> > > > > > >
> > > > > > > Thus, for both cases, accessing the record context does not
> seems to
> > > > be
> > > > > > > a requirement. And providing access to the processor context
> to, eg.,
> > > > > > > `forward()` or similar seems safe.
> > > > > > >
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > > On 9/10/20 7:25 PM, John Roesler wrote:
> > > > > > > > Thanks for the reply, Paul!
> > > > > > > >
> > > > > > > > I certainly intend to make sure that the changelogging layer
> > > > > > > > continues to work the way it does now, by hook or by crook.
> > > > > > > > I think the easiest path for me is to just "cheat" and get
> > > > > > > > the real ProcessorContext into the ChangeLoggingStore
> > > > > > > > implementation somehow. I'll tag you on the PR when I create
> > > > > > > > it, so you have an opportunity to express a preference about
> > > > > > > > the implementation choice, and maybe even compile/test
> > > > > > > > against it to make sure your stuff still works.
> > > > > > > >
> > > > > > > > Regarding this:
> > > > > > > >
> > > > > > > > > we have an interest in making a state store with a richer
> > > > > > > > > way of querying its data (like perhaps getting all values
> > > > > > > > > associated with a secondary key), while still ultimately
> > > > > > > > > writing to the changelog topic for later restoration.
> > > > > > > >
> > > > > > > > This is very intriguing to me. On the side, I've been
> > > > > > > > preparing a couple of ideas related to this topic. I don't
> > > > > > > > think I have a coherent enough thought to even express it in
> > > > > > > > a Jira right now, but when I do, I'll tag you on it also to
> > > > > > > > see what you think.
> > > > > > > >
> > > > > > > > Whenever you're ready to share the usability improvement
> > > > > > > > ideas, I'm very interested to see what you've come up with.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
> > > > > > > > > > when you use a HashMap or RocksDB or other "state
> stores", you
> > > > don't
> > > > > > > > > > expect them to automatically know extra stuff about the
> record
> > > > you're
> > > > > > > > > > storing.
> > > > > > > > >
> > > > > > > > > So, I don't think there is any reason we *can't* retain the
> > > > record context
> > > > > > > > > > in the StateStoreContext, and if any users came along
> with a
> > > > clear use case
> > > > > > > > > > I'd find that convincing.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > I agree with the principle of being conservative with the
> > > > StateStoreContext
> > > > > > > > > API.  Regarding user expectations or a clear use case, the
> only
> > > > > > > > > counterpoint I would offer is that we sort of have that
> use case
> > > > already,
> > > > > > > > > which is the example I gave of the change logging store
> using the
> > > > > > > > > timestamp.  I am curious if this functionality will be
> retained
> > > > when using
> > > > > > > > > built in state stores, or will a low-level processor get a
> > > > KeyValueStore
> > > > > > > > > that no longer writes to the changelog topic with the
> record's
> > > > timestamp.
> > > > > > > > > While I personally don't care much about that functionality
> > > > specifically, I
> > > > > > > > > have a general desire for custom state stores to easily do
> the
> > > > things that
> > > > > > > > > built in state stores do.
> > > > > > > > >
> > > > > > > > > It genuinely did not occur to me that users might be
> looking up
> > > > and/or
> > > > > > > > > > updating records of other keys from within a Processor.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > I'm glad you said this Sophie, because it gives me an
> > > > opportunity to say
> > > > > > > > > that this is actually a *huge* use case for my team.  The
> state
> > > > store
> > > > > > > > > usability improvements I was referring to in my previous
> message
> > > > were about
> > > > > > > > > enabling the user to write custom stores while still easily
> > > > hooking into
> > > > > > > > > the ability to write to a changelog topic.  I think that is
> > > > technically
> > > > > > > > > possible now, but I don't think it's trivial.
> Specifically, we
> > > > have an
> > > > > > > > > interest in making a state store with a richer way of
> querying
> > > > its data
> > > > > > > > > (like perhaps getting all values associated with a
> secondary
> > > > key), while
> > > > > > > > > still ultimately writing to the changelog topic for later
> > > > restoration.
> > > > > > > > >
> > > > > > > > > We recognize that this use case throws away some of what
> kafka
> > > > streams
> > > > > > > > > (especially the DSL) is good at - easy parallelizability by
> > > > partitioning
> > > > > > > > > all processing by key - and that our business logic would
> > > > completely fall
> > > > > > > > > apart if we were consuming from multi-partition topics with
> > > > multiple
> > > > > > > > > consumers.  But we have found that using the low level
> processor
> > > > API is
> > > > > > > > > good for the very simple stream processing primitives it
> > > > provides: handling
> > > > > > > > > the plumbing of consuming from multiple kafka topics and
> > > > potentially
> > > > > > > > > updating persistent local state in a reliable way.  That in
> > > > itself has
> > > > > > > > > proven to be a worthwhile programming model.
> > > > > > > > >
> > > > > > > > > Since I got off track a bit, let me summarize: I don't
> > > > particularly care
> > > > > > > > > about the record context being available to state store
> > > > implementations,
> > > > > > > > > and I think this KIP is headed in the right direction in
> that
> > > > regard.  But
> > > > > > > > > more generally, I wanted to express the importance of
> > > > maintaining a
> > > > > > > > > powerful and flexible StateStore interface.
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Paul
> > > > > > > > >
> > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman <
> > > > sop...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Aha, I did misinterpret the example in your previous
> response
> > > > regarding the
> > > > > > > > > > range query after all. I thought you just meant a
> time-range
> > > > query inside a
> > > > > > > > > > punctuator. It genuinely did not occur to me that users
> might
> > > > be looking up
> > > > > > > > > > and/or updating records of other keys from within a
> Processor.
> > > > Sorry for
> > > > > > > > > > being closed minded
> > > > > > > > > >
> > > > > > > > > > I won't drag out this discussion any further by asking
> whether
> > > > that might
> > > > > > > > > > be
> > > > > > > > > > a valid use case or just a lurking bug in itself :)
> > > > > > > > > >
> > > > > > > > > > Thanks for humoring me. The current proposal for KIP-478
> > > > sounds good to me
> > > > > > > > > >
> > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <
> > > > vvcep...@apache.org> wrote:
> > > > > > > > > >
> > > > > > > > > > > Ah, thanks Sophie,
> > > > > > > > > > >
> > > > > > > > > > > I'm sorry for misinterpreting your resonse. Yes, we
> > > > > > > > > > > absolutely can and should clear the context before
> > > > > > > > > > > punctuating.
> > > > > > > > > > >
> > > > > > > > > > > My secondary concern is maybe more far-fetched. I was
> > > > > > > > > > > thinking that inside process(key,value), a Processor
> might
> > > > > > > > > > > do a get/put of a _different_ key. Consider, for
> example,
> > > > > > > > > > > the way that Suppress processors work. When they get a
> > > > > > > > > > > record, they add it to the store and then do a range
> scan
> > > > > > > > > > > and possibly forward a _different_ record. Of course,
> this
> > > > > > > > > > > is an operation that is deeply coupled to the
> internals, and
> > > > > > > > > > > the Suppress processor accordingly actually does get
> access
> > > > > > > > > > > to the internal context so that it can set the context
> > > > > > > > > > > before forwarding.
> > > > > > > > > > >
> > > > > > > > > > > Still, it seems like I've had a handful of
> conversations
> > > > > > > > > > > with people over the years in which they tell me they
> are
> > > > > > > > > > > using state stores in a way that transcends the "get
> and put
> > > > > > > > > > > the currently processing record" access pattern. I
> doubt
> > > > > > > > > > > that those folks would even have considered the
> possiblity
> > > > > > > > > > > that the currently processing record's _context_ could
> > > > > > > > > > > pollute their state store operations, as I myself
> never gave
> > > > > > > > > > > it a second thought until the current conversation
> began. In
> > > > > > > > > > > cases like that, we have actually set a trap for these
> > > > > > > > > > > people, and it seems better to dismantle the trap.
> > > > > > > > > > >
> > > > > > > > > > > As you noted, really the only people who would be
> negatively
> > > > > > > > > > > impacted are people who implement their own state
> stores.
> > > > > > > > > > > These folks will get the deprecation warning and try to
> > > > > > > > > > > adapt their stores to the new interface. If they needed
> > > > > > > > > > > access to the record context, they would find it's now
> > > > > > > > > > > missing. They'd ask us about it, and we'd have the
> ability
> > > > > > > > > > > to explain the lurking bug that they have had in their
> > > > > > > > > > > stores all along, as well as the new recommended
> pattern
> > > > > > > > > > > (just pass everything you need in the value). If that's
> > > > > > > > > > > unsatisfying, _then_ we should consider amending the
> API.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > -John
> > > > > > > > > > >
> > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > Regarding your first sentence, "...the processor
> would
> > > > null
> > > > > > > > > > > > > out the record context...", this is not possible,
> since
> > > > the
> > > > > > > > > > > > > processor doesn't have write access to the
> context. We
> > > > could
> > > > > > > > > > > > > add it,
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Sorry, this was poorly phrased, I definitely did not
> mean
> > > > to imply that
> > > > > > > > > > > we
> > > > > > > > > > > > should make the context modifiable by the Processors
> > > > themselves. I
> > > > > > > > > > meant
> > > > > > > > > > > > this should be handled by the internal processing
> > > > framework that deals
> > > > > > > > > > > with
> > > > > > > > > > > > passing records from one Processor to the next,
> setting
> > > > the record
> > > > > > > > > > > context
> > > > > > > > > > > > when a new record is picked up, invoking the
> punctuators,
> > > > etc. I
> > > > > > > > > > believe
> > > > > > > > > > > > this
> > > > > > > > > > > > all currently happens in the StreamTask? It already
> can
> > > > and does
> > > > > > > > > > > overwrite
> > > > > > > > > > > > the record context as new records are processed, and
> is
> > > > also
> > > > > > > > > > responsible
> > > > > > > > > > > > for calling the punctuators, so it doesn't seem like
> a
> > > > huge leap to
> > > > > > > > > > just
> > > > > > > > > > > say
> > > > > > > > > > > > "null out the current record before punctuating"
> > > > > > > > > > > >
> > > > > > > > > > > > To clarify, I was never advocating or even
> considering to
> > > > give the
> > > > > > > > > > > > Processors
> > > > > > > > > > > > write access to the record context. Sorry if my last
> > > > message (or all of
> > > > > > > > > > > > them)
> > > > > > > > > > > > was misleading. I just wanted to point out that the
> > > > punctuator concern
> > > > > > > > > > is
> > > > > > > > > > > > orthogonal to the question of whether we should
> include
> > > > the record
> > > > > > > > > > > context
> > > > > > > > > > > > in the StateStoreContext. It's definitely a real
> problem,
> > > > but it's a
> > > > > > > > > > > > problem
> > > > > > > > > > > > that exists at the Processor level and not just the
> > > > StateStore.
> > > > > > > > > > > >
> > > > > > > > > > > > So, I don't think there is any reason we *can't*
> retain
> > > > the record
> > > > > > > > > > > context
> > > > > > > > > > > > in the
> > > > > > > > > > > > StateStoreContext, and if any users came along with a
> > > > clear use case
> > > > > > > > > > I'd
> > > > > > > > > > > > find
> > > > > > > > > > > > that convincing. In the absence of any examples, the
> > > > conservative
> > > > > > > > > > > approach
> > > > > > > > > > > > sounds good to me.
> > > > > > > > > > > >
> > > > > > > > > > > > If it turns out that someone did need the record
> context
> > > > in their
> > > > > > > > > > custom
> > > > > > > > > > > > state
> > > > > > > > > > > > store, I'm sure they'll submit a politely worded bug
> > > > report alerting us
> > > > > > > > > > > > that we
> > > > > > > > > > > > broke their application.
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <
> > > > vvcep...@apache.org>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > Thanks, Sophie,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Yes, now that you point it out, I can see that the
> record
> > > > > > > > > > > > > context itself should be nulled out by Streams
> before
> > > > > > > > > > > > > invoking punctuators. From that perspective, we
> don't
> > > > need
> > > > > > > > > > > > > to think about the second-order problem of what's
> in the
> > > > > > > > > > > > > context for the state store when called from a
> > > > punctuator.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Regarding your first sentence, "...the processor
> would
> > > > null
> > > > > > > > > > > > > out the record context...", this is not possible,
> since
> > > > the
> > > > > > > > > > > > > processor doesn't have write access to the
> context. We
> > > > could
> > > > > > > > > > > > > add it, but then all kinds of strange effects
> would ensue
> > > > > > > > > > > > > when downstream processors execute but the context
> is
> > > > empty,
> > > > > > > > > > > > > etc. Better to just let the framework manage the
> record
> > > > > > > > > > > > > context and keep it read-only for Processors.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Reading between the lines of your last reply, it
> sounds
> > > > that
> > > > > > > > > > > > > the disconnect may just have been a mutual
> > > > misunderstanding
> > > > > > > > > > > > > about whether or not Processors currently have
> access to
> > > > set
> > > > > > > > > > > > > the record context. Since they do not, if we
> wanted to
> > > > add
> > > > > > > > > > > > > the record context to StateStoreContext in a
> well-defined
> > > > > > > > > > > > > way, we'd also have to add the ability for
> Processors to
> > > > > > > > > > > > > manipulate it. But then, we're just creating a
> > > > side-channel
> > > > > > > > > > > > > for Processors to pass some information in
> arguments to
> > > > > > > > > > > > > "put()" and other information implicitly through
> the
> > > > > > > > > > > > > context. It seems better just to go for a single
> channel
> > > > for
> > > > > > > > > > > > > now.
> > > > > > > > > > > > >
> > > > > > > > > > > > > It sounds like you're basically in favor of the
> > > > conservative
> > > > > > > > > > > > > approach, and you just wanted to understand the
> blockers
> > > > > > > > > > > > > that I implied. Does my clarification make sense?
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > -John
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie
> Blee-Goldman
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > I was just thinking that the processor would
> null out
> > > > the record
> > > > > > > > > > > context
> > > > > > > > > > > > > > after it
> > > > > > > > > > > > > > finished processing the record, so I'm not sure I
> > > > follow why this
> > > > > > > > > > > would
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > possible? AFAIK we never call a punctuator in the
> > > > middle of
> > > > > > > > > > > processing a
> > > > > > > > > > > > > > record through the topology, and even if we did,
> we
> > > > still know when
> > > > > > > > > > > it is
> > > > > > > > > > > > > > about
> > > > > > > > > > > > > > to be called and could set it to null beforehand.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I'm not trying to advocate for it here, I'm in
> > > > agreement that
> > > > > > > > > > > anything
> > > > > > > > > > > > > you
> > > > > > > > > > > > > > want
> > > > > > > > > > > > > > to access within the store can and should be
> accessed
> > > > within the
> > > > > > > > > > > calling
> > > > > > > > > > > > > > Processor/Punctuator before reaching the store.
> The
> > > > "we can always
> > > > > > > > > > > add it
> > > > > > > > > > > > > > later if necessary" argument is also pretty
> > > > convincing. Just trying
> > > > > > > > > > > to
> > > > > > > > > > > > > > understand
> > > > > > > > > > > > > > why this wouldn't be possible.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > FWIW, the question of "what is the current
> record in
> > > > the context
> > > > > > > > > > of a
> > > > > > > > > > > > > > Punctuator"
> > > > > > > > > > > > > > exists independently of whether we want to add
> this to
> > > > the
> > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > or not. The full ProcessorContext, including the
> > > > current record
> > > > > > > > > > > context,
> > > > > > > > > > > > > is
> > > > > > > > > > > > > > already available within a Punctuator, so
> removing the
> > > > current
> > > > > > > > > > record
> > > > > > > > > > > > > > context
> > > > > > > > > > > > > > from the StateStoreContext does not solve the
> problem.
> > > > Users can --
> > > > > > > > > > > and
> > > > > > > > > > > > > have
> > > > > > > > > > > > > > (see KAFKA-9584 <
> > > > https://issues.apache.org/jira/browse/KAFKA-9584
> > > > > > > > > > > ;;)
> > > > > > > > > > > --
> > > > > > > > > > > > > hit
> > > > > > > > > > > > > > such subtle bugs without ever invoking a
> StateStore
> > > > > > > > > > > > > > from their punctuator.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Again, I think I do agree that we should leave
> the
> > > > current record
> > > > > > > > > > > context
> > > > > > > > > > > > > > off of
> > > > > > > > > > > > > > the StateStoreContext, but I don't think the
> > > > Punctuator argument
> > > > > > > > > > > against
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > is
> > > > > > > > > > > > > > very convincing. It sounds to me like we need to
> > > > disallow access to
> > > > > > > > > > > the
> > > > > > > > > > > > > > current
> > > > > > > > > > > > > > record context from within the Punctuator,
> independent
> > > > of anything
> > > > > > > > > > > to do
> > > > > > > > > > > > > > with
> > > > > > > > > > > > > > state stores
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler <
> > > > vvcep...@apache.org>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > Thanks for the thoughts, Sophie.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I agree that the extra information could be
> useful.
> > > > My only
> > > > > > > > > > > concern is
> > > > > > > > > > > > > > > that it doesn’t seem like we can actually
> supply
> > > > that extra
> > > > > > > > > > > information
> > > > > > > > > > > > > > > correctly. So, then we have a situation where
> the
> > > > system offers
> > > > > > > > > > > useful
> > > > > > > > > > > > > API
> > > > > > > > > > > > > > > calls that are only correct in a narrow range
> of use
> > > > cases.
> > > > > > > > > > > Outside of
> > > > > > > > > > > > > > > those use cases, you get incorrect behavior.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > If it were possible to null out the context
> before
> > > > you put a
> > > > > > > > > > > document
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > which the context doesn’t apply, then the
> concern
> > > > would be
> > > > > > > > > > > mitigated.
> > > > > > > > > > > > > But
> > > > > > > > > > > > > > > it would still be pretty weird from the
> perspective
> > > > of the store
> > > > > > > > > > > that
> > > > > > > > > > > > > > > sometimes the context is populated and other
> times,
> > > > it’s null.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > But that seems moot, since it doesn’t seem
> possible
> > > > to null out
> > > > > > > > > > the
> > > > > > > > > > > > > > > context. Only the Processor could know whether
> it’s
> > > > about to put
> > > > > > > > > > a
> > > > > > > > > > > > > document
> > > > > > > > > > > > > > > different from the context or not. And it
> would be
> > > > inappropriate
> > > > > > > > > > to
> > > > > > > > > > > > > offer a
> > > > > > > > > > > > > > > public ProcessorContext api to manage the
> record
> > > > context.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Ultimately, it still seems like if you want to
> store
> > > > headers, you
> > > > > > > > > > > can
> > > > > > > > > > > > > > > store them explicitly, right? That doesn’t seem
> > > > onerous to me,
> > > > > > > > > > and
> > > > > > > > > > > it
> > > > > > > > > > > > > kind
> > > > > > > > > > > > > > > of seems better than relying on undefined or
> > > > asymmetrical
> > > > > > > > > > behavior
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > store itself.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t solve
> these
> > > > problems.
> > > > > > > > > > Just
> > > > > > > > > > > > > that it
> > > > > > > > > > > > > > > seems a little that we can be conservative and
> avoid
> > > > them for
> > > > > > > > > > now.
> > > > > > > > > > > If
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > turns out we really need to solve them, we can
> > > > always do it
> > > > > > > > > > later.
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie
> Blee-Goldman
> > > > wrote:
> > > > > > > > > > > > > > > > > If you were to call "put" from a
> punctuator, or
> > > > do a
> > > > > > > > > > > > > > > > > `range()` query and then update one of
> those
> > > > records with
> > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug on
> your
> > > > hands.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Can you elaborate on this a bit? I agree
> that the
> > > > punctuator
> > > > > > > > > > > case is
> > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > obvious exemption to the assumption that
> store
> > > > invocations
> > > > > > > > > > always
> > > > > > > > > > > > > > > > have a corresponding "current record", but I
> don't
> > > > understand
> > > > > > > > > > the
> > > > > > > > > > > > > > > > second example. Are you envisioning a
> scenario
> > > > where the
> > > > > > > > > > #process
> > > > > > > > > > > > > > > > method performs a range query and then
> updates
> > > > records? Or were
> > > > > > > > > > > > > > > > you just giving another example of the
> punctuator
> > > > case?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I only bring it up because I agree that the
> > > > current record
> > > > > > > > > > > > > information
> > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > still be useful within the context of the
> store.
> > > > As a non-user
> > > > > > > > > > my
> > > > > > > > > > > > > input
> > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > definitely has limited value, but it just
> isn't
> > > > striking me as
> > > > > > > > > > > > > obvious
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > should remove access to the current record
> context
> > > > from the
> > > > > > > > > > state
> > > > > > > > > > > > > stores.
> > > > > > > > > > > > > > > > If there is no current record, as in the
> > > > punctuator case, we
> > > > > > > > > > > should
> > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > set
> > > > > > > > > > > > > > > > the record context to null (or
> Optional.empty,
> > > > etc).
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > That said, the put() always has to come from
> > > > somewhere, and
> > > > > > > > > > that
> > > > > > > > > > > > > > > > somewhere is always going to be either a
> Processor
> > > > or a
> > > > > > > > > > > Punctuator,
> > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > of which will still have access to the full
> > > > context. So
> > > > > > > > > > > additional
> > > > > > > > > > > > > info
> > > > > > > > > > > > > > > > such as
> > > > > > > > > > > > > > > > the timestamp can and should probably be
> supplied
> > > > to the store
> > > > > > > > > > > before
> > > > > > > > > > > > > > > > calling put(), rather than looked up by the
> store.
> > > > But I can
> > > > > > > > > > see
> > > > > > > > > > > some
> > > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > things being useful, for example the current
> > > > record's headers.
> > > > > > > > > > > Maybe
> > > > > > > > > > > > > > > if/when
> > > > > > > > > > > > > > > > we add better (or any) support for headers in
> > > > state stores this
> > > > > > > > > > > will
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > less true.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Of course as John has made clear, it's
> pretty hard
> > > > to judge
> > > > > > > > > > > without
> > > > > > > > > > > > > > > > examples
> > > > > > > > > > > > > > > > and more insight as to what actually goes on
> > > > within a custom
> > > > > > > > > > > state
> > > > > > > > > > > > > store
> > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler <
> > > > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > Hi Paul,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > It's good to hear from you!
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I'm glad you're in favor of the direction.
> > > > Especially when
> > > > > > > > > > > > > > > > > it comes to public API and usability
> concens, I
> > > > tend to
> > > > > > > > > > > > > > > > > think that "the folks who matter" are
> actually
> > > > the folks who
> > > > > > > > > > > > > > > > > have to use the APIs to accomplish real
> tasks.
> > > > It can be
> > > > > > > > > > > > > > > > > hard for me to be sure I'm thinking
> clearly from
> > > > that
> > > > > > > > > > > > > > > > > perspective.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Funny story, I also started down this road
> a
> > > > couple of times
> > > > > > > > > > > > > > > > > already and backed them out before the KIP
> > > > because I was
> > > > > > > > > > > > > > > > > afraid of the scope of the proposal.
> > > > Unfortunately, needing
> > > > > > > > > > > > > > > > > to make a new ProcessorContext kind of
> forced my
> > > > hand.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I see you've called me out about the
> > > > ChangeLogging stores :)
> > > > > > > > > > > > > > > > > In fact, I think these are the main/only
> reason
> > > > that stores
> > > > > > > > > > > > > > > > > might really need to invoke "forward()". My
> > > > secret plan was
> > > > > > > > > > > > > > > > > to cheat and either accomplish
> change-logging by
> > > > a different
> > > > > > > > > > > > > > > > > mechanism than implementing the store
> interface,
> > > > or by just
> > > > > > > > > > > > > > > > > breaking encapsulation to sneak the "real"
> > > > ProcessorContext
> > > > > > > > > > > > > > > > > into the ChangeLogging stores. But those
> are all
> > > > > > > > > > > > > > > > > implementation details. I think the key
> question
> > > > is whether
> > > > > > > > > > > > > > > > > anyone else has a store implementation that
> > > > needs to call
> > > > > > > > > > > > > > > > > "forward()". It's not what you mentioned,
> but
> > > > since you
> > > > > > > > > > > > > > > > > spoke up, I'll just ask: if you have a use
> case
> > > > for calling
> > > > > > > > > > > > > > > > > "forward()" in a store, please share it.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Regarding the other record-specific context
> > > > methods, I think
> > > > > > > > > > > > > > > > > you have a good point, but I also can't
> quite
> > > > wrap my head
> > > > > > > > > > > > > > > > > around how we can actually guarantee it to
> work
> > > > in general.
> > > > > > > > > > > > > > > > > For example, the case you cited, where the
> > > > implementation of
> > > > > > > > > > > > > > > > > `KeyValueStore#put(key, value)` uses the
> context
> > > > to augment
> > > > > > > > > > > > > > > > > the record with timestamp information. This
> > > > relies on the
> > > > > > > > > > > > > > > > > assumption that you would only call
> "put()" from
> > > > inside a
> > > > > > > > > > > > > > > > > `Processor#process(key, value)` call in
> which
> > > > the record
> > > > > > > > > > > > > > > > > being processed is the same record that
> you're
> > > > trying to put
> > > > > > > > > > > > > > > > > into the store.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > If you were to call "put" from a
> punctuator, or
> > > > do a
> > > > > > > > > > > > > > > > > `range()` query and then update one of
> those
> > > > records with
> > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug on
> your
> > > > hands. Right
> > > > > > > > > > > > > > > > > now, the Streams component that actually
> calls
> > > > the Processor
> > > > > > > > > > > > > > > > > takes care to set the right record context
> > > > before invoking
> > > > > > > > > > > > > > > > > the method, and in the case of caching,
> etc., it
> > > > also takes
> > > > > > > > > > > > > > > > > care to swap out the old context and keep
> it
> > > > somewhere safe.
> > > > > > > > > > > > > > > > > But when it comes to public API Processors
> > > > calling methods
> > > > > > > > > > > > > > > > > on StateStores, there's no opportunity for
> any
> > > > component to
> > > > > > > > > > > > > > > > > make sure the context is always correct.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > In the face of that situation, it seemed
> better
> > > > to just move
> > > > > > > > > > > > > > > > > in the direction of a "normal" data store.
> I.e.,
> > > > when you
> > > > > > > > > > > > > > > > > use a HashMap or RocksDB or other "state
> > > > stores", you don't
> > > > > > > > > > > > > > > > > expect them to automatically know extra
> stuff
> > > > about the
> > > > > > > > > > > > > > > > > record you're storing. If you need them to
> know
> > > > something,
> > > > > > > > > > > > > > > > > you just put it in the value.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > All of that said, I'm just reasoning from
> first
> > > > principles
> > > > > > > > > > > > > > > > > here. To really know if this is a mistake
> or
> > > > not, I need to
> > > > > > > > > > > > > > > > > be in your place. So please push back if
> you
> > > > think what I
> > > > > > > > > > > > > > > > > said is nonsense. My personal plan was to
> keep
> > > > an eye out
> > > > > > > > > > > > > > > > > during the period where the old API was
> still
> > > > present, but
> > > > > > > > > > > > > > > > > deprecated, to see if people were
> struggling to
> > > > use the new
> > > > > > > > > > > > > > > > > API. If so, then we'd have a chance to
> address
> > > > it before
> > > > > > > > > > > > > > > > > dropping the old API. But it's even better
> if
> > > > you can help
> > > > > > > > > > > > > > > > > think it through now.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > It did also cross my mind to _not_ add the
> > > > > > > > > > > > > > > > > StateStoreContext, but just to continue to
> punt
> > > > on the
> > > > > > > > > > > > > > > > > question by just dropping in the new
> > > > ProcessorContext to the
> > > > > > > > > > > > > > > > > new init method. If StateStoreContext
> seems too
> > > > bold, we can
> > > > > > > > > > > > > > > > > go that direction. But if we actually add
> some
> > > > methods to
> > > > > > > > > > > > > > > > > StateStoreContext, I'd like to be able to
> ensure
> > > > they would
> > > > > > > > > > > > > > > > > be well defined. I think the current
> situation
> > > > was more of
> > > > > > > > > > > > > > > > > an oversight than a choice.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks again for your reply,
> > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul
> Whalen
> > > > wrote:
> > > > > > > > > > > > > > > > > > John,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > It's exciting to see this KIP head in
> this
> > > > direction!  In
> > > > > > > > > > the
> > > > > > > > > > > > > last
> > > > > > > > > > > > > > > year
> > > > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > so I've tried to sketch out some
> usability
> > > > improvements for
> > > > > > > > > > > > > custom
> > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > stores, and I also ended up splitting
> out the
> > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > from
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > ProcessorContext in an attempt to
> facilitate
> > > > what I was
> > > > > > > > > > > doing.  I
> > > > > > > > > > > > > > > sort of
> > > > > > > > > > > > > > > > > > abandoned it when I realized how large
> the
> > > > ideal change
> > > > > > > > > > might
> > > > > > > > > > > > > have
> > > > > > > > > > > > > > > to be,
> > > > > > > > > > > > > > > > > > but it's great to see that there is other
> > > > interest in
> > > > > > > > > > moving
> > > > > > > > > > > in
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > direction (from the folks that matter :)
> ).
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Having taken a stab at it myself, I have
> a
> > > > comment/question
> > > > > > > > > > > on
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > bullet
> > > > > > > > > > > > > > > > > > about StateStoreContext:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > It does *not*  include anything
> processor- or
> > > > record-
> > > > > > > > > > > specific,
> > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > > > `forward()` or any information about
> the
> > > > "current"
> > > > > > > > > > record,
> > > > > > > > > > > > > which is
> > > > > > > > > > > > > > > > > only a
> > > > > > > > > > > > > > > > > > > well-defined in the context of the
> > > > Processor. Processors
> > > > > > > > > > > > > process
> > > > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > at a time, but state stores may be
> used to
> > > > store and
> > > > > > > > > > fetch
> > > > > > > > > > > many
> > > > > > > > > > > > > > > > > records, so
> > > > > > > > > > > > > > > > > > > there is no "current record".
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I totally agree that record-specific or
> > > > processor-specific
> > > > > > > > > > > > > context
> > > > > > > > > > > > > > > in a
> > > > > > > > > > > > > > > > > > state store is often not well-defined
> and it
> > > > would be good
> > > > > > > > > > to
> > > > > > > > > > > > > > > separate
> > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > out, but sometimes it (at least
> > > > record-specific context) is
> > > > > > > > > > > > > actually
> > > > > > > > > > > > > > > > > > useful, for example, passing the record's
> > > > timestamp through
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > underlying storage (or changelog topic):
> > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > >
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > > > > > > > > > > > You could have the writer client of the
> state
> > > > store pass
> > > > > > > > > > this
> > > > > > > > > > > > > > > through,
> > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > it would be nice to be able to write
> state
> > > > stores where the
> > > > > > > > > > > > > client
> > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > have this responsibility.  I'm not sure
> if the
> > > > solution is
> > > > > > > > > > > to add
> > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > things back to StateStoreContext, or
> make yet
> > > > another
> > > > > > > > > > context
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > represents record-specific context while
> > > > inside a state
> > > > > > > > > > > store.
> > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John
> Roesler <
> > > > > > > > > > > j...@vvcephei.org>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478
> forward
> > > > over the last
> > > > > > > > > > > year,
> > > > > > > > > > > > > > > > > > > and I'm happy to say that we're making
> good
> > > > progress now.
> > > > > > > > > > > > > > > > > > > However, several issues with the
> original
> > > > design have
> > > > > > > > > > come
> > > > > > > > > > > > > > > > > > > to light.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > The major changes:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > We discovered that the original plan
> of just
> > > > adding
> > > > > > > > > > generic
> > > > > > > > > > > > > > > > > > > parameters to ProcessorContext was too
> > > > disruptive, so we
> > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > now adding a new api.ProcessorContext.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > That choice forces us to add a new
> > > > StateStore.init method
> > > > > > > > > > > > > > > > > > > for the new context, but
> ProcessorContext
> > > > really isn't
> > > > > > > > > > > ideal
> > > > > > > > > > > > > > > > > > > for state stores to begin with, so I'm
> > > > proposing a new
> > > > > > > > > > > > > > > > > > > StateStoreContext for this purpose. In
> a
> > > > nutshell, there
> > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > quite a few methods in
> ProcessorContext that
> > > > actually
> > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > never be called from inside a
> StateStore.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Also, since there is a new
> ProcessorContext
> > > > interface, we
> > > > > > > > > > > > > > > > > > > need a new MockProcessorContext
> > > > implementation in the
> > > > > > > > > > test-
> > > > > > > > > > > > > > > > > > > utils module.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > The changeset for the KIP document is
> here:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > >
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > > > > > > > > > > > And the KIP itself is here:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > >
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > > > > > > > > > > > If you have any concerns, please let
> me know!
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to