Denovo1998 commented on code in PR #24328:
URL: https://github.com/apache/pulsar/pull/24328#discussion_r2102416628
##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
Review Comment:
The specific meaning of "not caring" can be further explained in detail. For
example: the Broker side only treats the message as raw byte data and no longer
performs additional processing such as schema compatibility check. This design
makes the Broker more lightweight, thereby significantly improving the overall
performance of the system.
Regarding the advantages of third-party schema registry services, it is
recommended to elaborate further. For example:
1. Taking Confluent Schema Registry as an example, it can achieve unified
Schema management between Kafka and Pulsar.
2. This service can also achieve collaborative management between Pulsar
topic and data lake table metadata.
##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Provide an ability to leverage third-party schema registry service for
Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing third-party schema
system.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer
subscription commands.
+- Provide a way to build external schema management system clients and
integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is decoupling schema management from Pulsar messaging
protocols,
+and making Pulsar client have the ability to leverage external schema registry
service to manage schema.
+The external schema registry is responsible for managing the schema, the
broker doesn't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer
and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface
to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the
customized schemas can register schema or get schema with these methods.
+The encoded messaging depends on the external schema system, Pulsar broker
just treats the message as bytes data, and it won't change the message version
of message metadata.
+Unlike Pulsar using schema version to identify the schema, some external
schema system use the schema ID to identify the schema,
+if using external schema system the Pulsar message metadata will not maintain
schema ID, the customized decoding method can try to retrieve the schema ID
from the encoded data.
+
+Example usage
+```java
+public void workWithExternalSchemaRegistry() throws Exception {
+ Map<String, String> srConfig = new HashedMap<>();
+ srConfig.put("schema.registry.url", "http://localhost:8001");
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .schemaInfoProviderFactory(new
KafkaSchemaInfoProviderFactory(srConfig))
+ .build();
+
+ String topic = "t1";
+ Schema<User> schema = KafkaSchemas.JSON(User.class);
+
+ Producer<User> producer = client.newProducer(schema)
+ .topic(topic)
+ .create();
+
+ Consumer<User> consumer = client.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new User("name-" + i, 10 + i));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<User> message = consumer.receive();
+ consumer.acknowledge(message);
+ }
+
+ client.close();
+}
+```
+
+Messaging protocols changes
+
+Ignore schema info while creating Producer, ignore register schema before
sending messages, the schema management
+```java
+public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask,
ConnectionHandler.Connection {
+
+ @Override
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+ // ...
+
+ if (schema != null && schema.getSchemaInfoProvider() != null &&
schema.getSchemaInfoProvider().isExternal()) {
+ try {
+ // set schemaInfo to null if the schema info provider is
external
+ schemaInfo = null;
+ } catch (Exception e) {
+ log.error("Failed to register schema.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ // send create producer request
+ }
+
+ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg,
SendCallback callback, long expectedCnxEpoch) {
+ // ...
+
+ if (schema.getSchemaInfoProvider().isExternal()) {
+ // don't register schema if external schema registry service, the
register method can be integrated in the messaging encode method.
+ return;
+ }
+ // getOrCreateSchemaAsync
+ }
+
+}
+```
+
+Ignore schema info while adding consumer subscription.
+```java
+public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandler.Connection {
+
+ @Override
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+ // ...
+
+ // only external schema info provider need to register schema
+ if (schema.getSchemaInfoProvider() != null &&
schema.getSchemaInfoProvider().isExternal()) {
+ try {
+ // don't set schema info for schema registry schema
+ si = null;
+ } catch (Exception e) {
+ log.error("Failed to register schema.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+}
+```
+
+## Public-facing Changes
+
+Add new methods for `SchemaInfoProvider` interface.
+The `SchemaInfoProvider` provides necessary params for connecting to the
external schema registry service with the method `getConfigs`.
+If the schema info provider is external, the new producer command, consumer
subscribe command will treat the schema as bytes schema, the broker will ignore
schema validation.
+```java
+public interface SchemaInfoProvider {
+
+ /**
+ * Returns the configs of the schema registry service, such as URL,
authentication params.
+ */
+ default Map<String, String> getConfigs() {
+ return Collections.emptyMap();
+ }
+
+ /**
+ * It's used to determine whether the SchemaInfoProvider is external or
not.
+ */
+ default boolean isExternal() {
+ return false;
+ }
+
+}
+```
+
+Add a new interface `SchemaInfoProviderFactory`, it's used to initialize
`SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`.
+```java
+public interface SchemaInfoProviderFactory {
+
+ SchemaInfoProvider of(String topic);
+
+}
+```
+
+The client build supports setting the `SchemaInfoProviderFactory`.
+```java
+public interface ClientBuilder extends Serializable, Cloneable {
+
+ ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory
schemaInfoProviderFactory);
+
+}
+```
+
+The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`.
+```java
+public class ClientConfigurationData implements Serializable, Cloneable {
+
+ @JsonIgnore
+ private transient SchemaInfoProviderFactory schemaInfoProviderFactory;
+
+}
+```
+
+The customized schema can get the `SchemaInfoProvider` and retrieve the
configs from it.
+```java
+public interface Schema {
+
+ /**
+ * When setting schema info provider for schema, the schema can retrieve
the configs.
+ */
+ default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+ }
+
+ /**
+ * Returns the schema info provider.
+ *
+ * @return a {@code SchemaInfoProvider} representing the schema info
provider
+ */
+ default SchemaInfoProvider getSchemaInfoProvider() {
+ return null;
+ }
+
+}
+```
+
+# Pulsar Function
+
+For supporting using third-party schema registry service in Pulsar Function,
+the function configurations should support setting the
`SchemaInfoProviderFactory` and schema registry auth configurations while
initializing the Pulsar client.
+
+# Security Considerations
+Users can provide security configuration while generating the
`SchemaInfoProviderFactory` instance,
+the factory can transfer the security configuration to the
`SchemaInfoProvider` instance.
+
+# Pulsar-GEO replication
+If users can use third-party schema registry service, it provides a new way to
manage scheme for geo-replicated topics.
Review Comment:
It can be slightly expanded a bit. For example, if a user uses an external,
globally available Schema Registry (such as a cross-region replicated Confluent
Schema Registry), then the Schema synchronization issue in the geo-replication
scenario can be guaranteed by this external system, simplifying the Schema
synchronization needs of Pulsar itself.
##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage
third-party schema registry services in Pulsar client.
Review Comment:
Here it might be worth mentioning.
https://github.com/apache/pulsar/blob/bdf6277456db9df1618adcd07b588f3a94714796/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaStorage.java#L29
Support for third-party schema registration services can also be implemented
through `SchemaStorage`. In the following Motivation section, it can be
explained what advantages this PIP implementation has over implementing
`SchemaStorag`·. This makes this PIP more persuasive.
##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Provide an ability to leverage third-party schema registry service for
Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing third-party schema
system.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer
subscription commands.
+- Provide a way to build external schema management system clients and
integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is decoupling schema management from Pulsar messaging
protocols,
+and making Pulsar client have the ability to leverage external schema registry
service to manage schema.
+The external schema registry is responsible for managing the schema, the
broker doesn't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer
and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface
to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the
customized schemas can register schema or get schema with these methods.
Review Comment:
What should be the behavior when the encode method of a custom Schema fails
when trying to register the Schema with an external Schema Registry (e.g., due
to network issues, authentication failure), or when the decode method cannot
find the corresponding Schema in the external Schema Registry based on the ID
in the message?
PIP might be able to suggest that implementers consider these scenarios, for
example, whether to throw a specific exception, return null, or have a retry
mechanism.
Although the specific implementation is up to the user, it would be better
to provide some guidance?
##########
pip/pip-420.md:
##########
@@ -0,0 +1,253 @@
+# PIP-420: Provide ability for Pulsar clients to integrate with third-party
schema registry service
+
+# Background knowledge
+
+Schema is an important feature for messaging systems. Pulsar integrates schema
manager into the Pulsar broker.
+The current implementation in Pulsar clients couples schema management with
some protocols (creating producer, adding consumer subscription).
+This increases the Pulsar protocol complexity and users can’t leverage
third-party schema registry services in Pulsar client.
+
+# Motivation
+
+The Pulsar client is better has the ability to access third-party schema
registry service to manage the schema (register schema,
+get schema, validate schema, etc.). The schema registry service can be an
independent service, if using third-party schema registry service,
+the Pulsar broker doesn't need to care about the schema of the messages.
+
+# Goals
+
+## In Scope
+
+- Provide an ability to leverage third-party schema registry service for
Pulsar client.
+
+## Out Scope
+
+This PIP will not include the implementation for accessing third-party schema
system.
+
+# High Level Design
+
+This PIP is just for providing some abilities.
+
+- Decouple schema management from creating producer or add consumer
subscription commands.
+- Provide a way to build external schema management system clients and
integrate with Pulsar clients.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP's target is decoupling schema management from Pulsar messaging
protocols,
+and making Pulsar client have the ability to leverage external schema registry
service to manage schema.
+The external schema registry is responsible for managing the schema, the
broker doesn't care about the messaging schema.
+The Pulsar client should ignore the schema information when creating producer
and adding consumer subscription.
+
+Users can implement the `SchemaInfoProvider` interface and `Schema` interface
to access external schema registry service.
+The `Schema` interface has mainly two methods `encode` and `decode`, the
customized schemas can register schema or get schema with these methods.
+The encoded messaging depends on the external schema system, Pulsar broker
just treats the message as bytes data, and it won't change the message version
of message metadata.
+Unlike Pulsar using schema version to identify the schema, some external
schema system use the schema ID to identify the schema,
+if using external schema system the Pulsar message metadata will not maintain
schema ID, the customized decoding method can try to retrieve the schema ID
from the encoded data.
+
+Example usage
+```java
+public void workWithExternalSchemaRegistry() throws Exception {
+ Map<String, String> srConfig = new HashedMap<>();
+ srConfig.put("schema.registry.url", "http://localhost:8001");
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://localhost:6650")
+ .schemaInfoProviderFactory(new
KafkaSchemaInfoProviderFactory(srConfig))
+ .build();
+
+ String topic = "t1";
+ Schema<User> schema = KafkaSchemas.JSON(User.class);
+
+ Producer<User> producer = client.newProducer(schema)
+ .topic(topic)
+ .create();
+
+ Consumer<User> consumer = client.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new User("name-" + i, 10 + i));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Message<User> message = consumer.receive();
+ consumer.acknowledge(message);
+ }
+
+ client.close();
+}
+```
+
+Messaging protocols changes
+
+Ignore schema info while creating Producer, ignore register schema before
sending messages, the schema management
+```java
+public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask,
ConnectionHandler.Connection {
+
+ @Override
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+ // ...
+
+ if (schema != null && schema.getSchemaInfoProvider() != null &&
schema.getSchemaInfoProvider().isExternal()) {
+ try {
+ // set schemaInfo to null if the schema info provider is
external
+ schemaInfo = null;
+ } catch (Exception e) {
+ log.error("Failed to register schema.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ // send create producer request
+ }
+
+ private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg,
SendCallback callback, long expectedCnxEpoch) {
+ // ...
+
+ if (schema.getSchemaInfoProvider().isExternal()) {
+ // don't register schema if external schema registry service, the
register method can be integrated in the messaging encode method.
+ return;
+ }
+ // getOrCreateSchemaAsync
+ }
+
+}
+```
+
+Ignore schema info while adding consumer subscription.
+```java
+public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandler.Connection {
+
+ @Override
+ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
+ // ...
+
+ // only external schema info provider need to register schema
+ if (schema.getSchemaInfoProvider() != null &&
schema.getSchemaInfoProvider().isExternal()) {
+ try {
+ // don't set schema info for schema registry schema
+ si = null;
+ } catch (Exception e) {
+ log.error("Failed to register schema.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+}
+```
+
+## Public-facing Changes
+
+Add new methods for `SchemaInfoProvider` interface.
+The `SchemaInfoProvider` provides necessary params for connecting to the
external schema registry service with the method `getConfigs`.
+If the schema info provider is external, the new producer command, consumer
subscribe command will treat the schema as bytes schema, the broker will ignore
schema validation.
+```java
+public interface SchemaInfoProvider {
+
+ /**
+ * Returns the configs of the schema registry service, such as URL,
authentication params.
+ */
+ default Map<String, String> getConfigs() {
+ return Collections.emptyMap();
+ }
+
+ /**
+ * It's used to determine whether the SchemaInfoProvider is external or
not.
+ */
+ default boolean isExternal() {
+ return false;
+ }
+
+}
+```
+
+Add a new interface `SchemaInfoProviderFactory`, it's used to initialize
`SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`.
+```java
+public interface SchemaInfoProviderFactory {
+
+ SchemaInfoProvider of(String topic);
+
+}
+```
+
+The client build supports setting the `SchemaInfoProviderFactory`.
+```java
+public interface ClientBuilder extends Serializable, Cloneable {
+
+ ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory
schemaInfoProviderFactory);
+
+}
+```
+
+The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`.
+```java
+public class ClientConfigurationData implements Serializable, Cloneable {
+
+ @JsonIgnore
+ private transient SchemaInfoProviderFactory schemaInfoProviderFactory;
+
+}
+```
+
+The customized schema can get the `SchemaInfoProvider` and retrieve the
configs from it.
+```java
+public interface Schema {
+
+ /**
+ * When setting schema info provider for schema, the schema can retrieve
the configs.
+ */
+ default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+ }
+
+ /**
+ * Returns the schema info provider.
+ *
+ * @return a {@code SchemaInfoProvider} representing the schema info
provider
+ */
+ default SchemaInfoProvider getSchemaInfoProvider() {
+ return null;
+ }
+
+}
+```
+
+# Pulsar Function
Review Comment:
Although this PIP mainly focuses on client changes, how do operations staff
and users understand this situation when a Topic's Schema is externally managed
through the Pulsar Admin API or tools (such as `pulsar-admin`)? For example,
what should the `pulsar-admin schemas get <topic-name>` command return for such
Topics? Will there be a new status or flag to indicate that the Schema is
externally managed? This may be beyond the direct scope of this PIP, but it is
worth raising and considering as part of the overall design impact.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]