This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9ec8a5e [schema] enable Schema.AUTO if functions or connectors are using GenericRecord (#2537) 9ec8a5e is described below commit 9ec8a5ee4d4a80c50986cde95093b1eb4af31ef0 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Sat Sep 8 12:51:12 2018 -0700 [schema] enable Schema.AUTO if functions or connectors are using GenericRecord (#2537) * [schema] enable Schema.AUTO if functions or connectors are using GenericRecord * add schema-api to functions-api --- pulsar-functions/api-java/pom.xml | 6 ++ .../pulsar/functions/source/PulsarSource.java | 2 +- .../pulsar/functions/source/TopicSchema.java | 44 +++++-------- pulsar-functions/java-examples/pom.xml | 1 + .../functions/api/examples/AutoSchemaFunction.java | 33 ++++++++++ tests/integration/pom.xml | 12 ++++ .../integration/functions/PulsarFunctionsTest.java | 77 +++++++++++++++++++++- 7 files changed, 143 insertions(+), 32 deletions(-) diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml index 9a84206..0f21bae 100644 --- a/pulsar-functions/api-java/pom.xml +++ b/pulsar-functions/api-java/pom.xml @@ -38,6 +38,12 @@ </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-client-schema</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>net.jodah</groupId> <artifactId>typetools</artifactId> <scope>test</scope> diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index e1059f3..6eed8e0 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -78,7 +78,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T> inputConsumers = configs.entrySet().stream().map(e -> { String topic = e.getKey(); ConsumerConfig<T> conf = e.getValue(); - log.info("Creating consumers for topic : {}", topic); + log.info("Creating consumers for topic : {}, schema : {}", topic, conf.getSchema()); ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema()) // consume message even if can't decrypt and deliver it along with encryption-ctx .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index 2ac5b65..76375dc 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -26,6 +26,7 @@ import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; @@ -68,24 +69,31 @@ public class TopicSchema { } public Schema<?> getSchema(String topic, Class<?> clazz, SchemaType schemaType) { - return cachedSchemas.computeIfAbsent(topic, t -> extractSchema(clazz, schemaType)); + return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(clazz, schemaType)); } /** * If the topic is already created, we should be able to fetch the schema type (avro, json, ...) */ private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) { - Optional<SchemaInfo> schema = ((PulsarClientImpl) client).getSchema(topic).join(); - if (schema.isPresent()) { - return schema.get().getType(); + if (GenericRecord.class.isAssignableFrom(clazz)) { + return SchemaType.AUTO; } else { - return getDefaultSchemaType(clazz); + Optional<SchemaInfo> schema = ((PulsarClientImpl) client).getSchema(topic).join(); + if (schema.isPresent()) { + return schema.get().getType(); + } else { + return getDefaultSchemaType(clazz); + } } } private static SchemaType getDefaultSchemaType(Class<?> clazz) { if (byte[].class.equals(clazz)) { return SchemaType.NONE; + } else if (GenericRecord.class.isAssignableFrom(clazz)) { + // the function is taking generic record, so we do auto schema detection + return SchemaType.AUTO; } else if (String.class.equals(clazz)) { // If type is String, then we use schema type string, otherwise we fallback on default schema return SchemaType.STRING; @@ -102,6 +110,9 @@ public class TopicSchema { case NONE: return (Schema<T>) Schema.BYTES; + case AUTO: + return (Schema<T>) Schema.AUTO(); + case STRING: return (Schema<T>) Schema.STRING; @@ -165,27 +176,4 @@ public class TopicSchema { return new SerDeSchema<>(serDe); } } - - @SuppressWarnings("unchecked") - private static <T> Schema<T> extractSchema(Class<T> clazz, SchemaType type) { - switch (type) { - case NONE: - return (Schema<T>) Schema.BYTES; - - case STRING: - return (Schema<T>) Schema.STRING; - - case AVRO: - return AvroSchema.of(clazz); - - case JSON: - return JSONSchema.of(clazz); - - case PROTOBUF: - return ProtobufSchema.ofGenericClass(clazz, Collections.emptyMap()); - - default: - throw new RuntimeException("Unsupported schema type" + type); - } - } } diff --git a/pulsar-functions/java-examples/pom.xml b/pulsar-functions/java-examples/pom.xml index 747acff..b077dea 100644 --- a/pulsar-functions/java-examples/pom.xml +++ b/pulsar-functions/java-examples/pom.xml @@ -41,6 +41,7 @@ <artifactId>pulsar-functions-api</artifactId> <version>${project.version}</version> </dependency> + </dependencies> </project> diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java new file mode 100644 index 0000000..03abc93 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +/** + * Function that deals with Schema.Auto. + */ +public class AutoSchemaFunction implements Function<GenericRecord, String> { + @Override + public String process(GenericRecord input, Context context) { + return "value-" + input.getField("value"); + } +} diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 6d3fdc4..7a89159 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -46,6 +46,18 @@ </dependency> <dependency> <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-functions-api-examples</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client-schema</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>${project.version}</version> <scope>test</scope> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 17634a1..0bf9ded 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -38,6 +38,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.functions.api.examples.AutoSchemaFunction; +import org.apache.pulsar.functions.api.examples.serde.CustomObject; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; @@ -45,7 +47,6 @@ import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runt import org.apache.pulsar.tests.integration.io.*; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; -import org.testng.Assert; import org.testng.annotations.Test; /** @@ -594,8 +595,23 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String inputTopicName, String outputTopicName, String functionName) throws Exception { + submitFunction( + runtime, + inputTopicName, + outputTopicName, + functionName, + getExclamationClass(runtime), + Schema.STRING); + } + + private static <T> void submitFunction(Runtime runtime, + String inputTopicName, + String outputTopicName, + String functionName, + String functionClass, + Schema<T> inputTopicSchema) throws Exception { CommandGenerator generator; - generator = CommandGenerator.createDefaultGenerator(inputTopicName, getExclamationClass(runtime)); + generator = CommandGenerator.createDefaultGenerator(inputTopicName, functionClass); generator.setSinkTopic(outputTopicName); generator.setFunctionName(functionName); String command; @@ -619,7 +635,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { try (PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) .build()) { - try (Consumer<String> ignored = client.newConsumer(Schema.STRING) + try (Consumer<T> ignored = client.newConsumer(inputTopicSchema) .topic(inputTopicName) .subscriptionType(SubscriptionType.Shared) .subscriptionName(String.format("public/default/%s", functionName)) @@ -707,4 +723,59 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { assertTrue(result.getStderr().isEmpty()); } + @Test + public void testAutoSchemaFunction() throws Exception { + String inputTopicName = "test-autoschema-input-" + randomName(8); + String outputTopicName = "test-autoshcema-output-" + randomName(8); + String functionName = "test-autoschema-fn-" + randomName(8); + final int numMessages = 10; + + // submit the exclamation function + submitFunction( + Runtime.JAVA, inputTopicName, outputTopicName, functionName, + AutoSchemaFunction.class.getName(), + Schema.AVRO(CustomObject.class)); + + // get function info + getFunctionInfoSuccess(functionName); + + // publish and consume result + publishAndConsumeAvroMessages(inputTopicName, outputTopicName, numMessages); + + // get function status + getFunctionStatus(functionName, numMessages); + + // delete function + deleteFunction(functionName); + + // get function info + getFunctionInfoNotFound(functionName); + } + + private static void publishAndConsumeAvroMessages(String inputTopic, + String outputTopic, + int numMessages) throws Exception { + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic(outputTopic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName("test-sub") + .subscribe(); + @Cleanup Producer<CustomObject> producer = client.newProducer(Schema.AVRO(CustomObject.class)) + .topic(inputTopic) + .create(); + + for (int i = 0; i < numMessages; i++) { + CustomObject co = new CustomObject(i); + producer.send(co); + } + + for (int i = 0; i < numMessages; i++) { + Message<String> msg = consumer.receive(); + assertEquals("value-" + i, msg.getValue()); + } + } + }