[
https://issues.apache.org/jira/browse/FLINK-11333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354957#comment-17354957
]
Roman Grebennikov commented on FLINK-11333:
-------------------------------------------
For anyone interested in this issue, I've implemented a first-class Protobuf
support for Apache Flink as a separate library:
[https://github.com/findify/flink-protobuf]
It works both with protobuf-java and ScalaPB generated classes.
As a usage example, given that you have a following message format:
{code:java}
message Foo {
required int32 value = 1;
}{code}
You can build a {{TypeInformation}} for scalapb-generated classes like this:
{code:java}
import io.findify.flinkpb.FlinkProtobuf
implicit val ti = FlinkProtobuf.generateScala(Foo)
val result = env.fromCollection(List(Foo(1), Foo(2), Foo(3))) {code}
For Java it's going to look a bit different:
{code:java}
import io.findify.flinkprotobuf.java.Tests;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
TypeInformation<Tests.Foo> ti = FlinkProtobuf.generateJava(Tests.Foo.class,
Tests.Foo.getDefaultInstance());
env.fromCollection(List.of(Tests.Foo.newBuilder().setValue(1).build()),
ti).executeAndCollect(100); {code}
The main motivation for this library was our own use case when:
# We have most of the Protobuf messages being encoded as a oneof, so in Scala
they become parts of a sealed trait hierarchy. And Flink falls back to Kryo on
such cases, resulting in a severe performance degradation
# Kryo-encoded messages may take like 10x more bytes for abstract classes, so
it became too easy to saturate a 1gbit link between nodes.
# There is no support for schema evolution for scala case classes.
With this one we can just rely on protobuf to handle all the problematic cases.
> First-class support for Protobuf types with evolvable schema
> ------------------------------------------------------------
>
> Key: FLINK-11333
> URL: https://issues.apache.org/jira/browse/FLINK-11333
> Project: Flink
> Issue Type: Sub-task
> Components: API / Type Serialization System
> Reporter: Tzu-Li (Gordon) Tai
> Priority: Major
> Labels: auto-unassigned, pull-request-available, usability
> Time Spent: 10m
> Remaining Estimate: 0h
>
> I think we have more and more users who are thinking about using Protobuf for
> their state types.
> Right now, Protobuf isn't supported directly in Flink. The only way to use
> Protobuf for a type is to register it via Kryo:
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html.
> Likewise for Avro types, we should be able to natively support Protobuf,
> having a {{ProtobufSerializer}} that handles serialization of Protobuf types.
> The serializer should also write necessary information in its snapshot, to
> enable schema evolution for it in the future. For Protobuf, this should
> almost work out-of-the-box.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)