[
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lee Dongjin resolved KAFKA-7502.
--------------------------------
Resolution: Fixed
> Cleanup KTable materialization logic in a single place
> ------------------------------------------------------
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Guozhang Wang
> Assignee: Lee Dongjin
> Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical
> node, we are effectively duplicating the logic to determine whether the
> resulted KTable should be materialized. More specifically, the
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable
> name, or if users do not specify a Materialized object at all, Streams may
> choose to materialize or not. But in any cases, even if the KTable is
> materialized it will not be queryable since there's no queryable name (i.e.
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize
> since it is needed for storing the aggregation (i.e. we use the
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the
> materialization until the downstream operator requires this KTable to be
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users
> creates a Materialized object even without a queryable name. However this can
> be optimized similar to 2.b) but is orthogonal to this ticket (see
> `KTableImpl#buildJoin` where we always use constructor with nameProvider !=
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter /
> mapValues, we never materialize.
> ------------
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode",
> as well as physical node like `ProcessorNode`. Ideally we should always
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if
> necessary, and then generate the physical plan (i.e. the Topology), however
> today we create some physical nodes beforehand, and the above logic is hence
> duplicated in the creation of both physical nodes and logical nodes. For
> example, in `KTableKTableJoinNode` we check if Materialized is null for
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is
> specified (case 2.c) above).
> Another example is in TableProcessorNode which is used for 2.d) above, in
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for
> example, also contains the logic when deciding to pass `queryableName`
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot
> to update the other class.
> --------------
> What we want to have is a cleaner code path similar to what we have for 2.b),
> such that when creating the logical nodes we keep track of whether 1)
> materialized is specified, and 2) queryable name is provided. And during
> optimization phase, we may change the inner physical ProcessorBuilder's
> parameters like queryable name etc, and then when it is time to generate the
> physical node, we can just blindly take the parameters and go for it.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)