This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 001445c Fix bug during user publish (#2421)
001445c is described below
commit 001445cdca43e85a6a8e34cf4c41c25de6ec0b83
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Wed Aug 22 17:03:03 2018 -0700
Fix bug during user publish (#2421)
* Fix bug during user publish
* Added unittest to cover the case of publish using default schema
---
.../apache/pulsar/functions/instance/ContextImpl.java | 5 ++++-
.../apache/pulsar/functions/source/TopicSchema.java | 2 +-
.../pulsar/functions/instance/ContextImplTest.java | 19 ++++++++++++++++---
3 files changed, 21 insertions(+), 5 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 7265e45..aec52fc 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -269,9 +269,11 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
if (producer == null) {
try {
- Producer<O> newProducer = ((ProducerBuilderImpl<O>)
producerBuilder.clone()).schema(schema).create();
+ Producer<O> newProducer = ((ProducerBuilderImpl<O>)
producerBuilder.clone())
+ .schema(schema).topic(topicName).create();
Producer<O> existingProducer = (Producer<O>)
publishProducers.putIfAbsent(topicName, newProducer);
+
if (existingProducer != null) {
// The value in the map was not updated after the
concurrent put
newProducer.close();
@@ -281,6 +283,7 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
}
} catch (PulsarClientException e) {
+ logger.error("Failed to create Producer while doing user
publish", e);
return FutureUtil.failedFuture(e);
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 05c57ab..1802ee5 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -52,7 +52,7 @@ public class TopicSchema {
public static final String DEFAULT_SERDE =
"org.apache.pulsar.functions.api.utils.DefaultSerDe";
public Schema<?> getSchema(String topic, Object object) {
- return getSchema(topic, object.getClass());
+ return getSchema(topic, object.getClass(), "");
}
public Schema<?> getSchema(String topic, Object object, String
schemaTypeOrClassName) {
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 4c8393d..4380c63 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -19,8 +19,7 @@
package org.apache.pulsar.functions.instance;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.mockito.Matchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -28,15 +27,20 @@ import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Matchers;
import org.slf4j.Logger;
/**
@@ -48,6 +52,7 @@ public class ContextImplTest {
private Logger logger;
private PulsarClientImpl client;
private ContextImpl context;
+ private Producer producer = mock(Producer.class);
@Before
public void setup() {
@@ -59,6 +64,10 @@ public class ContextImplTest {
logger = mock(Logger.class);
client = mock(PulsarClientImpl.class);
when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client,
Schema.BYTES));
+
when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class),
Matchers.any(Schema.class)))
+ .thenReturn(CompletableFuture.completedFuture(producer));
+
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
context = new ContextImpl(
config,
@@ -121,4 +130,8 @@ public class ContextImplTest {
verify(stateContext, times(1)).getValue(eq("test-key"));
}
-}
+ @Test
+ public void testPublishUsingDefaultSchema() throws Exception {
+ context.publish("sometopic", "Somevalue");
+ }
+ }