This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit a5e643cf6bf8b9b3da229d3986fdf83ba81c5122 Author: Matteo Merli <[email protected]> AuthorDate: Thu Jun 24 04:53:55 2021 +0200 [Python] Fixed import when AvroSchema is not being used (#11034) ### Motivation Fixes #10929 Since by default we're not marking `fastavro` as a dependency, we shouldn't failing when the dependency is not there, unless a user is really trying to use `AvroSchema`, in which case we should give a useful error message. (cherry picked from commit b4fa411e775b1cb7c21395fab83a16e60938d147) --- pulsar-client-cpp/python/pulsar/schema/__init__.py | 3 +- pulsar-client-cpp/python/pulsar/schema/schema.py | 27 --------- .../python/pulsar/schema/schema_avro.py | 67 ++++++++++++++++++++++ 3 files changed, 69 insertions(+), 28 deletions(-) diff --git a/pulsar-client-cpp/python/pulsar/schema/__init__.py b/pulsar-client-cpp/python/pulsar/schema/__init__.py index a38513f..150629d 100644 --- a/pulsar-client-cpp/python/pulsar/schema/__init__.py +++ b/pulsar-client-cpp/python/pulsar/schema/__init__.py @@ -20,4 +20,5 @@ from .definition import Record, Field, Null, Boolean, Integer, Long, \ Float, Double, Bytes, String, Array, Map -from .schema import Schema, BytesSchema, StringSchema, JsonSchema, AvroSchema +from .schema import Schema, BytesSchema, StringSchema, JsonSchema +from .schema_avro import AvroSchema diff --git a/pulsar-client-cpp/python/pulsar/schema/schema.py b/pulsar-client-cpp/python/pulsar/schema/schema.py index d0da91a..083efc3 100644 --- a/pulsar-client-cpp/python/pulsar/schema/schema.py +++ b/pulsar-client-cpp/python/pulsar/schema/schema.py @@ -20,9 +20,7 @@ from abc import abstractmethod import json -import fastavro import _pulsar -import io import enum @@ -95,28 +93,3 @@ class JsonSchema(Schema): def decode(self, data): return self._record_cls(**json.loads(data)) - - -class AvroSchema(Schema): - def __init__(self, record_cls): - super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, - record_cls.schema(), 'AVRO') - self._schema = record_cls.schema() - - def _get_serialized_value(self, x): - if isinstance(x, enum.Enum): - return x.name - else: - return x - - def encode(self, obj): - self._validate_object_type(obj) - buffer = io.BytesIO() - m = {k: self._get_serialized_value(v) for k, v in obj.__dict__.items()} - fastavro.schemaless_writer(buffer, self._schema, m) - return buffer.getvalue() - - def decode(self, data): - buffer = io.BytesIO(data) - d = fastavro.schemaless_reader(buffer, self._schema) - return self._record_cls(**d) diff --git a/pulsar-client-cpp/python/pulsar/schema/schema_avro.py b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py new file mode 100644 index 0000000..2afa9db --- /dev/null +++ b/pulsar-client-cpp/python/pulsar/schema/schema_avro.py @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import _pulsar +import io +import enum + +from .schema import Schema + +try: + import fastavro + HAS_AVRO = True +except ModuleNotFoundError: + HAS_AVRO = False + +if HAS_AVRO: + class AvroSchema(Schema): + def __init__(self, record_cls): + super(AvroSchema, self).__init__(record_cls, _pulsar.SchemaType.AVRO, + record_cls.schema(), 'AVRO') + self._schema = record_cls.schema() + + def _get_serialized_value(self, x): + if isinstance(x, enum.Enum): + return x.name + else: + return x + + def encode(self, obj): + self._validate_object_type(obj) + buffer = io.BytesIO() + m = {k: self._get_serialized_value(v) for k, v in obj.__dict__.items()} + fastavro.schemaless_writer(buffer, self._schema, m) + return buffer.getvalue() + + def decode(self, data): + buffer = io.BytesIO(data) + d = fastavro.schemaless_reader(buffer, self._schema) + return self._record_cls(**d) + +else: + class AvroSchema(Schema): + def __init__(self, _record_cls): + raise Exception("Avro library support was not found. Make sure to install Pulsar client " + + "with Avro support: pip3 install 'pulsar-client[avro]'") + + def encode(self, obj): + pass + + def decode(self, data): + pass
