yufeng.sun created FLINK-35034:
----------------------------------

             Summary: codegen compile error raised when use kafka connector and 
protobuf format
                 Key: FLINK-35034
                 URL: https://issues.apache.org/jira/browse/FLINK-35034
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
    Affects Versions: 1.17.2
            Reporter: yufeng.sun


The following error messages and stack were encountered When i using Flink SQL 
with Kafka connector and protobuf format:
{code:java}
2024-03-23 23:23:38,852 ERROR 
org.apache.flink.formats.protobuf.util.PbCodegenUtils        [] - Protobuf 
codegen compile error: package 
org.apache.flink.formats.protobuf.deserialize;import 
org.apache.flink.table.data.RowData;import 
org.apache.flink.table.data.ArrayData;import 
org.apache.flink.table.data.binary.BinaryStringData;import 
org.apache.flink.table.data.GenericRowData;import 
org.apache.flink.table.data.GenericMapData;import 
org.apache.flink.table.data.GenericArrayData;import java.util.ArrayList;import 
java.util.List;import java.util.Map;import java.util.HashMap;import 
com.google.protobuf.ByteString;public class 
GeneratedProtoToRow_916e09b8a900477390c1f944e4a36da6{public static RowData 
decode(.UserProtoBuf.User message){RowData rowData=null;.UserProtoBuf.User 
message0 = message;GenericRowData rowData0 = new GenericRowData(7);Object 
elementDataVar1 = null;elementDataVar1 = message0.getAge();
rowData0.setField(0, elementDataVar1);Object elementDataVar2 = 
null;elementDataVar2 = message0.getTimestamp();
rowData0.setField(1, elementDataVar2);Object elementDataVar3 = 
null;elementDataVar3 = message0.getEnabled();
rowData0.setField(2, elementDataVar3);Object elementDataVar4 = 
null;elementDataVar4 = message0.getHeight();
rowData0.setField(3, elementDataVar4);Object elementDataVar5 = 
null;elementDataVar5 = message0.getWeight();
rowData0.setField(4, elementDataVar5);Object elementDataVar6 = 
null;elementDataVar6 = 
BinaryStringData.fromString(message0.getUserName().toString());
rowData0.setField(5, elementDataVar6);Object elementDataVar7 = 
null;elementDataVar7 = 
BinaryStringData.fromString(message0.getFullAddress().toString());
rowData0.setField(6, elementDataVar7);rowData = rowData0;
return rowData;}}
2024-03-23 23:23:38,856 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: simple_test[2153] -> Sink: print_sink[2154] (1/1)#0 
(c4aaed5ad4c63a8ba82a47979ffce386_717c7b8afebbfb7137f6f0f99beb2a94_0_0) 
switched from INITIALIZING to FAILED with failure 
cause:org.apache.flink.formats.protobuf.PbCodegenException: 
org.apache.flink.api.common.InvalidProgramException: Program cannot be 
compiled. This is a bug. Please file an issue. at 
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124)
 ~[protobufTest-1.0-SNAPSHOT-1.jar:?]  at 
org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64)
 ~[protobufTest-1.0-SNAPSHOT-1.jar:?]       at 
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.open(DynamicKafkaDeserializationSchema.java:94)
 
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
   at 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.open(KafkaDeserializationSchemaWrapper.java:47)
 
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
 at 
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:144)
 
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
        at 
org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:135)
 
~[ververica-connector-kafka-1.17-vvr-8.0.5-SNAPSHOT-jar-with-dependencies.jar:1.17-vvr-8.0.5-SNAPSHOT]
        at 
org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:318)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]        at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:778)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:745)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
 ~[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]       at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:928) 
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) 
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]      at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) 
[flink-dist-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]        at 
java.lang.Thread.run(Thread.java:879) [?:1.8.0_372]Caused by: 
org.apache.flink.api.common.InvalidProgramException: Program cannot be 
compiled. This is a bug. Please file an issue.  at 
org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:262)
 ~[protobufTest-1.0-SNAPSHOT-1.jar:?]     at 
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116)
 ~[protobufTest-1.0-SNAPSHOT-1.jar:?]  ... 14 moreCaused by: 
