This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 85575f4 [Python Client] Python client support using custom Avro
schema definition (#12516)
85575f4 is described below
commit 85575f4f5d8eb54516c1c02e1cfcca0d936f2e49
Author: ran <[email protected]>
AuthorDate: Sat Oct 30 08:17:45 2021 +0800
[Python Client] Python client support using custom Avro schema definition
(#12516)
### Motivation
Currently, the Python client didn't support using schema definition to
generate `AvroSchema`, so users couldn't use the schema definition file in the
Python client.
### Modifications
Add a new init-param `schema_definition` for `AvroSchema` to support
initializing the `AvroSchema` by an Avro schema definition.
```
class AvroSchema(Schema):
def __init__(self, record_cls, schema_definition=None):
if record_cls is None and schema_definition is None:
raise AssertionError("The param record_cls and
schema_definition shouldn't be both None.")
if record_cls is not None:
self._schema = record_cls.schema()
else:
self._schema = schema_definition
super(AvroSchema, self).__init__(record_cls,
_pulsar.SchemaType.AVRO, self._schema, 'AVRO')
```
### How to use
Assume that there is a company Avro schema definition file `company.avsc`
like this.
```
{
"doc": "this is doc",
"namespace": "example.avro",
"type": "record",
"name": "Company",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "address", "type": ["null", "string"]},
{"name": "employees", "type": ["null", {"type": "array", "items": {
"type": "record",
"name": "Employee",
"fields": [
{"name": "name", "type": ["null", "string"]},
{"name": "age", "type": ["null", "int"]}
]
}}]},
{"name": "labels", "type": ["null", {"type": "map", "values":
"string"}]}
]
}
```
Users could load schema from file by `avro.schema` or `fastavro.schema`
> refer to
[load_schema](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema)
or [Avro Schema](http://avro.apache.org/docs/current/gettingstartedpython.html)
```
schema_definition = load_schema("examples/company.avsc")
# schema_definition = avro.schema.parse(open("examples/company.avsc",
"rb").read()).to_json()
avro_schema = AvroSchema(None, schema_definition=schema_definition)
producer = client.create_producer(
topic=topic,
schema=avro_schema)
consumer = client.subscribe(topic, 'test', schema=avro_schema)
company = {
"name": "company-name" + str(i),
"address": 'xxx road xxx street ' + str(i),
"employees": [
{"name": "user" + str(i), "age": 20 + i},
{"name": "user" + str(i), "age": 30 + i},
{"name": "user" + str(i), "age": 35 + i},
],
"labels": {
"industry": "software" + str(i),
"scale": ">100",
"funds": "1000000.0"
}
}
producer.send(company)
msg = consumer.receive()
# Users could get a dict object by `value()` method.
msg.value()
```
---
pulsar-client-cpp/python/examples/company.avsc | 19 +++
.../python/pulsar/schema/schema_avro.py | 29 +++--
pulsar-client-cpp/python/schema_test.py | 128 ++++++++++++++++++++-
3 files changed, 162 insertions(+), 14 deletions(-)
diff --git a/pulsar-client-cpp/python/examples/company.avsc
b/pulsar-client-cpp/python/examples/company.avsc
new file mode 100644
index 0000000..cdb595f
--- /dev/null
+++ b/pulsar-client-cpp/python/examples/company.avsc
@@ -0,0 +1,19 @@
+{
+ "doc": "this is doc",
+ "namespace": "example.avro",
+ "type": "record",
+ "name": "Company",
+ "fields": [
+ {"name": "name", "type": ["null", "string"]},
+ {"name": "address", "type": ["null", "string"]},
+ {"name": "employees", "type": ["null", {"type": "array", "items": {
+ "type": "record",
+ "name": "Employee",
+ "fields": [
+ {"name": "name", "type": ["null", "string"]},
+ {"name": "age", "type": ["null", "int"]}
+ ]
+ }}]},
+ {"name": "labels", "type": ["null", {"type": "map", "values":
"string"}]}
+ ]
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
index e76fc51..5861505 100644
--- a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
+++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py
@@ -32,10 +32,15 @@ except ModuleNotFoundError:
if HAS_AVRO:
class AvroSchema(Schema):
- def __init__(self, record_cls):
- super(AvroSchema, self).__init__(record_cls,
_pulsar.SchemaType.AVRO,
- record_cls.schema(), 'AVRO')
- self._schema = record_cls.schema()
+ def __init__(self, record_cls, schema_definition=None):
+ if record_cls is None and schema_definition is None:
+ raise AssertionError("The param record_cls and
schema_definition shouldn't be both None.")
+
+ if record_cls is not None:
+ self._schema = record_cls.schema()
+ else:
+ self._schema = schema_definition
+ super(AvroSchema, self).__init__(record_cls,
_pulsar.SchemaType.AVRO, self._schema, 'AVRO')
def _get_serialized_value(self, x):
if isinstance(x, enum.Enum):
@@ -53,9 +58,14 @@ if HAS_AVRO:
return x
def encode(self, obj):
- self._validate_object_type(obj)
buffer = io.BytesIO()
- m = self.encode_dict(obj.__dict__)
+ m = obj
+ if self._record_cls is not None:
+ self._validate_object_type(obj)
+ m = self.encode_dict(obj.__dict__)
+ elif not isinstance(obj, dict):
+ raise ValueError('If using the custom schema, the record data
should be dict type.')
+
fastavro.schemaless_writer(buffer, self._schema, m)
return buffer.getvalue()
@@ -68,11 +78,14 @@ if HAS_AVRO:
def decode(self, data):
buffer = io.BytesIO(data)
d = fastavro.schemaless_reader(buffer, self._schema)
- return self._record_cls(**d)
+ if self._record_cls is not None:
+ return self._record_cls(**d)
+ else:
+ return d
else:
class AvroSchema(Schema):
- def __init__(self, _record_cls):
+ def __init__(self, _record_cls, _schema_definition):
raise Exception("Avro library support was not found. Make sure to
install Pulsar client " +
"with Avro support: pip3 install
'pulsar-client[avro]'")
diff --git a/pulsar-client-cpp/python/schema_test.py
b/pulsar-client-cpp/python/schema_test.py
index 7adbcbe..d2554da 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -25,6 +25,7 @@ import pulsar
from pulsar.schema import *
from enum import Enum
import json
+from fastavro.schema import load_schema
class SchemaTest(TestCase):
@@ -1145,12 +1146,127 @@ class SchemaTest(TestCase):
client.close()
- def test(self):
- class NamespaceDemo(Record):
- _namespace = 'xxx.xxx.xxx'
- x = String()
- y = Integer()
- print('schema: ', NamespaceDemo.schema())
+ def custom_schema_test(self):
+
+ def encode_and_decode(schema_definition):
+ avro_schema = AvroSchema(None, schema_definition=schema_definition)
+
+ company = {
+ "name": "company-name",
+ "address": 'xxx road xxx street',
+ "employees": [
+ {"name": "user1", "age": 25},
+ {"name": "user2", "age": 30},
+ {"name": "user3", "age": 35},
+ ],
+ "labels": {
+ "industry": "software",
+ "scale": ">100",
+ "funds": "1000000.0"
+ }
+ }
+ data = avro_schema.encode(company)
+ company_decode = avro_schema.decode(data)
+ self.assertEqual(company, company_decode)
+
+ schema_definition = {
+ 'doc': 'this is doc',
+ 'namespace': 'example.avro',
+ 'type': 'record',
+ 'name': 'Company',
+ 'fields': [
+ {'name': 'name', 'type': ['null', 'string']},
+ {'name': 'address', 'type': ['null', 'string']},
+ {'name': 'employees', 'type': ['null', {'type': 'array',
'items': {
+ 'type': 'record',
+ 'name': 'Employee',
+ 'fields': [
+ {'name': 'name', 'type': ['null', 'string']},
+ {'name': 'age', 'type': ['null', 'int']}
+ ]
+ }}]},
+ {'name': 'labels', 'type': ['null', {'type': 'map', 'values':
'string'}]}
+ ]
+ }
+ encode_and_decode(schema_definition)
+ # Users could load schema from file by `fastavro.schema`
+ # Or use `avro.schema` like this
`avro.schema.parse(open("examples/company.avsc", "rb").read()).to_json()`
+ encode_and_decode(load_schema("examples/company.avsc"))
+
+ def custom_schema_produce_and_consume_test(self):
+ client = pulsar.Client(self.serviceUrl)
+
+ def produce_and_consume(topic, schema_definition):
+ print('custom schema produce and consume test topic - ', topic)
+ example_avro_schema = AvroSchema(None,
schema_definition=schema_definition)
+
+ producer = client.create_producer(
+ topic=topic,
+ schema=example_avro_schema)
+ consumer = client.subscribe(topic, 'test',
schema=example_avro_schema)
+
+ for i in range(0, 10):
+ company = {
+ "name": "company-name" + str(i),
+ "address": 'xxx road xxx street ' + str(i),
+ "employees": [
+ {"name": "user" + str(i), "age": 20 + i},
+ {"name": "user" + str(i), "age": 30 + i},
+ {"name": "user" + str(i), "age": 35 + i},
+ ],
+ "labels": {
+ "industry": "software" + str(i),
+ "scale": ">100",
+ "funds": "1000000.0"
+ }
+ }
+ producer.send(company)
+
+ for i in range(0, 10):
+ msg = consumer.receive()
+ company = {
+ "name": "company-name" + str(i),
+ "address": 'xxx road xxx street ' + str(i),
+ "employees": [
+ {"name": "user" + str(i), "age": 20 + i},
+ {"name": "user" + str(i), "age": 30 + i},
+ {"name": "user" + str(i), "age": 35 + i},
+ ],
+ "labels": {
+ "industry": "software" + str(i),
+ "scale": ">100",
+ "funds": "1000000.0"
+ }
+ }
+ self.assertEqual(msg.value(), company)
+ consumer.acknowledge(msg)
+
+ consumer.close()
+ producer.close()
+
+ schema_definition = {
+ 'doc': 'this is doc',
+ 'namespace': 'example.avro',
+ 'type': 'record',
+ 'name': 'Company',
+ 'fields': [
+ {'name': 'name', 'type': ['null', 'string']},
+ {'name': 'address', 'type': ['null', 'string']},
+ {'name': 'employees', 'type': ['null', {'type': 'array',
'items': {
+ 'type': 'record',
+ 'name': 'Employee',
+ 'fields': [
+ {'name': 'name', 'type': ['null', 'string']},
+ {'name': 'age', 'type': ['null', 'int']}
+ ]
+ }}]},
+ {'name': 'labels', 'type': ['null', {'type': 'map', 'values':
'string'}]}
+ ]
+ }
+ produce_and_consume('custom-schema-test-1',
schema_definition=schema_definition)
+ produce_and_consume('custom-schema-test-2',
schema_definition=load_schema("examples/company.avsc"))
+
+ client.close()
if __name__ == '__main__':
main()