This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 65fe863 [issue#4042] improve java functions API (#4093)
65fe863 is described below
commit 65fe863f84a0446f369e3b5a44f9851c1f53c23d
Author: 冉小龙 <[email protected]>
AuthorDate: Sun May 5 07:49:03 2019 +0800
[issue#4042] improve java functions API (#4093)
Master Issue: #4042
Fixes #4042
Motivation
improve java functions API, when you need to publish the fields in the
TypedMessageBuilder, there is no need to add a new publish method, just modify
the interface in the TypedMessageBuilder.
---
.../worker/PulsarFunctionPublishTest.java | 2 +-
pulsar-functions/api-java/pom.xml | 6 +
.../org/apache/pulsar/functions/api/Context.java | 31 ++-
.../pulsar/functions/instance/ContextImpl.java | 228 ++++++++++++++-------
.../pulsar/functions/instance/ContextImplTest.java | 2 +-
.../functions/api/examples/PublishFunction.java | 9 +-
...geConf.java => TypedMessageBuilderPublish.java} | 19 +-
.../api/examples/UserPublishFunction.java | 10 +-
...ge_conf.py => typed_message_builder_publish.py} | 2 +-
.../windowing/WindowFunctionExecutor.java | 2 +-
.../windowing/WindowFunctionExecutorTest.java | 8 +
.../functions/PulsarFunctionsTestBase.java | 7 +-
12 files changed, 221 insertions(+), 105 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index ef88fdd..e3b4131 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -279,7 +279,7 @@ public class PulsarFunctionPublishTest {
functionConfig.setSubName(subscriptionName);
functionConfig.setInputs(Collections.singleton(sourceTopic));
functionConfig.setAutoAck(true);
-
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf");
+
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish");
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
Map<String, Object> userConfig = new HashMap<>();
userConfig.put("publish-topic", publishTopic);
diff --git a/pulsar-functions/api-java/pom.xml
b/pulsar-functions/api-java/pom.xml
index 6f0481d..33e2ea0 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -42,6 +42,12 @@
<artifactId>typetools</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-api</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 556086a..013b5f2 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -19,6 +19,10 @@
package org.apache.pulsar.functions.api;
import java.nio.ByteBuffer;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.slf4j.Logger;
import java.util.Collection;
@@ -232,6 +236,7 @@ public interface Context {
* @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro",
"json", "protobuf") or the class name
* of the custom schema class
* @return A future that completes when the framework is done publishing
the message
+ * @deprecated in favor of using {@link #newOutputMessage(String, Schema)}
*/
<O> CompletableFuture<Void> publish(String topicName, O object, String
schemaOrSerdeClassName);
@@ -241,28 +246,18 @@ public interface Context {
* @param topicName The name of the topic for publishing
* @param object The object that needs to be published
* @return A future that completes when the framework is done publishing
the message
+ * @deprecated in favor of using {@link #newOutputMessage(String, Schema)}
*/
<O> CompletableFuture<Void> publish(String topicName, O object);
/**
- * Publish an object using serDe or schema class for serializing to the
topic.
- *
- * @param topicName The name of the topic for publishing
- * @param object The object that needs to be published
- * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro",
"json", "protobuf") or the class name
- * of the custom schema class
- * @param messageConf A map of configurations to set for the message
that will be published
- * The available options are:
- *
- * "key" - Parition Key
- * "properties" - Map of properties
- * "eventTime"
- * "sequenceId"
- * "replicationClusters"
- * "disableReplication"
+ * New output message using schema for serializing to the topic
*
- * @return A future that completes when the framework is done publishing
the message
+ * @param topicName The name of the topic for output message
+ * @param schema provide a way to convert between serialized data and
domain objects
+ * @param <O>
+ * @return the message builder instance
+ * @throws PulsarClientException
*/
- <O> CompletableFuture<Void> publish(String topicName, O object, String
schemaOrSerdeClassName, Map<String, Object> messageConf);
-
+ <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O>
schema) throws PulsarClientException;
}
\ No newline at end of file
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 5c06b49..f0d5ade 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -25,14 +25,7 @@ import io.prometheus.client.Summary;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.HashingScheme;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
@@ -52,12 +45,9 @@ import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkState;
@@ -94,11 +84,13 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
private final Summary userMetricsSummary;
private final static String[] userMetricsLabelNames;
+
static {
// add label to indicate user metric
userMetricsLabelNames =
Arrays.copyOf(ComponentStatsManager.metricsLabelNames,
ComponentStatsManager.metricsLabelNames.length + 1);
userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length]
= "metric";
}
+
private final ComponentType componentType;
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client,
@@ -259,7 +251,7 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
return null;
}
}
-
+
private void ensureStateEnabled() {
checkState(null != stateContext, "State is not enabled.");
}
@@ -337,69 +329,26 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object,
String schemaOrSerdeClassName) {
- return publish(topicName, object, schemaOrSerdeClassName, null);
+ return publish(topicName, object, (Schema<O>)
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
}
@SuppressWarnings("unchecked")
@Override
- public <O> CompletableFuture<Void> publish(String topicName, O object,
String schemaOrSerdeClassName, Map<String, Object> messageConf) {
- return publish(topicName, object, (Schema<O>)
topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false),
messageConf);
+ public <O> TypedMessageBuilder<O> newOutputMessage(String topicName,
Schema<O> schema) throws PulsarClientException {
+ MessageBuilderImpl<O> messageBuilder = new MessageBuilderImpl<>();
+ TypedMessageBuilder<O> typedMessageBuilder = getProducer(topicName,
schema).newMessage();
+ messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
+ return messageBuilder;
}
@SuppressWarnings("unchecked")
- public <O> CompletableFuture<Void> publish(String topicName, O object,
Schema<O> schema, Map<String, Object> messageConf) {
- Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
-
- if (producer == null) {
- try {
- Producer<O> newProducer = ((ProducerBuilderImpl<O>)
producerBuilder.clone())
- .schema(schema)
- .blockIfQueueFull(true)
- .enableBatching(true)
- .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
- .compressionType(CompressionType.LZ4)
- .hashingScheme(HashingScheme.Murmur3_32Hash) //
- .messageRoutingMode(MessageRoutingMode.CustomPartition)
- .messageRouter(FunctionResultRouter.of())
- // set send timeout to be infinity to prevent
potential deadlock with consumer
- // that might happen when consumer is blocked due to
unacked messages
- .sendTimeout(0, TimeUnit.SECONDS)
- .topic(topicName)
- .properties(InstanceUtils.getProperties(componentType,
- FunctionCommon.getFullyQualifiedName(
-
this.config.getFunctionDetails().getTenant(),
-
this.config.getFunctionDetails().getNamespace(),
-
this.config.getFunctionDetails().getName()),
- this.config.getInstanceId()))
- .create();
-
- Producer<O> existingProducer = (Producer<O>)
publishProducers.putIfAbsent(topicName, newProducer);
-
- if (existingProducer != null) {
- // The value in the map was not updated after the
concurrent put
- newProducer.close();
- producer = existingProducer;
- } else {
- producer = newProducer;
- }
-
- } catch (PulsarClientException e) {
- logger.error("Failed to create Producer while doing user
publish", e);
- return FutureUtil.failedFuture(e);
- }
- }
-
- TypedMessageBuilder<O> messageBuilder = producer.newMessage();
- if (messageConf != null) {
- messageBuilder.loadConf(messageConf);
+ public <O> CompletableFuture<Void> publish(String topicName, O object,
Schema<O> schema) {
+ try {
+ return newOutputMessage(topicName,
schema).value(object).sendAsync().thenApply(msgId -> null);
+ } catch (PulsarClientException e) {
+ logger.error("Failed to create Producer while doing user publish",
e);
+ return FutureUtil.failedFuture(e);
}
- CompletableFuture<Void> future =
messageBuilder.value(object).sendAsync().thenApply(msgId -> null);
- future.exceptionally(e -> {
- this.statsManager.incrSysExceptions(e);
- logger.error("Failed to publish to topic {} with error {}",
topicName, e);
- return null;
- });
- return future;
}
@Override
@@ -417,6 +366,46 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
}
}
+ private <O> Producer<O> getProducer(String topicName, Schema<O> schema)
throws PulsarClientException {
+ Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
+
+ if (producer == null) {
+
+ Producer<O> newProducer = ((ProducerBuilderImpl<O>)
producerBuilder.clone())
+ .schema(schema)
+ .blockIfQueueFull(true)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
+ .compressionType(CompressionType.LZ4)
+ .hashingScheme(HashingScheme.Murmur3_32Hash) //
+ .messageRoutingMode(MessageRoutingMode.CustomPartition)
+ .messageRouter(FunctionResultRouter.of())
+ // set send timeout to be infinity to prevent potential
deadlock with consumer
+ // that might happen when consumer is blocked due to
unacked messages
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(topicName)
+ .properties(InstanceUtils.getProperties(componentType,
+ FunctionCommon.getFullyQualifiedName(
+
this.config.getFunctionDetails().getTenant(),
+
this.config.getFunctionDetails().getNamespace(),
+
this.config.getFunctionDetails().getName()),
+ this.config.getInstanceId()))
+ .create();
+
+ Producer<O> existingProducer = (Producer<O>)
publishProducers.putIfAbsent(topicName, newProducer);
+
+ if (existingProducer != null) {
+ // The value in the map was not updated after the concurrent
put
+ newProducer.close();
+ producer = existingProducer;
+ } else {
+ producer = newProducer;
+ }
+
+ }
+ return producer;
+ }
+
public Map<String, Double> getAndResetMetrics() {
Map<String, Double> retval = getMetrics();
resetMetrics();
@@ -443,4 +432,105 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
}
return metricsMap;
}
+
+ class MessageBuilderImpl<O> implements TypedMessageBuilder<O> {
+ private TypedMessageBuilder<O> underlyingBuilder;
+ @Override
+ public MessageId send() throws PulsarClientException {
+ try {
+ return sendAsync().get();
+ } catch (ExecutionException e) {
+ Throwable t = e.getCause();
+ if (t instanceof PulsarClientException) {
+ throw (PulsarClientException) t;
+ } else {
+ throw new PulsarClientException(t);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<MessageId> sendAsync() {
+ return underlyingBuilder.sendAsync()
+ .whenComplete((result, cause) -> {
+ if (null != cause) {
+ statsManager.incrSysExceptions(cause);
+ logger.error("Failed to publish to topic with
error {}", cause);
+ }
+ });
+ }
+
+ @Override
+ public TypedMessageBuilder<O> key(String key) {
+ underlyingBuilder.key(key);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> keyBytes(byte[] key) {
+ underlyingBuilder.keyBytes(key);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> orderingKey(byte[] orderingKey) {
+ underlyingBuilder.orderingKey(orderingKey);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> value(O value) {
+ underlyingBuilder.value(value);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> property(String name, String value) {
+ underlyingBuilder.property(name, value);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> properties(Map<String, String>
properties) {
+ underlyingBuilder.properties(properties);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> eventTime(long timestamp) {
+ underlyingBuilder.eventTime(timestamp);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> sequenceId(long sequenceId) {
+ underlyingBuilder.sequenceId(sequenceId);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> replicationClusters(List<String>
clusters) {
+ underlyingBuilder.replicationClusters(clusters);
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> disableReplication() {
+ underlyingBuilder.disableReplication();
+ return this;
+ }
+
+ @Override
+ public TypedMessageBuilder<O> loadConf(Map<String, Object> config) {
+ underlyingBuilder.loadConf(config);
+ return this;
+ }
+
+ public void setUnderlyingBuilder(TypedMessageBuilder<O>
underlyingBuilder) {
+ this.underlyingBuilder = underlyingBuilder;
+ }
+ }
}
\ No newline at end of file
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 9d2579c..7eb7aae 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -153,6 +153,6 @@ public class ContextImplTest {
@Test
public void testPublishUsingDefaultSchema() throws Exception {
- context.publish("sometopic", "Somevalue");
+ context.newOutputMessage("sometopic",
null).value("Somevalue").sendAsync();
}
}
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
index cda8331..149cce6 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunction.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.functions.api.examples;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
@@ -30,7 +33,11 @@ public class PublishFunction implements Function<String,
Void> {
public Void process(String input, Context context) {
String publishTopic = (String)
context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
String output = String.format("%s!", input);
- context.publish(publishTopic, output);
+ try {
+ context.newOutputMessage(publishTopic,
Schema.STRING).value(output).sendAsync();
+ } catch (PulsarClientException e) {
+ context.getLogger().error(e.toString());
+ }
return null;
}
}
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java
similarity index 71%
rename from
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
rename to
pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java
index 9960552..327d371 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/TypedMessageBuilderPublish.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.functions.api.examples;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
@@ -30,7 +32,7 @@ import java.util.Map;
* to publish to a desired topic based on config and setting various message
configurations to be passed along.
*
*/
-public class PublishFunctionWithMessageConf implements Function<String, Void> {
+public class TypedMessageBuilderPublish implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
String publishTopic = (String)
context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
@@ -40,13 +42,16 @@ public class PublishFunctionWithMessageConf implements
Function<String, Void> {
properties.put("input_topic",
context.getCurrentRecord().getTopicName().get());
properties.putAll(context.getCurrentRecord().getProperties());
- Map<String, Object> messageConf = new HashMap<>();
- messageConf.put(TypedMessageBuilder.CONF_PROPERTIES, properties);
- if (context.getCurrentRecord().getKey().isPresent()) {
- messageConf.put(TypedMessageBuilder.CONF_KEY,
context.getCurrentRecord().getKey().get());
+ try {
+ TypedMessageBuilder messageBuilder =
context.newOutputMessage(publishTopic, Schema.STRING).
+ value(output).properties(properties);
+ if (context.getCurrentRecord().getKey().isPresent()){
+ messageBuilder.key(context.getCurrentRecord().getKey().get());
+ }
+ messageBuilder.eventTime(System.currentTimeMillis()).sendAsync();
+ } catch (PulsarClientException e) {
+ context.getLogger().error(e.toString());
}
- messageConf.put(TypedMessageBuilder.CONF_EVENT_TIME,
System.currentTimeMillis());
- context.publish(publishTopic, output, null, messageConf);
return null;
}
}
diff --git
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
index ec3fd95..41c4d7d 100644
---
a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
+++
b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.functions.api.examples;
import java.util.Optional;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
@@ -31,8 +33,12 @@ public class UserPublishFunction implements Function<String,
Void> {
@Override
public Void process(String input, Context context) {
Optional<Object> topicToWrite = context.getUserConfigValue("topic");
- if (topicToWrite.get() != null) {
- context.publish((String) topicToWrite.get(), input);
+ if (topicToWrite.isPresent()) {
+ try {
+ context.newOutputMessage((String) topicToWrite.get(),
Schema.STRING).value(input).sendAsync();
+ } catch (PulsarClientException e) {
+ e.printStackTrace();
+ }
}
return null;
}
diff --git
a/pulsar-functions/python-examples/publish_function_with_message_conf.py
b/pulsar-functions/python-examples/typed_message_builder_publish.py
similarity index 97%
rename from
pulsar-functions/python-examples/publish_function_with_message_conf.py
rename to pulsar-functions/python-examples/typed_message_builder_publish.py
index 79aac02..c6697a7 100644
--- a/pulsar-functions/python-examples/publish_function_with_message_conf.py
+++ b/pulsar-functions/python-examples/typed_message_builder_publish.py
@@ -23,7 +23,7 @@ from pulsar import Function
# Example function that uses the built in publish function in the context
# to publish to a desired topic based on config
-class PublishFunctionWithMessageConf(Function):
+class TypedMessageBuilderPublish(Function):
def __init__(self):
pass
diff --git
a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index 1945949..0fcc33e 100644
---
a/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++
b/pulsar-functions/windowing/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -282,7 +282,7 @@ public class WindowFunctionExecutor<I, O> implements
Function<I, O> {
this.windowManager.add(record, ts, record);
} else {
if (this.windowConfig.getLateDataTopic() != null) {
- context.publish(this.windowConfig.getLateDataTopic(),
input);
+
context.newOutputMessage(this.windowConfig.getLateDataTopic(),
null).value(input).sendAsync();
} else {
log.info(String.format(
"Received a late tuple %s with ts %d. This will
not be " + "processed"
diff --git
a/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
b/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index 88ecebe..ac96b44 100644
---
a/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++
b/pulsar-functions/windowing/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.windowing;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
@@ -36,8 +37,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
@@ -204,6 +208,10 @@ public class WindowFunctionExecutorTest {
windowConfig.setLateDataTopic("$late");
Mockito.doReturn(Optional.of(new Gson().fromJson(new
Gson().toJson(windowConfig), Map.class)))
.when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
+ TypedMessageBuilder typedMessageBuilder =
Mockito.mock(TypedMessageBuilder.class);
+
Mockito.when(typedMessageBuilder.value(anyString())).thenReturn(typedMessageBuilder);
+
Mockito.when(typedMessageBuilder.sendAsync()).thenReturn(CompletableFuture.anyOf());
+ Mockito.when(context.newOutputMessage(anyString(),
anyObject())).thenReturn(typedMessageBuilder);
long[] timestamps = {603, 605, 607, 618, 626, 636, 600};
List<Long> events = new ArrayList<>(timestamps.length);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 823b7d1..48b3932 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -74,7 +74,7 @@ public abstract class PulsarFunctionsTestBase extends
PulsarTestSuite {
"org.apache.pulsar.functions.api.examples.ExclamationFunction";
public static final String PUBLISH_JAVA_CLASS =
-
"org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf";
+
"org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish";
public static final String EXCEPTION_JAVA_CLASS =
"org.apache.pulsar.tests.integration.functions.ExceptionFunction";
@@ -89,14 +89,13 @@ public abstract class PulsarFunctionsTestBase extends
PulsarTestSuite {
public static final String EXCLAMATION_PYTHONZIP_CLASS =
"exclamation";
+ public static final String PUBLISH_PYTHON_CLASS =
"typed_message_builder_publish.TypedMessageBuilderPublish";
public static final String EXCEPTION_PYTHON_CLASS = "exception_function";
- public static final String PUBLISH_PYTHON_CLASS =
"publish_function_with_message_conf.PublishFunctionWithMessageConf";
-
public static final String EXCLAMATION_PYTHON_FILE =
"exclamation_function.py";
public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE =
"exclamation_with_extra_deps.py";
public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
- public static final String PUBLISH_FUNCTION_PYTHON_FILE =
"publish_function_with_message_conf.py";
+ public static final String PUBLISH_FUNCTION_PYTHON_FILE =
"typed_message_builder_publish.py";
public static final String EXCEPTION_FUNCTION_PYTHON_FILE =
"exception_function.py";
protected static String getExclamationClass(Runtime runtime,