Hi Edorado and Adrian,

Thanks for the KIP.

I think it would be good to elaborate on exactly how validate() gets
called, because I think there are a number of potential problems, or at
least things to consider.

>From the broker's point of view, validate() can do arbitrary things. It
might never return due to blocking or an infinite loop. It might cause an
OOME, or throw a StackOverflowException. These are not entirely unlikely
and the risks cannot always be easily avoided by a careful policy
implementation. For example, a plugin which performs schema validation
would typically be fetching schema information from a remote registry (and
caching it for future use), and so could block on the networking (there are
further questions about retry in the case of network error). Then, when
deserializing a format like JSON deserializers might be prone to SOE or
OOME (e.g. consider a naive recursive JSON parser with JSON input starting
"[[[[[[[[[[[[[[[[[[[[[[[[[[[[..."). More generally, incorrect
deserialization of untrusted input is a common kind of CVE. Similarly
validation might involve regular expression matching (for example
JSONSchema supports pattern constraints). The matcher implementation really
matters and common matchers, including Java's Pattern, expose you to the
possibility of nasty exponential time behaviour.

You mentioned LogValidator in the KIP. This executes on an IO thread and
gets called with the log lock held. So the consequences of the validation
blocking could actually be a real problem from a broker availability PoV if
this validation happens in the same place. In the worst case all the IO
threads get stuck because of bad input (perhaps from a single producer), or
network problems between the broker and the registry. I don't think simply
moving the validation to before the acquisition of the lock is an easy
solution either, because of the dependency on the compression validation.

Kind regards,

Tom

On Thu, 22 Jun 2023 at 04:16, Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Hi Eduardo, Adrian.
>
> Thanks for the KIP. I agree that allowing custom validations on the
> broker-side addresses a real gap as you clearly stated on the motivation.
>
> Some initial thoughts from my side:
>
> 1. Similar to Kirk's first point, I'm also concerned on how would the
> plugin developers / operators be able to apply multiple policies and how
> configurations would be passed to each policy.
>
> Some approaches from other plugins we can get some inspiration from:
>
> - AlterConfig, CreateTopic policies are designed to be 1 policy
> implementing the different APIs. Up to the plugin developer to pull
> policies together and configure it on the broker side. I guess for Record
> Validation this may be cumbersome considering some Schema Registry
> providers may want to offer implementations for their own backend.
>
> - Connect Transforms: here there's a named set of plugins to apply per
> connector, and each transform has its own configuration defined by prefix.
> Personally, I'd consider this one an interesting approach if we decide to
> allow multiple record validations to be configured.
>
> - Tiered Storage (probably Connectors as well) have class-loader aware
> implementations with class path specific to the plugin. Not sure if this is
> something to discuss on the KIP or later on the PR, but we could mention
> something on how this plugin would deal with dependency conflicts (e.g.
> different jackson version between broker, plugin(s)).
>
> Also, by potentially supporting multiple plugins for record validation, it
> would be important to consider if it's an all or nothing relation, or
> posible to choose _some_ policies apply per topic.
> I see there's some preference for setting the validation policy name on the
> topic, though this could be cumbersome to operate: topic creation users may
> not be aware of the record validation (similar to CreateTopic/AlterConfig
> policies) and would impose additional coordination.
> Maybe a flag to whether apply policies or not would be a better approach?
>
> 2. Have you consider adding the record metadata to the API? It may be
> useful for logging purposes (e.g. if record validation fails, to log
> topic-partition), or some policies are interested on record metadata (e.g.
> compression, timestamp type, etc.)
>
> 3. A minor comment for consistency regarding the APIs. As far as I have
> seen, we tend to use the name of the object returned directly instead of
> getters notation, see `AlterConfigPolicy.RecordMetadata` [1]. We may rename
> some of the proposed APIs accordingly:
>
> - `RecordProxy#headers()|key()|value()`
> - `TopicMetadata#topicPartition()`
>
> 4. For the `RecordIntrospectionHints`, I'm struggling to see how this may
> be used by the policy developers. Would you mind adding some examples on
> how the policy in general may be used?
> Specifically, `long needKeyBytes|needKeyValue` are difficult to interpret
> to me.
> nit: maybe replace `need*` with `requiresAccess*` or simply `access*`, or
> similar.
>
> Many thanks,
>
> Jorge.
>
> [1]
>
> https://github.com/apache/kafka/blob/3c059133d3008d87f018f2efa4af27027fd5d18e/clients/src/main/java/org/apache/kafka/server/policy/AlterConfigPolicy.java#L41
>
> On Wed, 21 Jun 2023 at 19:08, Kirk True <k...@kirktrue.pro> wrote:
>
> > Hi Edo/Adrian!
> >
> > Thanks for the KIP.
> >
> > I have some questions, and apologies that the may fall under the “stupid”
> > column because I’m not that familiar with this area :)
> >
> > 1. Does record.validation.policy.class.name support multiple classes, or
> > just one? I’m probably not wrapping my head around it, but I’d imagine
> > different policies for different families or groupings of topics, thus
> the
> > need for supporting multiple policies. But if there are multiple then you
> > run the risk of conflicts of ownership of validation, so ¯\_(ツ)_/¯
> >
> > 2. Is there any concern that a validation class may alter the contents of
> > the ByteBuffer of the key and/or value? Perhaps that’s already handled
> > and/or outside the scope of this KIP?
> >
> > 3. What is the benefit to introducing the inner TopicMetadata and
> > RecordProxy interfaces vs. passing the TopicPartition, String (validation
> > policy), and Record into the validate() method directly?
> >
> > Thanks,
> > Kirk
> >
> > > On Jun 20, 2023, at 2:28 AM, Edoardo Comar <edoardli...@gmail.com>
> > wrote:
> > >
> > > Thanks Николай,
> > > We’d like to open a vote next week.
> > > Hopefully getting some more feedback before then.
> > >
> > > Edo
> > >
> > >
> > > On Wed, 7 Jun 2023 at 19:15, Николай Ижиков <nizhi...@apache.org>
> wrote:
> > >
> > >> Hello.
> > >>
> > >> As author of one of related KIPs I’m +1 for this change.
> > >> Long waited feature.
> > >>
> > >>> 7 июня 2023 г., в 19:02, Edoardo Comar <eco...@uk.ibm.com>
> написал(а):
> > >>>
> > >>> Dear all,
> > >>> Adrian and I would like to start a discussion thread on
> > >>>
> > >>> KIP-940: Broker extension point for validating record contents at
> > >> produce time
> > >>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-940%3A+Broker+extension+point+for+validating+record+contents+at+produce+time
> > >>>
> > >>> This KIP proposes a new broker-side extension point (a “record
> > >> validation policy”) that can be used to reject records published by a
> > >> misconfigured client.
> > >>> Though general, it is aimed at the common, best-practice use case of
> > >> defining Kafka message formats with schemas maintained in a schema
> > registry.
> > >>>
> > >>> Please post your feedback, thanks !
> > >>>
> > >>> Edoardo & Adrian
> > >>>
> > >>> Unless otherwise stated above:
> > >>>
> > >>> IBM United Kingdom Limited
> > >>> Registered in England and Wales with number 741598
> > >>> Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
> > >>
> > >>
> >
> >
>

Reply via email to