This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 23706431 [Python Client] Fix handle complex schema (#11400)
23706431 is described below

commit 23706431c00371cb34d19d666ec02cf9c152ef40
Author: ran <[email protected]>
AuthorDate: Thu Jul 22 22:06:52 2021 +0800

    [Python Client] Fix handle complex schema (#11400)
    
    Fixes #7785, #11221
    
    ### Motivation
    
    Currently, the Pulsar python client couldn't handle complex schema, users 
using complex schema will encounter errors.
    
    ### Modifications
    
    Fix AvroSchema encodes and decodes complex schema.
    Fix JsonSchema decodes complex schema.
    
    ### Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Encode and decode complex schema data*
      - *Produce and consume complex schema data*
---
 .../python/pulsar/schema/definition.py             |  15 ++-
 .../python/pulsar/schema/schema_avro.py            |  11 +-
 pulsar-client-cpp/python/schema_test.py            | 115 ++++++++++++++++++++-
 3 files changed, 134 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py 
b/pulsar-client-cpp/python/pulsar/schema/definition.py
index d46cf3c..3b946b8 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -17,9 +17,10 @@
 # under the License.
 #
 
-from abc import abstractmethod, ABCMeta
-from enum import Enum, EnumMeta
+import copy
+from abc import abstractmethod
 from collections import OrderedDict
+from enum import Enum, EnumMeta
 from six import with_metaclass
 
 
@@ -63,8 +64,14 @@ class Record(with_metaclass(RecordMeta, object)):
 
         for k, value in self._fields.items():
             if k in kwargs:
-                # Value was overridden at constructor
-                self.__setattr__(k, kwargs[k])
+                if isinstance(value, Record) and isinstance(kwargs[k], dict):
+                    # Use dict init Record object
+                    copied = copy.copy(value)
+                    copied.__init__(decode=True, **kwargs[k])
+                    self.__setattr__(k, copied)
+                else:
+                    # Value was overridden at constructor
+                    self.__setattr__(k, kwargs[k])
             elif isinstance(value, Record):
                 # Value is a subrecord
                 self.__setattr__(k, value)
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py 
b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
index 2afa9db..fc9e6a6 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -21,6 +21,7 @@ import _pulsar
 import io
 import enum
 
+from . import Record
 from .schema import Schema
 
 try:
@@ -39,16 +40,24 @@ if HAS_AVRO:
         def _get_serialized_value(self, x):
             if isinstance(x, enum.Enum):
                 return x.name
+            elif isinstance(x, Record):
+                return self.encode_dict(x.__dict__)
             else:
                 return x
 
         def encode(self, obj):
             self._validate_object_type(obj)
             buffer = io.BytesIO()
-            m = {k: self._get_serialized_value(v) for k, v in 
obj.__dict__.items()}
+            m = self.encode_dict(obj.__dict__)
             fastavro.schemaless_writer(buffer, self._schema, m)
             return buffer.getvalue()
 
+        def encode_dict(self, d: dict):
+            obj = {}
+            for k, v in d.items():
+                obj[k] = self._get_serialized_value(v)
+            return obj
+
         def decode(self, data):
             buffer = io.BytesIO(data)
             d = fastavro.schemaless_reader(buffer, self._schema)
diff --git a/pulsar-client-cpp/python/schema_test.py 
b/pulsar-client-cpp/python/schema_test.py
index a0d60c0..8cf6ff4 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -363,7 +363,7 @@ class SchemaTest(TestCase):
 
         consumer = client.subscribe('my-avro-python-schema-version-topic', 
'sub-1',
                                     schema=AvroSchema(Example))
-        
+
         r = Example(a=1, b=2)
         producer.send(r)
 
@@ -372,7 +372,7 @@ class SchemaTest(TestCase):
         self.assertIsNotNone(msg.schema_version())
 
         self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', 
msg.schema_version().encode())
-        
+
         self.assertEqual(r, msg.value())
 
         client.close()
@@ -853,5 +853,116 @@ class SchemaTest(TestCase):
         consumer.close()
         client.close()
 
+    def test_serialize_schema_complex(self):
+        class NestedObj1(Record):
+            na1 = String()
+            nb1 = Double()
+
+        class NestedObj2(Record):
+            na2 = Integer()
+            nb2 = Boolean()
+            nc2 = NestedObj1()
+
+        class ComplexRecord(Record):
+            a = Integer()
+            b = Integer()
+            nested = NestedObj2()
+
+        self.assertEqual(ComplexRecord.schema(), {
+            "name": "ComplexRecord",
+            "type": "record",
+            "fields": [
+                {"name": "a", "type": ["null", "int"]},
+                {"name": "b", "type": ["null", "int"]},
+                {"name": "nested", "type": ['null', {'name': 'NestedObj2', 
'type': 'record', 'fields': [
+                    {'name': 'na2', 'type': ['null', 'int']},
+                    {'name': 'nb2', 'type': ['null', 'boolean']},
+                    {'name': 'nc2', 'type': ['null', {'name': 'NestedObj1', 
'type': 'record', 'fields': [
+                        {'name': 'na1', 'type': ['null', 'string']},
+                        {'name': 'nb1', 'type': ['null', 'double']}
+                    ]}]}
+                ]}]}
+            ]
+        })
+
+        def encode_and_decode(schema_type):
+            data_schema = AvroSchema(ComplexRecord)
+            if schema_type == 'json':
+                data_schema = JsonSchema(ComplexRecord)
+
+            nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
+            nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
+            r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+            data_encode = data_schema.encode(r)
+
+            data_decode = data_schema.decode(data_encode)
+            self.assertEqual(data_decode.__class__.__name__, 'ComplexRecord')
+            self.assertEqual(data_decode, r)
+            self.assertEqual(data_decode.a, 1)
+            self.assertEqual(data_decode.b, 2)
+            self.assertEqual(data_decode.nested.na2, 22)
+            self.assertEqual(data_decode.nested.nb2, True)
+            self.assertEqual(data_decode.nested.nc2.na1, 'na1 value')
+            self.assertEqual(data_decode.nested.nc2.nb1, 20.5)
+            print('Encode and decode complex schema finish. schema_type: ', 
schema_type)
+
+        encode_and_decode('avro')
+        encode_and_decode('json')
+
+    def test_produce_and_consume_complex_schema_data(self):
+        class NestedObj1(Record):
+            na1 = String()
+            nb1 = Double()
+
+        class NestedObj2(Record):
+            na2 = Integer()
+            nb2 = Boolean()
+            nc2 = NestedObj1()
+
+        class ComplexRecord(Record):
+            a = Integer()
+            b = Integer()
+            nested = NestedObj2()
+
+        client = pulsar.Client(self.serviceUrl)
+
+        def produce_consume_test(schema_type):
+            topic = "my-complex-schema-topic-" + schema_type
+
+            data_schema = AvroSchema(ComplexRecord)
+            if schema_type == 'json':
+                data_schema= JsonSchema(ComplexRecord)
+
+            producer = client.create_producer(
+                        topic=topic,
+                        schema=data_schema)
+
+            consumer = client.subscribe(topic, 'test', schema=data_schema)
+
+            nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
+            nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
+            r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+            producer.send(r)
+
+            msg = consumer.receive()
+            value = msg.value()
+            self.assertEqual(value.__class__.__name__, 'ComplexRecord')
+            self.assertEqual(value, r)
+            self.assertEqual(value.a, 1)
+            self.assertEqual(value.b, 2)
+            self.assertEqual(value.nested.na2, 22)
+            self.assertEqual(value.nested.nb2, True)
+            self.assertEqual(value.nested.nc2.na1, 'na1 value')
+            self.assertEqual(value.nested.nc2.nb1, 20.5)
+
+            producer.close()
+            consumer.close()
+            print('Produce and consume complex schema data finish. 
schema_type', schema_type)
+
+        produce_consume_test('avro')
+        produce_consume_test('json')
+
+        client.close()
+
 if __name__ == '__main__':
     main()

Reply via email to