maosuhan opened a new pull request #14376:
URL: https://github.com/apache/flink/pull/14376
## What is the purpose of the change
Protobuf is a structured data format introduced by google. Compared to json,
protobuf is more efficient of space and computing. Nowadays, many companies use
protobuf instead of json as data format in kafka and streaming processing.
So, we will introduce a new format which can derialize/serialize protobuf
data in a fast speed.
User can use this format in SQL or Table API.
## Verifying this change
`
create table source(
....... column list
)
with(
'connector' = 'kafka',
'format' = 'pb',
'pb.message-class-name' = '<message class name>'
)
`
`
create table sink(
....... column list
)
with(
'connector' = 'kafka',
'format' = 'pb',
'pb.message-class-name' = '<message class name>'
)
`
##Tests
TODO
## Benchmark
Performance test for pb object containing 200+ fields. Below is the consumed
time of processing 10M rows.
Implementation | Deserialize Speed | Serialize Speed
-- | -- | --
json | 110s | 120s
DynamicMessage and Descriptor API | 152s | 206s
Codegen | 42s | 33s
## Does this pull request potentially affect one of the following parts:
- New dependencies: Add protobuf dependency
com.google.protobuf:protobuf-java:3.12.2
- Public API: Add new format in Flink SQL
- The serializers: Add new PbRowDeserializationSchema and
PbRowSerializationSchema
- The runtime per-record code paths (performance sensitive): yes
## Documentation
Connector params:
1. pb.message-class-name: Required option to specify the full name of
protobuf message class. The protobuf class
must be located in the classpath both in client and task side.
1. pb.read-default-values: Optional flag to read as default values instead
of null when some field does not exist in deserialization; default to false. If
proto syntax is proto3, this value will be set true forcibly because proto3's
standard is to use default values.
1. pb.ignore-parse-errors: default is false. Deserialization task will keep
running if pb parse error occurs.
## Limitation
1. In proto definition, package must be equals to java_package.
2. java_multiple_files must be true
## Notice
### default values
As you know, if the syntax is proto2, the generated pb class has bit flags
to indicate whether a field is set or not. We can use pbObject.hasXXX() method
to know whether the field is set or not. In this way, we can handle null
information in flink properly. So if syntax=2,the decoded flink row may contain
null values. We could also expose an option to user to control the behavior to
handle null values.
But if the syntax is proto3, the generated pb class does not have
pbObject.hasXXX() method and does not hold bit flags, so there is no way to
tell if a field is set or not if it is equals to default value. For example, if
pbObje.getDim1() returns 0, there's no way to tell if dim1 is set by 0 or it is
not set anyway. So if syntax=3, the decoded flink row will not contain any null
values.
Also pb does not permit null in key/value of map and array. We need to
generate default value for them.
row value | pb value
-- | --
map<string,string>(<"a", null>) | map<string,string>(("a", ""))
map<string,string>(<null, "a">) | map<string,string>(("", "a"))
map<int, int>(null, 1) | map<int, int>(0, 1)
map<int, int>(1, null) | map<int, int>(1, 0)
map<long, long>(null, 1) | map<long, long>(0, 1)
map<long, long>(1, null) | map<long, long>(1, 0)
map<bool, bool>(null, true) | map<bool, bool>(false, true)
map<bool, bool>(true, null) | map<bool, bool>(true, false)
map<string, float>("key", null) | map<string, float>("key", 0)
map<string, double>("key", null) | map<string, double>("key", 0)
map<string, enum>("key", null) | map<string, enum>("key", first_enum_element)
map<string, binary>("key", null) | map<string, binary>("key",
ByteString.EMPTY)
map<string, MESSAGE>("key", null) | map<string, MESSAGE>("key",
MESSAGE.getDefaultInstance())
array<string>(null) | array("")
array<int>(null) | array(0)
array<long>(null) | array(0)
array<bool>(null) | array(false)
array<float>(null) | array(0)
array<double>(null) | array(0)
array<enum>(null) | array(first_enum_element)
array<binary>(null) | array(ByteString.EMPTY)
array<message>(null) | array(MESSAGE.getDefaultInstance())
### OneOf field
In serialization process, there's no guarantee that the flink row fields of
one-of group only contains at least one non-null value.
So in serialization, we set each field in the order of flink schema, so the
field in high position will override then field of low position in the same
one-of group.
### Enum type
Enum value of pb will be converted to String and vice versa in the name of
enum value definition in pb.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]