Hello, Paul.

Thanks for the feedback!

> How does the producer get notified of a failure to pass the RecordPolicy for 
> one or more records, 

The producer will receive `PolicyViolationException`.

> how should it recover?

Obvious answers are

Producer should switch to the correct schema OR producer should be stopped 
abnormally.

> Assuming a RecordPolicy can be loaded by a broker without restarting it, what 
> is the mechanism by which this happens?

Thanks for the good question:

Think we should choose from one of the following alternatives:

        1. We allow the users to use any `RecordsPolicy` implementation.
                In this case, Kafka administrator is responsible for putting a 
custom jar with the `RecordsPolicy` implementation to every Kafka Broker 
classpath(libs directory).
                AFAIK this selected as a base scenario for an `Authorizer` 
implementation.

        2. We allow the users to select implementation from some predefined 
list that Kafka developers included in some release.
                In this case, every Kafka broker will have a specific 
implementation from the Kafka release itself.
                We can go with this because wrong `RecordsPolicy` 
implementation can affect broker stability and performance.

I, personally, prefer first choice.

> Must writes to replicas also adhere to the RecordPolicy?

I think we should check only on the leader.

> Must already-written written records adhere to RecordPolicy, if it is added 
> later?

No.

> managing schema outside of kafka itself using something like the confluent 
> schema registry.  
> Maybe you can say why RecordPolicy would be better?

1. Can't agree that a commercial product is an alternative to the proposed 
open-source API.
Moreover, I propose to add an API that has a little overlap with such a big 
product as a Schema Registry as a whole.

2. AFAIU Confluent Schema Registry should use a similar technique to ensure 
records schema in the topic.
My understanding based on Schema Registry docs [1]. Specifically:
        - Confluent Schema Registry has custom topic configuration options to 
enable or disable schema checks.
        - "With this configuration, if a message is produced to the topic 
my-topic-sv that does not have a valid schema for the value of the message, an 
error is returned to the producer, and the message is discarded."

[1] 
https://docs.confluent.io/platform/current/schema-registry/schema-validation.html


> 1 дек. 2020 г., в 06:15, Paul Whalen <pgwha...@gmail.com> написал(а):
> 
> Nikolay,
> 
> I'm not a committer, but perhaps I can start the discussion.  I've had the
> urge for a similar feature after being bitten by writing a poorly formed
> record to a topic - it's natural to want to push schema validation into the
> broker, since that's the way regular databases work.  But I'm a bit
> skeptical of the complexity it introduces.  Some questions I think would
> have to be answered that aren't currently in the KIP:
> - How does the producer get notified of a failure to pass the RecordPolicy
> for one or more records, and how should it recover?
> - Assuming a RecordPolicy can be loaded by a broker without restarting it,
> what is the mechanism by which this happens?
> - Must writes to replicas also adhere to the RecordPolicy?
> - Must already-written written records adhere to RecordPolicy, if it is
> added later?
> 
> Also, the rejected alternatives section is blank - I see the status quo as
> at least one alternative, in particular, managing schema outside of kafka
> itself using something like the confluent schema registry.  Maybe you can
> say why RecordPolicy would be better?
> 
> Best,
> Paul
> 
> On Mon, Nov 30, 2020 at 9:58 AM Nikolay Izhikov <nizhi...@apache.org> wrote:
> 
>> Friendly bump.
>> 
>> Please, share your feedback.
>> Do we need those feature in the Kafka?
>> 
>>> 23 нояб. 2020 г., в 12:09, Nikolay Izhikov <nizhikov....@gmail.com>
>> написал(а):
>>> 
>>> Hello!
>>> 
>>> Any additional feedback on this KIP?
>>> I believe this API can be useful for Kafka users.
>>> 
>>> 
>>>> 18 нояб. 2020 г., в 14:47, Nikolay Izhikov <nizhikov....@gmail.com>
>> написал(а):
>>>> 
>>>> Hello, Ismael.
>>>> 
>>>> Thanks for the feedback.
>>>> You are right, I read public interfaces definition not carefully :)
>>>> 
>>>> Updated KIP according to your objection.
>>>> I propose to expose 2 new public interfaces:
>>>> 
>>>> ```
>>>> package org.apache.kafka.common;
>>>> 
>>>> public interface Record {
>>>>  long timestamp();
>>>> 
>>>>  boolean hasKey();
>>>> 
>>>>  ByteBuffer key();
>>>> 
>>>>  boolean hasValue();
>>>> 
>>>>  ByteBuffer value();
>>>> 
>>>>  Header[] headers();
>>>> }
>>>> 
>>>> package org.apache.kafka.server.policy;
>>>> 
>>>> public interface RecordsPolicy extends Configurable, AutoCloseable {
>>>>  void validate(String topic, int partition, Iterable<? extends Record>
>> records) throws PolicyViolationException;
>>>> }
>>>> ```
>>>> 
>>>> Data exposed in Record and in validate method itself seems to enough
>> for implementation of any reasonable Policy.
>>>> 
>>>>> 17 нояб. 2020 г., в 19:44, Ismael Juma <ism...@juma.me.uk> написал(а):
>>>>> 
>>>>> Thanks for the KIP. The policy interface is a small part of this. You
>> also
>>>>> have to describe the new public API that will be exposed as part of
>> this.
>>>>> For example, there is no public `Records` class.
>>>>> 
>>>>> Ismael
>>>>> 
>>>>> On Tue, Nov 17, 2020 at 8:24 AM Nikolay Izhikov <nizhi...@apache.org>
>> wrote:
>>>>> 
>>>>>> Hello.
>>>>>> 
>>>>>> I want to start discussion of the KIP-686 [1].
>>>>>> I propose to introduce the new public interface for it RecordsPolicy:
>>>>>> 
>>>>>> ```
>>>>>> public interface RecordsPolicy extends Configurable, AutoCloseable {
>>>>>> void validate(String topic, Records records) throws
>>>>>> PolicyViolationException;
>>>>>> }
>>>>>> ```
>>>>>> 
>>>>>> and a two new configuration options:
>>>>>> * `records.policy.class.name: String` - sets class name of the
>>>>>> implementation of RecordsPolicy for the specific topic.
>>>>>> * `records.policy.enabled: Boolean` - enable or disable records
>> policy
>>>>>> for the topic.
>>>>>> 
>>>>>> If `records.policy.enabled=true` then an instance of the
>> `RecordsPolicy`
>>>>>> should check each Records batch before applying data to the log.
>>>>>> If `PolicyViolationException`  thrown from the
>> `RecordsPolicy#validate`
>>>>>> method then no data added to the log and the client receives an error.
>>>>>> 
>>>>>> Motivation:
>>>>>> 
>>>>>> During the adoption of Kafka in large enterprises, it's important to
>>>>>> guarantee data in some topic conforms to the specific format.
>>>>>> When data are written and read by the different applications
>> developed by
>>>>>> the different teams it's hard to guarantee data format using only
>> custom
>>>>>> SerDe, because malicious applications can use different SerDe.
>>>>>> The data format can be enforced only on the broker side.
>>>>>> 
>>>>>> Please, share your feedback.
>>>>>> 
>>>>>> [1]
>>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
>>>> 
>>> 
>> 
>> 

Reply via email to