ruanhang1993 removed a comment on pull request #14376: URL: https://github.com/apache/flink/pull/14376#issuecomment-786451333
HI, @maosuhan thanks a lot for your work. I have used the code based on flink 1.11.2. There are some problem for me when using the protobuf format: 1. protobuf-java jar in flink-protobuf conflicts with which in flink-dist and flink-sql-connector-hive 2. There are some problems using protobuf format when recovering from some checkpoint ``` Caused by: java.io.IOException: Failed to deserialize PB object. at org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:97) ~[xxx.jar:?] at org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:39) ~[xxx.jar:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[xxx.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[xxx.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[xxx.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[xxx.jar:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2] Caused by: java.lang.NullPointerException at org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:91) ~[xxx.jar:?] at org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:39) ~[xxx.jar:?] at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[xxx.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[xxx.jar:?] at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[xxx.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[xxx.jar:?] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.11-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.11-1.11.2.jar:1.11.2] ``` For problem 1,I resolve it by using the protobuf-java jar in flink-dist,and set the scope of protobuf-java in flink-protobuf to provided. Besides, I shaded the protobuf-java in flink-sql-connector-hive. ``` // flink-protobuf : pom.xml <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protoc.version}</version> <scope>provided</scope> </dependency> // flink-sql-connector-hive-1.2.2 :pom.xml :shade-plugin <relocations> <relocation> <pattern>com.google.protobuf</pattern> <shadedPattern>org.apache.flink.connector.sql.hive.com.google.protobuf</shadedPattern> </relocation> </relocations> ``` For problem 2 (when submit job by `flink run -s <checkpoint>`), I simply used the following code for `deserialize` method in `PbRowDeserializationSchema` class to fix this issue. ```java @Override public RowData deserialize(byte[] message) throws IOException { try { if(protoToRowConverter == null) { protoToRowConverter = new ProtoToRowConverter(messageClassName, rowType, readDefaultValues); } return protoToRowConverter.convertProtoBinaryToRow(message); } catch (Throwable t) { if (ignoreParseErrors) { return null; } LOG.error("Failed to deserialize PB object.", t); throw new IOException("Failed to deserialize PB object.", t); } } ``` I don't know if the question is that the way i use or the different flink version 1.11.2, hope the problems for me are helpful for this PR. Do you have any better advice for me to fix it ? Thanks for Reading. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org