I understand your argument, but do not agree with it.

Your first version (even if the "flow" is not as nice) is more explicit
than the second version. Adding a stateStoreName parameter is quite
implicit but has a huge impact -- thus, I prefer the rather more verbose
but explicit version.


-Matthias

On 1/23/17 1:39 AM, Damian Guy wrote:
> I'm not a fan of materialize. I think it interrupts the flow, i.e,
> 
> table.mapValue(..).materialize().join(..).materialize()
> compared to:
> table.mapValues(..).join(..)
> 
> I know which one i prefer.
> My preference is stil to provide overloaded methods where people can
> specify the store names if they want, otherwise we just generate them.
> 
> On Mon, 23 Jan 2017 at 05:30 Matthias J. Sax <matth...@confluent.io> wrote:
> 
>> Hi,
>>
>> thanks for the KIP Eno! Here are my 2 cents:
>>
>> 1) I like Guozhang's proposal about removing store name from all KTable
>> methods and generate internal names (however, I would do this as
>> overloads). Furthermore, I would not force users to call .materialize()
>> if they want to query a store, but add one more method .stateStoreName()
>> that returns the store name if the KTable is materialized. Thus, also
>> .materialize() must not necessarily have a parameter storeName (ie, we
>> should have some overloads here).
>>
>> I would also not allow to provide a null store name (to indicate no
>> materialization if not necessary) but throw an exception.
>>
>> This yields some simplification (see below).
>>
>>
>> 2) I also like Guozhang's proposal about KStream#toTable()
>>
>>
>> 3)
>>>
>>>>   3. What will happen when you call materialize on KTable that is
>> already
>>>>   materialized? Will it create another StateStore (providing the name is
>>>>   different), throw an Exception?
>>>
>>> Currently an exception is thrown, but see below.
>>>
>>>
>>
>> If we follow approach (1) from Guozhang, there is no need to worry about
>> a second materialization and also no exception must be throws. A call to
>> .materialize() basically sets a "materialized flag" (ie, idempotent
>> operation) and sets a new name.
>>
>>
>> 4)
>>>> Rename toStream() to toKStream() for consistency.
>>>
>>> Not sure whether that is really required. We also use
>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example, and
>>> don't care about the "K" prefix.
>>
>> Eno's reply:
>>> I think changing it to `toKStream` would make it absolutely clear what
>> we are converting it to.
>>>
>>> I'd say we should probably change the KStreamBuilder methods (but not in
>> this KIP).
>>
>> I would keep #toStream(). (see below)
>>
>>
>> 5) We should not remove any methods but only deprecate them.
>>
>>
>>
>> A general note:
>>
>> I do not understand your comments "Rejected Alternatives". You say "Have
>> the KTable be the materialized view" was rejected. But your KIP actually
>> does exactly this -- the changelog abstraction of KTable is secondary
>> after those changes and the "view" abstraction is what a KTable is. And
>> just to be clear, I like this a lot:
>>
>>  - it aligns with the name KTable
>>  - is aligns with stream-table-duality
>>  - it aligns with IQ
>>
>> I would say that a KTable is a "view abstraction" (as materialization is
>> optional).
>>
>>
>>
>> -Matthias
>>
>>
>>
>>
>> On 1/22/17 5:05 PM, Guozhang Wang wrote:
>>> Thanks for the KIP Eno, I have a few meta comments and a few detailed
>>> comments:
>>>
>>> 1. I like the materialize() function in general, but I would like to see
>>> how other KTable functions should be updated accordingly. For example, 1)
>>> KStreamBuilder.table(..) has a state store name parameter, and we will
>>> always materialize the KTable unless its state store name is set to null;
>>> 2) KTable.agg requires the result KTable to be materialized, and hence it
>>> also have a state store name; 3) KTable.join requires the joining table
>> to
>>> be materialized. And today we do not actually have a mechanism to enforce
>>> that, but will only throw an exception at runtime if it is not (e.g. if
>> you
>>> have "builder.table("topic", null).join()" a RTE will be thrown).
>>>
>>> I'd make an extended proposal just to kick off the discussion here: let's
>>> remove all the state store params in other KTable functions, and if in
>> some
>>> cases KTable have to be materialized (e.g. KTable resulted from KXX.agg)
>>> and users do not call materialize(), then we treat it as "users are not
>>> interested in querying it at all" and hence use an internal name
>> generated
>>> for the materialized KTable; i.e. although it is materialized the state
>>> store is not exposed to users. And if users call materialize() afterwards
>>> but we have already decided to materialize it, we can replace the
>> internal
>>> name with the user's provided names. Then from a user's point-view, if
>> they
>>> ever want to query a KTable, they have to call materialize() with a given
>>> state store name. This approach has one awkwardness though, that serdes
>> and
>>> state store names param are not separated and could be overlapped (see
>>> detailed comment #2 below).
>>>
>>>
>>> 2. This step does not need to be included in this KIP, but just as a
>>> reference / future work: as we have discussed before, we may enforce
>>> materialize KTable.join resulted KTables as well in the future. If we do
>>> that, then:
>>>
>>> a) KXX.agg resulted KTables are always materialized;
>>> b) KTable.agg requires the aggregating KTable to always be materialized
>>> (otherwise we would not know the old value);
>>> c) KTable.join resulted KTables are always materialized, and so are the
>>> joining KTables to always be materialized.
>>> d) KTable.filter/mapValues resulted KTables materialization depend on its
>>> parent's materialization;
>>>
>>> By recursive induction all KTables are actually always materialized, and
>>> then the effect of the "materialize()" is just for specifying the state
>>> store names. In this scenario, we do not need to send Change<V> in
>>> repartition topics within joins any more, but only for repartitions
>> topics
>>> within aggregations. Instead, we can just send a "tombstone" without the
>>> old value and we do not need to calculate joins twice (one more time when
>>> old value is received).
>>>
>>> 3. I'm wondering if it is worth-while to add a "KStream#toTable()"
>> function
>>> which is interpreted as a dummy-aggregation where the new value always
>>> replaces the old value. I have seen a couple of use cases of this, for
>>> example, users want to read a changelog topic, apply some filters, and
>> then
>>> materialize it into a KTable with state stores without creating
>> duplicated
>>> changelog topics. With materialize() and toTable I'd imagine users can
>>> specify sth. like:
>>>
>>> "
>>> KStream stream = builder.stream("topic1").filter(..);
>>> KTable table = stream.toTable(..);
>>> table.materialize("state1");
>>> "
>>>
>>> And the library in this case could set store "state1" 's changelog topic
>> to
>>> be "topic1", and applying the filter on the fly while (re-)storing its
>>> state by reading from this topic, instead of creating a second changelog
>>> topic like "appID-state1-changelog" which is a semi-duplicate of
>> "topic1".
>>>
>>>
>>> Detailed:
>>>
>>> 1. I'm +1 with Michael regarding "#toStream"; actually I was thinking
>> about
>>> renaming to "#toChangeLog" but after thinking a bit more I think
>> #toStream
>>> is still better, and we can just mention in the javaDoc that it is
>>> transforming its underlying changelog stream to a normal stream.
>>> 2. As Damian mentioned, there are a few scenarios where the serdes are
>>> already specified in a previous operation whereas it is not known before
>>> calling materialize, for example:
>>> stream.groupByKey.agg(serde).materialize(serde) v.s. table.mapValues(/*no
>>> serde specified*/).materialize(serde). We need to specify what are the
>>> handling logic here.
>>> 3. We can remove "KTable#to" call as well, and enforce users to call "
>>> KTable.toStream.to" to be more clear.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Wed, Jan 18, 2017 at 3:22 AM, Eno Thereska <eno.there...@gmail.com>
>>> wrote:
>>>
>>>> I think changing it to `toKStream` would make it absolutely clear what
>> we
>>>> are converting it to.
>>>>
>>>> I'd say we should probably change the KStreamBuilder methods (but not in
>>>> this KIP).
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>> On 17 Jan 2017, at 13:59, Michael Noll <mich...@confluent.io> wrote:
>>>>>
>>>>>> Rename toStream() to toKStream() for consistency.
>>>>>
>>>>> Not sure whether that is really required. We also use
>>>>> `KStreamBuilder#stream()` and `KStreamBuilder#table()`, for example,
>> and
>>>>> don't care about the "K" prefix.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 17, 2017 at 10:55 AM, Eno Thereska <eno.there...@gmail.com
>>>
>>>>> wrote:
>>>>>
>>>>>> Thanks Damian, answers inline:
>>>>>>
>>>>>>> On 16 Jan 2017, at 17:17, Damian Guy <damian....@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Eno,
>>>>>>>
>>>>>>> Thanks for the KIP. Some comments:
>>>>>>>
>>>>>>>  1. I'd probably rename materialized to materialize.
>>>>>>
>>>>>> Ok.
>>>>>>
>>>>>>>  2. I don't think the addition of the new Log compaction mechanism is
>>>>>>>  necessary for this KIP, i.e, the KIP is useful without it. Maybe
>> that
>>>>>>>  should be a different KIP?
>>>>>>
>>>>>> Agreed, already removed. Will do a separate KIP for that.
>>>>>>
>>>>>>
>>>>>>>  3. What will happen when you call materialize on KTable that is
>>>> already
>>>>>>>  materialized? Will it create another StateStore (providing the name
>> is
>>>>>>>  different), throw an Exception?
>>>>>>
>>>>>> Currently an exception is thrown, but see below.
>>>>>>
>>>>>>
>>>>>>>  4. Have you considered overloading the existing KTable operations to
>>>>>> add
>>>>>>>  a state store name? So if a state store name is provided, then
>>>>>> materialize
>>>>>>>  a state store? This would be my preferred approach as i don't think
>>>>>>>  materialize is always a valid operation.
>>>>>>
>>>>>> Ok I can see your point. This will increase the KIP size since I'll
>> need
>>>>>> to enumerate all overloaded methods, but it's not a problem.
>>>>>>
>>>>>>>  5. The materialize method will need ta value Serde as some
>> operations,
>>>>>>>  i.e., mapValues, join etc can change the value types
>>>>>>>  6. https://issues.apache.org/jira/browse/KAFKA-4609 - might mean
>> that
>>>>>> we
>>>>>>>  always need to materialize the StateStore for KTable-KTable joins.
>> If
>>>>>> that
>>>>>>>  is the case, then the KTable Join operators will also need Serde
>>>>>>>  information.
>>>>>>
>>>>>> I'll update the KIP with the serdes.
>>>>>>
>>>>>> Thanks
>>>>>> Eno
>>>>>>
>>>>>>
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Damian
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 16 Jan 2017 at 16:44 Eno Thereska <eno.there...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> We created "KIP-114: KTable materialization and improved semantics"
>> to
>>>>>>>> solidify the KTable semantics in Kafka Streams:
>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 114%3A+KTable+materialization+and+improved+semantics
>>>>>>>> <
>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>>>> 114:+KTable+materialization+and+improved+semantics
>>>>>>>>>
>>>>>>>>
>>>>>>>> Your feedback is appreciated.
>>>>>>>> Thanks
>>>>>>>> Eno
>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to