This is an automated email from the ASF dual-hosted git repository.

hequn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc400d  [FLINK-18879][python] Support Row Serialization and 
Deserialization schemas for Python DataStream API. (#13150)
1fc400d is described below

commit 1fc400d1f94a10d2f5b227cc8eaa1ac9754ec237
Author: Shuiqiang Chen <acqua....@alibaba-inc.com>
AuthorDate: Sat Aug 15 16:51:07 2020 +0800

    [FLINK-18879][python] Support Row Serialization and Deserialization schemas 
for Python DataStream API. (#13150)
---
 .../pyflink/common/serialization_schemas.py        | 367 +++++++++++++++++++++
 .../common/tests/test_serialization_schemas.py     | 103 ++++++
 2 files changed, 470 insertions(+)

diff --git a/flink-python/pyflink/common/serialization_schemas.py 
b/flink-python/pyflink/common/serialization_schemas.py
new file mode 100644
index 0000000..3748e5a
--- /dev/null
+++ b/flink-python/pyflink/common/serialization_schemas.py
@@ -0,0 +1,367 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from py4j.java_gateway import java_import
+from pyflink.common.typeinfo import TypeInformation, WrapperTypeInfo
+
+from pyflink.util.utils import load_java_class
+
+from pyflink.java_gateway import get_gateway
+
+
+class SerializationSchema(object):
+    """
+    Base class for SerializationSchema. The serialization schema describes how 
to turn a data object
+    into a different serialized representation. Most data sinks (for example 
Apache Kafka) require
+    the data to be handed to them in a specific format (for example as byte 
strings).
+    """
+    def __init__(self, j_serialization_schema=None):
+        self._j_serialization_schema = j_serialization_schema
+
+
+class DeserializationSchema(object):
+    """
+    Base class for DeserializationSchema. The deserialization schema describes 
how to turn the byte
+    messages delivered by certain data sources (for example Apache Kafka) into 
data types (Java/
+    Scala objects) that are processed by Flink.
+
+    In addition, the DeserializationSchema describes the produced type which 
lets Flink create
+    internal serializers and structures to handle the type.
+    """
+    def __init__(self, j_deserialization_schema=None):
+        self._j_deserialization_schema = j_deserialization_schema
+
+
+class SimpleStringSchema(SerializationSchema, DeserializationSchema):
+    """
+    Very simple serialization/deserialization schema for strings. By default, 
the serializer uses
+    'UTF-8' for string/byte conversion.
+    """
+
+    def __init__(self, charset: str = 'UTF-8'):
+        gate_way = get_gateway()
+        j_char_set = gate_way.jvm.java.nio.charset.Charset.forName(charset)
+        j_simple_string_serialization_schema = gate_way \
+            
.jvm.org.apache.flink.api.common.serialization.SimpleStringSchema(j_char_set)
+        SerializationSchema.__init__(self,
+                                     
j_serialization_schema=j_simple_string_serialization_schema)
+        DeserializationSchema.__init__(
+            self, 
j_deserialization_schema=j_simple_string_serialization_schema)
+
+
+class JsonRowDeserializationSchema(DeserializationSchema):
+    """
+    Deserialization schema from JSON to Flink types.
+
+    Deserializes a byte[] message as a JSON object and reads the specified 
fields.
+
+    Failures during deserialization are forwarded as wrapped IOExceptions.
+    """
+    def __init__(self, j_deserialization_schema):
+        super(JsonRowDeserializationSchema, 
self).__init__(j_deserialization_schema)
+
+    @staticmethod
+    def builder():
+        """
+        A static method to get a Builder for JsonRowDeserializationSchema.
+        """
+        return JsonRowDeserializationSchema.Builder()
+
+    class Builder(object):
+        """
+        Builder for JsonRowDeserializationSchema.
+        """
+
+        def __init__(self):
+            self._type_info = None
+            self._fail_on_missing_field = False
+            self._ignore_parse_errors = False
+
+        def type_info(self, type_info: TypeInformation):
+            """
+            Creates a JSON deserialization schema for the given type 
information.
+
+            :param type_info: Type information describing the result type. The 
field names of Row
+                              are used to parse the JSON properties.
+            """
+            self._type_info = type_info
+            return self
+
+        def json_schema(self, json_schema: str):
+            """
+            Creates a JSON deserialization schema for the given JSON schema.
+
+            :param json_schema: JSON schema describing the result type.
+            """
+            if json_schema is None:
+                raise TypeError("The json_schema must not be None.")
+            j_type_info = get_gateway().jvm \
+                
.org.apache.flink.formats.json.JsonRowSchemaConverter.convert(json_schema)
+            self._type_info = WrapperTypeInfo(j_type_info)
+            return self
+
+        def fail_on_missing_field(self):
+            """
+            Configures schema to fail if a JSON field is missing. A missing 
field is ignored and the
+            field is set to null by default.
+            """
+            self._fail_on_missing_field = True
+            return self
+
+        def ignore_parse_errors(self):
+            """
+            Configures schema to fail when parsing json failed. An exception 
will be thrown when
+            parsing json fails.
+            """
+            self._ignore_parse_errors = True
+            return self
+
+        def build(self):
+            JBuilder = 
get_gateway().jvm.org.apache.flink.formats.json.JsonRowDeserializationSchema\
+                .Builder
+            j_builder = JBuilder(self._type_info.get_java_type_info())
+
+            if self._fail_on_missing_field:
+                j_builder = j_builder.fialOnMissingField()
+
+            if self._ignore_parse_errors:
+                j_builder = j_builder.ignoreParseErrors()
+
+            j_deserialization_schema = j_builder.build()
+            return 
JsonRowDeserializationSchema(j_deserialization_schema=j_deserialization_schema)
+
+
+class JsonRowSerializationSchema(SerializationSchema):
+    """
+    Serialization schema that serializes an object of Flink types into a JSON 
bytes. Serializes the
+    input Flink object into a JSON string and converts it into byte[].
+
+    Result byte[] message can be deserialized using 
JsonRowDeserializationSchema.
+    """
+
+    def __init__(self, j_serialization_schema):
+        super(JsonRowSerializationSchema, 
self).__init__(j_serialization_schema)
+
+    @staticmethod
+    def builder():
+        return JsonRowSerializationSchema.Builder()
+
+    class Builder(object):
+        """
+        Builder for JsonRowSerializationSchema.
+        """
+        def __init__(self):
+            self._type_info = None
+
+        def with_type_info(self, type_info: TypeInformation):
+            """
+            Creates a JSON serialization schema for the given type information.
+
+            :param type_info: Type information describing the result type. The 
field names of Row
+                              are used to parse the JSON properties.
+            """
+            self._type_info = type_info
+            return self
+
+        def build(self):
+            if self._type_info is None:
+                raise TypeError("Typeinfo should be set.")
+
+            j_builder = get_gateway().jvm \
+                
.org.apache.flink.formats.json.JsonRowSerializationSchema.builder()
+
+            j_schema = 
j_builder.withTypeInfo(self._type_info.get_java_type_info()).build()
+            return JsonRowSerializationSchema(j_serialization_schema=j_schema)
+
+
+class CsvRowDeserializationSchema(DeserializationSchema):
+    """
+    Deserialization schema from CSV to Flink types. Deserializes a byte[] 
message as a JsonNode and
+    converts it to Row.
+
+    Failure during deserialization are forwarded as wrapped IOException.
+    """
+
+    def __init__(self, j_deserialization_schema):
+        super(CsvRowDeserializationSchema, self).__init__(
+            j_deserialization_schema=j_deserialization_schema)
+
+    class Builder(object):
+        """
+        A builder for creating a CsvRowDeserializationSchema.
+        """
+        def __init__(self, type_info: TypeInformation):
+            if type_info is None:
+                raise TypeError("Type information must not be None")
+            self._j_builder = get_gateway().jvm\
+                
.org.apache.flink.formats.csv.CsvRowDeserializationSchema.Builder(
+                type_info.get_java_type_info())
+
+        def set_field_delimiter(self, delimiter: str):
+            self._j_builder = self._j_builder.setFieldDelimiter(delimiter)
+            return self
+
+        def set_allow_comments(self, allow_comments: bool):
+            self._j_builder = self._j_builder.setAllowComments(allow_comments)
+            return self
+
+        def set_array_element_delimiter(self, delimiter: str):
+            self._j_builder = 
self._j_builder.setArrayElementDelimiter(delimiter)
+            return self
+
+        def set_quote_character(self, c: str):
+            self._j_builder = self._j_builder.setQuoteCharacter(c)
+            return self
+
+        def set_escape_character(self, c: str):
+            self._j_builder = self._j_builder.setEscapeCharacter(c)
+            return self
+
+        def set_null_literal(self, null_literal: str):
+            self._j_builder = self._j_builder.setNullLiteral(null_literal)
+            return self
+
+        def set_ignore_parse_errors(self, ignore_parse_errors: bool):
+            self._j_builder = 
self._j_builder.setIgnoreParseErrors(ignore_parse_errors)
+            return self
+
+        def build(self):
+            j_csv_row_deserialization_schema = self._j_builder.build()
+            return CsvRowDeserializationSchema(
+                j_deserialization_schema=j_csv_row_deserialization_schema)
+
+
+class CsvRowSerializationSchema(SerializationSchema):
+    """
+    Serialization schema that serializes an object of Flink types into a CSV 
bytes. Serializes the
+    input row into an ObjectNode and converts it into byte[].
+
+    Result byte[] messages can be deserialized using 
CsvRowDeserializationSchema.
+    """
+    def __init__(self, j_csv_row_serialization_schema):
+        super(CsvRowSerializationSchema, 
self).__init__(j_csv_row_serialization_schema)
+
+    class Builder(object):
+        """
+        A builder for creating a CsvRowSerializationSchema.
+        """
+        def __init__(self, type_info: TypeInformation):
+            if type_info is None:
+                raise TypeError("Type information must not be None")
+            self._j_builder = get_gateway().jvm\
+                
.org.apache.flink.formats.csv.CsvRowSerializationSchema.Builder(
+                type_info.get_java_type_info())
+
+        def set_field_delimiter(self, c: str):
+            self._j_builder = self._j_builder.setFieldDelimiter(c)
+            return self
+
+        def set_line_delimiter(self, delimiter: str):
+            self._j_builder = self._j_builder.setLineDelimiter(delimiter)
+            return self
+
+        def set_array_element_delimiter(self, delimiter: str):
+            self._j_builder = 
self._j_builder.setArrayElementDelimiter(delimiter)
+            return self
+
+        def disable_quote_character(self):
+            self._j_builder = self._j_builder.disableQuoteCharacter()
+            return self
+
+        def set_quote_character(self, c: str):
+            self._j_builder = self._j_builder.setQuoteCharacter(c)
+            return self
+
+        def set_escape_character(self, c: str):
+            self._j_builder = self._j_builder.setEscapeCharacter(c)
+            return self
+
+        def set_null_literal(self, s: str):
+            self._j_builder = self._j_builder.setNullLiteral(s)
+            return self
+
+        def build(self):
+            j_serialization_schema = self._j_builder.build()
+            return 
CsvRowSerializationSchema(j_csv_row_serialization_schema=j_serialization_schema)
+
+
+class AvroRowDeserializationSchema(DeserializationSchema):
+    """
+    Deserialization schema from Avro bytes to Row. Deserializes the byte[] 
messages into (nested)
+    Flink rows. It converts Avro types into types that are compatible with 
Flink's Table & SQL API.
+
+    Projects with Avro records containing logical date/time types need to add 
a JodaTime dependency.
+    """
+    def __init__(self, record_class: str = None, avro_schema_string: str = 
None):
+        """
+        Creates an Avro deserialization schema for the given specific record 
class or Avro schema
+        string. Having the concrete Avro record class might improve 
performance.
+
+        :param record_class: Avro record class used to deserialize Avro's 
record to Flink's row.
+        :param avro_schema_string: Avro schema string to deserialize Avro's 
record to Flink's row.
+        """
+
+        if avro_schema_string is None and record_class is None:
+            raise TypeError("record_class or avro_schema_string should be 
specified.")
+        j_deserialization_schema = None
+        if record_class is not None:
+            gateway = get_gateway()
+            java_import(gateway.jvm, record_class)
+            j_record_class = load_java_class(record_class)
+            JAvroRowDeserializationSchema = get_gateway().jvm \
+                .org.apache.flink.formats.avro.AvroRowDeserializationSchema
+            j_deserialization_schema = 
JAvroRowDeserializationSchema(j_record_class)
+
+        elif avro_schema_string is not None:
+            JAvroRowDeserializationSchema = get_gateway().jvm \
+                .org.apache.flink.formats.avro.AvroRowDeserializationSchema
+            j_deserialization_schema = 
JAvroRowDeserializationSchema(avro_schema_string)
+
+        super(AvroRowDeserializationSchema, 
self).__init__(j_deserialization_schema)
+
+
+class AvroRowSerializationSchema(SerializationSchema):
+    """
+    Serialization schema that serializes to Avro binary format.
+    """
+
+    def __init__(self, record_class: str = None, avro_schema_string: str = 
None):
+        """
+        Creates AvroSerializationSchema that serializes SpecificRecord using 
provided schema or
+        record class.
+
+        :param record_class: Avro record class used to serialize  Flink's row 
to Avro's record.
+        :param avro_schema_string: Avro schema string to serialize Flink's row 
to Avro's record.
+        """
+        if avro_schema_string is None and record_class is None:
+            raise TypeError("record_class or avro_schema_string should be 
specified.")
+
+        j_serialization_schema = None
+        if record_class is not None:
+            gateway = get_gateway()
+            java_import(gateway.jvm, record_class)
+            j_record_class = load_java_class(record_class)
+            JAvroRowSerializationSchema = get_gateway().jvm \
+                .org.apache.flink.formats.avro.AvroRowSerializationSchema
+            j_serialization_schema = 
JAvroRowSerializationSchema(j_record_class)
+
+        elif avro_schema_string is not None:
+            JAvroRowSerializationSchema = get_gateway().jvm \
+                .org.apache.flink.formats.avro.AvroRowSerializationSchema
+            j_serialization_schema = 
JAvroRowSerializationSchema(avro_schema_string)
+
+        super(AvroRowSerializationSchema, 
self).__init__(j_serialization_schema)
diff --git a/flink-python/pyflink/common/tests/test_serialization_schemas.py 
b/flink-python/pyflink/common/tests/test_serialization_schemas.py
new file mode 100644
index 0000000..02e51e5
--- /dev/null
+++ b/flink-python/pyflink/common/tests/test_serialization_schemas.py
@@ -0,0 +1,103 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from pyflink.common.serialization_schemas import JsonRowSerializationSchema, \
+    JsonRowDeserializationSchema, CsvRowSerializationSchema, 
CsvRowDeserializationSchema, \
+    SimpleStringSchema
+from pyflink.common.typeinfo import Types
+from pyflink.java_gateway import get_gateway
+from pyflink.testing.test_case_utils import PyFlinkTestCase
+
+
+class TestRowSerializationSchemas(PyFlinkTestCase):
+
+    def test_simple_string_schema(self):
+        expected_string = 'test string'
+        simple_string_schema = SimpleStringSchema()
+        self.assertEqual(expected_string.encode(encoding='utf-8'),
+                         
simple_string_schema._j_serialization_schema.serialize(expected_string))
+
+        self.assertEqual(expected_string, 
simple_string_schema._j_deserialization_schema
+                         
.deserialize(expected_string.encode(encoding='utf-8')))
+
+    def test_json_row_serialization_deserialization_schema(self):
+        jsons = ["{\"svt\":\"2020-02-24T12:58:09.209+0800\"}",
+                 "{\"svt\":\"2020-02-24T12:58:09.209+0800\", "
+                 
"\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},\"ids\":[1, 2, 3]}",
+                 "{\"svt\":\"2020-02-24T12:58:09.209+0800\"}"]
+        expected_jsons = 
["{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}",
+                          "{\"svt\":\"2020-02-24T12:58:09.209+0800\","
+                          
"\"ops\":{\"id\":\"281708d0-4092-4c21-9233-931950b6eccf\"},"
+                          "\"ids\":[1,2,3]}",
+                          
"{\"svt\":\"2020-02-24T12:58:09.209+0800\",\"ops\":null,\"ids\":null}"]
+
+        row_schema = Types.ROW_NAMED(["svt", "ops", "ids"],
+                                     [Types.STRING(),
+                                     Types.ROW_NAMED(['id'], [Types.STRING()]),
+                                     Types.PRIMITIVE_ARRAY(Types.INT())])
+
+        json_row_serialization_schema = JsonRowSerializationSchema.builder() \
+            .with_type_info(row_schema).build()
+        json_row_deserialization_schema = 
JsonRowDeserializationSchema.builder() \
+            .type_info(row_schema).build()
+
+        for i in range(len(jsons)):
+            j_row = json_row_deserialization_schema._j_deserialization_schema\
+                .deserialize(bytes(jsons[i], encoding='utf-8'))
+            result = 
str(json_row_serialization_schema._j_serialization_schema.serialize(j_row),
+                         encoding='utf-8')
+            self.assertEqual(expected_jsons[i], result)
+
+    def test_csv_row_serialization_schema(self):
+        JRow = get_gateway().jvm.org.apache.flink.types.Row
+
+        j_row = JRow(3)
+        j_row.setField(0, "BEGIN")
+        j_row.setField(2, "END")
+
+        def field_assertion(field_info, csv_value, value, field_delimiter):
+            row_info = Types.ROW([Types.STRING(), field_info, Types.STRING()])
+            expected_csv = "BEGIN" + field_delimiter + csv_value + 
field_delimiter + "END\n"
+            j_row.setField(1, value)
+
+            csv_row_serialization_schema = 
CsvRowSerializationSchema.Builder(row_info)\
+                .set_escape_character('*').set_quote_character('\'')\
+                
.set_array_element_delimiter(':').set_field_delimiter(';').build()
+            csv_row_deserialization_schema = 
CsvRowDeserializationSchema.Builder(row_info)\
+                .set_escape_character('*').set_quote_character('\'')\
+                
.set_array_element_delimiter(':').set_field_delimiter(';').build()
+
+            serialized_bytes = 
csv_row_serialization_schema._j_serialization_schema.serialize(j_row)
+            self.assertEqual(expected_csv, str(serialized_bytes, 
encoding='utf-8'))
+
+            j_deserialized_row = 
csv_row_deserialization_schema._j_deserialization_schema\
+                .deserialize(expected_csv.encode("utf-8"))
+            self.assertTrue(j_row.equals(j_deserialized_row))
+
+        field_assertion(Types.STRING(), "'123''4**'", "123'4*", ";")
+        field_assertion(Types.STRING(), "'a;b''c'", "a;b'c", ";")
+        field_assertion(Types.INT(), "12", 12, ";")
+
+        test_j_row = JRow(2)
+        test_j_row.setField(0, "1")
+        test_j_row.setField(1, "hello")
+
+        field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), 
"'1:hello'", test_j_row, ";")
+        test_j_row.setField(1, "hello world")
+        field_assertion(Types.ROW([Types.STRING(), Types.STRING()]), "'1:hello 
world'", test_j_row,
+                        ";")
+        field_assertion(Types.STRING(), "null", "null", ";")

Reply via email to