merlimat closed pull request #1382: Correlate Output of function with input URL: https://github.com/apache/incubator-pulsar/pull/1382
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/python/pulsar.py b/pulsar-client-cpp/python/pulsar.py index b6e5faf2b..538191641 100644 --- a/pulsar-client-cpp/python/pulsar.py +++ b/pulsar-client-cpp/python/pulsar.py @@ -645,7 +645,7 @@ def _build_msg(self, content, properties, partition_key, sequence_id, mb = _pulsar.MessageBuilder() mb.content(content) if properties: - for k, v in properties: + for k, v in properties.items(): mb.property(k, v) if partition_key: mb.partition_key(partition_key) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 14f317123..faf00ddde 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -64,11 +64,6 @@ public JavaInstance(InstanceConfig config, Object userClassObject, public JavaExecutionResult handleMessage(MessageId messageId, String topicName, Object input) { context.setCurrentMessageContext(messageId, topicName); - return processMessage(input); - } - - private JavaExecutionResult processMessage(Object input) { - JavaExecutionResult executionResult = new JavaExecutionResult(); try { Object output; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index fffe8f56d..998cd6567 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -24,12 +24,8 @@ import com.google.common.collect.Maps; import io.netty.buffer.ByteBuf; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; + +import java.util.*; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -497,7 +493,9 @@ private void sendOutputMessage(InputMessage srcMsg, JavaExecutionResult result, byte[] output) { MessageBuilder msgBuilder = MessageBuilder.create() - .setContent(output); + .setContent(output) + .setProperty("__pfn_input_topic__", srcMsg.getTopicName()) + .setProperty("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray()))); if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) { msgBuilder = msgBuilder .setSequenceId(Utils.getSequenceId(srcMsg.getActualMessage().getMessageId())); diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 077cb723d..88a1961e9 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -22,6 +22,7 @@ """python_instance.py: Python Instance for running python functions """ +import base64 import os import time import Queue @@ -223,8 +224,9 @@ def process_result(self, output, msg): self.current_stats.nserialization_exceptions += 1 self.total_stats.nserialization_exceptions += 1 if output_bytes is not None: + props = {"__pfn_input_topic__" : str(msg.topic), "__pfn_input_msg_id__" : str(base64.b64encode(msg.message.message_id().serialize()))} try: - self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message)) + self.producer.send_async(output_bytes, partial(self.done_producing, msg.consumer, msg.message), properties=props) except Exception as e: self.current_stats.record_system_exception(e) self.total_stats.record_system_exception(e) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java index e8e761f13..88d0c893d 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java @@ -98,7 +98,7 @@ */ @Slf4j @PrepareForTest({ JavaInstanceRunnable.class, StorageClientBuilder.class, MessageBuilder.class, Reflections.class }) -@PowerMockIgnore({ "javax.management.*", "org.apache.pulsar.common.api.proto.*", "org.apache.logging.log4j.*" }) +@PowerMockIgnore({ "javax.management.*", "org.apache.pulsar.common.api.proto.*", "org.apache.logging.log4j.*", "org/apache/pulsar/common/api/proto/PulsarApi*", "org.apache.pulsar.common.util.protobuf.*", "org.apache.pulsar.shade.*" }) public class JavaInstanceRunnableProcessTest { @ObjectFactory @@ -389,6 +389,13 @@ public void setup() throws Exception { when(msg.getSequenceId()).thenReturn(seqId); return builder; }); + when(builder.setProperty(anyString(), anyString())) + .thenAnswer(invocationOnMock1 -> { + String key = invocationOnMock1.getArgumentAt(0, String.class); + String value = invocationOnMock1.getArgumentAt(1, String.class); + when(msg.getProperty(eq(key))).thenReturn(value); + return builder; + }); when(builder.build()).thenReturn(msg); return builder; }); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services