This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f5e10a9 [Schema] Support consume multiple schema types messages by
AutoConsumeSchema (#10604)
f5e10a9 is described below
commit f5e10a99c3f667f05c3303098ee22e9cd9332bf4
Author: ran <[email protected]>
AuthorDate: Sun May 23 21:47:55 2021 +0800
[Schema] Support consume multiple schema types messages by
AutoConsumeSchema (#10604)
Based on the PR https://github.com/apache/pulsar/pull/10573
### Motivation
Support consuming multiple schema types messages by AutoConsumeSchema.
---
.../java/org/apache/pulsar/schema/SchemaTest.java | 182 +++++++++++++++++++++
.../org/apache/pulsar/client/impl/MessageImpl.java | 16 +-
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
.../client/impl/TypedMessageBuilderImpl.java | 5 +-
.../client/impl/schema/AutoConsumeSchema.java | 130 +++++++++------
.../client/impl/schema/AutoProduceBytesSchema.java | 15 +-
.../generic/MultiVersionSchemaInfoProvider.java | 2 +-
.../schema/generic/AbstractGenericSchemaTest.java | 3 +-
.../impl/schema/generic/GenericSchemaImplTest.java | 26 +--
.../impl/schema/generic/GenericSchemaTest.java | 26 +--
.../pulsar/proxy/server/LookupProxyHandler.java | 14 +-
11 files changed, 331 insertions(+), 90 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index a2584a4..a50ab61 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
@@ -37,6 +38,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Cleanup;
@@ -53,9 +56,11 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -781,4 +786,181 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
assertEquals("foo", message.getValue());
}
+ public void testConsumeMultipleSchemaMessages() throws Exception {
+ final String namespace = "test-namespace-" + randomName(16);
+ String ns = PUBLIC_TENANT + "/" + namespace;
+ admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
+ admin.namespaces().setSchemaCompatibilityStrategy(ns,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
+
+ final String autoProducerTopic = getTopicName(ns,
"auto_produce_topic");
+ Producer<byte[]> autoProducer =
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+ .topic(autoProducerTopic)
+ .create();
+
+ AtomicInteger totalMsgCnt = new AtomicInteger(0);
+ generateDataByDifferentSchema(ns, "bytes_schema", Schema.BYTES, "bytes
value".getBytes(),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "string_schema", Schema.STRING,
"string value",
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "bool_schema", Schema.BOOL, true,
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_one_schema",
Schema.JSON(Schemas.PersonOne.class),
+ new Schemas.PersonOne(1), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_three_schema",
Schema.JSON(Schemas.PersonThree.class),
+ new Schemas.PersonThree(3, "ran"), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "json_four_schema",
Schema.JSON(Schemas.PersonFour.class),
+ new Schemas.PersonFour(4, "tang", 18), autoProducer,
totalMsgCnt);
+ generateDataByDifferentSchema(ns, "avro_one_schema",
Schema.AVRO(Schemas.PersonOne.class),
+ new Schemas.PersonOne(10), autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_one_v_three_schema_separate",
+ Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
+ Schema.JSON(Schemas.PersonThree.class),
KeyValueEncodingType.SEPARATED),
+ new KeyValue<>(new Schemas.PersonOne(1), new
Schemas.PersonThree(3, "kv-separate")),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_one_v_four_schema_inline",
+ Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
+ Schema.JSON(Schemas.PersonFour.class),
KeyValueEncodingType.INLINE),
+ new KeyValue<>(new Schemas.PersonOne(10), new
Schemas.PersonFour(30, "kv-inline", 20)),
+ autoProducer, totalMsgCnt);
+ generateDataByDifferentSchema(ns, "k_int_v_three_schema_separate",
+ Schema.KeyValue(Schema.INT32,
Schema.JSON(Schemas.PersonThree.class), KeyValueEncodingType.SEPARATED),
+ new KeyValue<>(100, new Schemas.PersonThree(40,
"kv-separate")),
+ autoProducer, totalMsgCnt);
+
+ Consumer<GenericRecord> autoConsumer =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(autoProducerTopic)
+ .subscriptionName("test")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<GenericRecord> message;
+ for (int i = 0; i < totalMsgCnt.get(); i++) {
+ message = autoConsumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ Assert.fail("Failed to receive multiple schema message.");
+ }
+ log.info("auto consumer get native object class: {}, value: {}",
+ message.getValue().getNativeObject().getClass(),
message.getValue().getNativeObject());
+ checkSchemaForAutoSchema(message);
+ }
+ }
+
+ private String getTopicName(String ns, String baseTopic) {
+ return ns + "/" + baseTopic;
+ }
+
+ private void generateDataByDifferentSchema(String ns,
+ String baseTopic,
+ Schema schema,
+ Object data,
+ Producer<?> autoProducer,
+ AtomicInteger totalMsgCnt)
throws PulsarClientException {
+ String topic = getTopicName(ns, baseTopic);
+ Producer<Object> producer = pulsarClient.newProducer(schema)
+ .topic(topic)
+ .create();
+ producer.newMessage().value(data).property("baseTopic",
baseTopic).send();
+ totalMsgCnt.incrementAndGet();
+
+ Consumer<GenericRecord> consumer =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("test")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ Message<GenericRecord> message = consumer.receive(5, TimeUnit.SECONDS);
+ if (message == null) {
+ Assert.fail("Failed to receive message for topic " + topic);
+ }
+ if (!message.getReaderSchema().isPresent()) {
+ Assert.fail("Failed to get reader schema for topic " + topic);
+ }
+ message.getValue();
+
+ Schema<?> readerSchema = message.getReaderSchema().get();
+ if (readerSchema instanceof KeyValueSchema
+ && ((KeyValueSchema<?, ?>) readerSchema)
+
.getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+ autoProducer.newMessage(
+
Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).keyBytes(message.getKeyBytes())
+ .value(message.getData())
+ .properties(message.getProperties())
+ .send();
+ } else {
+
autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
+ .properties(message.getProperties())
+ .value(message.getData())
+ .send();
+ }
+ producer.close();
+ consumer.close();
+ }
+
+ private void checkSchemaForAutoSchema(Message<GenericRecord> message) {
+ if (!message.getReaderSchema().isPresent()) {
+ Assert.fail("Failed to get reader schema for auto consume multiple
schema topic.");
+ }
+ Object nativeObject = message.getValue().getNativeObject();
+ String baseTopic = message.getProperty("baseTopic");
+ JsonNode jsonNode;
+ KeyValue<?, ?> kv;
+ switch (baseTopic) {
+ case "bytes_schema":
+ Assert.assertEquals(new String((byte[]) nativeObject), "bytes
value");
+ break;
+ case "string_schema":
+ Assert.assertEquals((String) nativeObject, "string value");
+ break;
+ case "bool_schema":
+ Assert.assertEquals(nativeObject, Boolean.TRUE);
+ break;
+ case "json_one_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 1);
+ break;
+ case "json_three_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 3);
+ Assert.assertEquals(jsonNode.get("name").textValue(), "ran");
+ break;
+ case "json_four_schema":
+ jsonNode = (JsonNode) nativeObject;
+ Assert.assertEquals(jsonNode.get("id").intValue(), 4);
+ Assert.assertEquals(jsonNode.get("name").textValue(), "tang");
+ Assert.assertEquals(jsonNode.get("age").intValue(), 18);
+ break;
+ case "avro_one_schema":
+ org.apache.avro.generic.GenericRecord genericRecord =
+ (org.apache.avro.generic.GenericRecord) nativeObject;
+ Assert.assertEquals(genericRecord.get("id"), 10);
+ break;
+ case "k_one_v_three_schema_separate":
+ kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
+ jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
+ Assert.assertEquals(jsonNode.get("id").intValue(), 1);
+ jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+ Assert.assertEquals(jsonNode.get("id").intValue(), 3);
+ Assert.assertEquals(jsonNode.get("name").textValue(),
"kv-separate");
+ break;
+ case "k_one_v_four_schema_inline":
+ kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
+ jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
+ Assert.assertEquals(jsonNode.get("id").intValue(), 10);
+ jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+ Assert.assertEquals(jsonNode.get("id").intValue(), 30);
+ Assert.assertEquals(jsonNode.get("name").textValue(),
"kv-inline");
+ Assert.assertEquals(jsonNode.get("age").intValue(), 20);
+ break;
+ case "k_int_v_three_schema_separate":
+ kv = (KeyValue<Integer, GenericRecord>) nativeObject;
+ Assert.assertEquals(kv.getKey(), 100);
+ jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
+ Assert.assertEquals(jsonNode.get("id").intValue(), 40);
+ Assert.assertEquals(jsonNode.get("name").textValue(),
"kv-separate");
+ break;
+ default:
+ // nothing to do
+ }
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 15c1edd..a59bd91 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.annotations.VisibleForTesting;
@@ -52,6 +51,7 @@ import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -401,7 +401,7 @@ public class MessageImpl<T> implements Message<T> {
.atSchemaVersion(schemaVersion));
} else if (schema instanceof AbstractSchema) {
byte[] schemaVersion = getSchemaVersion();
- return Optional.of(((AbstractSchema) schema)
+ return Optional.of(((AbstractSchema<?>) schema)
.atSchemaVersion(schemaVersion));
} else {
return Optional.of(schema);
@@ -419,11 +419,15 @@ public class MessageImpl<T> implements Message<T> {
private void ensureSchemaIsLoaded() {
if (schema instanceof AutoConsumeSchema) {
- ((AutoConsumeSchema) schema).fetchSchemaIfNeeded();
+ ((AutoConsumeSchema)
schema).fetchSchemaIfNeeded(BytesSchemaVersion.of(getSchemaVersion()));
}
}
+
private SchemaInfo getSchemaInfo() {
ensureSchemaIsLoaded();
+ if (schema instanceof AutoConsumeSchema) {
+ return ((AutoConsumeSchema)
schema).getSchemaInfo(getSchemaVersion());
+ }
return schema.getSchemaInfo();
}
@@ -449,7 +453,7 @@ public class MessageImpl<T> implements Message<T> {
private KeyValueSchema getKeyValueSchema() {
if (schema instanceof AutoConsumeSchema) {
- return (KeyValueSchema) ((AutoConsumeSchema)
schema).getInternalSchema();
+ return (KeyValueSchema) ((AutoConsumeSchema)
schema).getInternalSchema(getSchemaVersion());
} else {
return (KeyValueSchema) schema;
}
@@ -476,7 +480,7 @@ public class MessageImpl<T> implements Message<T> {
(org.apache.pulsar.common.schema.KeyValue)
kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
if (schema instanceof AutoConsumeSchema) {
return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
- schema.getSchemaInfo().getType(), schemaVersion);
+ ((AutoConsumeSchema)
schema).getSchemaInfo(schemaVersion).getType(), schemaVersion);
} else {
return (T) keyValue;
}
@@ -492,7 +496,7 @@ public class MessageImpl<T> implements Message<T> {
(org.apache.pulsar.common.schema.KeyValue)
kvSchema.decode(getKeyBytes(), getData(), null);
if (schema instanceof AutoConsumeSchema) {
return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
- schema.getSchemaInfo().getType(), null);
+ ((AutoConsumeSchema)
schema).getSchemaInfo(getSchemaVersion()).getType(), null);
} else {
return (T) keyValue;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 294233b..f951a88 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -987,7 +987,7 @@ public class PulsarClientImpl implements PulsarClient {
return new MultiVersionSchemaInfoProvider(TopicName.get(topicName),
this);
}
- private LoadingCache<String, SchemaInfoProvider>
getSchemaProviderLoadingCache() {
+ protected LoadingCache<String, SchemaInfoProvider>
getSchemaProviderLoadingCache() {
return schemaProviderLoadingCache;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 59ccde3..565502c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -123,7 +123,7 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
@Override
public TypedMessageBuilder<T> keyBytes(byte[] key) {
- if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+ if (schema instanceof KeyValueSchema &&
schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding
type is SEPARATED");
@@ -149,7 +149,8 @@ public class TypedMessageBuilderImpl<T> implements
TypedMessageBuilder<T> {
msgMetadata.setNullValue(true);
return this;
}
- if (schema.getSchemaInfo() != null && schema.getSchemaInfo().getType()
== SchemaType.KEY_VALUE) {
+ if (value instanceof org.apache.pulsar.common.schema.KeyValue
+ && schema.getSchemaInfo() != null &&
schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
org.apache.pulsar.common.schema.KeyValue kv =
(org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() ==
KeyValueEncodingType.SEPARATED) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index f075ecf..8693e3a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;
+import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -25,15 +26,16 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import
org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import static com.google.common.base.Preconditions.checkState;
@@ -43,7 +45,7 @@ import static com.google.common.base.Preconditions.checkState;
@Slf4j
public class AutoConsumeSchema implements Schema<GenericRecord> {
- private Schema<?> schema;
+ private final ConcurrentMap<SchemaVersion, Schema<?>> schemaMap =
initSchemaMap();
private String topicName;
@@ -51,65 +53,89 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
private SchemaInfoProvider schemaInfoProvider;
+ private ConcurrentMap<SchemaVersion, Schema<?>> initSchemaMap() {
+ ConcurrentMap<SchemaVersion, Schema<?>> schemaMap =
Maps.newConcurrentMap();
+ // The Schema.BYTES will not be uploaded to the broker and store in
the schema storage,
+ // if the schema version in the message metadata is empty byte[], it
means its schema is Schema.BYTES.
+ schemaMap.put(BytesSchemaVersion.of(new byte[0]), Schema.BYTES);
+ return schemaMap;
+ }
+
+ public void setSchema(SchemaVersion schemaVersion, Schema<?> schema) {
+ schemaMap.put(schemaVersion, schema);
+ }
+
public void setSchema(Schema<?> schema) {
- this.schema = schema;
+ schemaMap.put(SchemaVersion.Latest, schema);
}
- private void ensureSchemaInitialized() {
- checkState(null != schema, "Schema is not initialized before used");
+ private void ensureSchemaInitialized(SchemaVersion schemaVersion) {
+ checkState(schemaMap.containsKey(schemaVersion),
+ "Schema version " + schemaVersion + " is not initialized
before used");
}
@Override
public void validate(byte[] message) {
- ensureSchemaInitialized();
+ ensureSchemaInitialized(SchemaVersion.Latest);
- schema.validate(message);
+ schemaMap.get(SchemaVersion.Latest).validate(message);
}
@Override
public byte[] encode(GenericRecord message) {
- ensureSchemaInitialized();
-
throw new UnsupportedOperationException("AutoConsumeSchema is not
intended to be used for encoding");
}
@Override
public boolean supportSchemaVersioning() {
- return schema == null || schema.supportSchemaVersioning();
+ return true;
}
public Schema<?> atSchemaVersion(byte[] schemaVersion) {
- fetchSchemaIfNeeded();
- ensureSchemaInitialized();
- if (schema.supportSchemaVersioning() && schema instanceof
AbstractSchema) {
- return ((AbstractSchema) schema).atSchemaVersion(schemaVersion);
+ SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+ fetchSchemaIfNeeded(sv);
+ ensureSchemaInitialized(sv);
+ Schema<?> topicVersionedSchema = schemaMap.get(sv);
+ if (topicVersionedSchema.supportSchemaVersioning() &&
topicVersionedSchema instanceof AbstractSchema) {
+ return ((AbstractSchema<?>)
topicVersionedSchema).atSchemaVersion(schemaVersion);
} else {
- return schema;
+ return topicVersionedSchema;
}
}
@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
- fetchSchemaIfNeeded();
- ensureSchemaInitialized();
- return adapt(schema.decode(bytes, schemaVersion), schemaVersion);
+ SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+ fetchSchemaIfNeeded(sv);
+ ensureSchemaInitialized(sv);
+ return adapt(schemaMap.get(sv).decode(bytes), schemaVersion);
}
@Override
public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
- if (schema == null) {
- this.schemaInfoProvider = schemaInfoProvider;
- } else {
- schema.setSchemaInfoProvider(schemaInfoProvider);
+ this.schemaInfoProvider = schemaInfoProvider;
+ if (schemaMap.containsKey(SchemaVersion.Latest)) {
+
schemaMap.get(SchemaVersion.Latest).setSchemaInfoProvider(schemaInfoProvider);
}
}
@Override
public SchemaInfo getSchemaInfo() {
- if (schema == null) {
+ if (!schemaMap.containsKey(SchemaVersion.Latest)) {
return null;
}
- return schema.getSchemaInfo();
+ return schemaMap.get(SchemaVersion.Latest).getSchemaInfo();
+ }
+
+ public SchemaInfo getSchemaInfo(byte[] schemaVersion) {
+ if (schemaVersion == null) {
+ return Schema.BYTES.getSchemaInfo();
+ }
+ SchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+ if (schemaMap.containsKey(sv)) {
+ return schemaMap.get(sv).getSchemaInfo();
+ }
+ return null;
}
@Override
@@ -120,7 +146,7 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
this.componentName = componentName;
if (schemaInfo != null) {
Schema<?> genericSchema = generateSchema(schemaInfo);
- setSchema(genericSchema);
+ setSchema(SchemaVersion.Latest, genericSchema);
log.info("Configure {} schema for topic {} : {}",
componentName, topicName,
schemaInfo.getSchemaDefinition());
}
@@ -128,11 +154,11 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
@Override
public Optional<Object> getNativeSchema() {
- ensureSchemaInitialized();
- if (schema == null) {
+ ensureSchemaInitialized(SchemaVersion.Latest);
+ if (schemaMap.get(SchemaVersion.Latest) == null) {
return Optional.empty();
} else {
- return schema.getNativeSchema();
+ return schemaMap.get(SchemaVersion.Latest).getNativeSchema();
}
}
@@ -205,15 +231,14 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
}
public Schema<GenericRecord> clone() {
- Schema<GenericRecord> schema = new AutoConsumeSchema();
- if (this.schema != null) {
- schema.configureSchemaInfo(topicName, componentName,
this.schema.getSchemaInfo());
- } else {
- schema.configureSchemaInfo(topicName, componentName, null);
- }
+ AutoConsumeSchema schema = new AutoConsumeSchema();
+ schema.configureSchemaInfo(topicName, componentName, null);
if (schemaInfoProvider != null) {
schema.setSchemaInfoProvider(schemaInfoProvider);
}
+ for (Map.Entry<SchemaVersion, Schema<?>> entry : schemaMap.entrySet())
{
+ schema.setSchema(entry.getKey(), entry.getValue());
+ }
return schema;
}
@@ -226,19 +251,23 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
if (value instanceof GenericRecord) {
return (GenericRecord) value;
}
- if (this.schema == null) {
+ BytesSchemaVersion sv = BytesSchemaVersion.of(schemaVersion);
+ if (!schemaMap.containsKey(sv)) {
throw new IllegalStateException("Cannot decode a message without
schema");
}
- return wrapPrimitiveObject(value, schema.getSchemaInfo().getType(),
schemaVersion);
+ return wrapPrimitiveObject(value,
schemaMap.get(sv).getSchemaInfo().getType(), schemaVersion);
}
public static GenericRecord wrapPrimitiveObject(Object value, SchemaType
type, byte[] schemaVersion) {
return GenericObjectWrapper.of(value, type, schemaVersion);
}
-
public Schema<?> getInternalSchema() {
- return schema;
+ return schemaMap.get(SchemaVersion.Latest);
+ }
+
+ public Schema<?> getInternalSchema(byte[] schemaVersion) {
+ return schemaMap.get(BytesSchemaVersion.of(schemaVersion));
}
/**
@@ -246,15 +275,18 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
* We cannot call this method in getSchemaInfo, because getSchemaInfo is
called in many
* places and we will introduce lots of deadlocks.
*/
- public void fetchSchemaIfNeeded() throws SchemaSerializationException {
- if (schema == null) {
+ public void fetchSchemaIfNeeded(SchemaVersion schemaVersion) throws
SchemaSerializationException {
+ if (schemaVersion == null) {
+ schemaVersion = BytesSchemaVersion.of(new byte[0]);
+ }
+ if (!schemaMap.containsKey(schemaVersion)) {
if (schemaInfoProvider == null) {
throw new SchemaSerializationException("Can't get accurate
schema information for topic " + topicName +
"using AutoConsumeSchema
because SchemaInfoProvider is not set yet");
} else {
SchemaInfo schemaInfo = null;
try {
- schemaInfo = schemaInfoProvider.getLatestSchema().get();
+ schemaInfo =
schemaInfoProvider.getSchemaByVersion(schemaVersion.bytes()).get();
if (schemaInfo == null) {
// schemaless topic
schemaInfo = BytesSchema.of().getSchemaInfo();
@@ -267,18 +299,20 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
throw new SchemaSerializationException(e.getCause());
}
// schemaInfo null means that there is no schema attached to
the topic.
- schema = generateSchema(schemaInfo);
+ Schema<?> schema = generateSchema(schemaInfo);
schema.setSchemaInfoProvider(schemaInfoProvider);
- log.info("Configure {} schema for topic {} : {}",
- componentName, topicName,
schemaInfo.getSchemaDefinition());
+ setSchema(schemaVersion, schema);
+ log.info("Configure {} schema {} for topic {} : {}",
+ componentName, schemaVersion, topicName,
schemaInfo.getSchemaDefinition());
}
}
}
@Override
public String toString() {
- if (schema != null && schema.getSchemaInfo() != null) {
- return "AUTO_CONSUME(schematype=" +
schema.getSchemaInfo().getType() + ")";
+ if (schemaMap.containsKey(SchemaVersion.Latest)
+ && schemaMap.get(SchemaVersion.Latest).getSchemaInfo() !=
null) {
+ return "AUTO_CONSUME(schematype=" +
schemaMap.get(SchemaVersion.Latest).getSchemaInfo().getType() + ")";
} else {
return "AUTO_CONSUME(uninitialized)";
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 7578ffa..a85aceb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -21,9 +21,12 @@ package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkState;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import java.util.Optional;
+
/**
* Auto detect schema.
*/
@@ -71,7 +74,12 @@ public class AutoProduceBytesSchema<T> implements
Schema<byte[]> {
if (requireSchemaValidation) {
// verify if the message can be decoded by the underlying schema
- schema.validate(message);
+ if (schema instanceof KeyValueSchema
+ && ((KeyValueSchema)
schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+ ((KeyValueSchema) schema).getValueSchema().validate(message);
+ } else {
+ schema.validate(message);
+ }
}
return message;
@@ -97,6 +105,11 @@ public class AutoProduceBytesSchema<T> implements
Schema<byte[]> {
}
@Override
+ public Optional<Object> getNativeSchema() {
+ return Optional.ofNullable(schema);
+ }
+
+ @Override
public Schema<byte[]> clone() {
return new AutoProduceBytesSchema<>(schema.clone());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
index 4c430be..666177d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java
@@ -93,7 +93,7 @@ public class MultiVersionSchemaInfoProvider implements
SchemaInfoProvider {
}
private CompletableFuture<SchemaInfo> loadSchema(byte[] schemaVersion) {
- return pulsarClient.getLookup()
+ return pulsarClient.getLookup()
.getSchema(topicName, schemaVersion)
.thenApply(o -> o.orElse(null));
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java
index 0865579..ac32318 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/AbstractGenericSchemaTest.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
@@ -74,7 +75,7 @@ public class AbstractGenericSchemaTest {
GenericRecord record;
if (decodeSchema instanceof AutoConsumeSchema) {
- record = decodeSchema.decode(data, new byte[0]);
+ record = decodeSchema.decode(data, new
LongSchemaVersion(0L).bytes());
} else {
record = decodeSchema.decode(data);
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index 477e3d8..e2fc5d3 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -39,6 +39,7 @@ import
org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.testng.annotations.Test;
/**
@@ -85,7 +86,7 @@ public class GenericSchemaImplTest {
public void testAutoJsonSchema() {
// configure the schema info provider
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider =
mock(MultiVersionSchemaInfoProvider.class);
- GenericSchema genericAvroSchema =
GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
+ GenericSchema genericAvroSchema =
GenericSchemaImpl.of(Schema.JSON(Foo.class).getSchemaInfo());
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
@@ -111,7 +112,7 @@ public class GenericSchemaImplTest {
GenericRecord record;
if (decodeSchema instanceof AutoConsumeSchema) {
- record = decodeSchema.decode(data, new byte[0]);
+ record = decodeSchema.decode(data, new
LongSchemaVersion(0L).bytes());
} else {
record = decodeSchema.decode(data);
}
@@ -123,15 +124,6 @@ public class GenericSchemaImplTest {
public void testKeyValueSchema() {
// configure the schema info provider
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider =
mock(MultiVersionSchemaInfoProvider.class);
- GenericSchema genericAvroSchema =
GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
-
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(CompletableFuture.completedFuture(
- KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
- genericAvroSchema,
- genericAvroSchema,
- KeyValueEncodingType.INLINE
- )
- ));
List<Schema<Foo>> encodeSchemas = Lists.newArrayList(
Schema.JSON(Foo.class),
@@ -152,6 +144,16 @@ public class GenericSchemaImplTest {
decodeSchema.configureSchemaInfo(
"test-topic", "topic",kvSchema.getSchemaInfo()
);
+
+ GenericSchema genericAvroSchema =
GenericSchemaImpl.of(Schema.AVRO(Foo.class).getSchemaInfo());
+
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(CompletableFuture.completedFuture(
+ KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
+ keySchema,
+ valueSchema,
+ KeyValueEncodingType.INLINE
+ )
+ ));
decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
testEncodeAndDecodeKeyValues(kvSchema, decodeSchema);
@@ -167,7 +169,7 @@ public class GenericSchemaImplTest {
Foo foo = newFoo(i);
byte[] data = encodeSchema.encode(new KeyValue<>(foo, foo));
- KeyValue<GenericRecord, GenericRecord> kv =
decodeSchema.decode(data, new byte[0]);
+ KeyValue<GenericRecord, GenericRecord> kv =
decodeSchema.decode(data, new LongSchemaVersion(0L).bytes());
verifyFooRecord(kv.getKey(), i);
verifyFooRecord(kv.getValue(), i);
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
index 0910e86..6e3c942 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -30,6 +30,7 @@ import
org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.testng.annotations.Test;
import java.util.List;
@@ -85,7 +86,7 @@ public class GenericSchemaTest {
public void testAutoJsonSchema() {
// configure the schema info provider
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider =
mock(MultiVersionSchemaInfoProvider.class);
- GenericSchema genericAvroSchema =
GenericAvroSchema.of(Schema.AVRO(Foo.class).getSchemaInfo());
+ GenericSchema genericAvroSchema =
GenericAvroSchema.of(Schema.JSON(Foo.class).getSchemaInfo());
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
.thenReturn(CompletableFuture.completedFuture(genericAvroSchema.getSchemaInfo()));
@@ -111,7 +112,7 @@ public class GenericSchemaTest {
GenericRecord record;
if (decodeSchema instanceof AutoConsumeSchema) {
- record = decodeSchema.decode(data, new byte[0]);
+ record = decodeSchema.decode(data, new
LongSchemaVersion(0L).bytes());
} else {
record = decodeSchema.decode(data);
}
@@ -123,15 +124,6 @@ public class GenericSchemaTest {
public void testKeyValueSchema() {
// configure the schema info provider
MultiVersionSchemaInfoProvider multiVersionSchemaInfoProvider =
mock(MultiVersionSchemaInfoProvider.class);
- GenericSchema genericAvroSchema =
GenericAvroSchema.of(Schema.AVRO(Foo.class).getSchemaInfo());
-
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
- .thenReturn(CompletableFuture.completedFuture(
- KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
- genericAvroSchema,
- genericAvroSchema,
- KeyValueEncodingType.INLINE
- )
- ));
List<Schema<Foo>> encodeSchemas = Lists.newArrayList(
Schema.JSON(Foo.class),
@@ -152,6 +144,16 @@ public class GenericSchemaTest {
decodeSchema.configureSchemaInfo(
"test-topic", "topic",kvSchema.getSchemaInfo()
);
+
+
when(multiVersionSchemaInfoProvider.getSchemaByVersion(any(byte[].class)))
+ .thenReturn(CompletableFuture.completedFuture(
+ KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
+ keySchema,
+ valueSchema,
+ KeyValueEncodingType.INLINE
+ )
+ ));
+
decodeSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
testEncodeAndDecodeKeyValues(kvSchema, decodeSchema);
@@ -167,7 +169,7 @@ public class GenericSchemaTest {
Foo foo = newFoo(i);
byte[] data = encodeSchema.encode(new KeyValue<>(foo, foo));
- KeyValue<GenericRecord, GenericRecord> kv =
decodeSchema.decode(data, new byte[0]);
+ KeyValue<GenericRecord, GenericRecord> kv =
decodeSchema.decode(data, new LongSchemaVersion(1L).bytes());
verifyFooRecord(kv.getKey(), i);
verifyFooRecord(kv.getValue(), i);
}
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
index aa3a6b3..b14bea5 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java
@@ -37,6 +37,7 @@ import
org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -357,6 +358,12 @@ public class LookupProxyHandler {
final long clientRequestId = commandGetSchema.getRequestId();
String serviceUrl = getBrokerServiceUrl(clientRequestId);
String topic = commandGetSchema.getTopic();
+ Optional<SchemaVersion> schemaVersion;
+ if (commandGetSchema.hasSchemaVersion()) {
+ schemaVersion =
Optional.of(commandGetSchema.getSchemaVersion()).map(BytesSchemaVersion::of);
+ } else {
+ schemaVersion = Optional.empty();
+ }
if(!StringUtils.isNotBlank(serviceUrl)) {
return;
@@ -375,12 +382,7 @@ public class LookupProxyHandler {
// Connected to backend broker
long requestId = proxyConnection.newRequestId();
ByteBuf command;
- byte[] schemaVersion = null;
- if (commandGetSchema.hasSchemaVersion()) {
- schemaVersion = commandGetSchema.getSchemaVersion();
- }
- command = Commands.newGetSchema(requestId, topic,
-
Optional.ofNullable(schemaVersion).map(BytesSchemaVersion::of));
+ command = Commands.newGetSchema(requestId, topic, schemaVersion);
clientCnx.sendGetRawSchema(command, requestId).whenComplete((r, t)
-> {
if (t != null) {
log.warn("[{}] Failed to get schema {}: {}",
clientAddress, topic, t);