Hello, Paul. Thanks for the feedback!
> How does the producer get notified of a failure to pass the RecordPolicy for > one or more records, The producer will receive `PolicyViolationException`. > how should it recover? Obvious answers are Producer should switch to the correct schema OR producer should be stopped abnormally. > Assuming a RecordPolicy can be loaded by a broker without restarting it, what > is the mechanism by which this happens? Thanks for the good question: Think we should choose from one of the following alternatives: 1. We allow the users to use any `RecordsPolicy` implementation. In this case, Kafka administrator is responsible for putting a custom jar with the `RecordsPolicy` implementation to every Kafka Broker classpath(libs directory). AFAIK this selected as a base scenario for an `Authorizer` implementation. 2. We allow the users to select implementation from some predefined list that Kafka developers included in some release. In this case, every Kafka broker will have a specific implementation from the Kafka release itself. We can go with this because wrong `RecordsPolicy` implementation can affect broker stability and performance. I, personally, prefer first choice. > Must writes to replicas also adhere to the RecordPolicy? I think we should check only on the leader. > Must already-written written records adhere to RecordPolicy, if it is added > later? No. > managing schema outside of kafka itself using something like the confluent > schema registry. > Maybe you can say why RecordPolicy would be better? 1. Can't agree that a commercial product is an alternative to the proposed open-source API. Moreover, I propose to add an API that has a little overlap with such a big product as a Schema Registry as a whole. 2. AFAIU Confluent Schema Registry should use a similar technique to ensure records schema in the topic. My understanding based on Schema Registry docs [1]. Specifically: - Confluent Schema Registry has custom topic configuration options to enable or disable schema checks. - "With this configuration, if a message is produced to the topic my-topic-sv that does not have a valid schema for the value of the message, an error is returned to the producer, and the message is discarded." [1] https://docs.confluent.io/platform/current/schema-registry/schema-validation.html > 1 дек. 2020 г., в 06:15, Paul Whalen <pgwha...@gmail.com> написал(а): > > Nikolay, > > I'm not a committer, but perhaps I can start the discussion. I've had the > urge for a similar feature after being bitten by writing a poorly formed > record to a topic - it's natural to want to push schema validation into the > broker, since that's the way regular databases work. But I'm a bit > skeptical of the complexity it introduces. Some questions I think would > have to be answered that aren't currently in the KIP: > - How does the producer get notified of a failure to pass the RecordPolicy > for one or more records, and how should it recover? > - Assuming a RecordPolicy can be loaded by a broker without restarting it, > what is the mechanism by which this happens? > - Must writes to replicas also adhere to the RecordPolicy? > - Must already-written written records adhere to RecordPolicy, if it is > added later? > > Also, the rejected alternatives section is blank - I see the status quo as > at least one alternative, in particular, managing schema outside of kafka > itself using something like the confluent schema registry. Maybe you can > say why RecordPolicy would be better? > > Best, > Paul > > On Mon, Nov 30, 2020 at 9:58 AM Nikolay Izhikov <nizhi...@apache.org> wrote: > >> Friendly bump. >> >> Please, share your feedback. >> Do we need those feature in the Kafka? >> >>> 23 нояб. 2020 г., в 12:09, Nikolay Izhikov <nizhikov....@gmail.com> >> написал(а): >>> >>> Hello! >>> >>> Any additional feedback on this KIP? >>> I believe this API can be useful for Kafka users. >>> >>> >>>> 18 нояб. 2020 г., в 14:47, Nikolay Izhikov <nizhikov....@gmail.com> >> написал(а): >>>> >>>> Hello, Ismael. >>>> >>>> Thanks for the feedback. >>>> You are right, I read public interfaces definition not carefully :) >>>> >>>> Updated KIP according to your objection. >>>> I propose to expose 2 new public interfaces: >>>> >>>> ``` >>>> package org.apache.kafka.common; >>>> >>>> public interface Record { >>>> long timestamp(); >>>> >>>> boolean hasKey(); >>>> >>>> ByteBuffer key(); >>>> >>>> boolean hasValue(); >>>> >>>> ByteBuffer value(); >>>> >>>> Header[] headers(); >>>> } >>>> >>>> package org.apache.kafka.server.policy; >>>> >>>> public interface RecordsPolicy extends Configurable, AutoCloseable { >>>> void validate(String topic, int partition, Iterable<? extends Record> >> records) throws PolicyViolationException; >>>> } >>>> ``` >>>> >>>> Data exposed in Record and in validate method itself seems to enough >> for implementation of any reasonable Policy. >>>> >>>>> 17 нояб. 2020 г., в 19:44, Ismael Juma <ism...@juma.me.uk> написал(а): >>>>> >>>>> Thanks for the KIP. The policy interface is a small part of this. You >> also >>>>> have to describe the new public API that will be exposed as part of >> this. >>>>> For example, there is no public `Records` class. >>>>> >>>>> Ismael >>>>> >>>>> On Tue, Nov 17, 2020 at 8:24 AM Nikolay Izhikov <nizhi...@apache.org> >> wrote: >>>>> >>>>>> Hello. >>>>>> >>>>>> I want to start discussion of the KIP-686 [1]. >>>>>> I propose to introduce the new public interface for it RecordsPolicy: >>>>>> >>>>>> ``` >>>>>> public interface RecordsPolicy extends Configurable, AutoCloseable { >>>>>> void validate(String topic, Records records) throws >>>>>> PolicyViolationException; >>>>>> } >>>>>> ``` >>>>>> >>>>>> and a two new configuration options: >>>>>> * `records.policy.class.name: String` - sets class name of the >>>>>> implementation of RecordsPolicy for the specific topic. >>>>>> * `records.policy.enabled: Boolean` - enable or disable records >> policy >>>>>> for the topic. >>>>>> >>>>>> If `records.policy.enabled=true` then an instance of the >> `RecordsPolicy` >>>>>> should check each Records batch before applying data to the log. >>>>>> If `PolicyViolationException` thrown from the >> `RecordsPolicy#validate` >>>>>> method then no data added to the log and the client receives an error. >>>>>> >>>>>> Motivation: >>>>>> >>>>>> During the adoption of Kafka in large enterprises, it's important to >>>>>> guarantee data in some topic conforms to the specific format. >>>>>> When data are written and read by the different applications >> developed by >>>>>> the different teams it's hard to guarantee data format using only >> custom >>>>>> SerDe, because malicious applications can use different SerDe. >>>>>> The data format can be enforced only on the broker side. >>>>>> >>>>>> Please, share your feedback. >>>>>> >>>>>> [1] >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker >>>> >>> >> >>