org.codehaus.commons.compiler.CompileException: Line 14, Column 30: IDENTIFIER 
expected instead of '.'    at 
org.codehaus.janino.TokenStreamImpl.read(TokenStreamImpl.java:195) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]      
  at org.codehaus.janino.Parser.read(Parser.java:3313) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at 
org.codehaus.janino.Parser.parseQualifiedIdentifier(Parser.java:326) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]      
at org.codehaus.janino.Parser.parseReferenceType(Parser.java:2342) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.codehaus.janino.Parser.parseType(Parser.java:2326) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]    
at org.codehaus.janino.Parser.parseFormalParameter(Parser.java:1519) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at 
org.codehaus.janino.Parser.parseFormalParameters(Parser.java:1488) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]      
  at org.codehaus.janino.Parser.parseMethodDeclarationRest(Parser.java:1392) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]   at 
org.codehaus.janino.Parser.parseClassBodyDeclaration(Parser.java:938) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]     
at org.codehaus.janino.Parser.parseClassBody(Parser.java:736) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]      
  at org.codehaus.janino.Parser.parseClassDeclarationRest(Parser.java:642) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]     
at 
org.codehaus.janino.Parser.parsePackageMemberTypeDeclarationRest(Parser.java:370)
 ~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT] at 
org.codehaus.janino.Parser.parseCompilationUnit(Parser.java:241) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]  at 
org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]  at 
org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]  at 
org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]     
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) 
~[flink-table-runtime-1.17-vvr-8.0.5-SNAPSHOT.jar:1.17-vvr-8.0.5-SNAPSHOT]     
at 
org.apache.flink.formats.protobuf.util.PbCodegenUtils.compileClass(PbCodegenUtils.java:259)
 ~[protobufTest-1.0-SNAPSHOT-1.jar:?]     at 
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:116)
 ~[protobufTest-1.0-SNAPSHOT-1.jar:?]  ... 14 more {code}
proto file:
{code:java}
syntax = "proto3";option java_outer_classname = "UserProtoBuf";message User {
  int32 age = 1;
  int64 timestamp = 2;
  bool enabled = 3;
  float height = 4;
  double weight = 5;
  string userName = 6;
  string Full_Address = 7;
} {code}
Flink SQL:
{code:java}
CREATE TEMPORARY TABLE test (
  ...
) WITH (
  'connector' = 'kafka',
  'topic' = '',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = '',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'org.example.UserProtoBuf$User',
  'protobuf.ignore-parse-errors' = 'true'
)
; {code}
according to the error message, the type of the parameter `message` which is 
used in method `decode` was lost package info.
{code:java}
public static RowData decode(.UserProtoBuf.User message){} {code}
After analyzing the following method calls, i found that the above exception 
will occur when neither `package` nor `option java_package` is specified in the 
proto file, at this time, the variable `javaPackageName` in method 
`getOuterProtoPrefix` will be an empty string.

!https://intranetproxy.alipay.com/skylark/lark/0/2024/png/59256556/1712473173927-c277b275-08cc-4bb3-8322-f0c8937700b3.png!
{code:java}
org.apache.flink.formats.protobuf.util.PbCodegenUtils#compileClass
org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter#ProtoToRowConverter
 - Class generatedClass = 
PbCodegenUtils.compileClass(Thread.currentThread().getContextClassLoader(), 
generatedPackageName + "." + generatedClassName, codegenAppender.code());
 - codegenAppender.appendSegment("public static RowData decode(" + 
fullMessageClassName + " message){");
 - String fullMessageClassName = PbFormatUtils.getFullJavaName(descriptor);
org.apache.flink.formats.protobuf.util.PbFormatUtils#getFullJavaName(com.google.protobuf.Descriptors.Descriptor)
org.apache.flink.formats.protobuf.util.PbFormatUtils#getOuterProtoPrefix {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to