[
https://issues.apache.org/jira/browse/FLINK-35034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yufeng.sun updated FLINK-35034:
-------------------------------
Description:
The following error messages and stack were encountered When i using Flink SQL
with Kafka connector and protobuf format:
{code:java}
2024-03-23 23:23:38,852 ERROR
org.apache.flink.formats.protobuf.util.PbCodegenUtils [] - Protobuf
codegen compile error: package
org.apache.flink.formats.protobuf.deserialize;import
org.apache.flink.table.data.RowData;import
org.apache.flink.table.data.ArrayData;import
org.apache.flink.table.data.binary.BinaryStringData;import
org.apache.flink.table.data.GenericRowData;import
org.apache.flink.table.data.GenericMapData;import
org.apache.flink.table.data.GenericArrayData;import java.util.ArrayList;import
java.util.List;import java.util.Map;import java.util.HashMap;import
com.google.protobuf.ByteString;public class
GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{public static RowData
decode(.UserProtoBuf.User message){RowData rowData=null;.UserProtoBuf.User
message0 = message;GenericRowData rowData0 = new GenericRowData(7);Object
elementDataVar1 = null;elementDataVar1 = message0.getAge();
rowData0.setField(0, elementDataVar1);Object elementDataVar2 =
null;elementDataVar2 = message0.getTimestamp();
rowData0.setField(1, elementDataVar2);Object elementDataVar3 =
null;elementDataVar3 = message0.getEnabled();
rowData0.setField(2, elementDataVar3);Object elementDataVar4 =
null;elementDataVar4 = message0.getHeight();
rowData0.setField(3, elementDataVar4);Object elementDataVar5 =
null;elementDataVar5 = message0.getWeight();
rowData0.setField(4, elementDataVar5);Object elementDataVar6 =
null;elementDataVar6 =
BinaryStringData.fromString(message0.getUserName().toString());
rowData0.setField(5, elementDataVar6);Object elementDataVar7 =
null;elementDataVar7 =
BinaryStringData.fromString(message0.getFullAddress().toString());
rowData0.setField(6, elementDataVar7);rowData = rowData0;
return rowData;}}
2024-03-23 23:23:38,856 WARN org.apache.flink.runtime.taskmanager.Task
[] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0
(c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0)
switched from INITIALIZING to FAILED with failure
cause:org.apache.flink.formats.protobuf.PbCodegenException:
org.apache.flink.api.common.InvalidProgramException: Program cannot be
compiled. This is a bug. Please file an issue. at
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94)
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47)
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144)
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135)
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by:
org.apache.flink.api.common.InvalidProgramException: Program cannot be
compiled. This is a bug. Please file an issue. at
org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 moreCaused by:
org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER
expected instead of '.' at
org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.read(Parser.java:3313)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseType(Parser.java:2326)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseClassBody(Parser.java:736)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 more {code}
proto file:
{code:java}
syntax = "proto3";option java_outer_classname = "UserProtoBuf";message User {
int32 age = 1;
int64 timestamp = 2;
bool enabled = 3;
float height = 4;
double weight = 5;
string userName = 6;
string Full_Address = 7;
} {code}
Flink SQL:
{code:java}
CREATE TEMPORARY TABLE test (
...
) WITH (
'connector' = 'kafka',
'topic' = '',
'properties.bootstrap.servers' = '',
'properties.group.id' = '',
'scan.startup.mode' = 'earliest-offset',
'format' = 'protobuf',
'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
'protobuf.ignore-parse-errors' = 'true'
)
; {code}
according to the error message, the type of the parameter `message` which is
used in method `decode` was lost package info.
{code:java}
public static RowData decode(.UserProtoBuf.User message){} {code}
After analyzing the following method calls, i found that the above exception
will occur when neither `package` nor `option java_package` is specified in the
proto file, at this time, the variable `javaPackageName` in method
`getOuterProtoPrefix` will be an empty string.
!https://intranetproxy.alipay.com/skylark/lark/0/2024/png/59256556/1712473173927-c277b275-08cc-4bb3-8322-f0c8937700b3.png|width=657,height=349!
{code:java}
org.apache.flink.formats.protobuf.util.PbCodegenUtils#compileClass
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter#ProtoToRowConverter
- Class generatedClass =
PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(),
generatedPackageName + "." + generatedClassName, codegenAppender.code());
- codegenAppender.appendSegment("public static RowData decode(" +
fullMessageClassName + " message){");
- String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
org.apache.flink.formats.protobuf.util.PbFormatUtils#getFullJavaName(com.google.protobuf.Descriptors.Descriptor)
org.apache.flink.formats.protobuf.util.PbFormatUtils#getOuterProtoPrefix {code}
was:
The following error messages and stack were encountered When i using Flink SQL
with Kafka connector and protobuf format:
{code:java}
2024-03-23 23:23:38,852 ERROR
org.apache.flink.formats.protobuf.util.PbCodegenUtils [] - Protobuf
codegen compile error: package
org.apache.flink.formats.protobuf.deserialize;import
org.apache.flink.table.data.RowData;import
org.apache.flink.table.data.ArrayData;import
org.apache.flink.table.data.binary.BinaryStringData;import
org.apache.flink.table.data.GenericRowData;import
org.apache.flink.table.data.GenericMapData;import
org.apache.flink.table.data.GenericArrayData;import java.util.ArrayList;import
java.util.List;import java.util.Map;import java.util.HashMap;import
com.google.protobuf.ByteString;public class
GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{public static RowData
decode(.UserProtoBuf.User message){RowData rowData=null;.UserProtoBuf.User
message0 = message;GenericRowData rowData0 = new GenericRowData(7);Object
elementDataVar1 = null;elementDataVar1 = message0.getAge();
rowData0.setField(0, elementDataVar1);Object elementDataVar2 =
null;elementDataVar2 = message0.getTimestamp();
rowData0.setField(1, elementDataVar2);Object elementDataVar3 =
null;elementDataVar3 = message0.getEnabled();
rowData0.setField(2, elementDataVar3);Object elementDataVar4 =
null;elementDataVar4 = message0.getHeight();
rowData0.setField(3, elementDataVar4);Object elementDataVar5 =
null;elementDataVar5 = message0.getWeight();
rowData0.setField(4, elementDataVar5);Object elementDataVar6 =
null;elementDataVar6 =
BinaryStringData.fromString(message0.getUserName().toString());
rowData0.setField(5, elementDataVar6);Object elementDataVar7 =
null;elementDataVar7 =
BinaryStringData.fromString(message0.getFullAddress().toString());
rowData0.setField(6, elementDataVar7);rowData = rowData0;
return rowData;}}
2024-03-23 23:23:38,856 WARN org.apache.flink.runtime.taskmanager.Task
[] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0
(c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0)
switched from INITIALIZING to FAILED with failure
cause:org.apache.flink.formats.protobuf.PbCodegenException:
org.apache.flink.api.common.InvalidProgramException: Program cannot be
compiled. This is a bug. Please file an issue. at
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94)
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47)
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144)
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135)
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by:
org.apache.flink.api.common.InvalidProgramException: Program cannot be
compiled. This is a bug. Please file an issue. at
org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 moreCaused by:
org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER
expected instead of '.' at
org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.read(Parser.java:3313)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseType(Parser.java:2326)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseClassBody(Parser.java:736)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
at
org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116)
~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 more {code}
proto file:
{code:java}
syntax = "proto3";option java_outer_classname = "UserProtoBuf";message User {
int32 age = 1;
int64 timestamp = 2;
bool enabled = 3;
float height = 4;
double weight = 5;
string userName = 6;
string Full_Address = 7;
} {code}
Flink SQL:
{code:java}
CREATE TEMPORARY TABLE test (
...
) WITH (
'connector' = 'kafka',
'topic' = '',
'properties.bootstrap.servers' = '',
'properties.group.id' = '',
'scan.startup.mode' = 'earliest-offset',
'format' = 'protobuf',
'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
'protobuf.ignore-parse-errors' = 'true'
)
; {code}
according to the error message, the type of the parameter `message` which is
used in method `decode` was lost package info.
{code:java}
public static RowData decode(.UserProtoBuf.User message){} {code}
After analyzing the following method calls, i found that the above exception
will occur when neither `package` nor `option java_package` is specified in the
proto file, at this time, the variable `javaPackageName` in method
`getOuterProtoPrefix` will be an empty string.
!https://intranetproxy.alipay.com/skylark/lark/0/2024/png/59256556/1712473173927-c277b275-08cc-4bb3-8322-f0c8937700b3.png!
{code:java}
org.apache.flink.formats.protobuf.util.PbCodegenUtils#compileClass
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter#ProtoToRowConverter
- Class generatedClass =
PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(),
generatedPackageName + "." + generatedClassName, codegenAppender.code());
- codegenAppender.appendSegment("public static RowData decode(" +
fullMessageClassName + " message){");
- String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
org.apache.flink.formats.protobuf.util.PbFormatUtils#getFullJavaName(com.google.protobuf.Descriptors.Descriptor)
org.apache.flink.formats.protobuf.util.PbFormatUtils#getOuterProtoPrefix {code}
> codegen compile error raised when use kafka connector and protobuf format
> -------------------------------------------------------------------------
>
> Key: FLINK-35034
> URL: https://issues.apache.org/jira/browse/FLINK-35034
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC,
> SequenceFile)
> Affects Versions: 1.17.2
> Reporter: yufeng.sun
> Priority: Critical
>
> The following error messages and stack were encountered When i using Flink
> SQL with Kafka connector and protobuf format:
> {code:java}
> 2024-03-23 23:23:38,852 ERROR
> org.apache.flink.formats.protobuf.util.PbCodegenUtils [] - Protobuf
> codegen compile error: package
> org.apache.flink.formats.protobuf.deserialize;import
> org.apache.flink.table.data.RowData;import
> org.apache.flink.table.data.ArrayData;import
> org.apache.flink.table.data.binary.BinaryStringData;import
> org.apache.flink.table.data.GenericRowData;import
> org.apache.flink.table.data.GenericMapData;import
> org.apache.flink.table.data.GenericArrayData;import
> java.util.ArrayList;import java.util.List;import java.util.Map;import
> java.util.HashMap;import com.google.protobuf.ByteString;public class
> GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{public static RowData
> decode(.UserProtoBuf.User message){RowData rowData=null;.UserProtoBuf.User
> message0 = message;GenericRowData rowData0 = new GenericRowData(7);Object
> elementDataVar1 = null;elementDataVar1 = message0.getAge();
> rowData0.setField(0, elementDataVar1);Object elementDataVar2 =
> null;elementDataVar2 = message0.getTimestamp();
> rowData0.setField(1, elementDataVar2);Object elementDataVar3 =
> null;elementDataVar3 = message0.getEnabled();
> rowData0.setField(2, elementDataVar3);Object elementDataVar4 =
> null;elementDataVar4 = message0.getHeight();
> rowData0.setField(3, elementDataVar4);Object elementDataVar5 =
> null;elementDataVar5 = message0.getWeight();
> rowData0.setField(4, elementDataVar5);Object elementDataVar6 =
> null;elementDataVar6 =
> BinaryStringData.fromString(message0.getUserName().toString());
> rowData0.setField(5, elementDataVar6);Object elementDataVar7 =
> null;elementDataVar7 =
> BinaryStringData.fromString(message0.getFullAddress().toString());
> rowData0.setField(6, elementDataVar7);rowData = rowData0;
> return rowData;}}
> 2024-03-23 23:23:38,856 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0
> (c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0)
> switched from INITIALIZING to FAILED with failure
> cause:org.apache.flink.formats.protobuf.PbCodegenException:
> org.apache.flink.api.common.InvalidProgramException: Program cannot be
> compiled. This is a bug. Please file an issue. at
> org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124)
> ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
> org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64)
> ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94)
>
> ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47)
>
> ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at
> org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144)
>
> ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at
> org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135)
>
> ~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at
> org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318)
> ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
> ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778)
> ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745)
> ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
> ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928)
> [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
> [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
> [flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by:
> org.apache.flink.api.common.InvalidProgramException: Program cannot be
> compiled. This is a bug. Please file an issue. at
> org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262)
> ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
> org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116)
> ~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 moreCaused by:
> org.codehaus.commons.compiler.CompileException: Line 14, Column 30:
> IDENTIFIER expected instead of '.' at
> org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.Parser.read(Parser.java:3313)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.Parser.parseType(Parser.java:2326)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at
> org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at
> org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.Parser.parseClassBody(Parser.java:736)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at
> org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]
> at
> org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259)
> ~[protobufTest-1.0-SNAPSHOT-1.jar:?] at
> org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116)
> ~[protobufTest-1.0-SNAPSHOT-1.jar:?] ... 14 more {code}
> proto file:
> {code:java}
> syntax = "proto3";option java_outer_classname = "UserProtoBuf";message User {
> int32 age = 1;
> int64 timestamp = 2;
> bool enabled = 3;
> float height = 4;
> double weight = 5;
> string userName = 6;
> string Full_Address = 7;
> } {code}
> Flink SQL:
> {code:java}
> CREATE TEMPORARY TABLE test (
> ...
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = '',
> 'properties.bootstrap.servers' = '',
> 'properties.group.id' = '',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'protobuf',
> 'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
> 'protobuf.ignore-parse-errors' = 'true'
> )
> ; {code}
> according to the error message, the type of the parameter `message` which is
> used in method `decode` was lost package info.
> {code:java}
> public static RowData decode(.UserProtoBuf.User message){} {code}
> After analyzing the following method calls, i found that the above exception
> will occur when neither `package` nor `option java_package` is specified in
> the proto file, at this time, the variable `javaPackageName` in method
> `getOuterProtoPrefix` will be an empty string.
> !https://intranetproxy.alipay.com/skylark/lark/0/2024/png/59256556/1712473173927-c277b275-08cc-4bb3-8322-f0c8937700b3.png|width=657,height=349!
> {code:java}
> org.apache.flink.formats.protobuf.util.PbCodegenUtils#compileClass
> org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter#ProtoToRowConverter
> - Class generatedClass =
> PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(),
> generatedPackageName + "." + generatedClassName, codegenAppender.code());
> - codegenAppender.appendSegment("public static RowData decode(" +
> fullMessageClassName + " message){");
> - String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
> org.apache.flink.formats.protobuf.util.PbFormatUtils#getFullJavaName(com.google.protobuf.Descriptors.Descriptor)
> org.apache.flink.formats.protobuf.util.PbFormatUtils#getOuterProtoPrefix
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)