gaoran10 commented on code in PR #24328:
URL: https://github.com/apache/pulsar/pull/24328#discussion_r2153562223


##########
pip/pip-420.md:
##########
@@ -0,0 +1,284 @@
+# PIP-420: Provides an ability for Pulsar clients to integrate with 
third-party schema registry service
+
+# Motivation
+
+Apache Pulsar currently provides a built-in schema management system tightly 
coupled with the broker.
+Pulsar clients interact with this system implicitly when creating producers 
and consumers.
+
+However, many organizations already have independent schema registry services 
(such as Confluent Schema Registry) 
+and wish to reuse their existing schema governance processes across multiple 
messaging systems, including Pulsar.
+
+By enabling Pulsar clients to integrate with third-party schema registry 
services:
+- Users can unify schema management across different platforms. 
+- Pulsar brokers can be decoupled from schema storage and validation 
responsibilities.
+- Pulsar users can integrate with ecosystems that rely on external schema 
registries easier.
+
+This flexibility is particularly valuable for enterprises with strict schema 
validation, versioning,
+and governance workflows already centralized in external registries.
+
+# Goals
+
+## In Scope
+
+- Provide the ability for Pulsar clients to leverage third-party schema 
registry services for schema operations.
+
+## Out Scope
+
+- Providing built-in implementations for third-party schemas.
+- Migrating existing Pulsar-managed schemas to external schema registries.
+
+# High Level Design
+
+- Provide a mechanism to configure the Pulsar client to use either:
+  - The existing Pulsar schema registry (default)
+  - A third-party schema registry implementation
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP aims to enable the Pulsar client to directly integrate with external 
schema registry services for schema management.
+In this model, the external schema registry is fully responsible for schema 
storage, retrieval, and validation.
+The Pulsar broker will no longer manage schema data for topics using external 
schemas.
+
+### SchemaType: EXTERNAL
+
+Pulsar will introduce a new schema type: **SchemaType.EXTERNAL**.
+
+- All schemas that integrate with external schema registries must declare 
`SchemaType.EXTERNAL`.
+- When using `EXTERNAL` schema type, the Pulsar client will provide an empty 
schema definition to the broker.
+- If the `SchemaInfoProvider` is external but the schema type is not 
`EXTERNAL`, throw an ExternalSchemaException.
+- The broker will only record the schema type for topics.
+- Compatibility restrictions:
+  - Introduce a new compatibility check in broker side.
+  - The schema type `SchemaType.EXTERNAL` can't be compatible with other 
Pulsar schemas
+  - This prevents accidental data corruption or schema conflicts between 
internal and external schema management systems.
+- Pulsar Geo replicator needs to transfer the schema type 
`SchemaType.EXTERNAL` to the remote cluster.
+
+This design isolates external schema management and protects existing topics 
using Pulsar’s native schema system.
+
+### Extensibility via Client Interfaces
+
+To integrate with external schema registries, users can:
+- Implement the `Schema` interface to define custom schema encoding and 
decoding logic.
+- Implement the `SchemaInfoProvider` interface to initialize "external" 
schemas.
+
+#### Key `Schema` Interface Methods:
+- byte[] encode(T message)
+  - Serializes the message using the external schema. 
+  - Implementations should throw `SchemaSerializationException` if the 
serialization fails.
+
+- T decode(byte[] bytes)
+  - Deserialize the message using the external schema.
+  - Users should handle exceptions when get value by themselves.
+
+- close()
+
+  **(New addition)**
+  - Called when the producer or consumer is closed.
+  - Allows external schema implementations to release resources, such as 
schema registry connections or caches.
+
+#### Example Workflow:
+
+- During producer or consumer initialization:
+  The custom schema can register schemas or fetch schema metadata from the 
external registry.
+
+- During message send or receive:
+  The `encode` and `decode` methods handle the schema-aware serialization and 
deserialization using the external schema registry.
+
+#### Schema ID & Schema Version
+
+Unlike Pulsar, which uses **schema version** to identify schemas, many 
external schema registry systems use **schema ID** as the primary schema 
identifier.
+
+When integrating with external schema registries:
+- The `schemaVersion` filed in Pulsar message metadata is used in some places, 
**set to `-1` to flag the message is using external schema systems**.
+- The Pulsar client **needs to set the schema version to -1 and store in the 
message metadata**.
+- Instead, the external schema implementation can manage schema ID handling 
internally.
+- The schema ID can be embedded directly into the message payload by the 
external schema’s `encode` and `decode` methods.
+
+This approach allows external schema systems to fully control schema evolution 
and versioning without being constrained by Pulsar’s native schema versioning 
mechanism.
+This may impact some components that rely on schema version to deserialize 
messages, such as Pulsar Functions and Pulsar SQL,
+they will need to be updated to support the new schema type and handle 
external schemas appropriately.
+
+#### Example usage
+
+```java
+public void workWithExternalSchemaRegistry() throws Exception {
+    Map<String, String> srConfig = new HashedMap<>();
+    srConfig.put("schema.registry.url", "http://localhost:8001";);
+
+    PulsarClient client = PulsarClient.builder()
+            .serviceUrl("pulsar://localhost:6650")
+            .schemaInfoProviderFactory(new 
KafkaSchemaInfoProviderFactory(srConfig))
+            .build();
+
+    String topic = "t1";
+    Schema<User> schema = KafkaSchemas.JSON(User.class);
+
+    Producer<User> producer = client.newProducer(schema)
+            .topic(topic)
+            .create();
+
+    Consumer<User> consumer = client.newConsumer(schema)
+            .topic(topic)
+            .subscriptionName("sub")
+            .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+            .subscribe();
+
+    for (int i = 0; i < 10; i++) {
+        producer.send(new User("name-" + i, 10 + i));
+    }
+
+    for (int i = 0; i < 10; i++) {
+        Message<User> message = consumer.receive();
+        consumer.acknowledge(message);
+    }
+
+    client.close();
+}
+```
+
+## Public-facing Changes
+
+Introduce a new SchemaType `EXTERNAL` to represent the schema types that work 
with external schema registry.
+```java
+public enum SchemaType {
+
+    /**
+     * External Schema Type.
+     * <p>
+     * This is used to indicate that the schema is managed externally, such as 
in a schema registry.
+     * External schema type is not compatible with any other schema type.
+     * </p>
+     */
+    EXTERNAL(-5)
+
+}
+```
+
+Add new methods for `SchemaInfoProvider` interface.
+The `SchemaInfoProvider` provides necessary params for connecting to the 
external schema registry service with the method `getConfigs`.
+If the schema info provider is external, the new producer command, consumer 
subscribe command will treat the schema as bytes schema, the broker will ignore 
schema validation.
+```java
+public interface SchemaInfoProvider {
+
+    /**
+      * Returns the configs of the schema registry service, such as URL, 
authentication params.
+      */
+    default Map<String, String> getConfigs() {
+        return Collections.emptyMap();
+    }
+
+    /**
+      * It's used to determine whether the SchemaInfoProvider is external or 
not.
+      */
+    default boolean isExternal() {
+        return false;
+    }
+
+}
+```
+
+Add a new interface `SchemaInfoProviderFactory`, it's used to initialize 
`SchemaInfoProvider`, each topic has its own `SchemaInfoProvider`.
+```java
+public interface SchemaInfoProviderFactory {
+
+    SchemaInfoProvider of(String topic);
+
+}
+```
+
+The client build supports setting the `SchemaInfoProviderFactory`.
+```java
+public interface ClientBuilder extends Serializable, Cloneable {
+
+    ClientBuilder schemaInfoProviderFactory(SchemaInfoProviderFactory 
schemaInfoProviderFactory);
+
+}
+```
+
+The `ClientConfigurationData` supports transfer `SchemaInfoProviderFactory`.
+```java
+public class ClientConfigurationData implements Serializable, Cloneable {
+
+    @JsonIgnore
+    private transient SchemaInfoProviderFactory schemaInfoProviderFactory;
+
+}
+```
+
+The customized schema can get the `SchemaInfoProvider` and retrieve the 
configs from it, extends the interface `AutoCloseable` to support close some 
resources.
+```java
+public interface Schema extends Cloneable, AutoCloseable {
+
+    @Override
+    default void close() {
+        // no-op
+    }
+
+    /**
+     * When setting schema info provider for schema, the schema can retrieve 
the configs.
+     */
+    default void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
+    }
+
+    /**
+      * Returns the schema info provider.
+      *
+      * @return a {@code SchemaInfoProvider} representing the schema info 
provider
+      */
+    default SchemaInfoProvider getSchemaInfoProvider() {
+        return null;
+    }
+
+}
+```
+
+# Pulsar Function
+
+For support using third-party schema registry service in Pulsar Function,
+- Support setting the `SchemaInfoProviderFactory` while initializing the 
Pulsar client
+- Support the schema registry auth configurations while initializing the 
Pulsar client
+- Support the `SchemaType.EXTERNAL` schema type in Pulsar Function
+
+# Security Considerations
+
+Users can provide security configuration while generating the 
`SchemaInfoProviderFactory` instance,
+the factory can transfer the security configuration to the 
`SchemaInfoProvider` instance.
+
+# Pulsar-GEO replication impact

