[ 
https://issues.apache.org/jira/browse/FLINK-11333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354957#comment-17354957
 ] 

Roman Grebennikov commented on FLINK-11333:
-------------------------------------------

For anyone interested in this issue, I've implemented a first-class Protobuf 
support for Apache Flink as a separate library: 
[https://github.com/findify/flink-protobuf]

It works both with protobuf-java and ScalaPB generated classes.

 

As a usage example, given that you have a following message format:
{code:java}
message Foo { 
  required int32 value = 1; 
}{code}
You can build a {{TypeInformation}} for scalapb-generated classes like this:
 
{code:java}
import io.findify.flinkpb.FlinkProtobuf 

implicit val ti = FlinkProtobuf.generateScala(Foo) 
val result = env.fromCollection(List(Foo(1), Foo(2), Foo(3))) {code}
For Java it's going to look a bit different:
{code:java}
import io.findify.flinkprotobuf.java.Tests; 
import org.apache.flink.api.common.typeinfo.TypeInformation; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 

TypeInformation<Tests.Foo> ti = FlinkProtobuf.generateJava(Tests.Foo.class, 
Tests.Foo.getDefaultInstance()); 
env.fromCollection(List.of(Tests.Foo.newBuilder().setValue(1).build()), 
ti).executeAndCollect(100); {code}
The main motivation for this library was our own use case when:
 # We have most of the Protobuf messages being encoded as a oneof, so in Scala 
they become parts of a sealed trait hierarchy. And Flink falls back to Kryo on 
such cases, resulting in a severe performance degradation
 # Kryo-encoded messages may take like 10x more bytes for abstract classes, so 
it became too easy to saturate a 1gbit link between nodes.
 # There is no support for schema evolution for scala case classes.

With this one we can just rely on protobuf to handle all the problematic cases.

> First-class support for Protobuf types with evolvable schema
> ------------------------------------------------------------
>
>                 Key: FLINK-11333
>                 URL: https://issues.apache.org/jira/browse/FLINK-11333
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / Type Serialization System
>            Reporter: Tzu-Li (Gordon) Tai
>            Priority: Major
>              Labels: auto-unassigned, pull-request-available, usability
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I think we have more and more users who are thinking about using Protobuf for 
> their state types.
> Right now, Protobuf isn't supported directly in Flink. The only way to use 
> Protobuf for a type is to register it via Kryo: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html.
> Likewise for Avro types, we should be able to natively support Protobuf, 
> having a {{ProtobufSerializer}} that handles serialization of Protobuf types. 
> The serializer should also write necessary information in its snapshot, to 
> enable schema evolution for it in the future. For Protobuf, this should 
> almost work out-of-the-box.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to