cbornet commented on code in PR #16816:
URL: https://github.com/apache/pulsar/pull/16816#discussion_r949932680
##########
site2/docs/functions-develop-api.md:
##########
@@ -102,6 +102,39 @@ public class ExclamationFunction implements
Function<String, String> {
For more details, see [code
example](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java).
+The return type of the `Function` can be wrapped in a Pulsar `Record` generic
which gives more control on the output message (topic, schema, properties, ...)
+You can use the `Context::newOutputRecordBuilder` method to build this
`Record` output.
+
+```java
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+public class RecordFunction implements Function<String, Record<String>> {
+
+ @Override
+ public Record<String> process(String input, Context context) throws
Exception {
+ String publishTopic = (String)
context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
Review Comment:
I removed the destinationTopic part.
##########
site2/docs/functions-develop-api.md:
##########
@@ -102,6 +102,39 @@ public class ExclamationFunction implements
Function<String, String> {
For more details, see [code
example](https://github.com/apache/pulsar/blob/master/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ExclamationFunction.java).
+The return type of the function can be wrapped in a `Record` generic which
gives you more control over the output messages, such as topics, schemas,
properties, and so on.
+Use the `Context::newOutputRecordBuilder` method to build this `Record` output.
+
+```java
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+
+public class RecordFunction implements Function<String, Record<String>> {
+
+ @Override
+ public Record<String> process(String input, Context context) throws
Exception {
+ String publishTopic = (String)
context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
+ String output = String.format("%s!", input);
+
+ Map<String, String> properties = new
HashMap<>(context.getCurrentRecord().getProperties());
+ context.getCurrentRecord().getTopicName().ifPresent(topic ->
properties.put("input_topic", topic));
+
+ return context.<String>newOutputRecordBuilder()
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]