ruanhang1993 edited a comment on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-786494390


   Hi, @maosuhan thanks a lot for your work. I have used your code in flink 
1.11.2. There seems to be some problems for me:
   
   1. The protobuf-java jar in flink-protobuf module conflicts with flink-dist 
module and flink-sql-connector-hive module.
   2. NullPointerException for field `protoToRowConverter` when recovering from 
some checkpoint
   
   For problem 1, I set the scope of the protobuf-java in flink-protobuf module 
to `provided`, and use the version setting by `protoc.version` in flink-parent. 
Besides, I relocated the protobuf-java in fink-sql-connector-hive.
   ```xml
   <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>${protoc.version}</version>
       <scope>provided</scope>
   </dependency>
   ```
   
   For problem 2, it seems that the `open` method in 
`PbRowDeserializationSchema` is not called when recovering from some 
checkpoint(submit job by cli `flink run -s <dir>`), which causes the field 
`protoToRowConverter` to be null. I fixed it by the following code in 
`PbRowDeserializationSchema.java`.
   ```java
   @Override
   public RowData deserialize(byte[] message) throws IOException {
       try {
           if(protoToRowConverter == null) {
               LOG.info(String.format("[protobuf new]: %s %s %s", 
messageClassName, rowType.toString(), "" + readDefaultValues));
               protoToRowConverter = new ProtoToRowConverter(messageClassName, 
rowType, readDefaultValues);
           }
           return protoToRowConverter.convertProtoBinaryToRow(message);
       } catch (Throwable t) {
           if (ignoreParseErrors) {
               return null;
           }
           LOG.error("Failed to deserialize PB object.", t);
           throw new IOException("Failed to deserialize PB object.", t);
       }
   }
   ```
   
   Will the same problems occur in the new flink version ?
   And do you have any better ideas to fix it ?
   
   Thanks for reading.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to