maosuhan opened a new pull request #14376:
URL: https://github.com/apache/flink/pull/14376


    ## What is the purpose of the change
   
   Protobuf is a structured data format introduced by google. Compared to json, 
protobuf is more efficient of space and computing. Nowadays, many companies use 
protobuf instead of json as data format in kafka and streaming processing.
   
   So, we will introduce a new format which can derialize/serialize protobuf 
data in a fast speed.
   User can use this format in SQL or Table API. 
   
   
   ## Verifying this change
   
   `
   create table source(
   ....... column list
   ) 
                   with(
                   'connector' = 'kafka',
                   'format' = 'pb',
                   'pb.message-class-name' = '<message class name>'
   )
   `
   
   `
   create table sink(
   ....... column list
   ) 
   with(
                   'connector' = 'kafka',
                   'format' = 'pb',
                   'pb.message-class-name' = '<message class name>'
   )
   `
   
   ##Tests
   TODO
   
   ## Benchmark
   Performance test for pb object containing 200+ fields. Below is the consumed 
time of processing 10M rows.
   Implementation | Deserialize Speed | Serialize Speed
   -- | -- | --
   json | 110s | 120s
   DynamicMessage and Descriptor API | 152s | 206s
   Codegen | 42s | 33s
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - New dependencies: Add protobuf dependency 
com.google.protobuf:protobuf-java:3.12.2
     - Public API: Add new format in Flink SQL
     - The serializers: Add new PbRowDeserializationSchema and 
PbRowSerializationSchema
     - The runtime per-record code paths (performance sensitive): yes
   
   ## Documentation
   Connector params:
   
   1. pb.message-class-name: Required option to specify the full name of 
protobuf message class. The protobuf class 
   must be located in the classpath both in client and task side.
   1. pb.read-default-values: Optional flag to read as default values instead 
of null when some field does not exist in deserialization; default to false. If 
proto syntax is proto3, this value will be set true forcibly because proto3's 
standard is to use default values.
   1. pb.ignore-parse-errors: default is false. Deserialization task will keep 
running if pb parse error occurs.
   
   ## Limitation
   1. In proto definition, package must be equals to java_package.
   2. java_multiple_files must be true
   
   ## Notice
   
   ### default values
   As you know, if the syntax is proto2, the generated pb class has bit flags 
to indicate whether a field is set or not. We can use pbObject.hasXXX() method 
to know whether the field is set or not. In this way, we can handle null 
information in flink properly. So if syntax=2,the decoded flink row may contain 
null values. We could also expose an option to user to control the behavior to 
handle null values.
   But if the syntax is proto3, the generated pb class does not have 
pbObject.hasXXX() method and does not hold bit flags, so there is no way to 
tell if a field is set or not if it is equals to default value. For example, if 
pbObje.getDim1() returns 0, there's no way to tell if dim1 is set by 0 or it is 
not set anyway. So if syntax=3, the decoded flink row will not contain any null 
values.
    
   Also pb does not permit null in key/value of map and array. We need to 
generate default value for them.
   
   row value | pb value
   -- | --
   map<string,string>(<"a", null>) | map<string,string>(("a", ""))
   map<string,string>(<null, "a">) | map<string,string>(("", "a"))
   map<int, int>(null, 1) | map<int, int>(0, 1)
   map<int, int>(1, null) | map<int, int>(1, 0)
   map<long, long>(null, 1) | map<long, long>(0, 1)
   map<long, long>(1, null) | map<long, long>(1, 0)
   map<bool, bool>(null, true) | map<bool, bool>(false, true)
   map<bool, bool>(true, null) | map<bool, bool>(true, false)
   map<string, float>("key", null) | map<string, float>("key", 0)
   map<string, double>("key", null) | map<string, double>("key", 0)
   map<string, enum>("key", null) | map<string, enum>("key", first_enum_element)
   map<string, binary>("key", null) | map<string, binary>("key", 
ByteString.EMPTY)
   map<string, MESSAGE>("key", null) | map<string, MESSAGE>("key", 
MESSAGE.getDefaultInstance())
   array<string>(null) | array("")
   array<int>(null) | array(0)
   array<long>(null) | array(0)
   array<bool>(null) | array(false)
   array<float>(null) | array(0)
   array<double>(null) | array(0)
   array<enum>(null) | array(first_enum_element)
   array<binary>(null) | array(ByteString.EMPTY)
   array<message>(null) | array(MESSAGE.getDefaultInstance())
   
   ### OneOf field
   In serialization process, there's no guarantee that the flink row fields of 
one-of group only contains at least one non-null value.
   So in serialization, we set each field in the order of flink schema, so the 
field in high position will override then field of low position in the same 
one-of group.
   
   ### Enum type
   Enum value of pb will be converted to String and vice versa in the name of 
enum value definition in pb. 
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to