This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 65fe863  [issue#4042] improve java functions API (#4093)
65fe863 is described below

commit 65fe863f84a0446f369e3b5a44f9851c1f53c23d
Author: 冉小龙 <[email protected]>
AuthorDate: Sun May 5 07:49:03 2019 +0800

    [issue#4042] improve java functions API (#4093)
    
    Master Issue: #4042
    
    Fixes #4042
    
    Motivation
    
    improve java functions API, when you need to publish the fields in the 
TypedMessageBuilder, there is no need to add a new publish method, just modify 
the interface in the TypedMessageBuilder.
---
 .../worker/PulsarFunctionPublishTest.java          |   2 +-
 pulsar-functions/api-java/pom.xml                  |   6 +
 .../org/apache/pulsar/functions/api/Context.java   |  31 ++-
 .../pulsar/functions/instance/ContextImpl.java     | 228 ++++++++++++++-------
 .../pulsar/functions/instance/ContextImplTest.java |   2 +-
 .../functions/api/examples/PublishFunction.java    |   9 +-
 ...geConf.java => TypedMessageBuilderPublish.java} |  19 +-
 .../api/examples/UserPublishFunction.java          |  10 +-
 ...ge_conf.py => typed_message_builder_publish.py} |   2 +-
 .../windowing/WindowFunctionExecutor.java          |   2 +-
 .../windowing/WindowFunctionExecutorTest.java      |   8 +
 .../functions/PulsarFunctionsTestBase.java         |   7 +-
 12 files changed, 221 insertions(+), 105 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index ef88fdd..e3b4131 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -279,7 +279,7 @@ public class PulsarFunctionPublishTest {
         functionConfig.setSubName(subscriptionName);
         functionConfig.setInputs(Collections.singleton(sourceTopic));
         functionConfig.setAutoAck(true);
-        
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf");
+        
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish");
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
         Map<String, Object> userConfig = new HashMap<>();
         userConfig.put("publish-topic", publishTopic);
diff --git a/pulsar-functions/api-java/pom.xml 
b/pulsar-functions/api-java/pom.xml
index 6f0481d..33e2ea0 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -42,6 +42,12 @@
       <artifactId>typetools</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-client-api</artifactId>
+      <version>${project.version}</version>
+      <scope>compile</scope>
+    </dependency>
 
   </dependencies>
 
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 556086a..013b5f2 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -19,6 +19,10 @@
 package org.apache.pulsar.functions.api;
 
 import java.nio.ByteBuffer;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.slf4j.Logger;
 
 import java.util.Collection;
@@ -232,6 +236,7 @@ public interface Context {
      * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", 
"json", "protobuf") or the class name
      *                               of the custom schema class
      * @return A future that completes when the framework is done publishing 
the message
+     * @deprecated in favor of using {@link #newOutputMessage(String, Schema)}
      */
     <O> CompletableFuture<Void> publish(String topicName, O object, String 
schemaOrSerdeClassName);
 
@@ -241,28 +246,18 @@ public interface Context {
      * @param topicName The name of the topic for publishing
      * @param object    The object that needs to be published
      * @return A future that completes when the framework is done publishing 
the message
+     * @deprecated in favor of using {@link #newOutputMessage(String, Schema)}
      */
     <O> CompletableFuture<Void> publish(String topicName, O object);
 
     /**
-     * Publish an object using serDe or schema class for serializing to the 
topic.
-     *
-     * @param topicName              The name of the topic for publishing
-     * @param object                 The object that needs to be published
-     * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", 
"json", "protobuf") or the class name
-     *                               of the custom schema class
-     * @param messageConf      A map of configurations to set for the message 
that will be published
-     *                         The available options are:
-     *
-     *                         "key" - Parition Key
-     *                         "properties" - Map of properties
-     *                         "eventTime"
-     *                         "sequenceId"
-     *                         "replicationClusters"
-     *                         "disableReplication"
+     * New output message using schema for serializing to the topic
      *
-     * @return A future that completes when the framework is done publishing 
the message
+     * @param topicName The name of the topic for output message
+     * @param schema provide a way to convert between serialized data and 
domain objects
+     * @param <O>
+     * @return the message builder instance
+     * @throws PulsarClientException
      */
-    <O> CompletableFuture<Void> publish(String topicName, O object, String 
schemaOrSerdeClassName, Map<String, Object> messageConf);
-
+    <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> 
schema) throws PulsarClientException;
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 5c06b49..f0d5ade 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -25,14 +25,7 @@ import io.prometheus.client.Summary;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.HashingScheme;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
@@ -52,12 +45,9 @@ import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkState;
@@ -94,11 +84,13 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     private final Summary userMetricsSummary;
 
     private final static String[] userMetricsLabelNames;
+
     static {
         // add label to indicate user metric
         userMetricsLabelNames = 
Arrays.copyOf(ComponentStatsManager.metricsLabelNames, 
ComponentStatsManager.metricsLabelNames.length + 1);
         userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] 
= "metric";
     }
+
     private final ComponentType componentType;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client,
@@ -259,7 +251,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
             return null;
         }
     }
