[ 
https://issues.apache.org/jira/browse/FLINK-30093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17654822#comment-17654822
 ] 

Tomoyuki NAKAMURA edited comment on FLINK-30093 at 1/5/23 10:18 AM:
--------------------------------------------------------------------

I have investigated the Flink 1.16 implementation in detail.
The current implementation does not take into consideration the case where 
another package is imported and used for a message, so it seems that the way 
getOuterProtoPrefix is determined needs to be improved.

I made a fix in the following branch to try it out.
https://github.com/laughingman7743/flink/pull/3
https://github.com/laughingman7743/flink/pull/3/commits/f8ad68c401279b5911687473940918465d797692

It also takes into consideration cases where the multiple_files and 
java_outer_classname options are not specified and the suffix OuterClass is 
added when the file name and message name are the same.
This should probably cover most cases of protobuf definitions.

If this change is acceptable I will create a pull request in the Flink main 
repository.


was (Author: laughingman7743):
I have investigated the Flink 1.16 implementation in detail.
The current implementation does not take into consideration the case where 
another package is imported and used for a message, so it seems that the way 
getOuterProtoPrefix is determined needs to be improved.

I made a fix in the following branch to try it out.
https://github.com/laughingman7743/flink/pull/3
https://github.com/laughingman7743/flink/pull/3/commits/f8ad68c401279b5911687473940918465d797692

It also takes into consideration cases where the multiple_files and 
java_outer_classname options are not specified and the suffix OuterClass is 
added when the file name and message name are the same.
This should probably cover most cases of protobuf definitions.

> [Flink SQL][Protobuf] CompileException when querying Kafka topic using 
> google.protobuf.Timestamp 
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30093
>                 URL: https://issues.apache.org/jira/browse/FLINK-30093
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>    Affects Versions: 1.16.0
>         Environment: Mac OS Ventura
>            Reporter: James Mcguire
>            Assignee: hubert dulay
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: taskmanager_172.22.0 (1).4_46291-40eec2_log
>
>
> I am encountering an issue when trying to use Flink SQL to query a Kafka 
> topic that uses {{{}google.protobuf.Timestamp{}}}.
>  
> When attempting to use Flink SQL to query a protobuf serialized Kafka topic 
> that uses  {{{}google.protobuf.Timestamp{}}}, a 
> {{org.codehaus.commons.compiler.CompileException: Line 23, Column 5: Cannot 
> determine simple type name "com" }}error occurs when trying to query the 
> table.
>  
> *Replication steps:*
> 1. Use a protobuf definition that contains a 
> {{{}google.protobuf.Timestamp{}}}:
> {noformat}
> syntax = "proto3";
> package example.message;
> import "google/protobuf/timestamp.proto";
> option java_package = "com.example.message";
> option java_multiple_files = true;
> message Test {
>   int64 id = 1;
>   google.protobuf.Timestamp created_at = 5;
> }{noformat}
> 2. Use protobuf definition to produce message to topic
> 3. Confirm message is deserializable by protoc:
> {code:java}
> kcat -C -t development.example.message -b localhost:9092 -o -1 -e -q -D "" | 
> protoc --decode=example.message.Test 
> --proto_path=/Users/jamesmcguire/repos/flink-proto-example/schemas/ 
> example/message/test.proto 
> id: 123
> created_at {
>   seconds: 456
>   nanos: 789
> }{code}
> 4. Create table in Flink SQL using kafka connector and protobuf format
> {code:java}
> CREATE TABLE tests (
>   id BIGINT,
>   created_at row<seconds BIGINT, nanos INT>
> )
> COMMENT ''
> WITH (
>   'connector' = 'kafka',
>   'format' = 'protobuf',
>   'protobuf.message-class-name' = 'com.example.message.Test',
>   'properties.auto.offset.reset' = 'earliest',
>   'properties.bootstrap.servers' = 'host.docker.internal:9092',
>   'properties.group.id' = 'test-1',
>   'topic' = 'development.example.message'
> );{code}
> 5. Run query in Flink SQL and encounter error:
> {code:java}
> Flink SQL> select * from tests;
> [ERROR] Could not execute SQL statement. Reason:
> org.codehaus.commons.compiler.CompileException: Line 23, Column 5: Cannot 
> determine simple type name "com" {code}
> {*}NOTE{*}: If you repeat steps 4-5 without {{created_at row<seconds BIGINT, 
> nanos INT>}} in the table, step 5 will complete successfully.
> 6. Observe in attached log file, Flink appears to be using the incorrect 
> namespace (should be {{google.protobuf.Timestamp):}}
> {code:java}
> com.example.message.Timestamp message3 = message0.getCreatedAt(); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to