Review Comment:
   Hi, @poorbarcode , could you help review this section?



##########
pip/pip-420.md:
##########
@@ -0,0 +1,284 @@
+# PIP-420: Provides an ability for Pulsar clients to integrate with 
third-party schema registry service
+
+# Motivation
+
+Apache Pulsar currently provides a built-in schema management system tightly 
coupled with the broker.
+Pulsar clients interact with this system implicitly when creating producers 
and consumers.
+
+However, many organizations already have independent schema registry services 
(such as Confluent Schema Registry) 
+and wish to reuse their existing schema governance processes across multiple 
messaging systems, including Pulsar.
+
+By enabling Pulsar clients to integrate with third-party schema registry 
services:
+- Users can unify schema management across different platforms. 
+- Pulsar brokers can be decoupled from schema storage and validation 
responsibilities.
+- Pulsar users can integrate with ecosystems that rely on external schema 
registries easier.
+
+This flexibility is particularly valuable for enterprises with strict schema 
validation, versioning,
+and governance workflows already centralized in external registries.
+
+# Goals
+
+## In Scope
+
+- Provide the ability for Pulsar clients to leverage third-party schema 
registry services for schema operations.
+
+## Out Scope
+
+- Providing built-in implementations for third-party schemas.
+- Migrating existing Pulsar-managed schemas to external schema registries.
+
+# High Level Design
+
+- Provide a mechanism to configure the Pulsar client to use either:
+  - The existing Pulsar schema registry (default)
+  - A third-party schema registry implementation
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP aims to enable the Pulsar client to directly integrate with external 
schema registry services for schema management.
+In this model, the external schema registry is fully responsible for schema 
storage, retrieval, and validation.
+The Pulsar broker will no longer manage schema data for topics using external 
schemas.
+
+### SchemaType: EXTERNAL
+
+Pulsar will introduce a new schema type: **SchemaType.EXTERNAL**.
+
+- All schemas that integrate with external schema registries must declare 
`SchemaType.EXTERNAL`.
+- When using `EXTERNAL` schema type, the Pulsar client will provide an empty 
schema definition to the broker.
+- If the `SchemaInfoProvider` is external but the schema type is not 
`EXTERNAL`, throw an ExternalSchemaException.
+- The broker will only record the schema type for topics.
+- Compatibility restrictions:
+  - Introduce a new compatibility check in broker side.
+  - The schema type `SchemaType.EXTERNAL` can't be compatible with other 
Pulsar schemas
+  - This prevents accidental data corruption or schema conflicts between 
internal and external schema management systems.
+- Pulsar Geo replicator needs to transfer the schema type 
`SchemaType.EXTERNAL` to the remote cluster.

Review Comment:
   Hi, @poorbarcode , could you help review this PIP at your convenience?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to