jerrypeng closed pull request #3237: fix bug involving consumer producer
deadlock in functions
URL: https://github.com/apache/pulsar/pull/3237
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 4b634d28cf..1c61b45af9 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -88,6 +88,9 @@ protected PulsarSinkProcessorBase(Schema schema) {
.hashingScheme(HashingScheme.Murmur3_32Hash) //
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(FunctionResultRouter.of())
+ // set send timeout to be infinity to prevent potential
deadlock with consumer
+ // that might happen when consumer is blocked due to
unacked messages
+ .sendTimeout(0, TimeUnit.SECONDS)
.topic(topic);
if (producerName != null) {
builder.producerName(producerName);
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 2797658c14..63332d0171 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -133,7 +133,7 @@ def get_output_topic(self):
def get_output_serde_class_name(self):
return self.instance_config.function_details.outputSerdeClassName
- def publish(self, topic_name, message,
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None):
+ def publish(self, topic_name, message,
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None,
callback=None):
# Just make sure that user supplied values are properly typed
topic_name = str(topic_name)
serde_class_name = str(serde_class_name)
@@ -155,7 +155,7 @@ def publish(self, topic_name, message,
serde_class_name="serde.IdentitySerDe", p
self.publish_serializers[serde_class_name] = serde_klass()
output_bytes =
bytes(self.publish_serializers[serde_class_name].serialize(message))
- self.publish_producers[topic_name].send_async(output_bytes, None,
properties=properties)
+ self.publish_producers[topic_name].send_async(output_bytes, callback,
properties=properties)
def ack(self, msgid, topic):
if topic not in self.consumers:
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index 815865b3a1..d4c3da5a95 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -262,11 +262,15 @@ def setup_producer(self):
if self.instance_config.function_details.sink.topic != None and \
len(self.instance_config.function_details.sink.topic) > 0:
Log.debug("Setting up producer for topic %s" %
self.instance_config.function_details.sink.topic)
+
self.producer = self.pulsar_client.create_producer(
str(self.instance_config.function_details.sink.topic),
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=1,
+ # set send timeout to be infinity to prevent potential deadlock with
consumer
+ # that might happen when consumer is blocked due to unacked messages
+ send_timeout_millis=0,
max_pending_messages=100000)
def message_listener(self, serde, consumer, message):
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 51fa4523aa..fb3d2c2af6 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -20,6 +20,7 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
@@ -99,7 +100,8 @@ private static PulsarClientImpl getPulsarClient() throws
PulsarClientException {
doReturn(producerBuilder).when(producerBuilder).topic(anyString());
doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
doReturn(producerBuilder).when(producerBuilder).property(anyString(),
anyString());
-
+ doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(),
any());
+
CompletableFuture completableFuture = new CompletableFuture<>();
completableFuture.complete(mock(MessageId.class));
TypedMessageBuilder typedMessageBuilder =
mock(TypedMessageBuilder.class);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services