freeznet commented on code in PR #18929:
URL: https://github.com/apache/pulsar/pull/18929#discussion_r1118486643


##########
pulsar-functions/instance/src/main/python/python_instance.py:
##########
@@ -302,14 +310,27 @@ def done_producing(self, consumer, orig_message, topic, 
result, sent_message):
       # If producer fails send output then send neg ack for input message back 
to broker
       consumer.negative_acknowledge(orig_message)
 
-
   def process_result(self, output, msg):
-    if output is not None and self.instance_config.function_details.sink.topic 
!= None and \
+    if output is not None and self.instance_config.function_details.sink.topic 
is not None and \
             len(self.instance_config.function_details.sink.topic) > 0:
       if self.output_serde is None:
         self.setup_output_serde()
+      if self.effectively_once:
+        if self.contextimpl.get_message_partition_index() is None or \
+                self.contextimpl.get_message_partition_index() >= 0:
+          Log.error("Partitioned topic is not available in effectively_once 
mode.")
+          return

Review Comment:
   Should raise an error here instead of `return` nothing.



##########
pulsar-functions/instance/src/main/python/python_instance.py:
##########
@@ -475,6 +509,22 @@ def get_function_status(self):
     status.lastInvocationTime = int(last_invocation) if sys.version_info.major 
>= 3 else long(last_invocation)
     return status
 
+  def can_enable_effectively_once(self):
+    """
+    The prerequisites for enabling effective-once semantics are as follows.
+    1. deduplication is enabled
+    2. set ProcessingGuarantees to EFFECTIVELY_ONCE
+    3. the function has only one source topic and one sink topic
+    4. (unsupported yet) if partitioned topic is enabled, ensure that the 
number of partitions
+      (of both source and sink topics) is the same
+    """
+    if self.instance_config.function_details.processingGuarantees == \
+            Function_pb2.ProcessingGuarantees.Value('EFFECTIVELY_ONCE') and \

Review Comment:
   Could you please add some log output here when `processingGuarantees` is set 
to `EFFECTIVELY_ONCE`, but other requirements do not meet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to