[ 
https://issues.apache.org/jira/browse/FLINK-22925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374985#comment-17374985
 ] 

Stephan Ewen commented on FLINK-22925:
--------------------------------------

[~igal] What would be needed to support the routed ProtoBuf type again (for 
Kafka and others)?

Is the blocker the availability of a type description, or the ProtoBuf 
dependency?

> "FieldDescriptor does not match message type" ERROR  when use protobuf-router
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-22925
>                 URL: https://issues.apache.org/jira/browse/FLINK-22925
>             Project: Flink
>          Issue Type: Bug
>          Components: Stateful Functions
>    Affects Versions: statefun-3.0.0
>         Environment: Flink 1.12.2
> stateful function 3.0.0
>            Reporter: Bill lee
>            Priority: Major
>         Attachments: image-2021-06-08-21-22-01-748.png
>
>
> When use protobuf-router in I/O module as follow: 
> {code:java}
> version: "3.0"
> module:
>   meta:
>     type: remote
>   spec:
>     endpoints:
>       - endpoint:
>           meta:
>             kind: http
>           spec:
>             functions: dga/*
>             urlPathTemplate: http://cic02:9999/statefun
>             timeouts:
>               call: 2min
>     ingresses:
>       - ingress:
>           meta:
>             type: statefun.kafka.io/protobuf-ingress
>             id: dga/names
>           spec:
>             address: cic02:9092
>             consumerGroupId: my-group-id
>             topics:
>               - index_events
>             messageType: com.my.protobuf.XxMessage
>             descriptorSet: classpath:stream.desc    
>     egresses:
>       - egress:
>           meta:
>             type: io.statefun.kafka/egress
>             id: dga/greets
>           spec:
>             address: cic02:9092
>             deliverySemantic:
>               type: exactly-once
>               transactionTimeoutMillis: 100000    
>     routers:
>       - router:
>           meta:
>             type: org.apache.flink.statefun.sdk/protobuf-router
>           spec:
>             ingress: dga/names
>             target: "dga/person/{{$.src_ip}}"
>             messageType: com.my.protobuf.XxMessage
>             descriptorSet: classpath:stream.desc
> {code}
> I got  protobuf error : "FieldDescriptor does not match message type"
> !image-2021-06-08-21-22-01-748.png!
> And then I make a test like this:
> {code:java}
>   @Test
>   public void exampleUsage01() throws IOException {
>     Message originalMessage = 
> SimpleMessage.newBuilder().setName("bob").build();
>     ProtobufDescriptorMap descriptorPath01 = 
> ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
>     Optional<Descriptors.GenericDescriptor> maybeDescriptor01 =
>             
> descriptorPath01.getDescriptorByName("org.apache.flink.test.SimpleMessage");
>     Descriptors.Descriptor descriptor = (Descriptors.Descriptor) 
> maybeDescriptor01.get();
>     DynamicMessage dynamicMessage = 
> DynamicMessage.getDefaultInstance(descriptor);
>     Parser<? extends Message> parser = dynamicMessage.getParserForType();
>     Message message = parser.parseFrom(originalMessage.toByteArray());
>     ProtobufDescriptorMap descriptorPath = 
> ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);    
> Optional<Descriptors.GenericDescriptor> maybeDescriptor =
>             
> descriptorPath.getDescriptorByName("org.apache.flink.test.SimpleMessage");
>     AddressResolver evaluator = 
> AddressResolver.fromAddressTemplate((Descriptors.Descriptor) 
> maybeDescriptor.get(), "dga/person/{{$.name}}");
>     Address targetAddress = evaluator.evaluate(message);
>     System.out.println(targetAddress);
>   }
> {code}
> also got the same error.
> I think the cause is that , descriptorSet is defied in both ingress and 
> router, and generated two different Descriptors for the message.
> Please correct me if I am wrong.  
> And any advise for this problem?  Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to