Thanks, all,

Based on my reading of the conversation, it sounds like I
have some legwork to do in KIP-645, but our collective
instinct is that Leah's proposal doesn't need to change to
account for whatever we might decide to do in KIP-645.

I have no further concerns about KIP-645, and I think it's a
good proposal.

Thanks,
-John

P.s., there's still a typo on the wiki that says
"ConsumerConfig" on the code block, even though the text now
says "StreamsConfig".


On Fri, 2020-08-21 at 10:56 -0700, Sophie Blee-Goldman
wrote:
> Just want to make a quick comment on the question that John raised about
> whether we
> should introduce a separate config for "key" and "value" window sizes:
> 
> My short answer is No, I don't think that's necessary. First of all, as you
> said, there is no
> first-class concept of a "Windowed value" in the DSL. Second, to engage in
> your rhetorical
> question, if there's no default window size for a Streams program then how
> can there be a
> sensible default for the key AND a separate sensible default for a value?
> 
> I don't think we need to follow the existing pattern if it doesn't make
> sense, and to be honest
> I'm a bit skeptical that anyone was even using these default windowed inner
> classes since
> the config wasn't even defined/documented until quite recently. I'd
> actually be in favor
> of deprecating StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> but I don't want to drag that into this discussion as well.
> 
> My understanding is that these were meant to mirror the default key/value
> serde configs, but
> the real use of the DEFAULT_WINDOWED_SERDE_INNER_CLASS config is actually
> that you
> can at least use it to configure the inner class for a Consumer, thus
> making the TimeWindowed
> serdes functional at a basic level. With the window size configs, the point
> is not really to set a
> default but to make it actually work with a Consumer which instantiates the
> deserializer by
> reflection. So I don't think we should position this new config as a
> "default" (although it may
> technically behave as one) -- within Streams users can and should always
> supply the window
> size through the constructor. I don't think that's such an inconvenience,
> vs the amount of
> confusion that will (and has) been caused by default serde-related configs
> in streams.
> 
> Regarding the fixed vs variable sized config, one idea I had was to just
> keep the fixed-size config
> and constructor and let users of enumerable windows override the
> TimeWindowedSerde class(es)
> to do whatever it is they need. IIUC you already have to override some
> other windows-related
> classes to get variable-sized windows so doing the same for the serdes
> sounds reasonable to me.
> Just my take on the "simple things should be easy, difficult things should
> be possible" mantra
> 
> One last quick side note: the reason we don't really need to discuss
> SessionWindows here
> is that they already encode both the start and end time for the window.
> This is probably the best
> way to go for TimeWindows as well, but making this change in a backwards
> compatible way is a
> much larger scope of work. And even then, we might want to consider making
> it possible to still
> just encode the start time to save space, thus requiring this config either
> way
> 
> On Fri, Aug 21, 2020 at 9:26 AM Leah Thomas <ltho...@confluent.io> wrote:
> 
> > Thanks John and Walker for your thoughts.
> > 
> > I agree with your two scenarios John, that you configure fully in the
> > constructor, or you don't need to call `init()`. IIUC, if we pass the
> > deserializer to the consumer, we want to make sure it has the window size
> > is set using the newly required constructor. If we don't pass in the
> > deserializer, the window size will be set through the configs. To answer
> > Walker's question directly, because the configs aren't passed to the
> > constructor, we can't set the window size unless we pass it to the
> > constructor or configure the constructor after initializing it.
> > 
> > For users who would rather not set a strict window size (outside of the
> > variable size scenario), they can pass in Long.MAX_VALUE. The way I see
> > this is instead of having the default be for scenarios that don't require a
> > window size, we have the default be the scenarios that *do*, flipping the
> > current implementation to fit with typical use cases.
> > 
> > On your points John:
> > 1. I agree that it makes sense to store it in StreamsConfig, this shouldn't
> > cause any issues. I've updated the KIP accordingly.
> > 
> > 2. The non-fixed time windows issue is a good point. It seems like calendar
> > windows in particular are quite useful, so I think we want to make sure
> > that this wouldn't inhibit flexible sized windows. I think having two
> > different configs and functions makes sense, although it is slightly
> > messier. While requiring all time windows to use the WindowFunction
> > constructor would work, I think that allowing users to access the
> > WindowSize constructor is preferable because it seems easier to use for
> > people who are not at all interested in delving into variably sized
> > windows. This assumption could be wrong though, and perhaps users would
> > adapt quickly to the new WindowFunction style, but my immediate reaction is
> > to support both configs and constructors.
> > 
> > One note on this is that Session Windows are handled separately from time
> > windows and also have variable window sizes. I assume that the TimeWindowed
> > option is preferable for variably sized windows because you still want to
> > access the window end times? But I think one alternative could be
> > separating the variably sized windows from the current implementation of
> > time windows, although I think KIP-645
> > <
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface
> > would make this not strictly necessary.
> > 
> > Cheers,
> > Leah
> > 
> > On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vvcep...@apache.org> wrote:
> > 
> > > Hi Leah,
> > > 
> > > Thanks for the KIP! This has been a real pain for some use
> > > cases, so it's really good to see a proposal to fix it.
> > > 
> > > We do need a default constructor so that it can be
> > > dynamically instantiated by the consumer (or any other
> > > component). But I'm +1 on deprecating the constructor you're
> > > proposing to deprecate, which only partially configures the
> > > class. It seems like there are exactly two patterns: either
> > > you fully configure the class in the constructor and don't
> > > call `init()`, or you call the default constructor and then
> > > configure the class by calling `init()`.
> > > 
> > > I can appreciate Walker's point, but stepping back, it
> > > doesn't actually seem that useful to partially configure the
> > > class in the constructor and then finish up the
> > > configuration by calling `init()`. I could see the argument
> > > if there were a sensible default, but for this particular
> > > class, there isn't one. Rhetorical question: what is the
> > > default window size for Streams programs?
> > > 
> > > I have a couple of concerns to discuss:
> > > 
> > > 1. Config Location
> > > 
> > > I don't think I would add the new configs to ConsumerConfig,
> > > but would add it to StreamsConfig instead. The deserailzier
> > > itself is in Streams (it is
> > > o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> > > odd to have one of its configurations in a completely
> > > different module.
> > > 
> > > Also, this class already has two configs, which are in
> > > StreamsConfig:
> > > StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> > > StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
> > > 
> > > It seems like the new config belongs right next to the
> > > existing ones.
> > > 
> > > For me, it raises a secondary question:
> > > 1b: Should there be a KEY_WINDOW_SIZE and a
> > > VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> > > value" even is, but the fact that we can configure serdes
> > > for it implies that perhaps we should symmetrically
> > > configure its size as well.
> > > 
> > > 2. Fixed Size Assumption
> > > 
> > > In KIP-645, I'm proposing to lift the assumption that
> > > TimeWindows have a fixed size at all, but KIP-659 is
> > > currently built on that assumption.
> > > 
> > > For details on why this is not a good assumtion, see:
> > > https://issues.apache.org/jira/browse/KAFKA-10408
> > > 
> > > In fact, in my POC PR for KIP-659, I'm dropping the
> > > constructor that takes a "window size" parameter in favor of
> > > one that takes a window function, mapping a window start
> > > time to a full Window(start, end).
> > > 
> > > In that context, it seems incongruous to introduce a
> > > configuration that specifies a window size. Of course, my
> > > KIP is also under discussion, so my proposal may not
> > > eventually be accepted. But it is necessary to consider both
> > > of these concerns together.
> > > 
> > > One option seems to be to accept both. Namely, we keep the
> > > "fixed size" constructor AND add my new constructor (for
> > > variably sized windows). Likewise, we accept your proposal,
> > > and KIP-659 would propose to add a new config specifying a
> > > windowing function, such as:
> > > 
> > > > StreamsConfig.WINDOW_FUNCTION_CONFIG
> > > 
> > > which would be an instance of:
> > > 
> > > > public interface WindowFunction implements Function<Long,
> > > Window>;
> > > 
> > > I'm not bringing these up for discussion in your KIP right
> > > now, just demonstrating the feasibility of merging both
> > > proposals.
> > > 
> > > My question for you: do you think the general strategy of
> > > having two constructors and two configs, one for fixed and
> > > one for variable windows, makes sense? Is it too
> > > complicated? Do you have a better idea?
> > > 
> > > Thanks!
> > > -John
> > > 
> > > On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > > > Hi Leah,
> > > > 
> > > > Could you explain a bit more why we do not wish to
> > > > let TimeWindowedDeserializer and WindowedSerdes be created without a
> > > > specified time as a parameter?
> > > > 
> > > > I understand the long.MAX_VALUE could cause problems but would it not
> > be
> > > a
> > > > good idea to have a usable default or fetch from the config if
> > available?
> > > > After all you are proposing to add "window.size.ms"
> > > > 
> > > > We definitely need a fix to this problem and adding "window.size.ms"
> > > makes
> > > > sense to me.
> > > > 
> > > > Thanks for the KIP,
> > > > Walker
> > > > 
> > > > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <ltho...@confluent.io>
> > > wrote:
> > > > > Hi all,
> > > > > 
> > > > > I'd like to start a discussion for KIP-659:
> > > > > 
> > > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > > > > 
> > > > > The goal of the KIP is to ensure that window size is passed to the
> > > consumer
> > > > > when needed, which will generally be for testing purposes, and to
> > avoid
> > > > > runtime errors when the *TimeWindowedSerde* is created without a
> > window
> > > > > size.
> > > > > 
> > > > > Looking forward to hearing your feedback.
> > > > > 
> > > > > Cheers,
> > > > > Leah
> > > > > 

Reply via email to