Denovo1998 commented on code in PR #24328:
URL: https://github.com/apache/pulsar/pull/24328#discussion_r2102416628


##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party 
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema 
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with 
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage 
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema 
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an 
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.

Review Comment:
   The specific meaning of "not caring" can be further explained in detail. For 
example: the Broker side only treats the message as raw byte data and no longer 
performs additional processing such as schema compatibility check. This design 
makes the Broker more lightweight, thereby significantly improving the overall 
performance of the system.
   
   Regarding the advantages of third-party schema registry services, it is 
recommended to elaborate further. For example:
   
   1. Taking Confluent Schema Registry as an example, it can achieve unified 
Schema management between Kafka and Pulsar.
   2. This service can also achieve collaborative management between Pulsar 
topic and data lake table metadata.



##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party 
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema 
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with 
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage 
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema 
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an 
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Provide an ability to leverage third-party schema registry service for 
Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing third-party schema 
system.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer 
subscription commands.
+- Provide a way to build external schema management system clients and 
integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is decoupling schema management from Pulsar messaging 
protocols,
+and making Pulsar client have the ability to leverage external schema registry 
service to manage schema.
+The external schema registry is responsible for managing the schema, the 
broker doesn't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer 
and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface 
to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the 
customized schemas can register schema or get schema with these methods.
+The encoded messaging depends on the external schema system, Pulsar broker 
just treats the message as bytes data, and it won't change the message version 
of message metadata.
+Unlike Pulsar using schema version to identify the schema, some external 
schema system use the schema ID to identify the schema,
+if using external schema system the Pulsar message metadata will not maintain 
schema ID, the customized decoding method can try to retrieve the schema ID 
from the encoded data.
+
+Example usage
+```java
+public void workWithExternalSchemaRegistry() throws Exception {
+    Map<String, String> srConfig = new HashedMap<>();
+    srConfig.put("schema.registry.url", "http://localhost:8001";);
+
+    PulsarClient client = PulsarClient.builder()
+            .serviceUrl("pulsar://localhost:6650")
+            .schemaInfoProviderFactory(new 
KafkaSchemaInfoProviderFactory(srConfig))
+            .build();
+
+    String topic = "t1";
+    Schema<User> schema = KafkaSchemas.JSON(User.class);
+
+    Producer<User> producer = client.newProducer(schema)
+            .topic(topic)
+            .create();
+
+    Consumer<User> consumer = client.newConsumer(schema)
+            .topic(topic)
+            .subscriptionName("sub")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+    for (int i = 0; i < 10; i++) {
+        producer.send(new User("name-" + i, 10 + i));
+    }
+
+    for (int i = 0; i < 10; i++) {
+        Message<User> message = consumer.receive();
+        consumer.acknowledge(message);
+    }
+
+    client.close();
+}
+```
+
+Messaging protocols changes
+
+Ignore schema info while creating Producer, ignore register schema before 
sending messages, the schema management 
+```java
+public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, 
ConnectionHandler.Connection {
+
+    @Override
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+        // ...
+
+        if (schema != null && schema.getSchemaInfoProvider() != null && 
schema.getSchemaInfoProvider().isExternal()) {
+            try {
+                // set schemaInfo to null if the schema info provider is 
external
+                schemaInfo = null;
+            } catch (Exception e) {
+                log.error("Failed to register schema.", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        // send create producer request
+    }
+
+    private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, 
SendCallback callback, long expectedCnxEpoch) {
+        // ...
+
+        if (schema.getSchemaInfoProvider().isExternal()) {
+            // don't register schema if external schema registry service, the 
register method can be integrated in the messaging encode method.
+            return;
+        }
+        // getOrCreateSchemaAsync
+    }
+
+}
+```
+
+Ignore schema info while adding consumer subscription.
+```java
+public class ConsumerImpl<T> extends ConsumerBase<T> implements 
ConnectionHandler.Connection {
+
+    @Override
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+        // ...
+
+        // only external schema info provider need to register schema
+        if (schema.getSchemaInfoProvider() != null && 
schema.getSchemaInfoProvider().isExternal()) {
+            try {
+                // don't set schema info for schema registry schema
+                si = null;
+            } catch (Exception e) {
+                log.error("Failed to register schema.", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+}
+```
+
+## Public-facing Changes
+
+Add new methods for `SchemaInfoProvider` interface.
+The `SchemaInfoProvider` provides necessary params for connecting to the 
external schema registry service with the method `getConfigs`.
+If the schema info provider is external, the new producer command, consumer 
subscribe command will treat the schema as bytes schema, the broker will ignore 
schema validation.
+```java
+public interface SchemaInfoProvider {
+
+    /**
+      * Returns the configs of the schema registry service, such as URL, 
authentication params.
+      */
+    default Map<String, String> getConfigs() {
+        return Collections.emptyMap();
+    }
+
+    /**
+      * It's used to determine whether the SchemaInfoProvider is external or 
not.
+      */
+    default boolean isExternal() {
+        return false;
+    }
+
+}
+```
+
+Add a new interface `SchemaInfoProviderFactory`, it's used to initialize 
`SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`.
+```java
+public interface SchemaInfoProviderFactory {
+
+    SchemaInfoProvider of(String topic);
+
+}
+```
+
+The client build supports setting the `SchemaInfoProviderFactory`.
+```java
+public interface ClientBuilder extends Serializable, Cloneable {
+
+    ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory 
schemaInfoProviderFactory);
+
+}
+```
+
+The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`.
+```java
+public class ClientConfigurationData implements Serializable, Cloneable {
+
+    @JsonIgnore
+    private transient SchemaInfoProviderFactory schemaInfoProviderFactory;
+
+}
+```
+
+The customized schema can get the `SchemaInfoProvider` and retrieve the 
configs from it.
+```java
+public interface Schema {
+
+    /**
+     * When setting schema info provider for schema, the schema can retrieve 
the configs.
+     */
+    default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+    }
+
+    /**
+      * Returns the schema info provider.
+      *
+      * @return a {@code SchemaInfoProvider} representing the schema info 
provider
+      */
+    default SchemaInfoProvider getSchemaInfoProvider() {
+        return null;
+    }
+
+}
+```
+
+# Pulsar Function
+
+For supporting using third-party schema registry service in Pulsar Function,
+the function configurations should support setting the 
`SchemaInfoProviderFactory` and schema registry auth configurations while 
initializing the Pulsar client.
+
+# Security Considerations
+Users can provide security configuration while generating the 
`SchemaInfoProviderFactory` instance,
+the factory can transfer the security configuration to the 
`SchemaInfoProvider` instance.
+
+# Pulsar-GEO replication
+If users can use third-party schema registry service, it provides a new way to 
manage scheme for geo-replicated topics.

Review Comment:
   It can be slightly expanded a bit. For example, if a user uses an external, 
globally available Schema Registry (such as a cross-region replicated Confluent 
Schema Registry), then the Schema synchronization issue in the geo-replication 
scenario can be guaranteed by this external system, simplifying the Schema 
synchronization needs of Pulsar itself.



##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party 
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema 
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with 
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage 
third-party schema registry services in Pulsar client.

Review Comment:
   Here it might be worth mentioning.
   
https://github.com/apache/pulsar/blob/bdf6277456db9df1618adcd07b588f3a94714796/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java#L29
   
   Support for third-party schema registration services can also be implemented 
through `SchemaStorage`. In the following Motivation section, it can be 
explained what advantages this PIP implementation has over implementing 
`SchemaStorag`·. This makes this PIP more persuasive.



##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party 
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema 
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with 
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage 
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema 
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an 
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Provide an ability to leverage third-party schema registry service for 
Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing third-party schema 
system.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer 
subscription commands.
+- Provide a way to build external schema management system clients and 
integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is decoupling schema management from Pulsar messaging 
protocols,
+and making Pulsar client have the ability to leverage external schema registry 
service to manage schema.
+The external schema registry is responsible for managing the schema, the 
broker doesn't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer 
and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface 
to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the 
customized schemas can register schema or get schema with these methods.

Review Comment:
   What should be the behavior when the encode method of a custom Schema fails 
when trying to register the Schema with an external Schema Registry (e.g., due 
to network issues, authentication failure), or when the decode method cannot 
find the corresponding Schema in the external Schema Registry based on the ID 
in the message?
   
   PIP might be able to suggest that implementers consider these scenarios, for 
example, whether to throw a specific exception, return null, or have a retry 
mechanism.
   
   Although the specific implementation is up to the user, it would be better 
to provide some guidance?



##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party 
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema 
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with 
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage 
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema 
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an 
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Provide an ability to leverage third-party schema registry service for 
Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing third-party schema 
system.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer 
subscription commands.
+- Provide a way to build external schema management system clients and 
integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is decoupling schema management from Pulsar messaging 
protocols,
+and making Pulsar client have the ability to leverage external schema registry 
service to manage schema.
+The external schema registry is responsible for managing the schema, the 
broker doesn't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer 
and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface 
to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the 
customized schemas can register schema or get schema with these methods.
+The encoded messaging depends on the external schema system, Pulsar broker 
just treats the message as bytes data, and it won't change the message version 
of message metadata.
+Unlike Pulsar using schema version to identify the schema, some external 
schema system use the schema ID to identify the schema,
+if using external schema system the Pulsar message metadata will not maintain 
schema ID, the customized decoding method can try to retrieve the schema ID 
from the encoded data.
+
+Example usage
+```java
+public void workWithExternalSchemaRegistry() throws Exception {
+    Map<String, String> srConfig = new HashedMap<>();
+    srConfig.put("schema.registry.url", "http://localhost:8001";);
+
+    PulsarClient client = PulsarClient.builder()
+            .serviceUrl("pulsar://localhost:6650")
+            .schemaInfoProviderFactory(new 
KafkaSchemaInfoProviderFactory(srConfig))
+            .build();
+
+    String topic = "t1";
+    Schema<User> schema = KafkaSchemas.JSON(User.class);
+
+    Producer<User> producer = client.newProducer(schema)
+            .topic(topic)
+            .create();
+
+    Consumer<User> consumer = client.newConsumer(schema)
+            .topic(topic)
+            .subscriptionName("sub")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+    for (int i = 0; i < 10; i++) {
+        producer.send(new User("name-" + i, 10 + i));
+    }
+
+    for (int i = 0; i < 10; i++) {
+        Message<User> message = consumer.receive();
+        consumer.acknowledge(message);
+    }
+
+    client.close();
+}
+```
+
+Messaging protocols changes
+
+Ignore schema info while creating Producer, ignore register schema before 
sending messages, the schema management 
+```java
+public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, 
ConnectionHandler.Connection {
+
+    @Override
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+        // ...
+
+        if (schema != null && schema.getSchemaInfoProvider() != null && 
schema.getSchemaInfoProvider().isExternal()) {
+            try {
+                // set schemaInfo to null if the schema info provider is 
external
+                schemaInfo = null;
+            } catch (Exception e) {
+                log.error("Failed to register schema.", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        // send create producer request
+    }
+
+    private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, 
SendCallback callback, long expectedCnxEpoch) {
+        // ...
+
+        if (schema.getSchemaInfoProvider().isExternal()) {
+            // don't register schema if external schema registry service, the 
register method can be integrated in the messaging encode method.
+            return;
+        }
+        // getOrCreateSchemaAsync
+    }
+
+}
+```
+
+Ignore schema info while adding consumer subscription.
+```java
+public class ConsumerImpl<T> extends ConsumerBase<T> implements 
ConnectionHandler.Connection {
+
+    @Override
+    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+        // ...
+
+        // only external schema info provider need to register schema
+        if (schema.getSchemaInfoProvider() != null && 
schema.getSchemaInfoProvider().isExternal()) {
+            try {
+                // don't set schema info for schema registry schema
+                si = null;
+            } catch (Exception e) {
+                log.error("Failed to register schema.", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+    }
+
+}
+```
+
+## Public-facing Changes
+
+Add new methods for `SchemaInfoProvider` interface.
+The `SchemaInfoProvider` provides necessary params for connecting to the 
external schema registry service with the method `getConfigs`.
+If the schema info provider is external, the new producer command, consumer 
subscribe command will treat the schema as bytes schema, the broker will ignore 
schema validation.
+```java
+public interface SchemaInfoProvider {
+
+    /**
+      * Returns the configs of the schema registry service, such as URL, 
authentication params.
+      */
+    default Map<String, String> getConfigs() {
+        return Collections.emptyMap();
+    }
+
+    /**
+      * It's used to determine whether the SchemaInfoProvider is external or 
not.
+      */
+    default boolean isExternal() {
+        return false;
+    }
+
+}
+```
+
+Add a new interface `SchemaInfoProviderFactory`, it's used to initialize 
`SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`.
+```java
+public interface SchemaInfoProviderFactory {
+
+    SchemaInfoProvider of(String topic);
+
+}
+```
+
+The client build supports setting the `SchemaInfoProviderFactory`.
+```java
+public interface ClientBuilder extends Serializable, Cloneable {
+
+    ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory 
schemaInfoProviderFactory);
+
+}
+```
+
+The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`.
+```java
+public class ClientConfigurationData implements Serializable, Cloneable {
+
+    @JsonIgnore
+    private transient SchemaInfoProviderFactory schemaInfoProviderFactory;
+
+}
+```
+
+The customized schema can get the `SchemaInfoProvider` and retrieve the 
configs from it.
+```java
+public interface Schema {
+
+    /**
+     * When setting schema info provider for schema, the schema can retrieve 
the configs.
+     */
+    default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+    }
+
+    /**
+      * Returns the schema info provider.
+      *
+      * @return a {@code SchemaInfoProvider} representing the schema info 
provider
+      */
+    default SchemaInfoProvider getSchemaInfoProvider() {
+        return null;
+    }
+
+}
+```
+
+# Pulsar Function

Review Comment:
   Although this PIP mainly focuses on client changes, how do operations staff 
and users understand this situation when a Topic's Schema is externally managed 
through the Pulsar Admin API or tools (such as `pulsar-admin`)? For example, 
what should the `pulsar-admin schemas get <topic-name>` command return for such 
Topics? Will there be a new status or flag to indicate that the Schema is 
externally managed? This may be beyond the direct scope of this PIP, but it is 
worth raising and considering as part of the overall design impact.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to