Bill lee created FLINK-22925:
--------------------------------
Summary: "FieldDescriptor does not match message type" ERROR when
use protobuf-router
Key: FLINK-22925
URL: https://issues.apache.org/jira/browse/FLINK-22925
Project: Flink
Issue Type: Bug
Components: Stateful Functions
Affects Versions: statefun-3.0.0
Environment: Flink 1.12.2
stateful function 3.0.0
Reporter: Bill lee
Attachments: image-2021-06-08-21-22-01-748.png
When use protobuf-router in I/O module as follow:
{code:java}
version: "3.0"
module:
meta:
type: remote
spec:
endpoints:
- endpoint:
meta:
kind: http
spec:
functions: dga/*
urlPathTemplate: http://cic02:9999/statefun
timeouts:
call: 2min
ingresses:
- ingress:
meta:
type: statefun.kafka.io/protobuf-ingress
id: dga/names
spec:
address: cic02:9092
consumerGroupId: my-group-id
topics:
- index_events
messageType: com.my.protobuf.XxMessage
descriptorSet: classpath:stream.desc
egresses:
- egress:
meta:
type: io.statefun.kafka/egress
id: dga/greets
spec:
address: cic02:9092
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 100000
routers:
- router:
meta:
type: org.apache.flink.statefun.sdk/protobuf-router
spec:
ingress: dga/names
target: "dga/person/{{$.src_ip}}"
messageType: com.my.protobuf.XxMessage
descriptorSet: classpath:stream.desc
{code}
I got protobuf error : "FieldDescriptor does not match message type"
!image-2021-06-08-21-22-01-748.png!
And then I make a test like this:
{code:java}
@Test
public void exampleUsage01() throws IOException {
Message originalMessage = SimpleMessage.newBuilder().setName("bob").build();
ProtobufDescriptorMap descriptorPath01 =
ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
Optional<Descriptors.GenericDescriptor> maybeDescriptor01 =
descriptorPath01.getDescriptorByName("org.apache.flink.test.SimpleMessage");
Descriptors.Descriptor descriptor = (Descriptors.Descriptor)
maybeDescriptor01.get();
DynamicMessage dynamicMessage =
DynamicMessage.getDefaultInstance(descriptor);
Parser<? extends Message> parser = dynamicMessage.getParserForType();
Message message = parser.parseFrom(originalMessage.toByteArray());
ProtobufDescriptorMap descriptorPath =
ProtobufDescriptorMap.from(FILE_DESCRIPTOR_SET);
Optional<Descriptors.GenericDescriptor> maybeDescriptor =
descriptorPath.getDescriptorByName("org.apache.flink.test.SimpleMessage");
AddressResolver evaluator =
AddressResolver.fromAddressTemplate((Descriptors.Descriptor)
maybeDescriptor.get(), "dga/person/{{$.name}}");
Address targetAddress = evaluator.evaluate(message);
System.out.println(targetAddress);
}
{code}
also got the same error.
I think the cause is that , descriptorSet is defied in both ingress and router,
and generated two different Descriptors for the message.
Please correct me if I am wrong.
And any advise for this problem? Thanks a lot.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)