gaoran10 commented on code in PR #24328:
URL: https://github.com/apache/pulsar/pull/24328#discussion_r2153514409
##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Provide an ability to leverage third-party schema registry service for
Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing third-party schema
system.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer
subscription commands.
+- Provide a way to build external schema management system clients and
integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is decoupling schema management from Pulsar messaging
protocols,
+and making Pulsar client have the ability to leverage external schema registry
service to manage schema.
+The external schema registry is responsible for managing the schema, the
broker doesn't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer
and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface
to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the
customized schemas can register schema or get schema with these methods.
+The encoded messaging depends on the external schema system, Pulsar broker
just treats the message as bytes data, and it won't change the message version
of message metadata.
+Unlike Pulsar using schema version to identify the schema, some external
schema system use the schema ID to identify the schema,
+if using external schema system the Pulsar message metadata will not maintain
schema ID, the customized decoding method can try to retrieve the schema ID
from the encoded data.
+
+Example usage
+```java
+public void workWithExternalSchemaRegistry() throws Exception {
+ Map<String, String> srConfig = new HashedMap<>();
+ srConfig.put("schema.registry.url", "http://localhost:8001");
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .schemaInfoProviderFactory(new
KafkaSchemaInfoProviderFactory(srConfig))
+ .build();
+
+ String topic = "t1";
+ Schema<User> schema = KafkaSchemas.JSON(User.class);
+
+ Producer<User> producer = client.newProducer(schema)
+ .topic(topic)
+ .create();
+
+ Consumer<User> consumer = client.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new User("name-" + i, 10 + i));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<User> message = consumer.receive();
+ consumer.acknowledge(message);
+ }
+
+ client.close();
+}
+```
+
+Messaging protocols changes
+
+Ignore schema info while creating Producer, ignore register schema before
sending messages, the schema management
+```java
+public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask,
ConnectionHandler.Connection {
+
+ @Override
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+ // ...
+
+ if (schema != null && schema.getSchemaInfoProvider() != null &&
schema.getSchemaInfoProvider().isExternal()) {
+ try {
+ // set schemaInfo to null if the schema info provider is
external
+ schemaInfo = null;
+ } catch (Exception e) {
+ log.error("Failed to register schema.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ // send create producer request
+ }
+
+ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg,
SendCallback callback, long expectedCnxEpoch) {
+ // ...
+
+ if (schema.getSchemaInfoProvider().isExternal()) {
+ // don't register schema if external schema registry service, the
register method can be integrated in the messaging encode method.
+ return;
+ }
+ // getOrCreateSchemaAsync
+ }
+
+}
+```
+
+Ignore schema info while adding consumer subscription.
+```java
+public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandler.Connection {
+
+ @Override
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+ // ...
+
+ // only external schema info provider need to register schema
+ if (schema.getSchemaInfoProvider() != null &&
schema.getSchemaInfoProvider().isExternal()) {
+ try {
+ // don't set schema info for schema registry schema
+ si = null;
+ } catch (Exception e) {
+ log.error("Failed to register schema.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+}
+```
+
+## Public-facing Changes
+
+Add new methods for `SchemaInfoProvider` interface.
+The `SchemaInfoProvider` provides necessary params for connecting to the
external schema registry service with the method `getConfigs`.
+If the schema info provider is external, the new producer command, consumer
subscribe command will treat the schema as bytes schema, the broker will ignore
schema validation.
Review Comment:
Introduce a new schema type `EXTERNAL` and add a section for schema type
change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]