This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9ec8a5e [schema] enable Schema.AUTO if functions or connectors are
using GenericRecord (#2537)
9ec8a5e is described below
commit 9ec8a5ee4d4a80c50986cde95093b1eb4af31ef0
Author: Sijie Guo <[email protected]>
AuthorDate: Sat Sep 8 12:51:12 2018 -0700
[schema] enable Schema.AUTO if functions or connectors are using
GenericRecord (#2537)
* [schema] enable Schema.AUTO if functions or connectors are using
GenericRecord
* add schema-api to functions-api
---
pulsar-functions/api-java/pom.xml | 6 ++
.../pulsar/functions/source/PulsarSource.java | 2 +-
.../pulsar/functions/source/TopicSchema.java | 44 +++++--------
pulsar-functions/java-examples/pom.xml | 1 +
.../functions/api/examples/AutoSchemaFunction.java | 33 ++++++++++
tests/integration/pom.xml | 12 ++++
.../integration/functions/PulsarFunctionsTest.java | 77 +++++++++++++++++++++-
7 files changed, 143 insertions(+), 32 deletions(-)
diff --git a/pulsar-functions/api-java/pom.xml
b/pulsar-functions/api-java/pom.xml
index 9a84206..0f21bae 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -38,6 +38,12 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-schema</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>net.jodah</groupId>
<artifactId>typetools</artifactId>
<scope>test</scope>
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index e1059f3..6eed8e0 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -78,7 +78,7 @@ public class PulsarSource<T> extends PushSource<T> implements
MessageListener<T>
inputConsumers = configs.entrySet().stream().map(e -> {
String topic = e.getKey();
ConsumerConfig<T> conf = e.getValue();
- log.info("Creating consumers for topic : {}", topic);
+ log.info("Creating consumers for topic : {}, schema : {}", topic,
conf.getSchema());
ConsumerBuilder<T> cb = pulsarClient.newConsumer(conf.getSchema())
// consume message even if can't decrypt and deliver it
along with encryption-ctx
.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 2ac5b65..76375dc 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -26,6 +26,7 @@ import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -68,24 +69,31 @@ public class TopicSchema {
}
public Schema<?> getSchema(String topic, Class<?> clazz, SchemaType
schemaType) {
- return cachedSchemas.computeIfAbsent(topic, t -> extractSchema(clazz,
schemaType));
+ return cachedSchemas.computeIfAbsent(topic, t ->
newSchemaInstance(clazz, schemaType));
}
/**
* If the topic is already created, we should be able to fetch the schema
type (avro, json, ...)
*/
private SchemaType getSchemaTypeOrDefault(String topic, Class<?> clazz) {
- Optional<SchemaInfo> schema = ((PulsarClientImpl)
client).getSchema(topic).join();
- if (schema.isPresent()) {
- return schema.get().getType();
+ if (GenericRecord.class.isAssignableFrom(clazz)) {
+ return SchemaType.AUTO;
} else {
- return getDefaultSchemaType(clazz);
+ Optional<SchemaInfo> schema = ((PulsarClientImpl)
client).getSchema(topic).join();
+ if (schema.isPresent()) {
+ return schema.get().getType();
+ } else {
+ return getDefaultSchemaType(clazz);
+ }
}
}
private static SchemaType getDefaultSchemaType(Class<?> clazz) {
if (byte[].class.equals(clazz)) {
return SchemaType.NONE;
+ } else if (GenericRecord.class.isAssignableFrom(clazz)) {
+ // the function is taking generic record, so we do auto schema
detection
+ return SchemaType.AUTO;
} else if (String.class.equals(clazz)) {
// If type is String, then we use schema type string, otherwise we
fallback on default schema
return SchemaType.STRING;
@@ -102,6 +110,9 @@ public class TopicSchema {
case NONE:
return (Schema<T>) Schema.BYTES;
+ case AUTO:
+ return (Schema<T>) Schema.AUTO();
+
case STRING:
return (Schema<T>) Schema.STRING;
@@ -165,27 +176,4 @@ public class TopicSchema {
return new SerDeSchema<>(serDe);
}
}
-
- @SuppressWarnings("unchecked")
- private static <T> Schema<T> extractSchema(Class<T> clazz, SchemaType
type) {
- switch (type) {
- case NONE:
- return (Schema<T>) Schema.BYTES;
-
- case STRING:
- return (Schema<T>) Schema.STRING;
-
- case AVRO:
- return AvroSchema.of(clazz);
-
- case JSON:
- return JSONSchema.of(clazz);
-
- case PROTOBUF:
- return ProtobufSchema.ofGenericClass(clazz,
Collections.emptyMap());
-
- default:
- throw new RuntimeException("Unsupported schema type" + type);
- }
- }
}
diff --git a/pulsar-functions/java-examples/pom.xml
b/pulsar-functions/java-examples/pom.xml
index 747acff..b077dea 100644
--- a/pulsar-functions/java-examples/pom.xml
+++ b/pulsar-functions/java-examples/pom.xml
@@ -41,6 +41,7 @@
<artifactId>pulsar-functions-api</artifactId>
<version>${project.version}</version>
</dependency>
+
</dependencies>
</project>
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java
new file mode 100644
index 0000000..03abc93
--- /dev/null
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AutoSchemaFunction.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Function that deals with Schema.Auto.
+ */
+public class AutoSchemaFunction implements Function<GenericRecord, String> {
+ @Override
+ public String process(GenericRecord input, Context context) {
+ return "value-" + input.getField("value");
+ }
+}
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 6d3fdc4..7a89159 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -46,6 +46,18 @@
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-api-examples</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-schema</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 17634a1..0bf9ded 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -38,6 +38,8 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.functions.api.examples.AutoSchemaFunction;
+import org.apache.pulsar.functions.api.examples.serde.CustomObject;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
@@ -45,7 +47,6 @@ import
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runt
import org.apache.pulsar.tests.integration.io.*;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
-import org.testng.Assert;
import org.testng.annotations.Test;
/**
@@ -594,8 +595,23 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
String inputTopicName,
String outputTopicName,
String functionName) throws
Exception {
+ submitFunction(
+ runtime,
+ inputTopicName,
+ outputTopicName,
+ functionName,
+ getExclamationClass(runtime),
+ Schema.STRING);
+ }
+
+ private static <T> void submitFunction(Runtime runtime,
+ String inputTopicName,
+ String outputTopicName,
+ String functionName,
+ String functionClass,
+ Schema<T> inputTopicSchema) throws
Exception {
CommandGenerator generator;
- generator = CommandGenerator.createDefaultGenerator(inputTopicName,
getExclamationClass(runtime));
+ generator = CommandGenerator.createDefaultGenerator(inputTopicName,
functionClass);
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
String command;
@@ -619,7 +635,7 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build()) {
- try (Consumer<String> ignored = client.newConsumer(Schema.STRING)
+ try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
.topic(inputTopicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(String.format("public/default/%s",
functionName))
@@ -707,4 +723,59 @@ public abstract class PulsarFunctionsTest extends
PulsarFunctionsTestBase {
assertTrue(result.getStderr().isEmpty());
}
+ @Test
+ public void testAutoSchemaFunction() throws Exception {
+ String inputTopicName = "test-autoschema-input-" + randomName(8);
+ String outputTopicName = "test-autoshcema-output-" + randomName(8);
+ String functionName = "test-autoschema-fn-" + randomName(8);
+ final int numMessages = 10;
+
+ // submit the exclamation function
+ submitFunction(
+ Runtime.JAVA, inputTopicName, outputTopicName, functionName,
+ AutoSchemaFunction.class.getName(),
+ Schema.AVRO(CustomObject.class));
+
+ // get function info
+ getFunctionInfoSuccess(functionName);
+
+ // publish and consume result
+ publishAndConsumeAvroMessages(inputTopicName, outputTopicName,
numMessages);
+
+ // get function status
+ getFunctionStatus(functionName, numMessages);
+
+ // delete function
+ deleteFunction(functionName);
+
+ // get function info
+ getFunctionInfoNotFound(functionName);
+ }
+
+ private static void publishAndConsumeAvroMessages(String inputTopic,
+ String outputTopic,
+ int numMessages) throws
Exception {
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
+ .topic(outputTopic)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+ @Cleanup Producer<CustomObject> producer =
client.newProducer(Schema.AVRO(CustomObject.class))
+ .topic(inputTopic)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ CustomObject co = new CustomObject(i);
+ producer.send(co);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive();
+ assertEquals("value-" + i, msg.getValue());
+ }
+ }
+
}