This is an automated email from the ASF dual-hosted git repository. hequn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1fc400d [FLINK-18879][python] Support Row Serialization and Deserialization schemas for Python DataStream API. (#13150) 1fc400d is described below commit 1fc400d1f94a10d2f5b227cc8eaa1ac9754ec237 Author: Shuiqiang Chen <acqua....@alibaba-inc.com> AuthorDate: Sat Aug 15 16:51:07 2020 +0800 [FLINK-18879][python] Support Row Serialization and Deserialization schemas for Python DataStream API. (#13150) --- .../pyflink/common/serialization_schemas.py | 367 +++++++++++++++++++++ .../common/tests/test_serialization_schemas.py | 103 ++++++ 2 files changed, 470 insertions(+) diff --git a/flink-python/pyflink/common/serialization_schemas.py b/flink-python/pyflink/common/serialization_schemas.py new file mode 100644 index 0000000..3748e5a --- /dev/null +++ b/flink-python/pyflink/common/serialization_schemas.py @@ -0,0 +1,367 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from py4j.java_gateway import java_import +from pyflink.common.typeinfo import TypeInformation, WrapperTypeInfo + +from pyflink.util.utils import load_java_class + +from pyflink.java_gateway import get_gateway + + +class SerializationSchema(object): + """ + Base class for SerializationSchema. The serialization schema describes how to turn a data object + into a different serialized representation. Most data sinks (for example Apache Kafka) require + the data to be handed to them in a specific format (for example as byte strings). + """ + def __init__(self, j_serialization_schema=None): + self._j_serialization_schema = j_serialization_schema + + +class DeserializationSchema(object): + """ + Base class for DeserializationSchema. The deserialization schema describes how to turn the byte + messages delivered by certain data sources (for example Apache Kafka) into data types (Java/ + Scala objects) that are processed by Flink. + + In addition, the DeserializationSchema describes the produced type which lets Flink create + internal serializers and structures to handle the type. + """ + def __init__(self, j_deserialization_schema=None): + self._j_deserialization_schema = j_deserialization_schema + + +class SimpleStringSchema(SerializationSchema, DeserializationSchema): + """ + Very simple serialization/deserialization schema for strings. By default, the serializer uses + 'UTF-8' for string/byte conversion. + """ + + def __init__(self, charset: str = 'UTF-8'): + gate_way = get_gateway() + j_char_set = gate_way.jvm.java.nio.charset.Charset.forName(charset) + j_simple_string_serialization_schema = gate_way \ + .jvm.org.apache.flink.api.common.serialization.SimpleStringSchema(j_char_set) + SerializationSchema.__init__(self, + j_serialization_schema=j_simple_string_serialization_schema) + DeserializationSchema.__init__( + self, j_deserialization_schema=j_simple_string_serialization_schema) + + +class JsonRowDeserializationSchema(DeserializationSchema): + """ + Deserialization schema from JSON to Flink types. + + Deserializes a byte[] message as a JSON object and reads the specified fields. + + Failures during deserialization are forwarded as wrapped IOExceptions. + """ + def __init__(self, j_deserialization_schema): + super(JsonRowDeserializationSchema, self).__init__(j_deserialization_schema) + + @staticmethod + def builder(): + """ + A static method to get a Builder for JsonRowDeserializationSchema. + """ + return JsonRowDeserializationSchema.Builder() + + class Builder(object): + """ + Builder for JsonRowDeserializationSchema. + """ + + def __init__(self): + self._type_info = None + self._fail_on_missing_field = False + self._ignore_parse_errors = False + + def type_info(self, type_info: TypeInformation): + """ + Creates a JSON deserialization schema for the given type information. + + :param type_info: Type information describing the result type. The field names of Row + are used to parse the JSON properties. + """ + self._type_info = type_info + return self + + def json_schema(self, json_schema: str): + """ + Creates a JSON deserialization schema for the given JSON schema. + + :param json_schema: JSON schema describing the result type. + """ + if json_schema is None: + raise TypeError("The json_schema must not be None.") + j_type_info = get_gateway().jvm \ + .org.apache.flink.formats.json.JsonRowSchemaConverter.convert(json_schema) + self._type_info = WrapperTypeInfo(j_type_info) + return self + + def fail_on_missing_field(self): + """ + Configures schema to fail if a JSON field is missing. A missing field is ignored and the + field is set to null by default. + """ + self._fail_on_missing_field = True + return self + + def ignore_parse_errors(self): + """ + Configures schema to fail when parsing json failed. An exception will be thrown when + parsing json fails. + """ + self._ignore_parse_errors = True + return self + + def build(self): + JBuilder = get_gateway().jvm.org.apache.flink.formats.json.JsonRowDeserializationSchema\ + .Builder + j_builder = JBuilder(self._type_info.get_java_type_info()) + + if self._fail_on_missing_field: + j_builder = j_builder.fialOnMissingField() + + if self._ignore_parse_errors: + j_builder = j_builder.ignoreParseErrors() + + j_deserialization_schema = j_builder.build() + return JsonRowDeserializationSchema(j_deserialization_schema=j_deserialization_schema) + + +class JsonRowSerializationSchema(SerializationSchema): + """ + Serialization schema that serializes an object of Flink types into a JSON bytes. Serializes the + input Flink object into a JSON string and converts it into byte[]. + + Result byte[] message can be deserialized using JsonRowDeserializationSchema. + """ + + def __init__(self, j_serialization_schema): + super(JsonRowSerializationSchema, self).__init__(j_serialization_schema) + + @staticmethod + def builder(): + return JsonRowSerializationSchema.Builder() + + class Builder(object): + """ + Builder for JsonRowSerializationSchema. + """ + def __init__(self): + self._type_info = None + + def with_type_info(self, type_info: TypeInformation): + """ + Creates a JSON serialization schema for the given type information. + + :param type_info: Type information describing the result type. The field names of Row + are used to parse the JSON properties. + """ + self._type_info = type_info + return self + + def build(self): + if self._type_info is None: + raise TypeError("Typeinfo should be set.") + + j_builder = get_gateway().jvm \ + .org.apache.flink.formats.json.JsonRowSerializationSchema.builder() + + j_schema = j_builder.withTypeInfo(self._type_info.get_java_type_info()).build() + return JsonRowSerializationSchema(j_serialization_schema=j_schema) + + +class CsvRowDeserializationSchema(DeserializationSchema): + """ + Deserialization schema from CSV to Flink types. Deserializes a byte[] message as a JsonNode and + converts it to Row. + + Failure during deserialization are forwarded as wrapped IOException. + """ + + def __init__(self, j_deserialization_schema): + super(CsvRowDeserializationSchema, self).__init__( + j_deserialization_schema=j_deserialization_schema) + + class Builder(object): + """ + A builder for creating a CsvRowDeserializationSchema. + """ + def __init__(self, type_info: TypeInformation): + if type_info is None: + raise TypeError("Type information must not be None") + self._j_builder = get_gateway().jvm\ + .org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder( + type_info.get_java_type_info()) + + def set_field_delimiter(self, delimiter: str): + self._j_builder = self._j_builder.setFieldDelimiter(delimiter) + return self + + def set_allow_comments(self, allow_comments: bool): + self._j_builder = self._j_builder.setAllowComments(allow_comments) + return self + + def set_array_element_delimiter(self, delimiter: str): + self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter) + return self + + def set_quote_character(self, c: str): + self._j_builder = self._j_builder.setQuoteCharacter(c) + return self + + def set_escape_character(self, c: str): + self._j_builder = self._j_builder.setEscapeCharacter(c) + return self + + def set_null_literal(self, null_literal: str): + self._j_builder = self._j_builder.setNullLiteral(null_literal) + return self + + def set_ignore_parse_errors(self, ignore_parse_errors: bool): + self._j_builder = self._j_builder.setIgnoreParseErrors(ignore_parse_errors) + return self + + def build(self): + j_csv_row_deserialization_schema = self._j_builder.build() + return CsvRowDeserializationSchema( + j_deserialization_schema=j_csv_row_deserialization_schema) + + +class CsvRowSerializationSchema(SerializationSchema): + """ + Serialization schema that serializes an object of Flink types into a CSV bytes. Serializes the + input row into an ObjectNode and converts it into byte[]. + + Result byte[] messages can be deserialized using CsvRowDeserializationSchema. + """ + def __init__(self, j_csv_row_serialization_schema): + super(CsvRowSerializationSchema, self).__init__(j_csv_row_serialization_schema) + + class Builder(object): + """ + A builder for creating a CsvRowSerializationSchema. + """ + def __init__(self, type_info: TypeInformation): + if type_info is None: + raise TypeError("Type information must not be None") + self._j_builder = get_gateway().jvm\ + .org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder( + type_info.get_java_type_info()) + + def set_field_delimiter(self, c: str): + self._j_builder = self._j_builder.setFieldDelimiter(c) + return self + + def set_line_delimiter(self, delimiter: str): + self._j_builder = self._j_builder.setLineDelimiter(delimiter) + return self + + def set_array_element_delimiter(self, delimiter: str): + self._j_builder = self._j_builder.setArrayElementDelimiter(delimiter) + return self + + def disable_quote_character(self): + self._j_builder = self._j_builder.disableQuoteCharacter() + return self + + def set_quote_character(self, c: str): + self._j_builder = self._j_builder.setQuoteCharacter(c) + return self + + def set_escape_character(self, c: str): + self._j_builder = self._j_builder.setEscapeCharacter(c) + return self + + def set_null_literal(self, s: str): + self._j_builder = self._j_builder.setNullLiteral(s) + return self + + def build(self): + j_serialization_schema = self._j_builder.build() + return CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema) + + +class AvroRowDeserializationSchema(DeserializationSchema): + """ + Deserialization schema from Avro bytes to Row. Deserializes the byte[] messages into (nested) + Flink rows. It converts Avro types into types that are compatible with Flink's Table & SQL API. + + Projects with Avro records containing logical date/time types need to add a JodaTime dependency. + """ + def __init__(self, record_class: str = None, avro_schema_string: str = None): + """ + Creates an Avro deserialization schema for the given specific record class or Avro schema + string. Having the concrete Avro record class might improve performance. + + :param record_class: Avro record class used to deserialize Avro's record to Flink's row. + :param avro_schema_string: Avro schema string to deserialize Avro's record to Flink's row. + """ + + if avro_schema_string is None and record_class is None: + raise TypeError("record_class or avro_schema_string should be specified.") + j_deserialization_schema = None + if record_class is not None: + gateway = get_gateway() + java_import(gateway.jvm, record_class) + j_record_class = load_java_class(record_class) + JAvroRowDeserializationSchema = get_gateway().jvm \ + .org.apache.flink.formats.avro.AvroRowDeserializationSchema + j_deserialization_schema = JAvroRowDeserializationSchema(j_record_class) + + elif avro_schema_string is not None: + JAvroRowDeserializationSchema = get_gateway().jvm \ + .org.apache.flink.formats.avro.AvroRowDeserializationSchema + j_deserialization_schema = JAvroRowDeserializationSchema(avro_schema_string) + + super(AvroRowDeserializationSchema, self).__init__(j_deserialization_schema) + + +class AvroRowSerializationSchema(SerializationSchema): + """ + Serialization schema that serializes to Avro binary format. + """ + + def __init__(self, record_class: str = None, avro_schema_string: str = None): + """ + Creates AvroSerializationSchema that serializes SpecificRecord using provided schema or + record class. + + :param record_class: Avro record class used to serialize Flink's row to Avro's record. + :param avro_schema_string: Avro schema string to serialize Flink's row to Avro's record. + """ + if avro_schema_string is None and record_class is None: + raise TypeError("record_class or avro_schema_string should be specified.") + + j_serialization_schema = None + if record_class is not None: + gateway = get_gateway() + java_import(gateway.jvm, record_class) + j_record_class = load_java_class(record_class) + JAvroRowSerializationSchema = get_gateway().jvm \ + .org.apache.flink.formats.avro.AvroRowSerializationSchema + j_serialization_schema = JAvroRowSerializationSchema(j_record_class) + + elif avro_schema_string is not None: + JAvroRowSerializationSchema = get_gateway().jvm \ + .org.apache.flink.formats.avro.AvroRowSerializationSchema + j_serialization_schema = JAvroRowSerializationSchema(avro_schema_string) + + super(AvroRowSerializationSchema, self).__init__(j_serialization_schema) diff --git a/flink-python/pyflink/common/tests/test_serialization_schemas.py b/flink-python/pyflink/common/tests/test_serialization_schemas.py new file mode 100644 index 0000000..02e51e5 --- /dev/null +++ b/flink-python/pyflink/common/tests/test_serialization_schemas.py @@ -0,0 +1,103 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from pyflink.common.serialization_schemas import JsonRowSerializationSchema, \ + JsonRowDeserializationSchema, CsvRowSerializationSchema, CsvRowDeserializationSchema, \ + SimpleStringSchema +from pyflink.common.typeinfo import Types +from pyflink.java_gateway import get_gateway +from pyflink.testing.test_case_utils import PyFlinkTestCase + + +class TestRowSerializationSchemas(PyFlinkTestCase): + + def test_simple_string_schema(self): + expected_string = 'test string' + simple_string_schema = SimpleStringSchema() + self.assertEqual(expected_string.encode(encoding='utf-8'), + simple_string_schema._j_serialization_schema.serialize(expected_string)) + + self.assertEqual(expected_string, simple_string_schema._j_deserialization_schema + .deserialize(expected_string.encode(encoding='utf-8'))) + + def test_json_row_serialization_deserialization_schema(self): + jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\"}", + "{\"svt\":\"2020-02-24T12:58:09.209+0800\", " + "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},\"ids\":[1, 2, 3]}", + "{\"svt\":\"2020-02-24T12:58:09.209+0800\"}"] + expected_jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}", + "{\"svt\":\"2020-02-24T12:58:09.209+0800\"," + "\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"}," + "\"ids\":[1,2,3]}", + "{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}"] + + row_schema = Types.ROW_NAMED(["svt", "ops", "ids"], + [Types.STRING(), + Types.ROW_NAMED(['id'], [Types.STRING()]), + Types.PRIMITIVE_ARRAY(Types.INT())]) + + json_row_serialization_schema = JsonRowSerializationSchema.builder() \ + .with_type_info(row_schema).build() + json_row_deserialization_schema = JsonRowDeserializationSchema.builder() \ + .type_info(row_schema).build() + + for i in range(len(jsons)): + j_row = json_row_deserialization_schema._j_deserialization_schema\ + .deserialize(bytes(jsons[i], encoding='utf-8')) + result = str(json_row_serialization_schema._j_serialization_schema.serialize(j_row), + encoding='utf-8') + self.assertEqual(expected_jsons[i], result) + + def test_csv_row_serialization_schema(self): + JRow = get_gateway().jvm.org.apache.flink.types.Row + + j_row = JRow(3) + j_row.setField(0, "BEGIN") + j_row.setField(2, "END") + + def field_assertion(field_info, csv_value, value, field_delimiter): + row_info = Types.ROW([Types.STRING(), field_info, Types.STRING()]) + expected_csv = "BEGIN" + field_delimiter + csv_value + field_delimiter + "END\n" + j_row.setField(1, value) + + csv_row_serialization_schema = CsvRowSerializationSchema.Builder(row_info)\ + .set_escape_character('*').set_quote_character('\'')\ + .set_array_element_delimiter(':').set_field_delimiter(';').build() + csv_row_deserialization_schema = CsvRowDeserializationSchema.Builder(row_info)\ + .set_escape_character('*').set_quote_character('\'')\ + .set_array_element_delimiter(':').set_field_delimiter(';').build() + + serialized_bytes = csv_row_serialization_schema._j_serialization_schema.serialize(j_row) + self.assertEqual(expected_csv, str(serialized_bytes, encoding='utf-8')) + + j_deserialized_row = csv_row_deserialization_schema._j_deserialization_schema\ + .deserialize(expected_csv.encode("utf-8")) + self.assertTrue(j_row.equals(j_deserialized_row)) + + field_assertion(Types.STRING(), "'123''4**'", "123'4*", ";") + field_assertion(Types.STRING(), "'a;b''c'", "a;b'c", ";") + field_assertion(Types.INT(), "12", 12, ";") + + test_j_row = JRow(2) + test_j_row.setField(0, "1") + test_j_row.setField(1, "hello") + + field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello'", test_j_row, ";") + test_j_row.setField(1, "hello world") + field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello world'", test_j_row, + ";") + field_assertion(Types.STRING(), "null", "null", ";")