ruanhang1993 removed a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-786451333


   HI, @maosuhan thanks a lot for your work. I have used the code based on 
flink 1.11.2. There are some problem for me when using the protobuf format:
   
   1. protobuf-java jar in flink-protobuf conflicts with which in flink-dist 
and flink-sql-connector-hive
   2. There are some problems using protobuf format when recovering from some 
checkpoint
   ```
   Caused by: java.io.IOException: Failed to deserialize PB object.
        at 
org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:97)
 ~[xxx.jar:?]
        at 
org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:39)
 ~[xxx.jar:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
 ~[xxx.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
 ~[xxx.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
 ~[xxx.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
 ~[xxx.jar:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
   Caused by: java.lang.NullPointerException
        at 
org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:91)
 ~[xxx.jar:?]
        at 
org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:39)
 ~[xxx.jar:?]
        at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
 ~[xxx.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
 ~[xxx.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
 ~[xxx.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
 ~[xxx.jar:?]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) 
~[flink-dist_2.11-1.11.2.jar:1.11.2]
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
 ~[flink-dist_2.11-1.11.2.jar:1.11.2]
   ```
   
   For problem 1,I resolve it by using the protobuf-java jar in flink-dist,and 
set the scope of protobuf-java in flink-protobuf to provided. Besides, I shaded 
the protobuf-java in flink-sql-connector-hive.
   ```
   // flink-protobuf : pom.xml
   <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>${protoc.version}</version>
        <scope>provided</scope>
   </dependency>
   
   // flink-sql-connector-hive-1.2.2 :pom.xml :shade-plugin
   <relocations>
        <relocation>
                <pattern>com.google.protobuf</pattern>
                
<shadedPattern>org.apache.flink.connector.sql.hive.com.google.protobuf</shadedPattern>
        </relocation>
   </relocations>
   ```
   For problem 2 (when submit job by `flink run -s <checkpoint>`), I simply 
used the following code for `deserialize` method in 
`PbRowDeserializationSchema` class to fix this issue.
   ```java
   @Override
   public RowData deserialize(byte[] message) throws IOException {
       try {
           if(protoToRowConverter == null) {
               protoToRowConverter = new ProtoToRowConverter(messageClassName, 
rowType, readDefaultValues);
        }
           return protoToRowConverter.convertProtoBinaryToRow(message);
       } catch (Throwable t) {
               if (ignoreParseErrors) {
                   return null;
               }
               LOG.error("Failed to deserialize PB object.", t);
               throw new IOException("Failed to deserialize PB object.", t);
       }
   }
   ```
   
   I don't know if the question is that the way i use or the different flink 
version 1.11.2, hope the problems for me are helpful for this PR.
   Do you have any better advice for me to fix it ? 
   
   Thanks for Reading.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to