This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef9df9  augmenting protoschema with info for parsing (#2181)
6ef9df9 is described below

commit 6ef9df9cb18b6e28c385702becf1d0de7ba294e2
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Wed Jul 18 00:58:10 2018 -0700

    augmenting protoschema with info for parsing (#2181)
    
    ### Motivation
    
    Adding additional information to Protobuf schema so that the protobuf can 
be more easily parsed
---
 .../pulsar/client/impl/schema/ProtobufSchema.java  | 75 ++++++++++++++++++----
 .../pulsar/client/schema/ProtobufSchemaTest.java   | 47 +++++++++++++-
 pulsar-client-schema/src/test/proto/Test.proto     |  3 +-
 3 files changed, 112 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index 4f2fba3..34ec46b 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -18,7 +18,12 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.Parser;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
 import org.apache.avro.protobuf.ProtobufDatumReader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
@@ -27,23 +32,71 @@ import org.apache.pulsar.common.schema.SchemaType;
 
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 
 public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> 
implements Schema<T> {
 
     private SchemaInfo schemaInfo;
     private Parser<T> tParser;
+    public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";
 
-    private ProtobufSchema(SchemaInfo schemaInfo, Class<T> pojo) {
-        this.schemaInfo = schemaInfo;
+    @Getter
+    @AllArgsConstructor
+    public static class ProtoBufParsingInfo {
+        private final int number;
+        private final String name;
+        private final String type;
+        private final String label;
+        // For future nested fields
+        private final Map <String, Object> definition;
+    }
+
+    private ProtobufSchema(Map<String, String> properties, Class<T> pojo) {
         try {
             T protoMessageInstance = (T) 
pojo.getMethod("getDefaultInstance").invoke(null);
             tParser = (Parser<T>) protoMessageInstance.getParserForType();
+
+            this.schemaInfo = new SchemaInfo();
+            this.schemaInfo.setName("");
+
+            Map<String, String> allProperties = new HashMap<>();
+            allProperties.putAll(properties);
+            // set protobuf parsing info
+            allProperties.put(PARSING_INFO_PROPERTY, 
getParsingInfo(protoMessageInstance));
+
+            this.schemaInfo.setProperties(allProperties);
+            this.schemaInfo.setType(SchemaType.PROTOBUF);
+            ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
+            org.apache.avro.Schema schema = datumReader.getSchema();
+            this.schemaInfo.setSchema(schema.toString().getBytes());
+
         } catch (IllegalAccessException | InvocationTargetException | 
NoSuchMethodException e) {
             throw new IllegalArgumentException(e);
         }
     }
 
+    private String getParsingInfo(T protoMessageInstance) {
+        List<ProtoBufParsingInfo> protoBufParsingInfos = new LinkedList<>();
+        protoMessageInstance.getDescriptorForType().getFields().forEach(new 
Consumer<Descriptors.FieldDescriptor>() {
+            @Override
+            public void accept(Descriptors.FieldDescriptor fieldDescriptor) {
+                protoBufParsingInfos.add(new 
ProtoBufParsingInfo(fieldDescriptor.getNumber(),
+                        fieldDescriptor.getName(), 
fieldDescriptor.getType().name(),
+                        fieldDescriptor.toProto().getLabel().name(), null));
+            }
+        });
+
+        try {
+            return new ObjectMapper().writeValueAsString(protoBufParsingInfos);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public byte[] encode(T message) {
         return message.toByteArray();
@@ -67,16 +120,16 @@ public class ProtobufSchema<T extends 
com.google.protobuf.GeneratedMessageV3> im
         return of(pojo, Collections.emptyMap());
     }
 
+    public static ProtobufSchema ofGenericClass(Class pojo, Map<String, 
String> properties) {
+        if 
(!com.google.protobuf.GeneratedMessageV3.class.isAssignableFrom(pojo)) {
+            throw new 
IllegalArgumentException(com.google.protobuf.GeneratedMessageV3.class.getName()
+                    + " is not assignable from " + pojo.getName());
+        }
+        return new ProtobufSchema<>(properties, pojo);
+    }
+
     public static <T extends com.google.protobuf.GeneratedMessageV3> 
ProtobufSchema<T> of(
             Class<T> pojo, Map<String, String> properties){
-
-        SchemaInfo info = new SchemaInfo();
-        info.setName("");
-        info.setProperties(properties);
-        info.setType(SchemaType.PROTOBUF);
-        ProtobufDatumReader<T> datumReader = new ProtobufDatumReader<>(pojo);
-        org.apache.avro.Schema schema = datumReader.getSchema();
-        info.setSchema(schema.toString().getBytes());
-        return new ProtobufSchema<>(info, pojo);
+        return ofGenericClass(pojo, properties);
     }
 }
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
index 7ed575a..69c66dc 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/ProtobufSchemaTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.schema;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
@@ -26,6 +28,8 @@ import org.apache.pulsar.functions.proto.Function;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import java.util.Collections;
+
 @Slf4j
 public class ProtobufSchemaTest {
 
@@ -39,7 +43,19 @@ public class ProtobufSchemaTest {
             
"\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\","
 +
             
"\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\","
 +
             
"\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\","
 +
-            "\"type\":\"double\",\"default\":0}]}],\"default\":null}]}";
+            
"\"type\":\"double\",\"default\":0}]}],\"default\":null},{\"name\":\"repeatedField\","
 +
+            
"\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}}]}";
+
+    private static final String EXPECTED_PARSING_INFO = 
"{\"__PARSING_INFO__\":\"[{\\\"number\\\":1," +
+            
"\\\"name\\\":\\\"stringField\\\",\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\","
 +
+            
"\\\"definition\\\":null},{\\\"number\\\":2,\\\"name\\\":\\\"doubleField\\\",\\\"type\\\":\\\"DOUBLE\\\","
 +
+            
"\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":6,"
 +
+            
"\\\"name\\\":\\\"intField\\\",\\\"type\\\":\\\"INT32\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\","
 +
+            
"\\\"definition\\\":null},{\\\"number\\\":4,\\\"name\\\":\\\"testEnum\\\",\\\"type\\\":\\\"ENUM\\\","
 +
+            
"\\\"label\\\":\\\"LABEL_OPTIONAL\\\",\\\"definition\\\":null},{\\\"number\\\":5,"
 +
+            
"\\\"name\\\":\\\"nestedField\\\",\\\"type\\\":\\\"MESSAGE\\\",\\\"label\\\":\\\"LABEL_OPTIONAL\\\","
 +
+            
"\\\"definition\\\":null},{\\\"number\\\":10,\\\"name\\\":\\\"repeatedField\\\","
 +
+            
"\\\"type\\\":\\\"STRING\\\",\\\"label\\\":\\\"LABEL_REPEATED\\\",\\\"definition\\\":null}]\"}";
 
     @Test
     public void testEncodeAndDecode() {
@@ -67,4 +83,33 @@ public class ProtobufSchemaTest {
 
         Assert.assertEquals(schema.toString(), EXPECTED_SCHEMA_JSON);
     }
+
+    @Test
+    public void testGenericOf() {
+        try {
+            
ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> 
protobufSchema
+                    = 
ProtobufSchema.ofGenericClass(org.apache.pulsar.client.schema.proto.Test.TestMessage.class,
+                    Collections.emptyMap());
+        } catch (Exception e) {
+            Assert.fail("Should not construct a ProtobufShema over a 
non-protobuf-generated class");
+        }
+
+        try {
+            
ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> 
protobufSchema
+                    = ProtobufSchema.ofGenericClass(String.class,
+                    Collections.emptyMap());
+            Assert.fail("Should not construct a ProtobufShema over a 
non-protobuf-generated class");
+        } catch (Exception e) {
+
+        }
+    }
+
+    @Test
+    public void testParsingInfoProperty() throws JsonProcessingException {
+        ProtobufSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> 
protobufSchema
+                = 
ProtobufSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
+
+        Assert.assertEquals(new 
ObjectMapper().writeValueAsString(protobufSchema.getSchemaInfo().getProperties()),
 EXPECTED_PARSING_INFO);
+
+    }
 }
diff --git a/pulsar-client-schema/src/test/proto/Test.proto 
b/pulsar-client-schema/src/test/proto/Test.proto
index d640d2c..7d7b1b6 100644
--- a/pulsar-client-schema/src/test/proto/Test.proto
+++ b/pulsar-client-schema/src/test/proto/Test.proto
@@ -35,7 +35,8 @@ message SubMessage {
 message TestMessage {
     string stringField = 1;
     double doubleField = 2;
-    int32 intField = 3;
+    int32 intField = 6;
     TestEnum testEnum = 4;
     SubMessage nestedField = 5;
+    repeated string repeatedField = 10;
 }
\ No newline at end of file

Reply via email to