shuttie commented on pull request #7598: URL: https://github.com/apache/flink/pull/7598#issuecomment-851970419
For anyone interested in this issue, I've implemented a first-class Protobuf support for Apache Flink as a separate library: https://github.com/findify/flink-protobuf It works both with protobuf-java and ScalaPB generated classes. As a usage example, given that you have a following message format: ```proto message Foo { required int32 value = 1; } ``` You can build a TypeInformation for scalapb-generated classes like this: ```scala import io.findify.flinkpb.FlinkProtobuf implicit val ti = FlinkProtobuf.generateScala(Foo) val result = env.fromCollection(List(Foo(1), Foo(2), Foo(3))) ``` For Java it's going to look a bit different: ```java import io.findify.flinkprotobuf.java.Tests; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; TypeInformation<Tests.Foo> ti = FlinkProtobuf.generateJava(Tests.Foo.class, Tests.Foo.getDefaultInstance()); env.fromCollection(List.of(Tests.Foo.newBuilder().setValue(1).build()), ti).executeAndCollect(100); ``` The main motivation for this library was our own use case when: * We have most of the Protobuf messages being encoded as a oneof, so in Scala they become parts of a sealed trait hierarchy. And Flink falls back to Kryo on such cases, resulting in a severe performance degradation * Kryo-encoded messages may take like 10x more bytes for abstract classes, so it became too easy to saturate a 1gbit link between nodes. * There is no support for schema evolution for scala case classes. With this one we can just rely on protobuf to handle all the problematic cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
