This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 45d71f8fcdd [improve][client] PIP-420: Supports users implement
external schemas (#24488)
45d71f8fcdd is described below
commit 45d71f8fcdd58d8715ada7e418f3acd8b43c141a
Author: ran <[email protected]>
AuthorDate: Sun Aug 31 23:55:51 2025 +0800
[improve][client] PIP-420: Supports users implement external schemas
(#24488)
---
conf/broker.conf | 2 +-
conf/standalone.conf | 2 +-
pip/pip-420.md | 6 +-
.../apache/pulsar/broker/ServiceConfiguration.java | 3 +-
.../schema/ExternalSchemaCompatibilityCheck.java | 50 ++++
.../schema/validator/SchemaDataValidator.java | 3 +-
.../src/main/proto/SchemaRegistryFormat.proto | 1 +
.../ExternalSchemaCompatibilityCheckTest.java | 90 ++++++++
.../apache/pulsar/schema/ExternalSchemaTest.java | 255 +++++++++++++++++++++
.../pulsar/schema/MockExternalJsonSchema.java | 113 +++++++++
.../SchemaCompatibilityCheckTest.java | 92 ++++++++
.../org/apache/pulsar/client/api/EncodeData.java | 41 ++++
.../java/org/apache/pulsar/client/api/Message.java | 9 +
.../java/org/apache/pulsar/client/api/Schema.java | 17 ++
.../client/api/SchemaSerializationException.java | 16 ++
.../org/apache/pulsar/common/schema/KeyValue.java | 79 ++++++-
.../apache/pulsar/common/schema/SchemaType.java | 11 +
.../client/impl/BatchMessageContainerImpl.java | 4 +
.../apache/pulsar/client/impl/ConsumerImpl.java | 7 +
.../org/apache/pulsar/client/impl/MessageImpl.java | 39 ++++
.../apache/pulsar/client/impl/ProducerImpl.java | 15 +-
.../pulsar/client/impl/TopicMessageImpl.java | 5 +
.../client/impl/TypedMessageBuilderImpl.java | 25 +-
.../client/impl/schema/KeyValueSchemaImpl.java | 81 ++++++-
.../client/impl/schema/KeyValueSchemaTest.java | 2 +-
.../SupportVersioningKeyValueSchemaTest.java | 4 +-
.../apache/pulsar/common/schema/KeyValueTest.java | 29 +++
.../client/impl/schema/KeyValueSchemaInfo.java | 9 +-
.../apache/pulsar/common/protocol/Commands.java | 9 +
pulsar-common/src/main/proto/PulsarApi.proto | 2 +
.../org/apache/pulsar/io/http/HttpSinkTest.java | 5 +
31 files changed, 994 insertions(+), 32 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index f56077b5006..11270ae081e 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -711,7 +711,7 @@ zookeeperSessionExpiredPolicy=reconnect
systemTopicEnabled=true
# Deploy the schema compatibility checker for a specific schema type to
enforce schema compatibility check
-schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
+schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ExternalSchemaCompatibilityCheck
# The schema compatibility strategy is used for system topics.
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
diff --git a/conf/standalone.conf b/conf/standalone.conf
index c6be0376ed9..2b205a2b2f6 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -546,7 +546,7 @@ brokerClientTlsProtocols=
systemTopicEnabled=true
# Deploy the schema compatibility checker for a specific schema type to
enforce schema compatibility check
-schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
+schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ExternalSchemaCompatibilityCheck
# The schema compatibility strategy is used for system topics.
# Available values: ALWAYS_INCOMPATIBLE, ALWAYS_COMPATIBLE, BACKWARD, FORWARD,
FULL, BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE, FULL_TRANSITIVE
diff --git a/pip/pip-420.md b/pip/pip-420.md
index 2a033fa6f4e..1aa152ba8f1 100644
--- a/pip/pip-420.md
+++ b/pip/pip-420.md
@@ -173,7 +173,7 @@ Add a new field `schema_id` to the `MessageMetadata` to
store the schema ID for
```protobuf
// File `PulsarApi.proto`
message MessageMetadata {
- optional bytes schema_id = 31;
+ optional bytes schema_id = 32;
}
```
@@ -214,9 +214,9 @@ public interface Message<T> {
* PIP-420 provides a way to produce messages with external schema,
* and the schema ID will be set to the message metadata.
*
- * @return schema ID of the message if the message is produced with external
schema.
+ * @return the schema ID if the message is produced with external schema and
schema ID is set, otherwise empty.
*/
- byte[] getSchemaId();
+ Optional<byte[]> getSchemaId();
}
```
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index fd8dce934b9..279ed9ed73e 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3268,7 +3268,8 @@ public class ServiceConfiguration implements
PulsarConfiguration {
private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
"org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck",
-
"org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck"
+
"org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck",
+
"org.apache.pulsar.broker.service.schema.ExternalSchemaCompatibilityCheck"
);
@FieldContext(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheck.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheck.java
new file mode 100644
index 00000000000..b5b5b9dece3
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheck.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+
+public class ExternalSchemaCompatibilityCheck implements
SchemaCompatibilityCheck {
+
+ @Override
+ public SchemaType getSchemaType() {
+ return SchemaType.EXTERNAL;
+ }
+
+ @Override
+ public void checkCompatible(SchemaData from, SchemaData to,
SchemaCompatibilityStrategy strategy)
+ throws IncompatibleSchemaException {
+ if ((SchemaType.EXTERNAL.equals(from.getType()) ||
SchemaType.EXTERNAL.equals(to.getType()))
+ && !from.getType().equals(to.getType())) {
+ throw new IncompatibleSchemaException("External schema is not
compatible with the other schema types.");
+ }
+ }
+
+ @Override
+ public void checkCompatible(Iterable<SchemaData> from, SchemaData to,
SchemaCompatibilityStrategy strategy)
+ throws IncompatibleSchemaException {
+ for (SchemaData fromSchema : from) {
+ checkCompatible(fromSchema, to, strategy);
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
index 85b73f53f81..a26cc4434b2 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/validator/SchemaDataValidator.java
@@ -69,7 +69,8 @@ public interface SchemaDataValidator {
break;
case NONE:
case BYTES:
- // `NONE` and `BYTES` schema is not stored
+ case EXTERNAL:
+ // `NONE`, `BYTES` and `EXTERNAL` schema is not stored
break;
case AUTO:
case AUTO_CONSUME:
diff --git a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
index da024ddcad8..431deeaedf1 100644
--- a/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
+++ b/pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
@@ -45,6 +45,7 @@ message SchemaInfo {
LOCALTIME = 19;
LOCALDATETIME = 20;
PROTOBUFNATIVE = 21;
+ EXTERNAL = 22;
}
message KeyValuePair {
required string key = 1;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheckTest.java
new file mode 100644
index 00000000000..9c0377d1bcb
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ExternalSchemaCompatibilityCheckTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import static org.testng.Assert.fail;
+import
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
+import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ExternalSchemaCompatibilityCheckTest {
+
+ private final ExternalSchemaCompatibilityCheck compatibilityCheck = new
ExternalSchemaCompatibilityCheck();
+
+ private final SchemaData externalSchemaData = SchemaData.builder()
+ .type(SchemaType.EXTERNAL)
+ .data(new byte[0])
+ .build();
+
+ @DataProvider(name = "otherSchemasProvider")
+ public Object[] otherSchemasProvider() {
+ return new Object[] {
+ SchemaData.builder()
+ .type(SchemaType.JSON)
+ .build(),
+ SchemaData.builder()
+ .type(SchemaType.AVRO)
+ .build(),
+ SchemaData.builder()
+ .type(SchemaType.PROTOBUF)
+ .build(),
+ SchemaData.builder()
+ .type(SchemaType.PROTOBUF_NATIVE)
+ .build()
+ };
+ }
+
+ @Test(dataProvider = "otherSchemasProvider")
+ public void testExternalSchemaCompatibilityCheck(SchemaData schemaData) {
+ try {
+ compatibilityCheck.checkCompatible(
+ schemaData, externalSchemaData,
SchemaCompatibilityStrategy.FULL);
+ fail("Expected IncompatibleSchemaException not thrown");
+ } catch (IncompatibleSchemaException e) {
+ // Expected exception, as external schema is not compatible with
the other schemas
+ }
+
+ try {
+ compatibilityCheck.checkCompatible(
+ externalSchemaData, schemaData,
SchemaCompatibilityStrategy.FULL);
+ fail("Expected IncompatibleSchemaException not thrown");
+ } catch (IncompatibleSchemaException e) {
+ // Expected exception, as external schema is not compatible with
the other schemas
+ }
+ }
+
+ @Test
+ public void testExternalSchemaData() {
+ try {
+ SchemaData exSchemaData = SchemaData.builder()
+ .type(SchemaType.EXTERNAL)
+ .data(new byte[0])
+ .build();
+ compatibilityCheck.checkCompatible(
+ exSchemaData, externalSchemaData,
SchemaCompatibilityStrategy.FULL);
+ } catch (IncompatibleSchemaException e) {
+ fail("Did not expect IncompatibleSchemaException to be thrown");
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java
new file mode 100644
index 00000000000..652f9add920
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/ExternalSchemaTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.schema;
+
+import static
org.apache.pulsar.schema.MockExternalJsonSchema.MOCK_KEY_SCHEMA_DATA;
+import static org.apache.pulsar.schema.MockExternalJsonSchema.MOCK_SCHEMA_DATA;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.fail;
+import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test get partitioned topic schema.
+ */
+@Test(groups = "schema")
+public class ExternalSchemaTest extends MockedPulsarServiceBaseTest {
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ isTcpLookup = true;
+ super.internalSetup();
+
+ admin.clusters().createCluster("test",
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
Sets.newHashSet("test"));
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(timeOut = 1000 * 5)
+ public void testMockExternalSchema() throws Exception {
+ final String topic = "external-schema-topic";
+ MockExternalJsonSchema<Schemas.PersonFour> externalJsonSchema =
+ new MockExternalJsonSchema<>(Schemas.PersonFour.class);
+ @Cleanup
+ Consumer<Schemas.PersonFour> consumer =
pulsarClient.newConsumer(externalJsonSchema)
+ .topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .subscribe();
+
+ @Cleanup
+ Producer<Schemas.PersonFour> producer =
pulsarClient.newProducer(externalJsonSchema)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ int messageCount = 10;
+ for (int i = 0; i < messageCount; i++) {
+ Schemas.PersonFour person = new Schemas.PersonFour();
+ person.setId(i);
+ person.setName("user-" + i);
+ person.setAge(18);
+ producer.newMessage().value(person).send();
+ }
+
+ for (int i = 0; i < messageCount; i++) {
+ Message<Schemas.PersonFour> message = consumer.receive();
+ assertTrue(message.getSchemaId().isPresent());
+ assertEquals(message.getSchemaId().get(),
MockExternalJsonSchema.MOCK_SCHEMA_ID);
+ assertEquals(message.getData(), MOCK_SCHEMA_DATA);
+ assertNull(message.getValue());
+ Assert.assertNotNull(message);
+ }
+ }
+
+ @Test(timeOut = 1000 * 5)
+ public void testConflictKvSchema() throws Exception {
+ var externalJsonSchema = new
MockExternalJsonSchema<>(Schemas.PersonFour.class);
+ try {
+ Schema.KeyValue(externalJsonSchema,
Schema.JSON(Schemas.PersonFour.class), KeyValueEncodingType.SEPARATED);
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("External schema cannot be used
with other Pulsar struct schema types"));
+ }
+
+ try {
+ Schema.KeyValue(Schema.JSON(Schemas.PersonFour.class),
externalJsonSchema);
+ fail("should fail");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("External schema cannot be used
with other Pulsar struct schema types"));
+ }
+
+ Schema.KeyValue(Schema.JSON(Schemas.PersonFour.class), Schema.STRING);
+ Schema.KeyValue(Schema.STRING, Schema.JSON(Schemas.PersonFour.class));
+ }
+
+ @DataProvider(name = "provideKeyValueEncodingType")
+ public Object[][] provideKeyValueEncodingType() {
+ return new Object[][]{
+ {KeyValueEncodingType.SEPARATED},
+ {KeyValueEncodingType.INLINE}
+ };
+ }
+
+ @Test(dataProvider = "provideKeyValueEncodingType", timeOut = 1000 * 5)
+ public void testExternalKeyValueSchema(KeyValueEncodingType encodingType)
throws Exception {
+ var keySchema = new MockExternalJsonSchema<>(Schemas.PersonFour.class,
true);
+ var valueSchema = new
MockExternalJsonSchema<>(Schemas.PersonFour.class);
+ var keyValueSchema = Schema.KeyValue(keySchema, valueSchema,
encodingType);
+
+ final String topic = "testExternalKeyValueSchema-" +
encodingType.name();
+ @Cleanup
+ Consumer<KeyValue<Schemas.PersonFour, Schemas.PersonFour>> consumer =
pulsarClient.newConsumer(keyValueSchema)
+ .topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .subscribe();
+
+ @Cleanup
+ Producer<KeyValue<Schemas.PersonFour, Schemas.PersonFour>> producer =
pulsarClient.newProducer(keyValueSchema)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ int messageCount = 10;
+ for (int i = 0; i < messageCount; i++) {
+ var person = new Schemas.PersonFour();
+ person.setId(i);
+ person.setName("user-" + i);
+ person.setAge(18);
+ producer.newMessage().value(new KeyValue<>(person, person)).send();
+ }
+
+ for (int i = 0; i < messageCount; i++) {
+ var message = consumer.receive();
+ assertTrue(message.getSchemaId().isPresent());
+ assertEquals(message.getSchemaId().get(),
KeyValue.generateKVSchemaId(
+ MockExternalJsonSchema.MOCK_KEY_SCHEMA_ID,
MockExternalJsonSchema.MOCK_SCHEMA_ID));
+
+ if (KeyValueEncodingType.INLINE.equals(encodingType)) {
+ ByteBuf buf = Unpooled.wrappedBuffer(message.getData());
+ assertEquals(buf.readInt(), MOCK_KEY_SCHEMA_DATA.length);
+ byte[] data = new byte[MOCK_KEY_SCHEMA_DATA.length];
+ buf.readBytes(data);
+ assertEquals(data, MOCK_KEY_SCHEMA_DATA);
+ assertEquals(buf.readInt(), MOCK_SCHEMA_DATA.length);
+ data = new byte[MOCK_SCHEMA_DATA.length];
+ buf.readBytes(data);
+ assertEquals(data, MOCK_SCHEMA_DATA);
+ assertEquals(buf.readableBytes(), 0);
+ } else {
+ assertEquals(message.getData(), MOCK_SCHEMA_DATA);
+ assertEquals(message.getKeyBytes(), MOCK_KEY_SCHEMA_DATA);
+ }
+
+ assertNotNull(message.getValue());
+ assertNull(message.getValue().getKey());
+ assertNull(message.getValue().getValue());
+ }
+ }
+
+ @Test(dataProvider = "provideKeyValueEncodingType", timeOut = 1000 * 5)
+ public void testExternalSchemaWithPrimitiveSchema(KeyValueEncodingType
encodingType) throws Exception {
+ var keySchema = Schema.STRING;
+ var valueSchema = new
MockExternalJsonSchema<>(Schemas.PersonFour.class);
+ var keyValueSchema = Schema.KeyValue(keySchema, valueSchema,
encodingType);
+
+ final String topic = "testExternalSchemaWithPrimitiveSchema-" +
encodingType.name();
+ @Cleanup
+ Consumer<KeyValue<String, Schemas.PersonFour>> consumer =
pulsarClient.newConsumer(keyValueSchema)
+ .topic(topic)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName("test")
+ .subscribe();
+
+ @Cleanup
+ Producer<KeyValue<String, Schemas.PersonFour>> producer =
pulsarClient.newProducer(keyValueSchema)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ int messageCount = 10;
+ for (int i = 0; i < messageCount; i++) {
+ var person = new Schemas.PersonFour();
+ person.setId(i);
+ person.setName("user-" + i);
+ person.setAge(18);
+ producer.newMessage().value(new KeyValue<>("index-" + i,
person)).send();
+ }
+
+ for (int i = 0; i < messageCount; i++) {
+ var message = consumer.receive();
+ assertTrue(message.getSchemaId().isPresent());
+ assertEquals(message.getSchemaId().get(),
KeyValue.generateKVSchemaId(
+ new byte[0], MockExternalJsonSchema.MOCK_SCHEMA_ID));
+ var keyBytes = ("index-" + i).getBytes();
+
+ if (KeyValueEncodingType.INLINE.equals(encodingType)) {
+ ByteBuf buf = Unpooled.wrappedBuffer(message.getData());
+ assertEquals(buf.readInt(), keyBytes.length);
+ byte[] data = new byte[keyBytes.length];
+ buf.readBytes(data);
+ assertEquals(data, keyBytes);
+ assertEquals(buf.readInt(), MOCK_SCHEMA_DATA.length);
+ data = new byte[MOCK_SCHEMA_DATA.length];
+ buf.readBytes(data);
+ assertEquals(data, MOCK_SCHEMA_DATA);
+ assertEquals(buf.readableBytes(), 0);
+ } else {
+ assertEquals(message.getData(), MOCK_SCHEMA_DATA);
+ assertEquals(message.getKeyBytes(), keyBytes);
+ }
+
+ assertNotNull(message.getValue());
+ assertEquals(message.getValue().getKey(), "index-" + i);
+ assertNull(message.getValue().getValue());
+ }
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/MockExternalJsonSchema.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/MockExternalJsonSchema.java
new file mode 100644
index 00000000000..69c74d27ced
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/MockExternalJsonSchema.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.schema;
+
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.apache.pulsar.client.api.EncodeData;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+public class MockExternalJsonSchema<T> implements Schema<T> {
+
+ public static final byte[] MOCK_SCHEMA_DATA = new byte[] {1, 2, 3, 4, 5};
+ public static final byte[] MOCK_KEY_SCHEMA_DATA = new byte[] {0, 4, 3, 4,
8};
+ public static final byte[] MOCK_SCHEMA_ID = new byte[] {2, 3, 7, 8};
+ public static final byte[] MOCK_KEY_SCHEMA_ID = new byte[] {5, 6, 7, 8, 0,
0, 2};
+
+ private final Class<T> clazz;
+ private final boolean isKey;
+ @Getter
+ private boolean isClosed;
+
+ public MockExternalJsonSchema(Class<T> clazz, boolean isKey) {
+ this.clazz = clazz;
+ this.isKey = isKey;
+ }
+
+ public MockExternalJsonSchema(Class<T> clazz) {
+ this.clazz = clazz;
+ this.isKey = false;
+ }
+
+ @Override
+ public EncodeData encode(String topic, T message) {
+ // the external schema should register schema when encoding the
message, this is just a mock implementation
+ return new EncodeData(
+ isKey ? MOCK_KEY_SCHEMA_DATA : MOCK_SCHEMA_DATA,
+ isKey ? MOCK_KEY_SCHEMA_ID : MOCK_SCHEMA_ID);
+ }
+
+ @Override
+ public T decode(String topic, ByteBuffer data, byte[] schemaId) {
+ return decode(topic,
ByteBufUtil.getBytes(Unpooled.wrappedBuffer(data)), schemaId);
+ }
+
+ @Override
+ public T decode(String topic, byte[] data, byte[] schemaId) {
+ byte[] expectedSchemaId = isKey ? MOCK_KEY_SCHEMA_ID : MOCK_SCHEMA_ID;
+ if (!Arrays.equals(schemaId, expectedSchemaId)) {
+ throw new IllegalStateException("Unexpected schema id");
+ }
+ byte[] expectedSchemaData = isKey ? MOCK_KEY_SCHEMA_DATA :
MOCK_SCHEMA_DATA;
+ if (!Arrays.equals(data, expectedSchemaData)) {
+ throw new IllegalStateException("Unexpected schema data");
+ }
+ // the external schema should retrieve the schema and decoding the
payload, this is just a mock implementation
+ return null;
+ }
+
+ @Override
+ public byte[] encode(T message) {
+ // the external schema doesn't support this method
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SchemaInfoImpl.builder()
+ .name("")
+ .type(SchemaType.EXTERNAL)
+ .schema(new byte[0])
+ .build();
+ }
+
+ @Override
+ public boolean supportSchemaVersioning() {
+ return true;
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ this.isClosed = true;
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public Schema<T> clone() {
+ return new MockExternalJsonSchema<T>(clazz, isKey);
+ }
+
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index b18e2c179b3..b59ae89c0bf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -22,10 +22,13 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -37,6 +40,8 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -50,6 +55,7 @@ import
org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.schema.MockExternalJsonSchema;
import org.apache.pulsar.schema.Schemas;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -630,6 +636,92 @@ public class SchemaCompatibilityCheckTest extends
MockedPulsarServiceBaseTest {
}
}
+
+ @Test
+ public void testExternalSchemaTypeCompatibility() throws Exception {
+ String namespace = "test-namespace-" + randomName(16);
+ admin.namespaces().createNamespace(
+ PUBLIC_TENANT + "/" + namespace,
+ Sets.newHashSet(CLUSTER_NAME)
+ );
+
+ NamespaceName namespaceName = NamespaceName.get(PUBLIC_TENANT,
namespace);
+
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(),
SchemaCompatibilityStrategy.FULL);
+
+ final String topic = "persistent://" + PUBLIC_TENANT + "/" + namespace
+ "/testExternalSchemaTypeCompatibility";
+
+ MockExternalJsonSchema<Schemas.PersonThree> externalJsonSchema =
+ new MockExternalJsonSchema<>(Schemas.PersonThree.class);
+
+ Map<String, String> schemaConfigs = new HashMap<>();
+ schemaConfigs.put("schema.registry.url", "http://localhost:8080");
+
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(lookupUrl.toString())
+ .build();
+
+ // Existing topic schema is JSON, new schema can't be EXTERNAL
+ Producer<Schemas.PersonThree> producer = client
+ .newProducer(Schema.JSON(Schemas.PersonThree.class))
+ .topic(topic)
+ .create();
+ producer.close();
+
+ try (Producer<Schemas.PersonThree> ignored = client
+ .newProducer(externalJsonSchema)
+ .topic(topic)
+ .create()) {
+ fail("Should not be able to create producer with incompatible
schema.");
+ } catch (Exception e) {
+ assertTrue(e instanceof
PulsarClientException.IncompatibleSchemaException);
+ assertTrue(e.getMessage().contains(
+ "Incompatible schema: exists schema type JSON, new schema
type EXTERNAL"));
+ }
+ try (Consumer<Schemas.PersonThree> ignored = client
+ .newConsumer(externalJsonSchema)
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe()) {
+ fail("Should not be able to create consumer with incompatible
schema.");
+ } catch (Exception e) {
+ assertTrue(e instanceof
PulsarClientException.IncompatibleSchemaException);
+ assertTrue(e.getMessage().contains(
+ "Incompatible schema: exists schema type JSON, new schema
type EXTERNAL"));
+ }
+ admin.topics().delete(topic);
+
+ producer = client
+ .newProducer(externalJsonSchema)
+ .topic(topic)
+ .create();
+ assertFalse(externalJsonSchema.isClosed());
+ producer.close();
+ assertTrue(externalJsonSchema.isClosed());
+
+ try (Producer<Schemas.PersonThree> ignored = client
+ .newProducer(Schema.JSON(Schemas.PersonThree.class))
+ .topic(topic)
+ .create()) {
+ fail("Should not be able to create producer with incompatible
schema.");
+ } catch (Exception e) {
+ assertTrue(e instanceof
PulsarClientException.IncompatibleSchemaException);
+ assertTrue(e.getMessage().contains(
+ "Incompatible schema: exists schema type EXTERNAL, new
schema type JSON"));
+ }
+ try (Consumer<Schemas.PersonThree> ignored = client
+ .newConsumer(Schema.JSON(Schemas.PersonThree.class))
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe()) {
+ fail("Should not be able to create consumer with incompatible
schema.");
+ } catch (Exception e) {
+ assertTrue(e instanceof
PulsarClientException.IncompatibleSchemaException);
+ assertTrue(e.getMessage().contains(
+ "Incompatible schema: exists schema type EXTERNAL, new
schema type JSON"));
+ }
+ admin.topics().delete(topic);
+ }
+
public static String randomName(int numChars) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < numChars; i++) {
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncodeData.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncodeData.java
new file mode 100644
index 00000000000..92c35fe7b04
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EncodeData.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Data to be encoded by an external schema.
+ *
+ * @param data the message payload to be encoded
+ * @param schemaId the schema id return by the schema registry, it can be null
if not applicable
+ */
+public record EncodeData(byte[] data, byte[] schemaId) {
+
+ public EncodeData(byte[] data) {
+ this(data, null);
+ }
+
+ public boolean hasSchemaId() {
+ return isValidSchemaId(schemaId);
+ }
+
+ public static boolean isValidSchemaId(byte[] schemaId) {
+ return schemaId != null && schemaId.length > 0;
+ }
+
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
index 3be6a06153b..d76618b1e1d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -208,6 +208,15 @@ public interface Message<T> {
*/
byte[] getSchemaVersion();
+ /**
+ * Get schema ID of the message.
+ * PIP-420 provides a way to produce messages with external schema,
+ * and the schema ID will be set to the message metadata.
+ *
+ * @return the schema ID if the message is produced with external schema
and schema ID is set, otherwise empty.
+ */
+ Optional<byte[]> getSchemaId();
+
/**
* Get the schema associated to the message.
* Please note that this schema is usually equal to the Schema you passed
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index ef1e0cc1fea..e4ed2a1303d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -28,6 +28,7 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -74,6 +75,10 @@ public interface Schema<T> extends Cloneable {
*/
byte[] encode(T message);
+ default EncodeData encode(String topic, T message) {
+ return new EncodeData(encode(message));
+ }
+
/**
* Returns whether this schema supports versioning.
*
@@ -134,6 +139,14 @@ public interface Schema<T> extends Cloneable {
return decode(getBytes(data));
}
+ default T decode(String topic, ByteBuffer data, byte[] schemaId) {
+ return decode(data, schemaId);
+ }
+
+ default T decode(String topic, byte[] data, byte[] schemaId) {
+ return decode(data, schemaId);
+ }
+
/**
* Decode a ByteBuffer into an object using a given version. <br/>
*
@@ -184,6 +197,10 @@ public interface Schema<T> extends Cloneable {
*/
Schema<T> clone();
+ default CompletableFuture<Void> closeAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+
/**
* Schema that doesn't perform any encoding on the message payloads.
Accepts a byte array and it passes it through.
*/
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
index 0c9b1cd21dd..dfb79866212 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
@@ -50,4 +50,20 @@ public class SchemaSerializationException extends
RuntimeException {
public SchemaSerializationException(Throwable cause) {
super(cause);
}
+
+ /**
+ * Constructs an {@code SchemaSerializationException} with the specified
detail message and cause.
+ *
+ * @param message
+ * The detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method)
+ * @param cause
+ * The cause (which is saved for later retrieval by the
+ * {@link #getCause()} method). (A null value is permitted,
+ * and indicates that the cause is nonexistent or unknown.)
+ */
+ public SchemaSerializationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
index 489efc5825a..98725410af9 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
@@ -18,8 +18,10 @@
*/
package org.apache.pulsar.common.schema;
+import static org.apache.pulsar.client.api.EncodeData.isValidSchemaId;
import java.nio.ByteBuffer;
import java.util.Objects;
+import org.apache.pulsar.client.api.EncodeData;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
@@ -100,26 +102,79 @@ public class KeyValue<K, V> {
*/
public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
V value, Schema<V> valueWriter) {
- byte [] keyBytes;
+ return encode(null, key, keyWriter, value, valueWriter).data();
+ }
+
+ public static <K, V> EncodeData encode(String topic, K key, Schema<K>
keyWriter,
+ V value, Schema<V> valueWriter) {
+ EncodeData keyEncodeData;
if (key == null) {
- keyBytes = new byte[0];
+ keyEncodeData = new EncodeData(new byte[0]);
} else {
- keyBytes = keyWriter.encode(key);
+ keyEncodeData = keyWriter.encode(topic, key);
}
- byte [] valueBytes;
+ EncodeData valueEncodeData;
if (value == null) {
- valueBytes = new byte[0];
+ valueEncodeData = new EncodeData(new byte[0]);
} else {
- valueBytes = valueWriter.encode(value);
+ valueEncodeData = valueWriter.encode(topic, value);
}
- ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 +
valueBytes.length);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(
+ 4 + keyEncodeData.data().length + 4 +
valueEncodeData.data().length);
byteBuffer
- .putInt(key == null ? -1 : keyBytes.length)
- .put(keyBytes)
- .putInt(value == null ? -1 : valueBytes.length)
- .put(valueBytes);
- return byteBuffer.array();
+ .putInt(key == null ? -1 : keyEncodeData.data().length)
+ .put(keyEncodeData.data())
+ .putInt(value == null ? -1 : valueEncodeData.data().length)
+ .put(valueEncodeData.data());
+ return new EncodeData(byteBuffer.array(),
+ generateKVSchemaId(keyEncodeData.schemaId(),
valueEncodeData.schemaId()));
+ }
+
+ /**
+ * Generate a combined schema id for key/value schema.
+ * The format is:
+ * schemaId = schemaKeyLength + keySchemaIdBytes + schemaValueLength +
valueSchemaIdBytes
+ * where schemaKeyLength and schemaValueLength are 4 bytes integer.
+ * If keySchemaIdBytes or valueSchemaIdBytes is null, the length will be 0.
+ * So the total length of schemaId is:
+ * 4 + keySchemaIdBytes.length + 4 + valueSchemaIdBytes.length
+ *
+ * @param keySchemaId the schema id of key schema
+ * @param valueSchemaId the schema id of value schema
+ */
+ public static byte[] generateKVSchemaId(byte[] keySchemaId, byte[]
valueSchemaId) {
+ if (!isValidSchemaId(keySchemaId) && !isValidSchemaId(valueSchemaId)) {
+ return null;
+ }
+ keySchemaId = keySchemaId == null ? new byte[0] : keySchemaId;
+ valueSchemaId = valueSchemaId == null ? new byte[0] : valueSchemaId;
+ ByteBuffer buffer = ByteBuffer.allocate(
+ 4 + keySchemaId.length + 4 + valueSchemaId.length);
+ buffer
+ .putInt(keySchemaId.length)
+ .put(keySchemaId)
+ .putInt(valueSchemaId.length)
+ .put(valueSchemaId);
+ return buffer.array();
+ }
+
+ public static KeyValue<byte[], byte[]> getSchemaId(byte[] schemaId) {
+ ByteBuffer buffer = ByteBuffer.wrap(schemaId);
+ int keySchemaLength = buffer.getInt();
+ byte[] keySchemaId = new byte[0];
+ if (keySchemaLength > 0) {
+ keySchemaId = new byte[keySchemaLength];
+ buffer.get(keySchemaId);
+ }
+
+ int valueSchemaLength = buffer.getInt();
+ byte[] valueSchemaId = new byte[0];
+ if (valueSchemaLength > 0) {
+ valueSchemaId = new byte[valueSchemaLength];
+ buffer.get(valueSchemaId);
+ }
+ return new KeyValue<>(keySchemaId, valueSchemaId);
}
/**
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index ac87485abad..af674a85aa5 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -144,6 +144,16 @@ public enum SchemaType {
*/
PROTOBUF_NATIVE(20),
+ /**
+ * External Schema Type.
+ * <p>
+ * This is used to indicate that the schema is managed externally, such as
in a schema registry.
+ *
+ * External schema type is not compatible with any other schema type.
+ * </p>
+ */
+ EXTERNAL(21),
+
//
// Schemas that don't have schema info. the value should be negative.
//
@@ -202,6 +212,7 @@ public enum SchemaType {
case 18: return LOCAL_TIME;
case 19: return LOCAL_DATE_TIME;
case 20: return PROTOBUF_NATIVE;
+ case 21: return EXTERNAL;
case -1: return BYTES;
case -2: return AUTO;
case -3: return AUTO_CONSUME;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 489a3752332..57946bd7a69 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -366,6 +366,10 @@ class BatchMessageContainerImpl extends
AbstractBatchMessageContainer {
if (numMessagesInBatch == 0) {
return true;
}
+ if (messageMetadata.hasSchemaId() && msg.getSchemaId().isPresent()) {
+ return Arrays.equals(msg.getSchemaId().get(),
messageMetadata.getSchemaId())
+ && Arrays.equals(msg.getSchemaVersion(),
messageMetadata.getSchemaVersion());
+ }
if (!messageMetadata.hasSchemaVersion()) {
return msg.getSchemaVersion() == null;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e44664491d0..6d5305136b1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1170,6 +1170,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
}));
}
+ if (schema != null) {
+ closeFutures.add(schema.closeAsync().whenComplete((ignore, ex) -> {
+ if (ex != null) {
+ log.warn("Exception ignored in closing schema of
consumer", ex);
+ }
+ }));
+ }
CompletableFuture<Void> compositeCloseFuture =
FutureUtil.waitForAll(closeFutures);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index aa98df6cda9..2d27a2ed2c8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -424,6 +424,15 @@ public class MessageImpl<T> implements Message<T> {
}
}
+ @Override
+ public Optional<byte[]> getSchemaId() {
+ if (msgMetadata.hasSchemaId()) {
+ byte[] schemaId = msgMetadata.getSchemaId();
+ return (schemaId.length == 0) ? Optional.empty() :
Optional.of(schemaId);
+ }
+ return Optional.empty();
+ }
+
private void ensureSchemaIsLoaded() {
if (schema instanceof AutoConsumeSchema) {
((AutoConsumeSchema)
schema).fetchSchemaIfNeeded(BytesSchemaVersion.of(getSchemaVersion()));
@@ -464,7 +473,11 @@ public class MessageImpl<T> implements Message<T> {
@Override
public T getValue() {
SchemaInfo schemaInfo = getSchemaInfo();
+ var schemaIdOp = getSchemaId();
if (schemaInfo != null && SchemaType.KEY_VALUE ==
schemaInfo.getType()) {
+ if (schemaIdOp.isPresent()) {
+ return getKeyValueBySchemaId(schemaIdOp.get());
+ }
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
} else {
@@ -474,6 +487,9 @@ public class MessageImpl<T> implements Message<T> {
if (msgMetadata.isNullValue()) {
return null;
}
+ if (schemaIdOp.isPresent()) {
+ return decodeBySchemaId(schemaIdOp.get());
+ }
// check if the schema passed in from client supports schema
versioning or not
// this is an optimization to only get schema version when
necessary
return decode(schema.supportSchemaVersioning() ?
getSchemaVersion() : null);
@@ -514,6 +530,15 @@ public class MessageImpl<T> implements Message<T> {
}
}
+ private T decodeBySchemaId(byte[] schemaId) {
+ try {
+ return schema.decode(topic, getByteBuffer(), schemaId);
+ } catch (Exception e) {
+ throw new SchemaSerializationException("Failed to decode message
from topic " + topic
+ + " with schemaId " +
Base64.getEncoder().encodeToString(schemaId), e);
+ }
+ }
+
private ByteBuffer getByteBuffer() {
if (msgMetadata.isNullValue()) {
return null;
@@ -538,6 +563,20 @@ public class MessageImpl<T> implements Message<T> {
}
}
+ private T getKeyValueBySchemaId(byte[] schemaId) {
+ if (schema instanceof AutoConsumeSchema) {
+ throw new UnsupportedOperationException("AutoConsumeSchema is not
supported with schemaId");
+ }
+ if (!(schema instanceof KeyValueSchemaImpl<?, ?> kvSchema)) {
+ throw new IllegalStateException("The schema is not a
KeyValueSchema");
+ }
+ if (kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
+ return (T) kvSchema.decode(topic, getKeyBytes(), getData(),
schemaId);
+ } else {
+ return decodeBySchemaId(schemaId);
+ }
+ }
+
private T getKeyValue() {
KeyValueSchemaImpl kvSchema = getKeyValueSchema();
if (kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index a6987ca11f8..bf749ef2b34 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1203,6 +1203,18 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return State.Closing;
});
+ CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+ List<CompletableFuture<Void>> closeTasks = new ArrayList<>();
+ closeTasks.add(closeFuture);
+ if (schema != null) {
+ closeTasks.add(schema.closeAsync().whenComplete((__, t) -> {
+ if (t != null) {
+ log.warn("Exception ignored in closing schema of
producer", t);
+ }
+ }));
+ }
+ CompletableFuture<Void> compositeCloseFuture =
FutureUtil.waitForAll(closeTasks);
+
if (currentState == State.Closed || currentState == State.Closing) {
return CompletableFuture.completedFuture(null);
}
@@ -1220,7 +1232,6 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
long requestId = client.newRequestId();
ByteBuf cmd = Commands.newCloseProducer(producerId, requestId);
- CompletableFuture<Void> closeFuture = new CompletableFuture<>();
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
cnx.removeProducer(producerId);
if (exception == null || !cnx.ctx().channel().isActive()) {
@@ -1236,7 +1247,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
return null;
});
- return closeFuture;
+ return compositeCloseFuture;
}
@VisibleForTesting
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 1fec08a43f1..7b9916b58fc 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -169,6 +169,11 @@ public class TopicMessageImpl<T> implements Message<T> {
return msg.getSchemaVersion();
}
+ @Override
+ public Optional<byte[]> getSchemaId() {
+ return msg.getSchemaId();
+ }
+
@Override
public boolean isReplicated() {
return msg.isReplicated();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 8ef9079091a..e2bb4b0cf97 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.EncodeData.isValidSchemaId;
import static org.apache.pulsar.client.util.TypeCheckUtil.checkType;
import java.nio.ByteBuffer;
import java.util.Base64;
@@ -28,6 +29,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.EncodeData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,6 +38,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
@@ -77,7 +80,11 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
return null;
}
}).orElseGet(() -> {
- content = ByteBuffer.wrap(schema.encode(value));
+ EncodeData encodeData = schema.encode(producer.topic, value);
+ content = ByteBuffer.wrap(encodeData.data());
+ if (encodeData.hasSchemaId()) {
+ msgMetadata.setSchemaId(encodeData.schemaId());
+ }
return this;
});
}
@@ -304,20 +311,30 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
org.apache.pulsar.common.schema.KeyValue<K, V> keyValue =
(org.apache.pulsar.common.schema.KeyValue<K, V>) value;
+ EncodeData keyEncoded = null;
// set key as the message key
if (keyValue.getKey() != null) {
- msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(
- keyValueSchema.getKeySchema().encode(keyValue.getKey())));
+ keyEncoded = keyValueSchema.getKeySchema().encode(producer.topic,
keyValue.getKey());
+
msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(keyEncoded.data()));
msgMetadata.setPartitionKeyB64Encoded(true);
} else {
msgMetadata.setNullPartitionKey(true);
}
+ EncodeData valueEncoded = null;
// set value as the payload
if (keyValue.getValue() != null) {
- content =
ByteBuffer.wrap(keyValueSchema.getValueSchema().encode(keyValue.getValue()));
+ valueEncoded =
keyValueSchema.getValueSchema().encode(producer.topic, keyValue.getValue());
+ content = ByteBuffer.wrap(valueEncoded.data());
} else {
msgMetadata.setNullValue(true);
}
+
+ byte[] schemaId = KeyValue.generateKVSchemaId(
+ keyEncoded != null && keyEncoded.hasSchemaId() ?
keyEncoded.schemaId() : null,
+ valueEncoded != null && valueEncoded.hasSchemaId() ?
valueEncoded.schemaId() : null);
+ if (isValidSchemaId(schemaId)) {
+ msgMetadata.setSchemaId(schemaId);
+ }
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
index b00370b414b..887ccc48756 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
@@ -19,13 +19,17 @@
package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.pulsar.client.api.EncodeData.isValidSchemaId;
+import static
org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
+import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.EncodeData;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
@@ -102,6 +106,22 @@ public class KeyValueSchemaImpl<K, V> extends
AbstractSchema<KeyValue<K, V>> imp
private KeyValueSchemaImpl(Schema<K> keySchema,
Schema<V> valueSchema,
KeyValueEncodingType keyValueEncodingType) {
+ SchemaType keySchemaType = null;
+ if (keySchema != null && keySchema.getSchemaInfo() != null) {
+ keySchemaType = keySchema.getSchemaInfo().getType();
+ }
+ SchemaType valueSchemaType = null;
+ if (valueSchema != null && valueSchema.getSchemaInfo() != null) {
+ valueSchemaType = valueSchema.getSchemaInfo().getType();
+ }
+ if ((SchemaType.EXTERNAL.equals(keySchemaType)
+ && valueSchemaType != null &&
SchemaType.isStructType(valueSchemaType))
+ || (SchemaType.EXTERNAL.equals(valueSchemaType)
+ && keySchemaType != null &&
SchemaType.isStructType(keySchemaType))) {
+ throw new IllegalArgumentException("External schema cannot be used
with other Pulsar struct schema types,"
+ + "keySchemaType: " + keySchemaType + ", valueSchemaType:
" + valueSchemaType);
+ }
+
this.keySchema = keySchema;
this.valueSchema = valueSchema;
this.keyValueEncodingType = keyValueEncodingType;
@@ -133,18 +153,23 @@ public class KeyValueSchemaImpl<K, V> extends
AbstractSchema<KeyValue<K, V>> imp
// encode as bytes: [key.length][key.bytes][value.length][value.bytes] or
[value.bytes]
public byte[] encode(KeyValue<K, V> message) {
+ return encode(null, message).data();
+ }
+
+ @Override
+ public EncodeData encode(String topic, KeyValue<K, V> message) {
if (keyValueEncodingType != null && keyValueEncodingType ==
KeyValueEncodingType.INLINE) {
return KeyValue.encode(
+ topic,
message.getKey(),
keySchema,
message.getValue(),
- valueSchema
- );
+ valueSchema);
} else {
if (message.getValue() == null) {
return null;
}
- return valueSchema.encode(message.getValue());
+ return valueSchema.encode(topic, message.getValue());
}
}
@@ -166,6 +191,20 @@ public class KeyValueSchemaImpl<K, V> extends
AbstractSchema<KeyValue<K, V>> imp
return decode(ByteBufUtil.getBytes(byteBuf));
}
+ @Override
+ public KeyValue<K, V> decode(String topic, byte[] data, byte[] schemaId) {
+ if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
+ throw new SchemaSerializationException("This method cannot be used
under this SEPARATED encoding type");
+ }
+ return KeyValue.decode(data, (keyBytes, valueBytes) ->
+ decode(topic, keyBytes, valueBytes, schemaId));
+ }
+
+ @Override
+ public KeyValue<K, V> decode(String topic, ByteBuffer data, byte[]
schemaId) {
+ return decode(topic, getBytes(data), schemaId);
+ }
+
@Override
public KeyValue<K, V> decode(ByteBuf byteBuf, byte[] schemaVersion) {
return decode(ByteBufUtil.getBytes(byteBuf), schemaVersion);
@@ -196,6 +235,42 @@ public class KeyValueSchemaImpl<K, V> extends
AbstractSchema<KeyValue<K, V>> imp
return new KeyValue<>(k, v);
}
+ public KeyValue<K, V> decode(String topic, byte[] keyBytes, byte[]
valueBytes, byte[] schemaId) {
+ K k = null;
+ byte[] keySchemaId = null;
+ byte[] valueSchemaId = null;
+ if (isValidSchemaId(schemaId)) {
+ var kvSchemaId = getKeyValueSchemaId(schemaId);
+ keySchemaId = kvSchemaId.getKey();
+ valueSchemaId = kvSchemaId.getValue();
+ }
+
+ if (keyBytes != null) {
+ if (keySchema.supportSchemaVersioning() &&
isValidSchemaId(keySchemaId)) {
+ k = keySchema.decode(topic, keyBytes, keySchemaId);
+ } else {
+ k = keySchema.decode(keyBytes);
+ }
+ }
+
+ V v = null;
+ if (valueBytes != null) {
+ if (valueSchema.supportSchemaVersioning() &&
isValidSchemaId(valueSchemaId)) {
+ v = valueSchema.decode(topic, valueBytes, valueSchemaId);
+ } else {
+ v = valueSchema.decode(valueBytes);
+ }
+ }
+ return new KeyValue<>(k, v);
+ }
+
+ private KeyValue<byte[], byte[]> getKeyValueSchemaId(byte[] schemaId) {
+ if
(!SchemaType.EXTERNAL.equals(valueSchema.getSchemaInfo().getType())) {
+ return new KeyValue<>(schemaId, schemaId);
+ }
+ return KeyValue.getSchemaId(schemaId);
+ }
+
public SchemaInfo getSchemaInfo() {
return this.schemaInfo;
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 179c0081e07..6de7f805f93 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -375,7 +375,7 @@ public class KeyValueSchemaTest {
} catch (SchemaSerializationException e) {
Assert.assertTrue(e.getMessage().contains("This method cannot be
used under this SEPARATED encoding type"));
}
- KeyValue<Foo, Bar> keyValue = ((KeyValueSchemaImpl)
keyValueSchema).decode(fooSchema.encode(foo),
+ KeyValue<Foo, Bar> keyValue = ((KeyValueSchemaImpl)
keyValueSchema).decode(null, fooSchema.encode(foo),
encodeBytes, null);
Foo fooBack = keyValue.getKey();
Bar barBack = keyValue.getValue();
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
index f64e7a91624..e92d1c09124 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -96,7 +96,7 @@ public class SupportVersioningKeyValueSchemaTest {
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue =
((KeyValueSchemaImpl) keyValueSchema).decode(
- fooSchema.encode(foo), encodeBytes, new byte[10]);
+ null, fooSchema.encode(foo), encodeBytes, new byte[10]);
Assert.assertTrue(keyValue.getValue().isField1());
Assert.assertEquals(
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
@@ -157,7 +157,7 @@ public class SupportVersioningKeyValueSchemaTest {
byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue =
((KeyValueSchemaImpl) keyValueSchema).decode(
- fooSchema.encode(foo), encodeBytes, new byte[10]);
+ null, fooSchema.encode(foo), encodeBytes, new byte[10]);
Assert.assertTrue(keyValue.getValue().isField1());
Assert.assertEquals(
KeyValueEncodingType.valueOf(keyValueSchema.getSchemaInfo().getProperties().get("kv.encoding.type")),
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
index 69a6cc36ae4..919efb89f0f 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
@@ -19,7 +19,11 @@
package org.apache.pulsar.common.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.client.api.EncodeData.isValidSchemaId;
+import static org.apache.pulsar.common.schema.KeyValue.generateKVSchemaId;
+import static org.apache.pulsar.common.schema.KeyValue.getSchemaId;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import io.netty.buffer.Unpooled;
import java.nio.ByteBuffer;
import java.sql.Time;
@@ -133,4 +137,29 @@ public class KeyValueTest {
}
}
+ @DataProvider(name = "keyValueSchemaBytes")
+ public Object[][] keyValueSchemaBytes() {
+ return new Object[][] {
+ { null, null },
+ { new byte[0], new byte[0] },
+ { null, new byte[] {4, 5, 6, 7, 8} },
+ { new byte[0], new byte[] {4, 5, 6, 7, 8} },
+ { new byte[] {1, 2, 3}, null },
+ { new byte[] {1, 2, 3}, new byte[0] },
+ { new byte[] {1, 2, 3}, new byte[] {4, 5, 6, 7, 8} },
+ };
+ }
+
+ @Test(dataProvider = "keyValueSchemaBytes")
+ public void testEncodeDecodeSchemaId(byte[] keySchemaId, byte[]
valueSchemaId) {
+ byte[] encoded = generateKVSchemaId(keySchemaId, valueSchemaId);
+ if (!isValidSchemaId(keySchemaId) && !isValidSchemaId(valueSchemaId)) {
+ assertFalse(isValidSchemaId(encoded));
+ return;
+ }
+ var decoded = getSchemaId(encoded);
+ assertEquals(keySchemaId == null ? new byte[0] : keySchemaId,
decoded.getKey());
+ assertEquals(valueSchemaId == null ? new byte[0] : valueSchemaId,
decoded.getValue());
+ }
+
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index c99c407b9d7..4016dfb2a98 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.EncodeData;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -38,9 +39,15 @@ import org.apache.pulsar.common.schema.SchemaType;
public final class KeyValueSchemaInfo {
private static final Schema<SchemaInfo> SCHEMA_INFO_WRITER = new
Schema<SchemaInfo>() {
+
+ @Override
+ public EncodeData encode(String topic, SchemaInfo si) {
+ return new EncodeData(si.getSchema());
+ }
+
@Override
public byte[] encode(SchemaInfo si) {
- return si.getSchema();
+ return encode(null, si).data();
}
@Override
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 58d58a3acef..7aaa010c11e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -840,6 +840,9 @@ public class Commands {
return Schema.Type.AutoConsume;
} else if (type.getValue() < 0) {
return Schema.Type.None;
+ } else if (type == SchemaType.EXTERNAL) {
+ // This is a special case, SchemaType.EXTERNAL number is not match
the Schema.Type.EXTERNAL.
+ return Schema.Type.External;
} else {
return Schema.Type.valueOf(type.getValue());
}
@@ -851,6 +854,9 @@ public class Commands {
} else if (type.getValue() < 0) {
// this is unexpected
return SchemaType.NONE;
+ } else if (type == Schema.Type.External) {
+ // This is a special case, SchemaType.EXTERNAL number is not match
the Schema.Type.EXTERNAL.
+ return SchemaType.EXTERNAL;
} else {
return SchemaType.valueOf(type.getValue());
}
@@ -1858,6 +1864,9 @@ public class Commands {
if (builder.hasSchemaVersion()) {
messageMetadata.setSchemaVersion(builder.getSchemaVersion());
}
+ if (builder.hasSchemaId()) {
+ messageMetadata.setSchemaId(builder.getSchemaId());
+ }
return builder.getSequenceId();
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index a03daf05f99..1bfcf16beb5 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -46,6 +46,7 @@ message Schema {
LocalDateTime = 19;
ProtobufNative = 20;
AutoConsume = 21;
+ External = 22;
}
required string name = 1;
@@ -173,6 +174,7 @@ message MessageMetadata {
// the `compacted_out` field in `SingleMessageMetadata` must be checked to
identify and filter out compacted
// messages (e.g., `k1 => v1` and `k1 => null` in the example above).
repeated int32 compacted_batch_indexes = 31;
+ optional bytes schema_id = 32;
}
message SingleMessageMetadata {
diff --git
a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
index c7c8aa92a2f..e1e8a218345 100644
--- a/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
+++ b/pulsar-io/http/src/test/java/org/apache/pulsar/io/http/HttpSinkTest.java
@@ -384,6 +384,11 @@ public class HttpSinkTest {
return new byte[0];
}
+ @Override
+ public Optional<byte[]> getSchemaId() {
+ return Optional.of(new byte[0]);
+ }
+
@Override
public boolean isReplicated() {
return false;