sijie closed pull request #2494: [Schema] Introduce Schema.AUTO to detect schema automatically for consumers and readers URL: https://github.com/apache/incubator-pulsar/pull/2494
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index b880953267..24573d5bc9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.service.schema.SchemaRegistry; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.impl.schema.ProtobufSchema; @@ -440,4 +441,98 @@ public String toString() { } } + @Test + public void testAvroProducerAndAutoSchemaConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + AvroSchema<AvroEncodedPojo> avroSchema = + AvroSchema.of(AvroEncodedPojo.class); + + Producer<AvroEncodedPojo> producer = pulsarClient + .newProducer(avroSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(new AvroEncodedPojo(message)); + } + + Consumer<GenericRecord> consumer = pulsarClient + .newConsumer(Schema.AUTO()) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message<GenericRecord> msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + GenericRecord receivedMessage = msg.getValue(); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + String actualMessage = (String) receivedMessage.getField("message"); + testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + consumer.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + + } + + @Test + public void testAvroProducerAndAutoSchemaReader() throws Exception { + log.info("-- Starting {} test --", methodName); + + AvroSchema<AvroEncodedPojo> avroSchema = + AvroSchema.of(AvroEncodedPojo.class); + + Producer<AvroEncodedPojo> producer = pulsarClient + .newProducer(avroSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(new AvroEncodedPojo(message)); + } + + Reader<GenericRecord> reader = pulsarClient + .newReader(Schema.AUTO()) + .topic("persistent://my-property/use/my-ns/my-topic1") + .startMessageId(MessageId.earliest) + .create(); + + Message<GenericRecord> msg = null; + Set<String> messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = reader.readNext(); + GenericRecord receivedMessage = msg.getValue(); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + String actualMessage = (String) receivedMessage.getField("message"); + testMessageOrderAndDuplicates(messageSet, actualMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + reader.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + + } + } diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java index 3088a4de21..113f26c50e 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.api; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.BytesSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; @@ -77,4 +79,8 @@ static <T> Schema<T> JSON(Class<T> clazz) { return JSONSchema.of(clazz); } + + static Schema<GenericRecord> AUTO() { + return new AutoSchema(); + } } diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java new file mode 100644 index 0000000000..5bf92b71cb --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/AutoSchema.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema; + + +import static com.google.common.base.Preconditions.checkState; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.SchemaInfo; + +/** + * Auto detect schema. + */ +public class AutoSchema implements Schema<GenericRecord> { + + private Schema<GenericRecord> schema; + + public void setSchema(Schema<GenericRecord> schema) { + this.schema = schema; + } + + private void ensureSchemaInitialized() { + checkState(null != schema, "Schema is not initialized before used"); + } + + @Override + public byte[] encode(GenericRecord message) { + ensureSchemaInitialized(); + + return schema.encode(message); + } + + @Override + public GenericRecord decode(byte[] bytes) { + ensureSchemaInitialized(); + + return schema.decode(bytes); + } + + @Override + public SchemaInfo getSchemaInfo() { + ensureSchemaInitialized(); + + return schema.getSchemaInfo(); + } +} diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java index cfe9cb7cba..e5fcc3c4f6 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java @@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; @@ -115,11 +116,29 @@ public void testEncodeAndDecode() { @Test public void testEncodeAndDecodeGenericRecord() { AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null); + GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo()); + + log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema()); + + testGenericSchema(avroSchema, genericAvroSchema); + } + + @Test + public void testAutoSchema() { + AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null); GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo()); log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema()); + AutoSchema schema = new AutoSchema(); + schema.setSchema(genericAvroSchema); + + testGenericSchema(avroSchema, schema); + } + + private void testGenericSchema(AvroSchema<Foo> avroSchema, + org.apache.pulsar.client.api.Schema<GenericRecord> genericRecordSchema) { int numRecords = 10; for (int i = 0; i < numRecords; i++) { Foo foo = new Foo(); @@ -132,7 +151,7 @@ public void testEncodeAndDecodeGenericRecord() { byte[] data = avroSchema.encode(foo); - GenericRecord record = genericAvroSchema.decode(data); + GenericRecord record = genericRecordSchema.decode(data); Object field1 = record.getField("field1"); assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass()); Object field2 = record.getField("field2"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e156e10820..e82cf6c2f2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.commons.lang3.StringUtils.isBlank; import com.google.common.annotations.VisibleForTesting; @@ -61,12 +62,15 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; +import org.apache.pulsar.client.impl.schema.AutoSchema; +import org.apache.pulsar.client.impl.schema.GenericAvroSchema; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.slf4j.Logger; @@ -243,6 +247,11 @@ public ClientConfigurationData getConfiguration() { new PulsarClientException.InvalidConfigurationException("Producer configuration undefined")); } + if (schema instanceof AutoSchema) { + return FutureUtil.failedFuture( + new PulsarClientException.InvalidConfigurationException("AutoSchema is only used by consumers to detect schemas automatically")); + } + if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed : state = " + state.get())); } @@ -377,6 +386,29 @@ public ClientConfigurationData getConfiguration() { } private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) { + if (schema instanceof AutoSchema) { + AutoSchema autoSchema = (AutoSchema) schema; + return lookup.getSchema(TopicName.get(conf.getSingleTopic())) + .thenCompose(schemaInfoOptional -> { + if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) { + GenericAvroSchema genericAvroSchema = new GenericAvroSchema(schemaInfoOptional.get()); + log.info("Auto detected schema for topic {} : {}", + conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8)); + autoSchema.setSchema(genericAvroSchema); + return doSingleTopicSubscribeAsync(conf, schema); + } else { + return FutureUtil.failedFuture( + new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas")); + } + }); + } else { + return doSingleTopicSubscribeAsync(conf, schema); + } + } + + + + private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema) { CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>(); String topic = conf.getSingleTopic(); @@ -505,6 +537,26 @@ public ClientConfigurationData getConfiguration() { } public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) { + if (schema instanceof AutoSchema) { + AutoSchema autoSchema = (AutoSchema) schema; + return lookup.getSchema(TopicName.get(conf.getTopicName())) + .thenCompose(schemaInfoOptional -> { + if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) { + GenericAvroSchema genericAvroSchema = new GenericAvroSchema(schemaInfoOptional.get()); + log.info("Auto detected schema for topic {} : {}", + conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8)); + autoSchema.setSchema(genericAvroSchema); + return doCreateReaderAsync(conf, schema); + } else { + return FutureUtil.failedFuture( + new PulsarClientException.LookupException("Currently schema detection only works for topics with avro schemas")); + } + }); + } else { + return doCreateReaderAsync(conf, schema); + } + } + <T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) { if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java index cbf7c91768..88adb532a4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java @@ -45,5 +45,10 @@ /** * Serialize and deserialize via avro */ - AVRO + AVRO, + + /** + * Auto Detect Schema Type. + */ + AUTO } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services