This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 001445c  Fix bug during user publish (#2421)
001445c is described below

commit 001445cdca43e85a6a8e34cf4c41c25de6ec0b83
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Wed Aug 22 17:03:03 2018 -0700

    Fix bug during user publish (#2421)
    
    * Fix bug during user publish
    
    * Added unittest to cover the case of publish using default schema
---
 .../apache/pulsar/functions/instance/ContextImpl.java |  5 ++++-
 .../apache/pulsar/functions/source/TopicSchema.java   |  2 +-
 .../pulsar/functions/instance/ContextImplTest.java    | 19 ++++++++++++++++---
 3 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 7265e45..aec52fc 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -269,9 +269,11 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
 
         if (producer == null) {
             try {
-                Producer<O> newProducer = ((ProducerBuilderImpl<O>) 
producerBuilder.clone()).schema(schema).create();
+                Producer<O> newProducer = ((ProducerBuilderImpl<O>) 
producerBuilder.clone())
+                        .schema(schema).topic(topicName).create();
 
                 Producer<O> existingProducer = (Producer<O>) 
publishProducers.putIfAbsent(topicName, newProducer);
+
                 if (existingProducer != null) {
                     // The value in the map was not updated after the 
concurrent put
                     newProducer.close();
@@ -281,6 +283,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
                 }
 
             } catch (PulsarClientException e) {
+                logger.error("Failed to create Producer while doing user 
publish", e);
                 return FutureUtil.failedFuture(e);
             }
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 05c57ab..1802ee5 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -52,7 +52,7 @@ public class TopicSchema {
     public static final String DEFAULT_SERDE = 
"org.apache.pulsar.functions.api.utils.DefaultSerDe";
 
     public Schema<?> getSchema(String topic, Object object) {
-        return getSchema(topic, object.getClass());
+        return getSchema(topic, object.getClass(), "");
     }
 
     public Schema<?> getSchema(String topic, Object object, String 
schemaTypeOrClassName) {
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 4c8393d..4380c63 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -19,8 +19,7 @@
 package org.apache.pulsar.functions.instance;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.same;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -28,15 +27,20 @@ import static org.mockito.Mockito.when;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Matchers;
 import org.slf4j.Logger;
 
 /**
@@ -48,6 +52,7 @@ public class ContextImplTest {
     private Logger logger;
     private PulsarClientImpl client;
     private ContextImpl context;
+    private Producer producer = mock(Producer.class);
 
     @Before
     public void setup() {
@@ -59,6 +64,10 @@ public class ContextImplTest {
         logger = mock(Logger.class);
         client = mock(PulsarClientImpl.class);
         when(client.newProducer()).thenReturn(new ProducerBuilderImpl(client, 
Schema.BYTES));
+        
when(client.createProducerAsync(Matchers.any(ProducerConfigurationData.class), 
Matchers.any(Schema.class)))
+                .thenReturn(CompletableFuture.completedFuture(producer));
+        
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+        
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
 
         context = new ContextImpl(
             config,
@@ -121,4 +130,8 @@ public class ContextImplTest {
         verify(stateContext, times(1)).getValue(eq("test-key"));
     }
 
-}
+    @Test
+    public void testPublishUsingDefaultSchema() throws Exception {
+        context.publish("sometopic", "Somevalue");
+    }
+ }

Reply via email to