[
https://issues.apache.org/jira/browse/FLINK-22925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374985#comment-17374985
]
Stephan Ewen edited comment on FLINK-22925 at 7/5/21, 7:59 PM:
---------------------------------------------------------------
[~igal] What would be needed to support the routed ProtoBuf type again (for
Kafka and others)?
Is the blocker the availability of a TypeDescriptor, or the ProtoBuf dependency?
was (Author: stephanewen):
[~igal] What would be needed to support the routed ProtoBuf type again (for
Kafka and others)?
Is the blocker the availability of a type description, or the ProtoBuf
dependency?
> "FieldDescriptor does not match message type" ERROR when use protobuf-router
> -----------------------------------------------------------------------------
>
> Key: FLINK-22925
> URL: https://issues.apache.org/jira/browse/FLINK-22925
> Project: Flink
> Issue Type: Bug
> Components: Stateful Functions
> Affects Versions: statefun-3.0.0
> Environment: Flink 1.12.2
> stateful function 3.0.0
> Reporter: Bill lee
> Priority: Major
> Attachments: image-2021-06-08-21-22-01-748.png
>
>
> When use protobuf-router in I/O module as follow:
> {code:java}
> version: "3.0"
> module:
> meta:
> type: remote
> spec:
> endpoints:
> - endpoint:
> meta:
> kind: http
> spec:
> functions: dga/*
> urlPathTemplate: http://cic02:9999/statefun
> timeouts:
> call: 2min
> ingresses:
> - ingress:
> meta:
> type: statefun.kafka.io/protobuf-ingress
> id: dga/names
> spec:
> address: cic02:9092
> consumerGroupId: my-group-id
> topics:
> - index_events
> messageType: com.my.protobuf.XxMessage
> descriptorSet: classpath:stream.desc
> egresses:
> - egress:
> meta:
> type: io.statefun.kafka/egress
> id: dga/greets
> spec:
> address: cic02:9092
> deliverySemantic:
> type: exactly-once
> transactionTimeoutMillis: 100000
> routers:
> - router:
> meta:
> type: org.apache.flink.statefun.sdk/protobuf-router
> spec:
> ingress: dga/names
> target: "dga/person/{{$.src_ip}}"
> messageType: com.my.protobuf.XxMessage
> descriptorSet: classpath:stream.desc
> {code}
> I got protobuf error : "FieldDescriptor does not match message type"
> !image-2021-06-08-21-22-01-748.png!
> And then I make a test like this:
> {code:java}
> @Test
> public void exampleUsage01() throws IOException {
> Message originalMessage =
> SimpleMessage.newBuilder().setName("bob").build();
> ProtobufDescriptorMap descriptorPath01 =
> ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
> Optional<Descriptors.GenericDescriptor> maybeDescriptor01 =
>
> descriptorPath01.getDescriptorByName("org.apache.flink.test.SimpleMessage");
> Descriptors.Descriptor descriptor = (Descriptors.Descriptor)
> maybeDescriptor01.get();
> DynamicMessage dynamicMessage =
> DynamicMessage.getDefaultInstance(descriptor);
> Parser<? extends Message> parser = dynamicMessage.getParserForType();
> Message message = parser.parseFrom(originalMessage.toByteArray());
> ProtobufDescriptorMap descriptorPath =
> ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
> Optional<Descriptors.GenericDescriptor> maybeDescriptor =
>
> descriptorPath.getDescriptorByName("org.apache.flink.test.SimpleMessage");
> AddressResolver evaluator =
> AddressResolver.fromAddressTemplate((Descriptors.Descriptor)
> maybeDescriptor.get(), "dga/person/{{$.name}}");
> Address targetAddress = evaluator.evaluate(message);
> System.out.println(targetAddress);
> }
> {code}
> also got the same error.
> I think the cause is that , descriptorSet is defied in both ingress and
> router, and generated two different Descriptors for the message.
> Please correct me if I am wrong.
> And any advise for this problem? Thanks a lot.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)