Tom Bentley created KAFKA-9673:
----------------------------------
Summary: Conditionally apply SMTs
Key: KAFKA-9673
URL: https://issues.apache.org/jira/browse/KAFKA-9673
Project: Kafka
Issue Type: New Feature
Components: KafkaConnect
Reporter: Tom Bentley
Assignee: Tom Bentley
KAFKA-7052 ended up using IAE with a message, rather than NPE in the case of a
SMT being applied to a record lacking a given field. It's still not possible to
apply a SMT conditionally, which is what things like Debezium really need in
order to apply transformations only to non-schema change events.
[~rhauch] suggested a mechanism to conditionally apply any SMT but was
concerned about the possibility of a naming collision (assuming it was
configured by a simple config)
I'd like to propose something which would solve this problem without the
possibility of such collisions. The idea is to have a higher-level condition,
which applies an arbitrary transformation (or transformation chain) according
to some predicate on the record.
More concretely, it might be configured like this:
{noformat}
transforms.conditionalExtract.type: Conditional
transforms.conditionalExtract.transforms: extractInt
transforms.conditionalExtract.transforms.extractInt.type:
org.apache.kafka.connect.transforms.ExtractField$Key
transforms.conditionalExtract.transforms.extractInt.field: c1
transforms.conditionalExtract.condition: topic-matches:<someRegexHere>
{noformat}
* The {{Conditional}} SMT is configured with its own list of transforms
({{transforms.conditionalExtract.transforms}}) to apply. This would work just
like the top level {{transforms}} config, so subkeys can be used to configure
these transforms in the usual way.
* The {{condition}} config defines the predicate for when the transforms are
applied to a record using a {{<condition-type>:<parameters>}} syntax
We could initially support three condition types:
*{{topic-matches:<pattern>}}* The transformation would be applied if the
record's topic name matched the given regular expression pattern. For example,
the following would apply the transformation on records being sent to any topic
with a name beginning with "my-prefix-":
{noformat}
transforms.conditionalExtract.condition: topic-matches:my-prefix-.*
{noformat}
*{{has-header:<header-name>}}* The transformation would be applied if the
record had at least one header with the given name. For example, the following
will apply the transformation on records with at least one header with the name
"my-header":
{noformat}
transforms.conditionalExtract.condition: has-header:my-header
{noformat}
*{{not:<condition-name>}}* This would negate the result of another named
condition using the condition config prefix. For example, the following will
apply the transformation on records which lack any header with the name
my-header:
{noformat}
transforms.conditionalExtract.condition: not:hasMyHeader
transforms.conditionalExtract.condition.hasMyHeader: has-header:my-header
{noformat}
I foresee one implementation concern with this approach, which is that
currently {{Transformation}} has to return a fixed {{ConfigDef}}, and this
proposal would require something more flexible in order to allow the config
parameters to depend on the listed transform aliases (and similarly for named
predicate used for the {{not:}} predicate). I think this could be done by
adding a {{default}} method to {{Transformation}} for getting the ConfigDef
given the config, for example.
Obviously this would require a KIP, but before I spend any more time on this
I'd be interested in your thoughts [~rhauch], [~rmoff], [~gunnar.morling].
--
This message was sent by Atlassian Jira
(v8.3.4#803005)