This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6b87357dbf3 [improve][client] Support protobuf v4 schema compatibility
(#25261)
6b87357dbf3 is described below
commit 6b87357dbf384ef45d0f4620066e31f9a3b60ae2
Author: Penghui Li <[email protected]>
AuthorDate: Thu Feb 26 04:09:31 2026 -0800
[improve][client] Support protobuf v4 schema compatibility (#25261)
Co-authored-by: Lari Hotari <[email protected]>
---
.github/workflows/pulsar-ci.yaml | 2 +
build/run_unit_group.sh | 8 +++
.../java/org/apache/pulsar/client/api/Schema.java | 8 +--
.../PulsarClientImplementationBinding.java | 4 +-
.../PulsarClientImplementationBindingImpl.java | 4 +-
.../client/impl/schema/ProtobufNativeSchema.java | 15 +++---
.../pulsar/client/impl/schema/ProtobufSchema.java | 17 +++----
.../impl/schema/reader/ProtobufNativeReader.java | 4 +-
.../client/impl/schema/reader/ProtobufReader.java | 3 +-
.../impl/schema/writer/ProtobufNativeWriter.java | 4 +-
.../client/impl/schema/writer/ProtobufWriter.java | 3 +-
.../client/api/ProtobufSchemaApiSignatureTest.java | 58 ++++++++++++++++++++++
.../impl/schema/ProtobufNativeSchemaTest.java | 15 ++++++
.../client/impl/schema/ProtobufSchemaTest.java | 13 +++++
.../functions/instance/JavaInstanceRunnable.java | 2 +-
.../pulsar/functions/source/TopicSchema.java | 2 +-
.../instance/JavaInstanceRunnableTest.java | 10 ++++
.../pulsar/functions/source/TopicSchemaTest.java | 16 ++++++
18 files changed, 157 insertions(+), 31 deletions(-)
diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml
index 17947a45b5b..9d7e025d71d 100644
--- a/.github/workflows/pulsar-ci.yaml
+++ b/.github/workflows/pulsar-ci.yaml
@@ -337,6 +337,8 @@ jobs:
group: CLIENT
- name: Pulsar Metadata
group: METADATA
+ - name: Protobuf v4
+ group: PROTOBUFV4
steps:
- name: checkout
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 9653a787325..5286d34d6c4 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -115,6 +115,14 @@ function test_group_metadata() {
mvn_test -pl pulsar-metadata -DtestReuseFork=false
}
+function test_group_protobufv4() {
+ mvn_test --clean --install \
+ -Dprotobuf3.version=4.31.1 \
+ -Dprotoc3.version=4.31.1 \
+ -pl pulsar-client,pulsar-functions/instance \
+
-Dtest=org.apache.pulsar.client.api.ProtobufSchemaApiSignatureTest,org.apache.pulsar.client.impl.schema.ProtobufSchemaTest,org.apache.pulsar.client.impl.schema.ProtobufNativeSchemaTest,org.apache.pulsar.functions.source.TopicSchemaTest,org.apache.pulsar.functions.instance.JavaInstanceRunnableTest
+}
+
# prints summaries of failed tests to console
# by using the targer/surefire-reports files
# works only when testForkCount > 1 since that is when surefire will create
reports for individual test classes
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 3089684a1f9..64862edca1e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -300,7 +300,7 @@ public interface Schema<T> extends Cloneable {
* @param clazz the Protobuf generated class to be used to extract the
schema
* @return a Schema instance
*/
- static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
PROTOBUF(Class<T> clazz) {
+ static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF(Class<T>
clazz) {
return DefaultImplementation.getDefaultImplementation()
.newProtobufSchema(SchemaDefinition.builder().withPojo(clazz).build());
}
@@ -311,7 +311,7 @@ public interface Schema<T> extends Cloneable {
* @param schemaDefinition schemaDefinition the definition of the schema
* @return a Schema instance
*/
- static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
PROTOBUF(SchemaDefinition<T> schemaDefinition) {
+ static <T extends com.google.protobuf.Message> Schema<T>
PROTOBUF(SchemaDefinition<T> schemaDefinition) {
return
DefaultImplementation.getDefaultImplementation().newProtobufSchema(schemaDefinition);
}
@@ -321,7 +321,7 @@ public interface Schema<T> extends Cloneable {
* @param clazz the Protobuf generated class to be used to extract the
schema
* @return a Schema instance
*/
- static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
PROTOBUF_NATIVE(Class<T> clazz) {
+ static <T extends com.google.protobuf.Message> Schema<T>
PROTOBUF_NATIVE(Class<T> clazz) {
return DefaultImplementation.getDefaultImplementation()
.newProtobufNativeSchema(SchemaDefinition.builder().withPojo(clazz).build());
}
@@ -332,7 +332,7 @@ public interface Schema<T> extends Cloneable {
* @param schemaDefinition schemaDefinition the definition of the schema
* @return a Schema instance
*/
- static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
PROTOBUF_NATIVE(
+ static <T extends com.google.protobuf.Message> Schema<T> PROTOBUF_NATIVE(
SchemaDefinition<T> schemaDefinition) {
return
DefaultImplementation.getDefaultImplementation().newProtobufNativeSchema(schemaDefinition);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index b5f2a3a468e..f13cc003f5b 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -116,9 +116,9 @@ public interface PulsarClientImplementationBinding {
<T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition);
- <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
newProtobufSchema(SchemaDefinition schemaDefinition);
+ <T extends com.google.protobuf.Message> Schema<T>
newProtobufSchema(SchemaDefinition schemaDefinition);
- <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
newProtobufNativeSchema(
+ <T extends com.google.protobuf.Message> Schema<T> newProtobufNativeSchema(
SchemaDefinition schemaDefinition);
<T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 0351477985f..0b61540821a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -210,12 +210,12 @@ public final class PulsarClientImplementationBindingImpl
implements PulsarClient
return AvroSchema.of(schemaDefinition);
}
- public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
newProtobufSchema(
+ public <T extends com.google.protobuf.Message> Schema<T> newProtobufSchema(
SchemaDefinition schemaDefinition) {
return ProtobufSchema.of(schemaDefinition);
}
- public <T extends com.google.protobuf.GeneratedMessageV3> Schema<T>
newProtobufNativeSchema(
+ public <T extends com.google.protobuf.Message> Schema<T>
newProtobufNativeSchema(
SchemaDefinition schemaDefinition) {
return ProtobufNativeSchema.of(schemaDefinition);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
index 46a2f7d806a..9243303e842 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.client.impl.schema;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.Descriptors;
-import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.Message;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
@@ -41,7 +41,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
* A schema implementation to deal with protobuf generated messages.
*/
-public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends
AbstractStructSchema<T> {
+public class ProtobufNativeSchema<T extends Message> extends
AbstractStructSchema<T> {
public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
@@ -104,7 +104,7 @@ public class ProtobufNativeSchema<T extends
GeneratedMessageV3> extends Abstract
return Optional.of(getProtobufNativeSchema());
}
- public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T>
of(Class<T> pojo) {
+ public static <T extends Message> ProtobufNativeSchema<T> of(Class<T>
pojo) {
return of(pojo, new HashMap<>());
}
@@ -117,8 +117,8 @@ public class ProtobufNativeSchema<T extends
GeneratedMessageV3> extends Abstract
public static <T> ProtobufNativeSchema of(SchemaDefinition<T>
schemaDefinition) {
Class<T> pojo = schemaDefinition.getPojo();
- if (!GeneratedMessageV3.class.isAssignableFrom(pojo)) {
- throw new
IllegalArgumentException(GeneratedMessageV3.class.getName()
+ if (!Message.class.isAssignableFrom(pojo)) {
+ throw new IllegalArgumentException(Message.class.getName()
+ " is not assignable from " + pojo.getName());
}
Descriptors.Descriptor descriptor =
createProtobufNativeSchema(schemaDefinition.getPojo());
@@ -131,14 +131,13 @@ public class ProtobufNativeSchema<T extends
GeneratedMessageV3> extends Abstract
.build();
try {
return new ProtobufNativeSchema(schemaInfo,
- (GeneratedMessageV3)
pojo.getMethod("getDefaultInstance").invoke(null));
+ (Message)
pojo.getMethod("getDefaultInstance").invoke(null));
} catch (IllegalAccessException | InvocationTargetException |
NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
}
- public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> of(
- Class pojo, Map<String, String> properties) {
+ public static <T extends Message> ProtobufNativeSchema<T> of(Class<T>
pojo, Map<String, String> properties) {
return ofGenericClass(pojo, properties);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 2e33bcda299..71e0bed6912 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.Descriptors;
-import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.Message;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.LinkedList;
@@ -41,7 +41,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
/**
* A schema implementation to deal with protobuf generated messages.
*/
-public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3>
extends AvroBaseStructSchema<T> {
+public class ProtobufSchema<T extends Message> extends AvroBaseStructSchema<T>
{
public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
@@ -53,7 +53,7 @@ public class ProtobufSchema<T extends
com.google.protobuf.GeneratedMessageV3> ex
private final String type;
private final String label;
// For future nested fields
- private final Map <String, Object> definition;
+ private final Map<String, Object> definition;
}
private static <T> org.apache.avro.Schema
createProtobufAvroSchema(Class<T> pojo) {
@@ -89,7 +89,7 @@ public class ProtobufSchema<T extends
com.google.protobuf.GeneratedMessageV3> ex
}
}
- public static <T extends com.google.protobuf.GeneratedMessageV3>
ProtobufSchema<T> of(Class<T> pojo) {
+ public static <T extends Message> ProtobufSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
}
@@ -102,8 +102,8 @@ public class ProtobufSchema<T extends
com.google.protobuf.GeneratedMessageV3> ex
public static <T> ProtobufSchema of(SchemaDefinition<T> schemaDefinition) {
Class<T> pojo = schemaDefinition.getPojo();
- if
(!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
- throw new
IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
+ if (!Message.class.isAssignableFrom(pojo)) {
+ throw new IllegalArgumentException(Message.class.getName()
+ " is not assignable from " + pojo.getName());
}
@@ -116,14 +116,13 @@ public class ProtobufSchema<T extends
com.google.protobuf.GeneratedMessageV3> ex
try {
return new ProtobufSchema(schemaInfo,
- (GeneratedMessageV3)
pojo.getMethod("getDefaultInstance").invoke(null));
+ (Message) pojo.getMethod("getDefaultInstance").invoke(null));
} catch (IllegalAccessException | InvocationTargetException |
NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
}
- public static <T extends com.google.protobuf.GeneratedMessageV3>
ProtobufSchema<T> of(
- Class pojo, Map<String, String> properties){
+ public static <T extends Message> ProtobufSchema<T> of(Class<T> pojo,
Map<String, String> properties) {
return ofGenericClass(pojo, properties);
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java
index 1c10608d448..0ecc0be3d8c 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufNativeReader.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.impl.schema.reader;
-public class ProtobufNativeReader<T extends
com.google.protobuf.GeneratedMessageV3> extends ProtobufReader<T> {
+import com.google.protobuf.Message;
+
+public class ProtobufNativeReader<T extends Message> extends ProtobufReader<T>
{
public ProtobufNativeReader(T protoMessageInstance) {
super(protoMessageInstance);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
index a2504a660a0..a56fb30b140 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/ProtobufReader.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl.schema.reader;
import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import java.io.IOException;
import java.io.InputStream;
@@ -27,7 +28,7 @@ import org.apache.pulsar.client.api.schema.SchemaReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ProtobufReader<T extends com.google.protobuf.GeneratedMessageV3>
implements SchemaReader<T> {
+public class ProtobufReader<T extends Message> implements SchemaReader<T> {
private Parser<T> tParser;
public ProtobufReader(T protoMessageInstance) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java
index 32569f7b7c0..0abd231475f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufNativeWriter.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.impl.schema.writer;
-public class ProtobufNativeWriter<T extends
com.google.protobuf.GeneratedMessageV3> extends ProtobufWriter<T> {
+import com.google.protobuf.Message;
+
+public class ProtobufNativeWriter<T extends Message> extends ProtobufWriter<T>
{
public ProtobufNativeWriter() {
super();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
index 52ccec8dfaa..7bab227e181 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/writer/ProtobufWriter.java
@@ -18,9 +18,10 @@
*/
package org.apache.pulsar.client.impl.schema.writer;
+import com.google.protobuf.Message;
import org.apache.pulsar.client.api.schema.SchemaWriter;
-public class ProtobufWriter<T extends com.google.protobuf.GeneratedMessageV3>
implements SchemaWriter<T> {
+public class ProtobufWriter<T extends Message> implements SchemaWriter<T> {
@Override
public byte[] write(T message) {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java
new file mode 100644
index 00000000000..f9ce4ecbf97
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ProtobufSchemaApiSignatureTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.protobuf.Message;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.internal.PulsarClientImplementationBinding;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ProtobufSchemaApiSignatureTest {
+
+ @Test
+ public void testSchemaProtobufTypeBounds() throws NoSuchMethodException {
+ assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF",
Class.class), Message.class);
+ assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF",
SchemaDefinition.class), Message.class);
+
assertSingleTypeParamUpperBound(Schema.class.getMethod("PROTOBUF_NATIVE",
Class.class), Message.class);
+ assertSingleTypeParamUpperBound(
+ Schema.class.getMethod("PROTOBUF_NATIVE",
SchemaDefinition.class), Message.class);
+ }
+
+ @Test
+ public void testBindingProtobufTypeBounds() throws NoSuchMethodException {
+ assertSingleTypeParamUpperBound(
+
PulsarClientImplementationBinding.class.getMethod("newProtobufSchema",
SchemaDefinition.class),
+ Message.class);
+ assertSingleTypeParamUpperBound(
+
PulsarClientImplementationBinding.class.getMethod("newProtobufNativeSchema",
SchemaDefinition.class),
+ Message.class);
+ }
+
+ private static void assertSingleTypeParamUpperBound(Method method, Type
expectedBound) {
+ TypeVariable<Method>[] typeParameters = method.getTypeParameters();
+ Assert.assertEquals(typeParameters.length, 1, method + " should define
one type parameter");
+ Type[] bounds = typeParameters[0].getBounds();
+ Assert.assertEquals(bounds.length, 1, method + " should define one
type bound");
+ Assert.assertEquals(bounds[0], expectedBound, method + " has
unexpected type bound");
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
index a0ea3c1cc6e..ed918cd0b8c 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
@@ -23,9 +23,11 @@ import static org.testng.Assert.assertNotNull;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;
import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.StringValue;
import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -93,6 +95,19 @@ public class ProtobufNativeSchemaTest {
assertEquals(message.getStringField(), stringFieldValue);
}
+ @Test
+ public void testSchemaApiSupportsMessageBound() {
+ Any any =
Any.pack(StringValue.newBuilder().setValue("native-message").build());
+ org.apache.pulsar.client.api.Schema<Any> protobufSchema =
+ org.apache.pulsar.client.api.Schema.PROTOBUF_NATIVE(Any.class);
+
+ byte[] bytes = protobufSchema.encode(any);
+ Any message = protobufSchema.decode(bytes);
+
+ assertEquals(protobufSchema.getSchemaInfo().getType(),
SchemaType.PROTOBUF_NATIVE);
+ assertEquals(message, any);
+ }
+
@Test
public void testSchema() throws Exception {
ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage>
protobufSchema =
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
index 5acdd5b1b1c..9e283b6799d 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.protobuf.Any;
+import com.google.protobuf.StringValue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.ArrayList;
@@ -121,6 +123,17 @@ public class ProtobufSchemaTest {
Assert.assertEquals(message.getName(), NAME);
}
+ @Test
+ public void testSchemaApiSupportsMessageBound() {
+ Any any = Any.pack(StringValue.newBuilder().setValue(NAME).build());
+ org.apache.pulsar.client.api.Schema<Any> protobufSchema =
+ org.apache.pulsar.client.api.Schema.PROTOBUF(Any.class);
+
+ byte[] bytes = protobufSchema.encode(any);
+ Any message = protobufSchema.decode(bytes);
+ Assert.assertEquals(message, any);
+ }
+
@Test
public void testSchema() {
ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage>
protobufSchema =
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 50cb1fb54e8..56dc802affb 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -1182,7 +1182,7 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
private static boolean isProtobufClass(Class<?> pojoClazz) {
try {
- Class<?> protobufBaseClass =
Class.forName("com.google.protobuf.GeneratedMessageV3");
+ Class<?> protobufBaseClass =
Class.forName("com.google.protobuf.Message");
return protobufBaseClass.isAssignableFrom(pojoClazz);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
// If sink does not have protobuf in classpath then it cannot be
protobuf
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index d8ae6b19f4a..81f4de49682 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -196,7 +196,7 @@ public class TopicSchema {
private static boolean isProtobufClass(Class<?> pojoClazz) {
try {
- Class<?> protobufBaseClass =
Class.forName("com.google.protobuf.GeneratedMessageV3");
+ Class<?> protobufBaseClass =
Class.forName("com.google.protobuf.Message");
return protobufBaseClass.isAssignableFrom(pojoClazz);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
// If function does not have protobuf in classpath then it cannot
be protobuf
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 7cae03c8f5f..eea1c9cc966 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.protobuf.Any;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
@@ -44,6 +45,7 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
@@ -245,6 +247,14 @@ public class JavaInstanceRunnableTest {
Assert.assertEquals(javaInstanceRunnable.getMetrics(),
InstanceCommunication.MetricsData.newBuilder().build());
}
+ @Test
+ public void testDefaultSchemaTypeInfersProtobufForMessageBaseClass()
throws Exception {
+ Method method =
JavaInstanceRunnable.class.getDeclaredMethod("getDefaultSchemaType",
Class.class);
+ method.setAccessible(true);
+ SchemaType schemaType = (SchemaType) method.invoke(null, Any.class);
+ Assert.assertEquals(schemaType, SchemaType.PROTOBUF);
+ }
+
@Test
public void testSinkConfigParsingPreservesOriginalType() throws Exception {
final Map<String, Object> parsedConfig =
JavaInstanceRunnable.augmentAndFilterConnectorConfig(
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
index fa6d1a533dc..6b065fc237a 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/TopicSchemaTest.java
@@ -18,10 +18,16 @@
*/
package org.apache.pulsar.functions.source;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import com.google.protobuf.Any;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
@@ -53,6 +59,16 @@ public class TopicSchemaTest {
assertEquals(schema.getClass(), ProtobufNativeSchema.class);
}
+ @Test
+ public void testDefaultSchemaTypeInfersProtobufForMessageBaseClass() {
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
+ TopicSchema topicSchema = new TopicSchema(client,
Thread.currentThread().getContextClassLoader());
+ Schema<?> schema =
topicSchema.getSchema("public/default/test-protobuf-default", Any.class,
Optional.empty());
+ assertEquals(schema.getClass(), ProtobufSchema.class);
+ }
+
private static class DummyClass {
}
}