sijie closed pull request #2181: augmenting protoschema with info for parsing
URL: https://github.com/apache/incubator-pulsar/pull/2181
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 4f2fba3748..34ec46bc2b 100644
---
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -18,7 +18,12 @@
*/
package org.apache.pulsar.client.impl.schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Descriptors;
import com.google.protobuf.Parser;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
import org.apache.avro.protobuf.ProtobufDatumReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -27,23 +32,71 @@
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3>
implements Schema<T> {
private SchemaInfo schemaInfo;
private Parser<T> tParser;
+ public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
- private ProtobufSchema(SchemaInfo schemaInfo, Class<T> pojo) {
- this.schemaInfo = schemaInfo;
+ @Getter
+ @AllArgsConstructor
+ public static class ProtoBufParsingInfo {
+ private final int number;
+ private final String name;
+ private final String type;
+ private final String label;
+ // For future nested fields
+ private final Map <String, Object> definition;
+ }
+
+ private ProtobufSchema(Map<String, String> properties, Class<T> pojo) {
try {
T protoMessageInstance = (T)
pojo.getMethod("getDefaultInstance").invoke(null);
tParser = (Parser<T>) protoMessageInstance.getParserForType();
+
+ this.schemaInfo = new SchemaInfo();
+ this.schemaInfo.setName("");
+
+ Map<String, String> allProperties = new HashMap<>();
+ allProperties.putAll(properties);
+ // set protobuf parsing info
+ allProperties.put(PARSING_INFO_PROPERTY,
getParsingInfo(protoMessageInstance));
+
+ this.schemaInfo.setProperties(allProperties);
+ this.schemaInfo.setType(SchemaType.PROTOBUF);
+ ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
+ org.apache.avro.Schema schema = datumReader.getSchema();
+ this.schemaInfo.setSchema(schema.toString().getBytes());
+
} catch (IllegalAccessException | InvocationTargetException |
NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
}
+ private String getParsingInfo(T protoMessageInstance) {
+ List<ProtoBufParsingInfo> protoBufParsingInfos = new LinkedList<>();
+ protoMessageInstance.getDescriptorForType().getFields().forEach(new
Consumer<Descriptors.FieldDescriptor>() {
+ @Override
+ public void accept(Descriptors.FieldDescriptor fieldDescriptor) {
+ protoBufParsingInfos.add(new
ProtoBufParsingInfo(fieldDescriptor.getNumber(),
+ fieldDescriptor.getName(),
fieldDescriptor.getType().name(),
+ fieldDescriptor.toProto().getLabel().name(), null));
+ }
+ });
+
+ try {
+ return new ObjectMapper().writeValueAsString(protoBufParsingInfos);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public byte[] encode(T message) {
return message.toByteArray();
@@ -67,16 +120,16 @@ public SchemaInfo getSchemaInfo() {
return of(pojo, Collections.emptyMap());
}
+ public static ProtobufSchema ofGenericClass(Class pojo, Map<String,
String> properties) {
+ if
(!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
+ throw new
IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
+ + " is not assignable from " + pojo.getName());
+ }
+ return new ProtobufSchema<>(properties, pojo);
+ }
+
public static <T extends com.google.protobuf.GeneratedMessageV3>
ProtobufSchema<T> of(
Class<T> pojo, Map<String, String> properties){
-
- SchemaInfo info = new SchemaInfo();
- info.setName("");
- info.setProperties(properties);
- info.setType(SchemaType.PROTOBUF);
- ProtobufDatumReader<T> datumReader = new ProtobufDatumReader<>(pojo);
- org.apache.avro.Schema schema = datumReader.getSchema();
- info.setSchema(schema.toString().getBytes());
- return new ProtobufSchema<>(info, pojo);
+ return ofGenericClass(pojo, properties);
}
}
diff --git
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
index 7ed575a5d1..69c66dcfa6 100644
---
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
+++
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.schema;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
@@ -26,6 +28,8 @@
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Collections;
+
@Slf4j
public class ProtobufSchemaTest {
@@ -39,7 +43,19 @@
"\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\","
+
"\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\","
+
"\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\","
+
- "\"type\":\"double\",\"default\":0}]}],\"default\":null}]}";
+
"\"type\":\"double\",\"default\":0}]}],\"default\":null},{\"name\":\"repeatedField\","
+
+
"\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}}]}";
+
+ private static final String EXPECTED_PARSING_INFO =
"{\"__PARSING_INFO__\":\"[{\\\"number\\\":1," +
+
"\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\","
+
+
"\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\":\\\"DOUBLE\\\","
+
+
"\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6,"
+
+
"\\\"name\\\":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\","
+
+
"\\\"definition\\\":null},{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\","
+
+
"\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":5,"
+
+
"\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\","
+
+
"\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\","
+
+
"\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_REPEATED\\\",\\\"definition\\\":null}]\"}";
@Test
public void testEncodeAndDecode() {
@@ -67,4 +83,33 @@ public void testSchema() {
Assert.assertEquals(schema.toString(), EXPECTED_SCHEMA_JSON);
}
+
+ @Test
+ public void testGenericOf() {
+ try {
+
ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage>
protobufSchema
+ =
ProtobufSchema.ofGenericClass(org.apache.pulsar.client.schema.proto.Test.TestMessage.class,
+ Collections.emptyMap());
+ } catch (Exception e) {
+ Assert.fail("Should not construct a ProtobufShema over a
non-protobuf-generated class");
+ }
+
+ try {
+
ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage>
protobufSchema
+ = ProtobufSchema.ofGenericClass(String.class,
+ Collections.emptyMap());
+ Assert.fail("Should not construct a ProtobufShema over a
non-protobuf-generated class");
+ } catch (Exception e) {
+
+ }
+ }
+
+ @Test
+ public void testParsingInfoProperty() throws JsonProcessingException {
+ ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage>
protobufSchema
+ =
ProtobufSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
+
+ Assert.assertEquals(new
ObjectMapper().writeValueAsString(protobufSchema.getSchemaInfo().getProperties()),
EXPECTED_PARSING_INFO);
+
+ }
}
diff --git a/pulsar-client-schema/src/test/proto/Test.proto
b/pulsar-client-schema/src/test/proto/Test.proto
index d640d2c499..7d7b1b64ab 100644
--- a/pulsar-client-schema/src/test/proto/Test.proto
+++ b/pulsar-client-schema/src/test/proto/Test.proto
@@ -35,7 +35,8 @@ message SubMessage {
message TestMessage {
string stringField = 1;
double doubleField = 2;
- int32 intField = 3;
+ int32 intField = 6;
TestEnum testEnum = 4;
SubMessage nestedField = 5;
+ repeated string repeatedField = 10;
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services