This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 23706431 [Python Client] Fix handle complex schema (#11400)
23706431 is described below
commit 23706431c00371cb34d19d666ec02cf9c152ef40
Author: ran <[email protected]>
AuthorDate: Thu Jul 22 22:06:52 2021 +0800
[Python Client] Fix handle complex schema (#11400)
Fixes #7785, #11221
### Motivation
Currently, the Pulsar python client couldn't handle complex schema, users
using complex schema will encounter errors.
### Modifications
Fix AvroSchema encodes and decodes complex schema.
Fix JsonSchema decodes complex schema.
### Verifying this change
This change added tests and can be verified as follows:
- *Encode and decode complex schema data*
- *Produce and consume complex schema data*
---
.../python/pulsar/schema/definition.py | 15 ++-
.../python/pulsar/schema/schema_avro.py | 11 +-
pulsar-client-cpp/python/schema_test.py | 115 ++++++++++++++++++++-
3 files changed, 134 insertions(+), 7 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/schema/definition.py
b/pulsar-client-cpp/python/pulsar/schema/definition.py
index d46cf3c..3b946b8 100644
--- a/pulsar-client-cpp/python/pulsar/schema/definition.py
+++ b/pulsar-client-cpp/python/pulsar/schema/definition.py
@@ -17,9 +17,10 @@
# under the License.
#
-from abc import abstractmethod, ABCMeta
-from enum import Enum, EnumMeta
+import copy
+from abc import abstractmethod
from collections import OrderedDict
+from enum import Enum, EnumMeta
from six import with_metaclass
@@ -63,8 +64,14 @@ class Record(with_metaclass(RecordMeta, object)):
for k, value in self._fields.items():
if k in kwargs:
- # Value was overridden at constructor
- self.__setattr__(k, kwargs[k])
+ if isinstance(value, Record) and isinstance(kwargs[k], dict):
+ # Use dict init Record object
+ copied = copy.copy(value)
+ copied.__init__(decode=True, **kwargs[k])
+ self.__setattr__(k, copied)
+ else:
+ # Value was overridden at constructor
+ self.__setattr__(k, kwargs[k])
elif isinstance(value, Record):
# Value is a subrecord
self.__setattr__(k, value)
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
index 2afa9db..fc9e6a6 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -21,6 +21,7 @@ import _pulsar
import io
import enum
+from . import Record
from .schema import Schema
try:
@@ -39,16 +40,24 @@ if HAS_AVRO:
def _get_serialized_value(self, x):
if isinstance(x, enum.Enum):
return x.name
+ elif isinstance(x, Record):
+ return self.encode_dict(x.__dict__)
else:
return x
def encode(self, obj):
self._validate_object_type(obj)
buffer = io.BytesIO()
- m = {k: self._get_serialized_value(v) for k, v in
obj.__dict__.items()}
+ m = self.encode_dict(obj.__dict__)
fastavro.schemaless_writer(buffer, self._schema, m)
return buffer.getvalue()
+ def encode_dict(self, d: dict):
+ obj = {}
+ for k, v in d.items():
+ obj[k] = self._get_serialized_value(v)
+ return obj
+
def decode(self, data):
buffer = io.BytesIO(data)
d = fastavro.schemaless_reader(buffer, self._schema)
diff --git a/pulsar-client-cpp/python/schema_test.py
b/pulsar-client-cpp/python/schema_test.py
index a0d60c0..8cf6ff4 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -363,7 +363,7 @@ class SchemaTest(TestCase):
consumer = client.subscribe('my-avro-python-schema-version-topic',
'sub-1',
schema=AvroSchema(Example))
-
+
r = Example(a=1, b=2)
producer.send(r)
@@ -372,7 +372,7 @@ class SchemaTest(TestCase):
self.assertIsNotNone(msg.schema_version())
self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00',
msg.schema_version().encode())
-
+
self.assertEqual(r, msg.value())
client.close()
@@ -853,5 +853,116 @@ class SchemaTest(TestCase):
consumer.close()
client.close()
+ def test_serialize_schema_complex(self):
+ class NestedObj1(Record):
+ na1 = String()
+ nb1 = Double()
+
+ class NestedObj2(Record):
+ na2 = Integer()
+ nb2 = Boolean()
+ nc2 = NestedObj1()
+
+ class ComplexRecord(Record):
+ a = Integer()
+ b = Integer()
+ nested = NestedObj2()
+
+ self.assertEqual(ComplexRecord.schema(), {
+ "name": "ComplexRecord",
+ "type": "record",
+ "fields": [
+ {"name": "a", "type": ["null", "int"]},
+ {"name": "b", "type": ["null", "int"]},
+ {"name": "nested", "type": ['null', {'name': 'NestedObj2',
'type': 'record', 'fields': [
+ {'name': 'na2', 'type': ['null', 'int']},
+ {'name': 'nb2', 'type': ['null', 'boolean']},
+ {'name': 'nc2', 'type': ['null', {'name': 'NestedObj1',
'type': 'record', 'fields': [
+ {'name': 'na1', 'type': ['null', 'string']},
+ {'name': 'nb1', 'type': ['null', 'double']}
+ ]}]}
+ ]}]}
+ ]
+ })
+
+ def encode_and_decode(schema_type):
+ data_schema = AvroSchema(ComplexRecord)
+ if schema_type == 'json':
+ data_schema = JsonSchema(ComplexRecord)
+
+ nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
+ nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
+ r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+ data_encode = data_schema.encode(r)
+
+ data_decode = data_schema.decode(data_encode)
+ self.assertEqual(data_decode.__class__.__name__, 'ComplexRecord')
+ self.assertEqual(data_decode, r)
+ self.assertEqual(data_decode.a, 1)
+ self.assertEqual(data_decode.b, 2)
+ self.assertEqual(data_decode.nested.na2, 22)
+ self.assertEqual(data_decode.nested.nb2, True)
+ self.assertEqual(data_decode.nested.nc2.na1, 'na1 value')
+ self.assertEqual(data_decode.nested.nc2.nb1, 20.5)
+ print('Encode and decode complex schema finish. schema_type: ',
schema_type)
+
+ encode_and_decode('avro')
+ encode_and_decode('json')
+
+ def test_produce_and_consume_complex_schema_data(self):
+ class NestedObj1(Record):
+ na1 = String()
+ nb1 = Double()
+
+ class NestedObj2(Record):
+ na2 = Integer()
+ nb2 = Boolean()
+ nc2 = NestedObj1()
+
+ class ComplexRecord(Record):
+ a = Integer()
+ b = Integer()
+ nested = NestedObj2()
+
+ client = pulsar.Client(self.serviceUrl)
+
+ def produce_consume_test(schema_type):
+ topic = "my-complex-schema-topic-" + schema_type
+
+ data_schema = AvroSchema(ComplexRecord)
+ if schema_type == 'json':
+ data_schema= JsonSchema(ComplexRecord)
+
+ producer = client.create_producer(
+ topic=topic,
+ schema=data_schema)
+
+ consumer = client.subscribe(topic, 'test', schema=data_schema)
+
+ nested_obj1 = NestedObj1(na1='na1 value', nb1=20.5)
+ nested_obj2 = NestedObj2(na2=22, nb2=True, nc2=nested_obj1)
+ r = ComplexRecord(a=1, b=2, nested=nested_obj2)
+ producer.send(r)
+
+ msg = consumer.receive()
+ value = msg.value()
+ self.assertEqual(value.__class__.__name__, 'ComplexRecord')
+ self.assertEqual(value, r)
+ self.assertEqual(value.a, 1)
+ self.assertEqual(value.b, 2)
+ self.assertEqual(value.nested.na2, 22)
+ self.assertEqual(value.nested.nb2, True)
+ self.assertEqual(value.nested.nc2.na1, 'na1 value')
+ self.assertEqual(value.nested.nc2.nb1, 20.5)
+
+ producer.close()
+ consumer.close()
+ print('Produce and consume complex schema data finish.
schema_type', schema_type)
+
+ produce_consume_test('avro')
+ produce_consume_test('json')
+
+ client.close()
+
if __name__ == '__main__':
main()