merlimat closed pull request #1382: Correlate Output of function with input
URL: https://github.com/apache/incubator-pulsar/pull/1382
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/python/pulsar.py 
b/pulsar-client-cpp/python/pulsar.py
index b6e5faf2b..538191641 100644
--- a/pulsar-client-cpp/python/pulsar.py
+++ b/pulsar-client-cpp/python/pulsar.py
@@ -645,7 +645,7 @@ def _build_msg(self, content, properties, partition_key, 
sequence_id,
         mb = _pulsar.MessageBuilder()
         mb.content(content)
         if properties:
-            for k, v in properties:
+            for k, v in properties.items():
                 mb.property(k, v)
         if partition_key:
             mb.partition_key(partition_key)
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 14f317123..faf00ddde 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -64,11 +64,6 @@ public JavaInstance(InstanceConfig config, Object 
userClassObject,
 
     public JavaExecutionResult handleMessage(MessageId messageId, String 
topicName, Object input) {
         context.setCurrentMessageContext(messageId, topicName);
-        return processMessage(input);
-    }
-
-    private JavaExecutionResult processMessage(Object input) {
-
         JavaExecutionResult executionResult = new JavaExecutionResult();
         try {
             Object output;
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fffe8f56d..998cd6567 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,12 +24,8 @@
 
 import com.google.common.collect.Maps;
 import io.netty.buffer.ByteBuf;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+
+import java.util.*;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 
@@ -497,7 +493,9 @@ private void sendOutputMessage(InputMessage srcMsg,
                                    JavaExecutionResult result,
                                    byte[] output) {
         MessageBuilder msgBuilder = MessageBuilder.create()
-            .setContent(output);
+            .setContent(output)
+            .setProperty("__pfn_input_topic__", srcMsg.getTopicName())
+            .setProperty("__pfn_input_msg_id__", new 
String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray())));
         if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
             msgBuilder = msgBuilder
                 
.setSequenceId(Utils.getSequenceId(srcMsg.getActualMessage().getMessageId()));
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 077cb723d..88a1961e9 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -22,6 +22,7 @@
 
 """python_instance.py: Python Instance for running python functions
 """
+import base64
 import os
 import time
 import Queue
@@ -223,8 +224,9 @@ def process_result(self, output, msg):
         self.current_stats.nserialization_exceptions += 1
         self.total_stats.nserialization_exceptions += 1
       if output_bytes is not None:
+        props = {"__pfn_input_topic__" : str(msg.topic), 
"__pfn_input_msg_id__" : 
str(base64.b64encode(msg.message.message_id().serialize()))}
         try:
-          self.producer.send_async(output_bytes, partial(self.done_producing, 
msg.consumer, msg.message))
+          self.producer.send_async(output_bytes, partial(self.done_producing, 
msg.consumer, msg.message), properties=props)
         except Exception as e:
           self.current_stats.record_system_exception(e)
           self.total_stats.record_system_exception(e)
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index e8e761f13..88d0c893d 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -98,7 +98,7 @@
  */
 @Slf4j
 @PrepareForTest({ JavaInstanceRunnable.class, StorageClientBuilder.class, 
MessageBuilder.class, Reflections.class })
-@PowerMockIgnore({ "javax.management.*", 
"org.apache.pulsar.common.api.proto.*", "org.apache.logging.log4j.*" })
+@PowerMockIgnore({ "javax.management.*", 
"org.apache.pulsar.common.api.proto.*", "org.apache.logging.log4j.*", 
"org/apache/pulsar/common/api/proto/PulsarApi*", 
"org.apache.pulsar.common.util.protobuf.*", "org.apache.pulsar.shade.*" })
 public class JavaInstanceRunnableProcessTest {
 
     @ObjectFactory
@@ -389,6 +389,13 @@ public void setup() throws Exception {
                         when(msg.getSequenceId()).thenReturn(seqId);
                         return builder;
                     });
+                when(builder.setProperty(anyString(), anyString()))
+                        .thenAnswer(invocationOnMock1 -> {
+                            String key = invocationOnMock1.getArgumentAt(0, 
String.class);
+                            String value = invocationOnMock1.getArgumentAt(1, 
String.class);
+                            when(msg.getProperty(eq(key))).thenReturn(value);
+                            return builder;
+                            });
                 when(builder.build()).thenReturn(msg);
                 return builder;
             });


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to