-    
+
     private void ensureStateEnabled() {
         checkState(null != stateContext, "State is not enabled.");
     }
@@ -337,69 +329,26 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object, 
String schemaOrSerdeClassName) {
-        return publish(topicName, object, schemaOrSerdeClassName, null);
+        return publish(topicName, object, (Schema<O>) 
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <O> CompletableFuture<Void> publish(String topicName, O object, 
String schemaOrSerdeClassName, Map<String, Object> messageConf) {
-        return publish(topicName, object, (Schema<O>) 
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false), 
messageConf);
+    public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, 
Schema<O> schema) throws PulsarClientException {
+        MessageBuilderImpl<O> messageBuilder = new MessageBuilderImpl<>();
+        TypedMessageBuilder<O> typedMessageBuilder = getProducer(topicName, 
schema).newMessage();
+        messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
+        return messageBuilder;
     }
 
     @SuppressWarnings("unchecked")
-    public <O> CompletableFuture<Void> publish(String topicName, O object, 
Schema<O> schema, Map<String, Object> messageConf) {
-        Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
-
-        if (producer == null) {
-            try {
-                Producer<O> newProducer = ((ProducerBuilderImpl<O>) 
producerBuilder.clone())
-                        .schema(schema)
-                        .blockIfQueueFull(true)
-                        .enableBatching(true)
-                        .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
-                        .compressionType(CompressionType.LZ4)
-                        .hashingScheme(HashingScheme.Murmur3_32Hash) //
-                        .messageRoutingMode(MessageRoutingMode.CustomPartition)
-                        .messageRouter(FunctionResultRouter.of())
-                        // set send timeout to be infinity to prevent 
potential deadlock with consumer
-                        // that might happen when consumer is blocked due to 
unacked messages
-                        .sendTimeout(0, TimeUnit.SECONDS)
-                        .topic(topicName)
-                        .properties(InstanceUtils.getProperties(componentType,
-                                FunctionCommon.getFullyQualifiedName(
-                                        
this.config.getFunctionDetails().getTenant(),
-                                        
this.config.getFunctionDetails().getNamespace(),
-                                        
this.config.getFunctionDetails().getName()),
-                                this.config.getInstanceId()))
-                        .create();
-
-                Producer<O> existingProducer = (Producer<O>) 
publishProducers.putIfAbsent(topicName, newProducer);
-
-                if (existingProducer != null) {
-                    // The value in the map was not updated after the 
concurrent put
-                    newProducer.close();
-                    producer = existingProducer;
-                } else {
-                    producer = newProducer;
-                }
-
-            } catch (PulsarClientException e) {
-                logger.error("Failed to create Producer while doing user 
publish", e);
-                return FutureUtil.failedFuture(e);
-            }
-        }
-
-        TypedMessageBuilder<O> messageBuilder = producer.newMessage();
-        if (messageConf != null) {
-            messageBuilder.loadConf(messageConf);
+    public <O> CompletableFuture<Void> publish(String topicName, O object, 
Schema<O> schema) {
+        try {
+            return newOutputMessage(topicName, 
schema).value(object).sendAsync().thenApply(msgId -> null);
+        } catch (PulsarClientException e) {
+            logger.error("Failed to create Producer while doing user publish", 
e);
+            return FutureUtil.failedFuture(e);
         }
-        CompletableFuture<Void> future = 
messageBuilder.value(object).sendAsync().thenApply(msgId -> null);
-        future.exceptionally(e -> {
-            this.statsManager.incrSysExceptions(e);
-            logger.error("Failed to publish to topic {} with error {}", 
topicName, e);
-            return null;
-        });
-        return future;
     }
 
     @Override
@@ -417,6 +366,46 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         }
     }
 
+    private <O> Producer<O> getProducer(String topicName, Schema<O> schema) 
throws PulsarClientException {
+        Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
+
+        if (producer == null) {
+
+            Producer<O> newProducer = ((ProducerBuilderImpl<O>) 
producerBuilder.clone())
+                    .schema(schema)
+                    .blockIfQueueFull(true)
+                    .enableBatching(true)
+                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
+                    .compressionType(CompressionType.LZ4)
+                    .hashingScheme(HashingScheme.Murmur3_32Hash) //
+                    .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                    .messageRouter(FunctionResultRouter.of())
+                    // set send timeout to be infinity to prevent potential 
deadlock with consumer
+                    // that might happen when consumer is blocked due to 
unacked messages
+                    .sendTimeout(0, TimeUnit.SECONDS)
+                    .topic(topicName)
+                    .properties(InstanceUtils.getProperties(componentType,
+                            FunctionCommon.getFullyQualifiedName(
+                                    
this.config.getFunctionDetails().getTenant(),
+                                    
this.config.getFunctionDetails().getNamespace(),
+                                    
this.config.getFunctionDetails().getName()),
+                            this.config.getInstanceId()))
+                    .create();
+
+            Producer<O> existingProducer = (Producer<O>) 
publishProducers.putIfAbsent(topicName, newProducer);
+
+            if (existingProducer != null) {
+                // The value in the map was not updated after the concurrent 
put
+                newProducer.close();
+                producer = existingProducer;
+            } else {
+                producer = newProducer;
+            }
+
+        }
+        return producer;
+    }
+
     public Map<String, Double> getAndResetMetrics() {
         Map<String, Double> retval = getMetrics();
         resetMetrics();
@@ -443,4 +432,105 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         }
         return metricsMap;
     }
+
+    class MessageBuilderImpl<O> implements TypedMessageBuilder<O> {
+        private TypedMessageBuilder<O> underlyingBuilder;
+        @Override
+        public MessageId send() throws PulsarClientException {
+            try {
+                return sendAsync().get();
+            } catch (ExecutionException e) {
+                Throwable t = e.getCause();
+                if (t instanceof PulsarClientException) {
+                    throw (PulsarClientException) t;
+                } else {
+                    throw new PulsarClientException(t);
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new PulsarClientException(e);
+            }
+        }
+
+        @Override
+        public CompletableFuture<MessageId> sendAsync() {
+            return underlyingBuilder.sendAsync()
+                    .whenComplete((result, cause) -> {
+                        if (null != cause) {
+                            statsManager.incrSysExceptions(cause);
+                            logger.error("Failed to publish to topic with 
error {}", cause);
+                        }
+                    });
+        }
+
+        @Override
+        public TypedMessageBuilder<O> key(String key) {
+            underlyingBuilder.key(key);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> keyBytes(byte[] key) {
+            underlyingBuilder.keyBytes(key);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> orderingKey(byte[] orderingKey) {
+            underlyingBuilder.orderingKey(orderingKey);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> value(O value) {
+            underlyingBuilder.value(value);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> property(String name, String value) {
+            underlyingBuilder.property(name, value);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> properties(Map<String, String> 
properties) {
+            underlyingBuilder.properties(properties);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> eventTime(long timestamp) {
+            underlyingBuilder.eventTime(timestamp);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> sequenceId(long sequenceId) {
+            underlyingBuilder.sequenceId(sequenceId);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> replicationClusters(List<String> 
clusters) {
+            underlyingBuilder.replicationClusters(clusters);
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> disableReplication() {
+            underlyingBuilder.disableReplication();
+            return this;
+        }
+
+        @Override
+        public TypedMessageBuilder<O> loadConf(Map<String, Object> config) {
+            underlyingBuilder.loadConf(config);
+            return this;
+        }
+
+        public void setUnderlyingBuilder(TypedMessageBuilder<O> 
underlyingBuilder) {
+            this.underlyingBuilder = underlyingBuilder;
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 9d2579c..7eb7aae 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -153,6 +153,6 @@ public class ContextImplTest {
 
     @Test
     public void testPublishUsingDefaultSchema() throws Exception {
-        context.publish("sometopic", "Somevalue");
+        context.newOutputMessage("sometopic", 
null).value("Somevalue").sendAsync();
     }
  }
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index cda8331..149cce6 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.functions.api.examples;
 
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 
@@ -30,7 +33,11 @@ public class PublishFunction implements Function<String, 
Void> {
     public Void process(String input, Context context) {
         String publishTopic = (String) 
context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
         String output = String.format("%s!", input);
-        context.publish(publishTopic, output);
+        try {
+            context.newOutputMessage(publishTopic, 
Schema.STRING).value(output).sendAsync();
+        } catch (PulsarClientException e) {
+            context.getLogger().error(e.toString());
+        }
         return null;
     }
 }
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java
similarity index 71%
rename from 
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
rename to 
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java
index 9960552..327d371 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.functions.api.examples;
 
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
@@ -30,7 +32,7 @@ import java.util.Map;
  * to publish to a desired topic based on config and setting various message 
configurations to be passed along.
  *
  */
-public class PublishFunctionWithMessageConf implements Function<String, Void> {
+public class TypedMessageBuilderPublish implements Function<String, Void> {
     @Override
     public Void process(String input, Context context) {
         String publishTopic = (String) 
context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
@@ -40,13 +42,16 @@ public class PublishFunctionWithMessageConf implements 
Function<String, Void> {
         properties.put("input_topic", 
context.getCurrentRecord().getTopicName().get());
         properties.putAll(context.getCurrentRecord().getProperties());
 
-        Map<String, Object> messageConf = new HashMap<>();
-        messageConf.put(TypedMessageBuilder.CONF_PROPERTIES, properties);
-        if (context.getCurrentRecord().getKey().isPresent()) {
-            messageConf.put(TypedMessageBuilder.CONF_KEY, 
context.getCurrentRecord().getKey().get());
+        try {
+            TypedMessageBuilder messageBuilder = 
context.newOutputMessage(publishTopic, Schema.STRING).
+                    value(output).properties(properties);
+            if (context.getCurrentRecord().getKey().isPresent()){
+                messageBuilder.key(context.getCurrentRecord().getKey().get());
+            }
+            messageBuilder.eventTime(System.currentTimeMillis()).sendAsync();
+        } catch (PulsarClientException e) {
+            context.getLogger().error(e.toString());
         }
-        messageConf.put(TypedMessageBuilder.CONF_EVENT_TIME, 
System.currentTimeMillis());
-        context.publish(publishTopic, output, null, messageConf);
         return null;
     }
 }
diff --git 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
index ec3fd95..41c4d7d 100644
--- 
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
+++ 
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.functions.api.examples;
 
 import java.util.Optional;
 
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 
@@ -31,8 +33,12 @@ public class UserPublishFunction implements Function<String, 
Void> {
     @Override
     public Void process(String input, Context context) {
         Optional<Object> topicToWrite = context.getUserConfigValue("topic");
-        if (topicToWrite.get() != null) {
-            context.publish((String) topicToWrite.get(), input);
+        if (topicToWrite.isPresent()) {
+            try {
+                context.newOutputMessage((String) topicToWrite.get(), 
Schema.STRING).value(input).sendAsync();
+            } catch (PulsarClientException e) {
+                e.printStackTrace();
+            }
         }
         return null;
     }
diff --git 
a/pulsar-functions/python-examples/publish_function_with_message_conf.py 
b/pulsar-functions/python-examples/typed_message_builder_publish.py
similarity index 97%
rename from 
pulsar-functions/python-examples/publish_function_with_message_conf.py
rename to pulsar-functions/python-examples/typed_message_builder_publish.py
index 79aac02..c6697a7 100644
--- a/pulsar-functions/python-examples/publish_function_with_message_conf.py
+++ b/pulsar-functions/python-examples/typed_message_builder_publish.py
@@ -23,7 +23,7 @@ from pulsar import Function
 
 # Example function that uses the built in publish function in the context
 # to publish to a desired topic based on config
-class PublishFunctionWithMessageConf(Function):
+class TypedMessageBuilderPublish(Function):
   def __init__(self):
     pass
 
diff --git 
a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
 
b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index 1945949..0fcc33e 100644
--- 
a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ 
b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -282,7 +282,7 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
                 this.windowManager.add(record, ts, record);
             } else {
                 if (this.windowConfig.getLateDataTopic() != null) {
-                    context.publish(this.windowConfig.getLateDataTopic(), 
input);
+                    
context.newOutputMessage(this.windowConfig.getLateDataTopic(), 
null).value(input).sendAsync();
                 } else {
                     log.info(String.format(
                             "Received a late tuple %s with ts %d. This will 
not be " + "processed"
diff --git 
a/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
 
b/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 88ecebe..ac96b44 100644
--- 
a/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ 
b/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.windowing;
 
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Record;
 
@@ -36,8 +37,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
@@ -204,6 +208,10 @@ public class WindowFunctionExecutorTest {
         windowConfig.setLateDataTopic("$late");
         Mockito.doReturn(Optional.of(new Gson().fromJson(new 
Gson().toJson(windowConfig), Map.class)))
                 
.when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+        TypedMessageBuilder typedMessageBuilder = 
Mockito.mock(TypedMessageBuilder.class);
+        
Mockito.when(typedMessageBuilder.value(anyString())).thenReturn(typedMessageBuilder);
+        
Mockito.when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf());
+        Mockito.when(context.newOutputMessage(anyString(), 
anyObject())).thenReturn(typedMessageBuilder);
 
         long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
         List<Long> events = new ArrayList<>(timestamps.length);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 823b7d1..48b3932 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -74,7 +74,7 @@ public abstract class PulsarFunctionsTestBase extends 
PulsarTestSuite {
         "org.apache.pulsar.functions.api.examples.ExclamationFunction";
 
     public static final String PUBLISH_JAVA_CLASS =
-            
"org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf";
+            
"org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish";
 
     public static final String EXCEPTION_JAVA_CLASS =
             "org.apache.pulsar.tests.integration.functions.ExceptionFunction";
@@ -89,14 +89,13 @@ public abstract class PulsarFunctionsTestBase extends 
PulsarTestSuite {
     public static final String EXCLAMATION_PYTHONZIP_CLASS =
             "exclamation";
 
+    public static final String PUBLISH_PYTHON_CLASS = 
"typed_message_builder_publish.TypedMessageBuilderPublish";
     public static final String EXCEPTION_PYTHON_CLASS = "exception_function";
 
-    public static final String PUBLISH_PYTHON_CLASS = 
"publish_function_with_message_conf.PublishFunctionWithMessageConf";
-
     public static final String EXCLAMATION_PYTHON_FILE = 
"exclamation_function.py";
     public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = 
"exclamation_with_extra_deps.py";
     public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
-    public static final String PUBLISH_FUNCTION_PYTHON_FILE = 
"publish_function_with_message_conf.py";
+    public static final String PUBLISH_FUNCTION_PYTHON_FILE = 
"typed_message_builder_publish.py";
     public static final String EXCEPTION_FUNCTION_PYTHON_FILE = 
"exception_function.py";
 
     protected static String getExclamationClass(Runtime runtime,

Reply via email to