Thanks John and Walker for your thoughts.

I agree with your two scenarios John, that you configure fully in the
constructor, or you don't need to call `init()`. IIUC, if we pass the
deserializer to the consumer, we want to make sure it has the window size
is set using the newly required constructor. If we don't pass in the
deserializer, the window size will be set through the configs. To answer
Walker's question directly, because the configs aren't passed to the
constructor, we can't set the window size unless we pass it to the
constructor or configure the constructor after initializing it.

For users who would rather not set a strict window size (outside of the
variable size scenario), they can pass in Long.MAX_VALUE. The way I see
this is instead of having the default be for scenarios that don't require a
window size, we have the default be the scenarios that *do*, flipping the
current implementation to fit with typical use cases.

On your points John:
1. I agree that it makes sense to store it in StreamsConfig, this shouldn't
cause any issues. I've updated the KIP accordingly.

2. The non-fixed time windows issue is a good point. It seems like calendar
windows in particular are quite useful, so I think we want to make sure
that this wouldn't inhibit flexible sized windows. I think having two
different configs and functions makes sense, although it is slightly
messier. While requiring all time windows to use the WindowFunction
constructor would work, I think that allowing users to access the
WindowSize constructor is preferable because it seems easier to use for
people who are not at all interested in delving into variably sized
windows. This assumption could be wrong though, and perhaps users would
adapt quickly to the new WindowFunction style, but my immediate reaction is
to support both configs and constructors.

One note on this is that Session Windows are handled separately from time
windows and also have variable window sizes. I assume that the TimeWindowed
option is preferable for variably sized windows because you still want to
access the window end times? But I think one alternative could be
separating the variably sized windows from the current implementation of
time windows, although I think KIP-645
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface>
would make this not strictly necessary.

Cheers,
Leah

On Fri, Aug 21, 2020 at 10:04 AM John Roesler <vvcep...@apache.org> wrote:

> Hi Leah,
>
> Thanks for the KIP! This has been a real pain for some use
> cases, so it's really good to see a proposal to fix it.
>
> We do need a default constructor so that it can be
> dynamically instantiated by the consumer (or any other
> component). But I'm +1 on deprecating the constructor you're
> proposing to deprecate, which only partially configures the
> class. It seems like there are exactly two patterns: either
> you fully configure the class in the constructor and don't
> call `init()`, or you call the default constructor and then
> configure the class by calling `init()`.
>
> I can appreciate Walker's point, but stepping back, it
> doesn't actually seem that useful to partially configure the
> class in the constructor and then finish up the
> configuration by calling `init()`. I could see the argument
> if there were a sensible default, but for this particular
> class, there isn't one. Rhetorical question: what is the
> default window size for Streams programs?
>
> I have a couple of concerns to discuss:
>
> 1. Config Location
>
> I don't think I would add the new configs to ConsumerConfig,
> but would add it to StreamsConfig instead. The deserailzier
> itself is in Streams (it is
> o.a.k.streams.kstream.TimeWindowedDeserializer), so it seems
> odd to have one of its configurations in a completely
> different module.
>
> Also, this class already has two configs, which are in
> StreamsConfig:
> StreamsConfig.DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS
> StreamsConfig.DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS
>
> It seems like the new config belongs right next to the
> existing ones.
>
> For me, it raises a secondary question:
> 1b: Should there be a KEY_WINDOW_SIZE and a
> VALUE_WINDOW_SIZE? I'm honestly not sure what a "windowed
> value" even is, but the fact that we can configure serdes
> for it implies that perhaps we should symmetrically
> configure its size as well.
>
> 2. Fixed Size Assumption
>
> In KIP-645, I'm proposing to lift the assumption that
> TimeWindows have a fixed size at all, but KIP-659 is
> currently built on that assumption.
>
> For details on why this is not a good assumtion, see:
> https://issues.apache.org/jira/browse/KAFKA-10408
>
> In fact, in my POC PR for KIP-659, I'm dropping the
> constructor that takes a "window size" parameter in favor of
> one that takes a window function, mapping a window start
> time to a full Window(start, end).
>
> In that context, it seems incongruous to introduce a
> configuration that specifies a window size. Of course, my
> KIP is also under discussion, so my proposal may not
> eventually be accepted. But it is necessary to consider both
> of these concerns together.
>
> One option seems to be to accept both. Namely, we keep the
> "fixed size" constructor AND add my new constructor (for
> variably sized windows). Likewise, we accept your proposal,
> and KIP-659 would propose to add a new config specifying a
> windowing function, such as:
>
> > StreamsConfig.WINDOW_FUNCTION_CONFIG
>
> which would be an instance of:
>
> > public interface WindowFunction implements Function<Long,
> Window>;
>
> I'm not bringing these up for discussion in your KIP right
> now, just demonstrating the feasibility of merging both
> proposals.
>
> My question for you: do you think the general strategy of
> having two constructors and two configs, one for fixed and
> one for variable windows, makes sense? Is it too
> complicated? Do you have a better idea?
>
> Thanks!
> -John
>
> On Thu, 2020-08-20 at 14:49 -0700, Walker Carlson wrote:
> > Hi Leah,
> >
> > Could you explain a bit more why we do not wish to
> > let TimeWindowedDeserializer and WindowedSerdes be created without a
> > specified time as a parameter?
> >
> > I understand the long.MAX_VALUE could cause problems but would it not be
> a
> > good idea to have a usable default or fetch from the config if available?
> > After all you are proposing to add "window.size.ms"
> >
> > We definitely need a fix to this problem and adding "window.size.ms"
> makes
> > sense to me.
> >
> > Thanks for the KIP,
> > Walker
> >
> > On Thu, Aug 20, 2020 at 2:22 PM Leah Thomas <ltho...@confluent.io>
> wrote:
> >
> > > Hi all,
> > >
> > > I'd like to start a discussion for KIP-659:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-659%3A+Improve+TimeWindowedDeserializer+and+TimeWindowedSerde+to+handle+window+size
> > >
> > >
> > > The goal of the KIP is to ensure that window size is passed to the
> consumer
> > > when needed, which will generally be for testing purposes, and to avoid
> > > runtime errors when the *TimeWindowedSerde* is created without a window
> > > size.
> > >
> > > Looking forward to hearing your feedback.
> > >
> > > Cheers,
> > > Leah
> > >
>
>

Reply via email to