gaoran10 edited a comment on issue #9851:
URL: https://github.com/apache/pulsar/issues/9851#issuecomment-797149723
> I have added `forwardSourceMessageProperty: true` to the bottom of
functions_worker.yml with pulsar running as a standalone instance for test
purposes and there was no change. Properties are still not being sent.
>
> On my main cluster where I initially observed this problem, the function
workers are running as threads on the brokers.
I think SourceConnector couldn't set the config forwardSourceMessageProperty
through the functions_worker.yml, this config is only used for the Function.
Please check these codes in the method `SourceConfigUtils.convert`, this is
the sink spec generation code for the SourceConnector.
The default value of the param
`sinkSpecBuilder.forwardSourceMessageProperty` should be `false`.
```
// set up sink spec.
// Sink spec classname should be empty so that the default pulsar sink will
be used
Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
if
(!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getSchemaType())) {
sinkSpecBuilder.setSchemaType(sourceConfig.getSchemaType());
}
if
(!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getSerdeClassName()))
{
sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
}
sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
if (sourceDetails.getTypeArg() != null) {
sinkSpecBuilder.setTypeClassName(sourceDetails.getTypeArg());
}
if (sourceConfig.getProducerConfig() != null) {
sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
}
functionDetailsBuilder.setSink(sinkSpecBuilder);
```
Currently, I think the SourceConnector couldn't transport the message
properties to Pulsar through the method `getProperties` in the `Record`. We
need to make some changes.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]