Thanks for the conversation, Sophie! Sorry for the ambiguity
I introduced into it, though.

Thanks,
-John

On Thu, 2020-09-10 at 16:10 -0700, Sophie Blee-Goldman
wrote:
> Aha, I did misinterpret the example in your previous response regarding the
> range query after all. I thought you just meant a time-range query inside a
> punctuator. It genuinely did not occur to me that users might be looking up
> and/or updating records of other keys from within a Processor. Sorry for
> being closed minded
> 
> I won't drag out this discussion any further by asking whether that might be
> a valid use case or just a lurking bug in itself :)
> 
> Thanks for humoring me. The current proposal for KIP-478 sounds good to me
> 
> On Thu, Sep 10, 2020 at 3:43 PM John Roesler <vvcep...@apache.org> wrote:
> 
> > Ah, thanks Sophie,
> > 
> > I'm sorry for misinterpreting your resonse. Yes, we
> > absolutely can and should clear the context before
> > punctuating.
> > 
> > My secondary concern is maybe more far-fetched. I was
> > thinking that inside process(key,value), a Processor might
> > do a get/put of a _different_ key. Consider, for example,
> > the way that Suppress processors work. When they get a
> > record, they add it to the store and then do a range scan
> > and possibly forward a _different_ record. Of course, this
> > is an operation that is deeply coupled to the internals, and
> > the Suppress processor accordingly actually does get access
> > to the internal context so that it can set the context
> > before forwarding.
> > 
> > Still, it seems like I've had a handful of conversations
> > with people over the years in which they tell me they are
> > using state stores in a way that transcends the "get and put
> > the currently processing record" access pattern. I doubt
> > that those folks would even have considered the possiblity
> > that the currently processing record's _context_ could
> > pollute their state store operations, as I myself never gave
> > it a second thought until the current conversation began. In
> > cases like that, we have actually set a trap for these
> > people, and it seems better to dismantle the trap.
> > 
> > As you noted, really the only people who would be negatively
> > impacted are people who implement their own state stores.
> > These folks will get the deprecation warning and try to
> > adapt their stores to the new interface. If they needed
> > access to the record context, they would find it's now
> > missing. They'd ask us about it, and we'd have the ability
> > to explain the lurking bug that they have had in their
> > stores all along, as well as the new recommended pattern
> > (just pass everything you need in the value). If that's
> > unsatisfying, _then_ we should consider amending the API.
> > 
> > Thanks,
> > -John
> > 
> > On Thu, 2020-09-10 at 15:21 -0700, Sophie Blee-Goldman
> > wrote:
> > > > Regarding your first sentence, "...the processor would null
> > > > out the record context...", this is not possible, since the
> > > > processor doesn't have write access to the context. We could
> > > > add it,
> > > > 
> > > 
> > > Sorry, this was poorly phrased, I definitely did not mean to imply that
> > we
> > > should make the context modifiable by the Processors themselves. I meant
> > > this should be handled by the internal processing framework that deals
> > with
> > > passing records from one Processor to the next, setting the record
> > context
> > > when a new record is picked up, invoking the punctuators, etc. I believe
> > > this
> > > all currently happens in the StreamTask? It already can and does
> > overwrite
> > > the record context as new records are processed, and is also responsible
> > > for calling the punctuators, so it doesn't seem like a huge leap to just
> > say
> > > "null out the current record before punctuating"
> > > 
> > > To clarify, I was never advocating or even considering to give the
> > > Processors
> > > write access to the record context. Sorry if my last message (or all of
> > > them)
> > > was misleading. I just wanted to point out that the punctuator concern is
> > > orthogonal to the question of whether we should include the record
> > context
> > > in the StateStoreContext. It's definitely a real problem, but it's a
> > > problem
> > > that exists at the Processor level and not just the StateStore.
> > > 
> > > So, I don't think there is any reason we *can't* retain the record
> > context
> > > in the
> > > StateStoreContext, and if any users came along with a clear use case I'd
> > > find
> > > that convincing. In the absence of any examples, the conservative
> > approach
> > > sounds good to me.
> > > 
> > > If it turns out that someone did need the record context in their custom
> > > state
> > > store, I'm sure they'll submit a politely worded bug report alerting us
> > > that we
> > > broke their application.
> > > 
> > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > > > Thanks, Sophie,
> > > > 
> > > > Yes, now that you point it out, I can see that the record
> > > > context itself should be nulled out by Streams before
> > > > invoking punctuators. From that perspective, we don't need
> > > > to think about the second-order problem of what's in the
> > > > context for the state store when called from a punctuator.
> > > > 
> > > > Regarding your first sentence, "...the processor would null
> > > > out the record context...", this is not possible, since the
> > > > processor doesn't have write access to the context. We could
> > > > add it, but then all kinds of strange effects would ensue
> > > > when downstream processors execute but the context is empty,
> > > > etc. Better to just let the framework manage the record
> > > > context and keep it read-only for Processors.
> > > > 
> > > > Reading between the lines of your last reply, it sounds that
> > > > the disconnect may just have been a mutual misunderstanding
> > > > about whether or not Processors currently have access to set
> > > > the record context. Since they do not, if we wanted to add
> > > > the record context to StateStoreContext in a well-defined
> > > > way, we'd also have to add the ability for Processors to
> > > > manipulate it. But then, we're just creating a side-channel
> > > > for Processors to pass some information in arguments to
> > > > "put()" and other information implicitly through the
> > > > context. It seems better just to go for a single channel for
> > > > now.
> > > > 
> > > > It sounds like you're basically in favor of the conservative
> > > > approach, and you just wanted to understand the blockers
> > > > that I implied. Does my clarification make sense?
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie Blee-Goldman
> > > > wrote:
> > > > > I was just thinking that the processor would null out the record
> > context
> > > > > after it
> > > > > finished processing the record, so I'm not sure I follow why this
> > would
> > > > not
> > > > > be
> > > > > possible? AFAIK we never call a punctuator in the middle of
> > processing a
> > > > > record through the topology, and even if we did, we still know when
> > it is
> > > > > about
> > > > > to be called and could set it to null beforehand.
> > > > > 
> > > > > I'm not trying to advocate for it here, I'm in agreement that
> > anything
> > > > you
> > > > > want
> > > > > to access within the store can and should be accessed within the
> > calling
> > > > > Processor/Punctuator before reaching the store. The "we can always
> > add it
> > > > > later if necessary" argument is also pretty convincing. Just trying
> > to
> > > > > understand
> > > > > why this wouldn't be possible.
> > > > > 
> > > > > FWIW, the question of "what is the current record in the context of a
> > > > > Punctuator"
> > > > > exists independently of whether we want to add this to the
> > > > StateStoreContext
> > > > > or not. The full ProcessorContext, including the current record
> > context,
> > > > is
> > > > > already available within a Punctuator, so removing the current record
> > > > > context
> > > > > from the StateStoreContext does not solve the problem. Users can --
> > and
> > > > have
> > > > > (see KAFKA-9584 <https://issues.apache.org/jira/browse/KAFKA-9584>;;;)
> > --
> > > > hit
> > > > > such subtle bugs without ever invoking a StateStore
> > > > > from their punctuator.
> > > > > 
> > > > > Again, I think I do agree that we should leave the current record
> > context
> > > > > off of
> > > > > the StateStoreContext, but I don't think the Punctuator argument
> > against
> > > > it
> > > > > is
> > > > > very convincing. It sounds to me like we need to disallow access to
> > the
> > > > > current
> > > > > record context from within the Punctuator, independent of anything
> > to do
> > > > > with
> > > > > state stores
> > > > > 
> > > > > On Thu, Sep 10, 2020 at 7:12 AM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > > > > Thanks for the thoughts, Sophie.
> > > > > > 
> > > > > > I agree that the extra information could be useful. My only
> > concern is
> > > > > > that it doesn’t seem like we can actually supply that extra
> > information
> > > > > > correctly. So, then we have a situation where the system offers
> > useful
> > > > API
> > > > > > calls that are only correct in a narrow range of use cases.
> > Outside of
> > > > > > those use cases, you get incorrect behavior.
> > > > > > 
> > > > > > If it were possible to null out the context before you put a
> > document
> > > > to
> > > > > > which the context doesn’t apply, then the concern would be
> > mitigated.
> > > > But
> > > > > > it would still be pretty weird from the perspective of the store
> > that
> > > > > > sometimes the context is populated and other times, it’s null.
> > > > > > 
> > > > > > But that seems moot, since it doesn’t seem possible to null out the
> > > > > > context. Only the Processor could know whether it’s about to put a
> > > > document
> > > > > > different from the context or not. And it would be inappropriate to
> > > > offer a
> > > > > > public ProcessorContext api to manage the record context.
> > > > > > 
> > > > > > Ultimately, it still seems like if you want to store headers, you
> > can
> > > > > > store them explicitly, right? That doesn’t seem onerous to me, and
> > it
> > > > kind
> > > > > > of seems better than relying on undefined or asymmetrical behavior
> > in
> > > > the
> > > > > > store itself.
> > > > > > 
> > > > > > Anyway, I’m not saying that we couldn’t solve these problems. Just
> > > > that it
> > > > > > seems a little that we can be conservative and avoid them for now.
> > If
> > > > it
> > > > > > turns out we really need to solve them, we can always do it later.
> > > > > > 
> > > > > > Thanks,
> > > > > > John
> > > > > > 
> > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie Blee-Goldman wrote:
> > > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > > `range()` query and then update one of those records with
> > > > > > > > `put()`, you'd have a very subtle bug on your hands.
> > > > > > > 
> > > > > > > Can you elaborate on this a bit? I agree that the punctuator
> > case is
> > > > an
> > > > > > > obvious exemption to the assumption that store invocations always
> > > > > > > have a corresponding "current record", but I don't understand the
> > > > > > > second example. Are you envisioning a scenario where the #process
> > > > > > > method performs a range query and then updates records? Or were
> > > > > > > you just giving another example of the punctuator case?
> > > > > > > 
> > > > > > > I only bring it up because I agree that the current record
> > > > information
> > > > > > could
> > > > > > > still be useful within the context of the store. As a non-user my
> > > > input
> > > > > > on
> > > > > > > this
> > > > > > > definitely has limited value, but it just isn't striking me as
> > > > obvious
> > > > > > that
> > > > > > > we
> > > > > > > should remove access to the current record context from the state
> > > > stores.
> > > > > > > If there is no current record, as in the  punctuator case, we
> > should
> > > > just
> > > > > > > set
> > > > > > > the record context to null (or Optional.empty, etc).
> > > > > > > 
> > > > > > > That said, the put() always has to come from somewhere, and that
> > > > > > > somewhere is always going to be either a Processor or a
> > Punctuator,
> > > > both
> > > > > > > of which will still have access to the full context. So
> > additional
> > > > info
> > > > > > > such as
> > > > > > > the timestamp can and should probably be supplied to the store
> > before
> > > > > > > calling put(), rather than looked up by the store. But I can see
> > some
> > > > > > other
> > > > > > > things being useful, for example the current record's headers.
> > Maybe
> > > > > > if/when
> > > > > > > we add better (or any) support for headers in state stores this
> > will
> > > > be
> > > > > > > less true.
> > > > > > > 
> > > > > > > Of course as John has made clear, it's pretty hard to judge
> > without
> > > > > > > examples
> > > > > > > and more insight as to what actually goes on within a custom
> > state
> > > > store
> > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John Roesler <vvcep...@apache.org
> > > > wrote:
> > > > > > > > Hi Paul,
> > > > > > > > 
> > > > > > > > It's good to hear from you!
> > > > > > > > 
> > > > > > > > I'm glad you're in favor of the direction. Especially when
> > > > > > > > it comes to public API and usability concens, I tend to
> > > > > > > > think that "the folks who matter" are actually the folks who
> > > > > > > > have to use the APIs to accomplish real tasks. It can be
> > > > > > > > hard for me to be sure I'm thinking clearly from that
> > > > > > > > perspective.
> > > > > > > > 
> > > > > > > > Funny story, I also started down this road a couple of times
> > > > > > > > already and backed them out before the KIP because I was
> > > > > > > > afraid of the scope of the proposal. Unfortunately, needing
> > > > > > > > to make a new ProcessorContext kind of forced my hand.
> > > > > > > > 
> > > > > > > > I see you've called me out about the ChangeLogging stores :)
> > > > > > > > In fact, I think these are the main/only reason that stores
> > > > > > > > might really need to invoke "forward()". My secret plan was
> > > > > > > > to cheat and either accomplish change-logging by a different
> > > > > > > > mechanism than implementing the store interface, or by just
> > > > > > > > breaking encapsulation to sneak the "real" ProcessorContext
> > > > > > > > into the ChangeLogging stores. But those are all
> > > > > > > > implementation details. I think the key question is whether
> > > > > > > > anyone else has a store implementation that needs to call
> > > > > > > > "forward()". It's not what you mentioned, but since you
> > > > > > > > spoke up, I'll just ask: if you have a use case for calling
> > > > > > > > "forward()" in a store, please share it.
> > > > > > > > 
> > > > > > > > Regarding the other record-specific context methods, I think
> > > > > > > > you have a good point, but I also can't quite wrap my head
> > > > > > > > around how we can actually guarantee it to work in general.
> > > > > > > > For example, the case you cited, where the implementation of
> > > > > > > > `KeyValueStore#put(key, value)` uses the context to augment
> > > > > > > > the record with timestamp information. This relies on the
> > > > > > > > assumption that you would only call "put()" from inside a
> > > > > > > > `Processor#process(key, value)` call in which the record
> > > > > > > > being processed is the same record that you're trying to put
> > > > > > > > into the store.
> > > > > > > > 
> > > > > > > > If you were to call "put" from a punctuator, or do a
> > > > > > > > `range()` query and then update one of those records with
> > > > > > > > `put()`, you'd have a very subtle bug on your hands. Right
> > > > > > > > now, the Streams component that actually calls the Processor
> > > > > > > > takes care to set the right record context before invoking
> > > > > > > > the method, and in the case of caching, etc., it also takes
> > > > > > > > care to swap out the old context and keep it somewhere safe.
> > > > > > > > But when it comes to public API Processors calling methods
> > > > > > > > on StateStores, there's no opportunity for any component to
> > > > > > > > make sure the context is always correct.
> > > > > > > > 
> > > > > > > > In the face of that situation, it seemed better to just move
> > > > > > > > in the direction of a "normal" data store. I.e., when you
> > > > > > > > use a HashMap or RocksDB or other "state stores", you don't
> > > > > > > > expect them to automatically know extra stuff about the
> > > > > > > > record you're storing. If you need them to know something,
> > > > > > > > you just put it in the value.
> > > > > > > > 
> > > > > > > > All of that said, I'm just reasoning from first principles
> > > > > > > > here. To really know if this is a mistake or not, I need to
> > > > > > > > be in your place. So please push back if you think what I
> > > > > > > > said is nonsense. My personal plan was to keep an eye out
> > > > > > > > during the period where the old API was still present, but
> > > > > > > > deprecated, to see if people were struggling to use the new
> > > > > > > > API. If so, then we'd have a chance to address it before
> > > > > > > > dropping the old API. But it's even better if you can help
> > > > > > > > think it through now.
> > > > > > > > 
> > > > > > > > It did also cross my mind to _not_ add the
> > > > > > > > StateStoreContext, but just to continue to punt on the
> > > > > > > > question by just dropping in the new ProcessorContext to the
> > > > > > > > new init method. If StateStoreContext seems too bold, we can
> > > > > > > > go that direction. But if we actually add some methods to
> > > > > > > > StateStoreContext, I'd like to be able to ensure they would
> > > > > > > > be well defined. I think the current situation was more of
> > > > > > > > an oversight than a choice.
> > > > > > > > 
> > > > > > > > Thanks again for your reply,
> > > > > > > > -John
> > > > > > > > 
> > > > > > > > 
> > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, Paul Whalen wrote:
> > > > > > > > > John,
> > > > > > > > > 
> > > > > > > > > It's exciting to see this KIP head in this direction!  In the
> > > > last
> > > > > > year
> > > > > > > > or
> > > > > > > > > so I've tried to sketch out some usability improvements for
> > > > custom
> > > > > > state
> > > > > > > > > stores, and I also ended up splitting out the
> > StateStoreContext
> > > > from
> > > > > > the
> > > > > > > > > ProcessorContext in an attempt to facilitate what I was
> > doing.  I
> > > > > > sort of
> > > > > > > > > abandoned it when I realized how large the ideal change might
> > > > have
> > > > > > to be,
> > > > > > > > > but it's great to see that there is other interest in moving
> > in
> > > > this
> > > > > > > > > direction (from the folks that matter :) ).
> > > > > > > > > 
> > > > > > > > > Having taken a stab at it myself, I have a comment/question
> > on
> > > > this
> > > > > > > > bullet
> > > > > > > > > about StateStoreContext:
> > > > > > > > > 
> > > > > > > > > It does *not*  include anything processor- or record-
> > specific,
> > > > like
> > > > > > > > > > `forward()` or any information about the "current" record,
> > > > which is
> > > > > > > > only a
> > > > > > > > > > well-defined in the context of the Processor. Processors
> > > > process
> > > > > > one
> > > > > > > > record
> > > > > > > > > > at a time, but state stores may be used to store and fetch
> > many
> > > > > > > > records, so
> > > > > > > > > > there is no "current record".
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > I totally agree that record-specific or processor-specific
> > > > context
> > > > > > in a
> > > > > > > > > state store is often not well-defined and it would be good to
> > > > > > separate
> > > > > > > > that
> > > > > > > > > out, but sometimes it (at least record-specific context) is
> > > > actually
> > > > > > > > > useful, for example, passing the record's timestamp through
> > to
> > > > the
> > > > > > > > > underlying storage (or changelog topic):
> > > > > > > > > 
> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > > You could have the writer client of the state store pass this
> > > > > > through,
> > > > > > > > but
> > > > > > > > > it would be nice to be able to write state stores where the
> > > > client
> > > > > > did
> > > > > > > > not
> > > > > > > > > have this responsibility.  I'm not sure if the solution is
> > to add
> > > > > > some
> > > > > > > > > things back to StateStoreContext, or make yet another context
> > > > that
> > > > > > > > > represents record-specific context while inside a state
> > store.
> > > > > > > > > Best,
> > > > > > > > > Paul
> > > > > > > > > 
> > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John Roesler <
> > j...@vvcephei.org>
> > > > > > wrote:
> > > > > > > > > > Hello all,
> > > > > > > > > > 
> > > > > > > > > > I've been slowly pushing KIP-478 forward over the last
> > year,
> > > > > > > > > > and I'm happy to say that we're making good progress now.
> > > > > > > > > > However, several issues with the original design have come
> > > > > > > > > > to light.
> > > > > > > > > > 
> > > > > > > > > > The major changes:
> > > > > > > > > > 
> > > > > > > > > > We discovered that the original plan of just adding generic
> > > > > > > > > > parameters to ProcessorContext was too disruptive, so we
> > are
> > > > > > > > > > now adding a new api.ProcessorContext.
> > > > > > > > > > 
> > > > > > > > > > That choice forces us to add a new StateStore.init method
> > > > > > > > > > for the new context, but ProcessorContext really isn't
> > ideal
> > > > > > > > > > for state stores to begin with, so I'm proposing a new
> > > > > > > > > > StateStoreContext for this purpose. In a nutshell, there
> > are
> > > > > > > > > > quite a few methods in ProcessorContext that actually
> > should
> > > > > > > > > > never be called from inside a StateStore.
> > > > > > > > > > 
> > > > > > > > > > Also, since there is a new ProcessorContext interface, we
> > > > > > > > > > need a new MockProcessorContext implementation in the test-
> > > > > > > > > > utils module.
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > The changeset for the KIP document is here:
> > > > > > > > > > 
> > > > > > > > > > 
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > > And the KIP itself is here:
> > > > > > > > > > 
> > > > > > > > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > > If you have any concerns, please let me know!
> > > > > > > > > > 
> > > > > > > > > > Thanks,
> > > > > > > > > > -John
> > > > > > > > > > 
> > > > > > > > > > 

Reply via email to