srkukarni closed pull request #2421: Fix bug during user publish
URL: https://github.com/apache/incubator-pulsar/pull/2421
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 18e1fe3514..e8461e0267 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -269,9 +269,11 @@ public ByteBuffer getState(String key) {
if (producer == null) {
try {
- Producer<O> newProducer = ((ProducerBuilderImpl<O>)
producerBuilder.clone()).schema(schema).create();
+ Producer<O> newProducer = ((ProducerBuilderImpl<O>)
producerBuilder.clone())
+ .schema(schema).topic(topicName).create();
Producer<O> existingProducer = (Producer<O>)
publishProducers.putIfAbsent(topicName, newProducer);
+
if (existingProducer != null) {
// The value in the map was not updated after the
concurrent put
newProducer.close();
@@ -281,6 +283,7 @@ public ByteBuffer getState(String key) {
}
} catch (PulsarClientException e) {
+ logger.error("Failed to create Producer while doing user
publish", e);
return FutureUtil.failedFuture(e);
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 05c57ab974..1802ee5e61 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -52,7 +52,7 @@ public TopicSchema(PulsarClient client) {
public static final String DEFAULT_SERDE =
"org.apache.pulsar.functions.api.utils.DefaultSerDe";
public Schema<?> getSchema(String topic, Object object) {
- return getSchema(topic, object.getClass());
+ return getSchema(topic, object.getClass(), "");
}
public Schema<?> getSchema(String topic, Object object, String
schemaTypeOrClassName) {
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 4c8393d042..4380c63ff7 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -19,8 +19,7 @@
package org.apache.pulsar.functions.instance;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.mockito.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -28,15 +27,20 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Matchers;
import org.slf4j.Logger;
/**
@@ -48,6 +52,7 @@
private Logger logger;
private PulsarClientImpl client;
private ContextImpl context;
+ private Producer producer = mock(Producer.class);
@Before
public void setup() {
@@ -59,6 +64,10 @@ public void setup() {
logger = mock(Logger.class);
client = mock(PulsarClientImpl.class);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client,
Schema.BYTES));
+
when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class),
Matchers.any(Schema.class)))
+ .thenReturn(CompletableFuture.completedFuture(producer));
+
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
context = new ContextImpl(
config,
@@ -121,4 +130,8 @@ public void testGetStateStateEnabled() throws Exception {
verify(stateContext, times(1)).getValue(eq("test-key"));
}
-}
+ @Test
+ public void testPublishUsingDefaultSchema() throws Exception {
+ context.publish("sometopic", "Somevalue");
+ }
+ }
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services