> When the input messages are raw bytes, we cannot guarantee the
> validation always succeeds because the schema might change. The
> exception is actually thrown in `TypedMessageBuilder#value`.
>
> But since these APIs are stable, we could only fix it by adding the
> documents to describe in which cases could `TypedMessageBuilder#value`
> and `Message#getValue` throw exceptions.

yes, this is a better way to handle this. I agree to optimize the
AUTO_PRODUCE schema in this way. It makes a lot of sense to use the
API correctly.

Thanks,
Bo

Yunze Xu <y...@streamnative.io.invalid> 于2022年12月16日周五 12:16写道:
>
> > It is irresponsible behavior of the producer to leave everything to the 
> > consumer.
>
> I agreed now.
>
> > I think what we need to do is describe the document clearly
>
> IMO, it's a code problem because there is no exception signature for
> `TypedMessageBuilder#value` and `Message#getValue`. The application
> users should catch the exception. It could be better if the exception
> is thrown during `send` or `receive` and wrapped into
> `PulsarClientException`.
>
> When the input messages are raw bytes, we cannot guarantee the
> validation always succeeds because the schema might change. The
> exception is actually thrown in `TypedMessageBuilder#value`.
>
> But since these APIs are stable, we could only fix it by adding the
> documents to describe in which cases could `TypedMessageBuilder#value`
> and `Message#getValue` throw exceptions.
>
> Thanks,
> Yunze
>
> On Thu, Dec 15, 2022 at 12:48 PM 丛搏 <congbobo...@gmail.com> wrote:
> >
> > We also can use BYTES producer, but in BYTES schema, do not use
> > .newMessage(schema0), the message will not carry the schema version.
> > the consumer will not decode correctly.
> >
> > and BYTES schema can't validate the data schema. if the data is empty
> > bytes array, It does not make sense to send it to the broker.
> >
> > It is irresponsible behavior of the producer to leave everything to
> > the consumer. I think AUTO_PRODUCER simplifies the data validation
> > process for users.
> >
> > I think what we need to do is describe the document clearly and
> > distinguish it from BYTES rather than delete or deprecate it.
> >
> > Thanks,
> > Bo
> >
> >
> > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 23:36写道:
> >
> > >
> > > Why not use the following code with a BYTES producer in your case?
> > >
> > > ```java
> > > var schema0 = Schema.AVRO(SchemaDefinition.builder()
> > >     .withJsonDef("student with version0 json def").build();
> > > p.newMessage(schema0).value(schema0.decode(student1)).send();
> > > ...
> > > ```
> > >
> > > Thanks,
> > > Yunze
> > >
> > > On Wed, Dec 14, 2022 at 10:37 PM 丛搏 <congbobo...@gmail.com> wrote:
> > > >
> > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 20:37写道:
> > > > >
> > > > > > how do you can create two Student.class in one java process? and use
> > > > > the same namespace?
> > > > >
> > > > > Could you give an example to show how `AUTO_PRODUCE` schema makes a 
> > > > > difference?
> > > >
> > > > // this is Student use version0, may be data from kafka
> > > > byte[] student1 = autoConsumer.receive().getData();
> > > > // this is Student use version1, may be data from kafka
> > > > byte[] student2 = autoConsumer.receive().getData();
> > > > // send student with version0 schema date
> > > > p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder()
> > > >         .withJsonDef("student with version0 json def").build())))
> > > >         .value(student1).send();
> > > >
> > > > // send student with version1 schema date
> > > > p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(SchemaDefinition.builder()
> > > >         .withJsonDef("student with version1 json def").build())))
> > > >         .value(student1).send();
> > > >
> > > > >
> > > > > But with AUTO_PRODUCE schema, the precondition is that we have a topic
> > > > > that has messages of these two schemas.
> > > > >
> > > > > For example, there is a `bytes-topic` without schema that has two 
> > > > > messages:
> > > > > - msg0: Serialized from `new Student("abc")` (schema v0)
> > > > > - msg1: Serialized from `new Student("abc", 1)` (schema v1)
> > > > >
> > > > > Then you can consume these bytes, and send the messages to **a topic
> > > > > that has registered a schema**.
> > > > > - If the schema is v0, it's okay to send msg0 and msg1 to the topic.
> > > > > But the msg1 will lose some bytes because the schema v0 doesn't have
> > > > > the `age` field.
> > > > > - If the schema is v1, msg0 cannot be sent because msg0 doesn't have
> > > > > the `age` field.
> > > > >
> > > > > So which schema did you expect for this topic?
> > > > if you use AUTO_PRODUCE_BYTES, the message will have the correct schema 
> > > > version.
> > > > link code: 
> > > > https://github.com/apache/pulsar/blob/4129583c418dd68f8303dee601132e2910cdf8e6/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L718-L746
> > > >
> > > > the msg0 will be sent with schema v0
> > > > this msg1 will be sent with schema v1
> > > > >
> > > > > This example also shows AUTO_PRODUCE schema performs validation at
> > > > > producer side.
> > > > >
> > > > > However, if we just send msg0 and msg1 to a topic without schema. Then
> > > > > it will be consumer's responsibility to determine whether the received
> > > > > message is valid.
> > > > >
> > > > > ```java
> > > > > var bytes = consumer.receive(); // bytes
> > > > > var student = Schema.AVRO(Student.class).decode(bytes);
> > > > > ```
> > > > >
> > > > > - If the `Student` is v0, msg0 and msg1 can be decoded successfully.
> > > > > - If the `Student` is v1, decoding msg0 will throw an exception.
> > > > >
> > > > > Since all messages are stored in the topic, the downstream side
> > > > > (consumer) can catch the exception to discard the bytes without the
> > > > > expected schema.
> > > > >
> > > > > But if the validation fails at the producer side, there is a chance
> > > > > that msg0 is lost. In addition, let's see the producer and consumer
> > > > > code in this case.
> > > > >
> > > > > ```
> > > > > producer.send(msg0); // validation happens at the producer side
> > > > > ```
> > > > >
> > > > > ```
> > > > > var msg = consumer.receive();
> > > > > var student = msg.getValue(); // validation happens again, though it
> > > > > has already been validated before
> > > > > ```
> > > > >
> > > > > Thanks,
> > > > > Yunze
> > > > >
> > > > > On Wed, Dec 14, 2022 at 3:11 PM 丛搏 <congbobo...@gmail.com> wrote:
> > > > > >
> > > > > > >
> > > > > > > > the user only creates one producer to send all Kafka topic 
> > > > > > > > data, if
> > > > > > > using Pulsar schema, the user needs to create all schema 
> > > > > > > producers in
> > > > > > > a map
> > > > > > >
> > > > > > > It doesn't make sense to me. If the source topic has messages of
> > > > > > > multiple schemas, why did you try to sink them into the same topic
> > > > > > > with a schema? The key point of AUTO_PRODUCE schema is to 
> > > > > > > download the
> > > > > > > schema to validate the source messages. But if the schema of the 
> > > > > > > topic
> > > > > > > evolved, the left messages from the source topic could not be 
> > > > > > > sent to
> > > > > > > the topic.
> > > > > > >
> > > > > > Let me give you an example, AvroSchema will have multi-version,
> > > > > > the version(0) :
> > > > > > Student {
> > > > > > String name;
> > > > > > }
> > > > > > the version(1) :
> > > > > > Student {
> > > > > > String name;
> > > > > > int age;
> > > > > > }
> > > > > > how do you can create two Student.class in one java process? and use
> > > > > > the same namespace?
> > > > > > It's not only the schema type changes it also will have 
> > > > > > multi-version schema.
> > > > > > In this case, how do you create two producers with version(0) and 
> > > > > > version(1)?
> > > > > >
> > > > > > > The most confusing part is that AUTO_PRODUCE schema will perform
> > > > > > > message format validation before send. It's transparent to users 
> > > > > > > and
> > > > > > > intuitive. IMO, it's better to call validate explicitly like
> > > > > > >
> > > > > > > ```java
> > > > > > > producer.newMessage().value(bytes).validate().sendAsync();
> > > > > > > ```
> > > > > > >
> > > > > > > There are two benefits:
> > > > > > > 1. It's clear that the message validation happens before sending.
> > > > > > > 2. If users don't want to validate before sending, they can 
> > > > > > > choose to
> > > > > > > send the bytes directly and validate the message during 
> > > > > > > consumption.
> > > > > > It only uses `schema.validate()` is enough, data validation does not
> > > > > > belong to the pulsar message, and we can add a usage description in
> > > > > > the schema doc.
> > > > > > >
> > > > > > > The performance problem of the AUTO_PRODUCE schema is that the
> > > > > > > validation happens twice and it cannot be controlled.
> > > > > >
> > > > > > Our data verification is the behavior of the client, not the 
> > > > > > behavior
> > > > > > of the broker. Therefore, we cannot effectively verify that bytes 
> > > > > > are
> > > > > > generated by a specific schema. I think this is something that users
> > > > > > should consider rather than something that pulsar should guarantee
> > > > > > because you can't control the data sent by users that is generated 
> > > > > > by
> > > > > > this schema only for client verification. so, we don't need to 
> > > > > > verify
> > > > > > twice. Unless we verify in the broker, but this is an overhead, we 
> > > > > > can
> > > > > > add config to control, but is it really necessary?
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >
> > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 12:40写道:
> > > > > > >
> > > > > > > > the user only creates one producer to send all Kafka topic 
> > > > > > > > data, if
> > > > > > > using Pulsar schema, the user needs to create all schema 
> > > > > > > producers in
> > > > > > > a map
> > > > > > >
> > > > > > > It doesn't make sense to me. If the source topic has messages of
> > > > > > > multiple schemas, why did you try to sink them into the same topic
> > > > > > > with a schema? The key point of AUTO_PRODUCE schema is to 
> > > > > > > download the
> > > > > > > schema to validate the source messages. But if the schema of the 
> > > > > > > topic
> > > > > > > evolved, the left messages from the source topic could not be 
> > > > > > > sent to
> > > > > > > the topic.
> > > > > > >
> > > > > > > The most confusing part is that AUTO_PRODUCE schema will perform
> > > > > > > message format validation before send. It's transparent to users 
> > > > > > > and
> > > > > > > intuitive. IMO, it's better to call validate explicitly like
> > > > > > >
> > > > > > > ```java
> > > > > > > producer.newMessage().value(bytes).validate().sendAsync();
> > > > > > > ```
> > > > > > >
> > > > > > > There are two benefits:
> > > > > > > 1. It's clear that the message validation happens before sending.
> > > > > > > 2. If users don't want to validate before sending, they can 
> > > > > > > choose to
> > > > > > > send the bytes directly and validate the message during 
> > > > > > > consumption.
> > > > > > >
> > > > > > > The performance problem of the AUTO_PRODUCE schema is that the
> > > > > > > validation happens twice and it cannot be controlled.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yunze
> > > > > > >
> > > > > > > On Wed, Dec 14, 2022 at 12:01 PM 丛搏 <bog...@apache.org> wrote:
> > > > > > > >
> > > > > > > > Hi, Yunze:
> > > > > > > >
> > > > > > > > Yunze Xu <y...@streamnative.io.invalid> 于2022年12月14日周三 02:26写道:
> > > > > > > >
> > > > > > > > > First, how do you guarantee the schema can be used to encode 
> > > > > > > > > the raw
> > > > > > > > > bytes whose format is unknown?
> > > > > > > > I think this is what the user needs to ensure that the user 
> > > > > > > > knows all
> > > > > > > > the schema from the Kafka topic and the date(bytes[]) that the 
> > > > > > > > user
> > > > > > > > can send with a pulsar schema
> > > > > > > > >
> > > > > > > > > Second, messages that cannot be encoded by the schema can 
> > > > > > > > > only be
> > > > > > > > > discarded, i.e. message lost.
> > > > > > > > If the encoding fails, it proves that the user does not know 
> > > > > > > > how to
> > > > > > > > convert Kafka date's schema to pulsar schema, which is the 
> > > > > > > > user's own
> > > > > > > > problem.
> > > > > > > > >
> > > > > > > > > Third, schema in Pulsar is convenient because it can support 
> > > > > > > > > sending
> > > > > > > > > any object of type `T` and the Pulsar client is responsible to
> > > > > > > > > serialize `T` to the bytes. However, when using AUTO_PRODUCE 
> > > > > > > > > schema,
> > > > > > > > > the producer still sends raw bytes.
> > > > > > > > the user only creates one producer to send all Kafka topic 
> > > > > > > > data, if
> > > > > > > > using Pulsar schema, the user needs to create all schema 
> > > > > > > > producers in
> > > > > > > > a map, and get the schema producer to send a message.
> > > > > > > >
> > > > > > > >
> > > > > > > > In my understanding, AUTO_PRODUCE mainly reduces the number of
> > > > > > > > producers created by the client, which will bring convenience 
> > > > > > > > to users
> > > > > > > > in migrating data. Instead of dealing with unknown schema data. 
> > > > > > > > If you
> > > > > > > > want to use it correctly, you must know the schema of all data, 
> > > > > > > > which
> > > > > > > > can be converted into a pulsar schema. Otherwise, it would be 
> > > > > > > > best if
> > > > > > > > you handled it yourself using the bytes schema.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Bo

Reply via email to