This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2607c18 fix bug involving consumer producer deadlock in functions
(#3237)
2607c18 is described below
commit 2607c18d1b27c1df667d62d8343acd1b306c3c8f
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Fri Dec 21 16:04:36 2018 -0800
fix bug involving consumer producer deadlock in functions (#3237)
* fix bug involving consumer producer deadlock in functions
* fix unit test
---
.../src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java | 3 +++
pulsar-functions/instance/src/main/python/contextimpl.py | 4 ++--
pulsar-functions/instance/src/main/python/python_instance.py | 4 ++++
.../test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java | 4 +++-
4 files changed, 12 insertions(+), 3 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 4b634d2..1c61b45 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -88,6 +88,9 @@ public class PulsarSink<T> implements Sink<T> {
.hashingScheme(HashingScheme.Murmur3_32Hash) //
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(FunctionResultRouter.of())
+ // set send timeout to be infinity to prevent potential
deadlock with consumer
+ // that might happen when consumer is blocked due to
unacked messages
+ .sendTimeout(0, TimeUnit.SECONDS)
.topic(topic);
if (producerName != null) {
builder.producerName(producerName);
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 2797658..63332d0 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -133,7 +133,7 @@ class ContextImpl(pulsar.Context):
def get_output_serde_class_name(self):
return self.instance_config.function_details.outputSerdeClassName
- def publish(self, topic_name, message,
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None):
+ def publish(self, topic_name, message,
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None,
callback=None):
# Just make sure that user supplied values are properly typed
topic_name = str(topic_name)
serde_class_name = str(serde_class_name)
@@ -155,7 +155,7 @@ class ContextImpl(pulsar.Context):
self.publish_serializers[serde_class_name] = serde_klass()
output_bytes =
bytes(self.publish_serializers[serde_class_name].serialize(message))
- self.publish_producers[topic_name].send_async(output_bytes, None,
properties=properties)
+ self.publish_producers[topic_name].send_async(output_bytes, callback,
properties=properties)
def ack(self, msgid, topic):
if topic not in self.consumers:
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py
b/pulsar-functions/instance/src/main/python/python_instance.py
index 815865b..d4c3da5 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -262,11 +262,15 @@ class PythonInstance(object):
if self.instance_config.function_details.sink.topic != None and \
len(self.instance_config.function_details.sink.topic) > 0:
Log.debug("Setting up producer for topic %s" %
self.instance_config.function_details.sink.topic)
+
self.producer = self.pulsar_client.create_producer(
str(self.instance_config.function_details.sink.topic),
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=1,
+ # set send timeout to be infinity to prevent potential deadlock with
consumer
+ # that might happen when consumer is blocked due to unacked messages
+ send_timeout_millis=0,
max_pending_messages=100000)
def message_listener(self, serde, consumer, message):
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 51fa452..fb3d2c2 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.sink;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
@@ -99,7 +100,8 @@ public class PulsarSinkTest {
doReturn(producerBuilder).when(producerBuilder).topic(anyString());
doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
doReturn(producerBuilder).when(producerBuilder).property(anyString(),
anyString());
-
+ doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(),
any());
+
CompletableFuture completableFuture = new CompletableFuture<>();
completableFuture.complete(mock(MessageId.class));
TypedMessageBuilder typedMessageBuilder =
mock(TypedMessageBuilder.class);