ruanhang1993 commented on pull request #14376:
URL: https://github.com/apache/flink/pull/14376#issuecomment-786451333
HI, @maosuhan thanks a lot for your work. I have used the code based on
flink 1.11.2. There are some problem for me when using the protobuf format:
1. protobuf-java jar in flink-protobuf conflicts with which in flink-dist
and flink-sql-connector-hive
2. There are some problems using protobuf format when recovering from some
checkpoint
```
Caused by: java.io.IOException: Failed to deserialize PB object.
at
org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:97)
~[xxx.jar:?]
at
org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:39)
~[xxx.jar:?]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[xxx.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[xxx.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[xxx.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[xxx.jar:?]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
Caused by: java.lang.NullPointerException
at
org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:91)
~[xxx.jar:?]
at
org.apache.flink.formats.protobuf.deserialize.PbRowDeserializationSchema.deserialize(PbRowDeserializationSchema.java:39)
~[xxx.jar:?]
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[xxx.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[xxx.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[xxx.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[xxx.jar:?]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
~[flink-dist_2.11-1.11.2.jar:1.11.2]
```
For problem 1,I resolve it by using the protobuf-java jar in flink-dist,and
set the scope of protobuf-java in flink-protobuf to provided. Besides, I shaded
the protobuf-java in flink-sql-connector-hive.
```
// flink-protobuf : pom.xml
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protoc.version}</version>
<scope>provided</scope>
</dependency>
// flink-sql-connector-hive-1.2.2 :pom.xml :shade-plugin
<relocations>
<relocation>
<pattern>com.google.protobuf</pattern>
<shadedPattern>org.apache.flink.connector.sql.hive.com.google.protobuf</shadedPattern>
</relocation>
</relocations>
```
For problem 2 (when submit job by `flink run -s <checkpoint>`), I simply
used the following code for `deserialize` method in
`PbRowDeserializationSchema` class to fix this issue.
```java
@Override
public RowData deserialize(byte[] message) throws IOException {
try {
if(protoToRowConverter == null) {
protoToRowConverter = new ProtoToRowConverter(messageClassName,
rowType, readDefaultValues);
}
return protoToRowConverter.convertProtoBinaryToRow(message);
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
}
LOG.error("Failed to deserialize PB object.", t);
throw new IOException("Failed to deserialize PB object.", t);
}
}
```
I don't know if the question is that the way i use or the different flink
version 1.11.2, hope the problems for me are helpful for this PR.
Do you have any better advice for me to fix it ?
Thanks for Reading.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]