This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6142a4c Removed the `Context.ack(byte[] messageId)` and expose
`Record` instead (#2187)
6142a4c is described below
commit 6142a4c5b03f5a4e26a774fd57ab458844255786
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jul 19 16:08:16 2018 -0700
Removed the `Context.ack(byte[] messageId)` and expose `Record` instead
(#2187)
* Removed the `Context.ack(byte[] messageId)` and expose `Record` instead
* Unified all per-message function context into record
* Fixed compilation
* Fixed compilation
* Fix test
* Fixed mocked object
* Fixed test
---
.../org/apache/pulsar/functions/api/Context.java | 20 +----
.../org/apache/pulsar/functions/api/Record.java | 7 ++
.../pulsar/functions/instance/ContextImpl.java | 89 ++++++++--------------
.../pulsar/functions/instance/JavaInstance.java | 7 +-
.../functions/instance/JavaInstanceRunnable.java | 31 ++++----
.../apache/pulsar/functions/sink/PulsarSink.java | 2 +-
.../pulsar/functions/source/PulsarRecord.java | 5 ++
.../pulsar/functions/source/PulsarSource.java | 30 ++++++--
.../apache/pulsar/functions/windowing/Event.java | 15 ++--
.../pulsar/functions/windowing/EventImpl.java | 12 +--
.../windowing/WindowFunctionExecutor.java | 33 ++++----
.../pulsar/functions/windowing/WindowManager.java | 14 ++--
.../pulsar/functions/instance/ContextImplTest.java | 7 +-
.../functions/instance/JavaInstanceTest.java | 10 +--
.../windowing/WindowFunctionExecutorTest.java | 31 ++++++--
15 files changed, 164 insertions(+), 149 deletions(-)
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 2fa513e..9869067 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -34,17 +34,10 @@ import java.util.concurrent.CompletableFuture;
*/
public interface Context {
/**
- * Returns the messageId of the message that we are processing
- * This messageId is a stringified version of the actual MessageId
- * @return the messageId
+ * Access the record associated with the current input value
+ * @return
*/
- byte[] getMessageId();
-
- /**
- * The input topic that the message currently being processed belongs to
- * @return The input topic name
- */
- String getCurrentMessageTopicName();
+ Record<?> getCurrentRecord();
/**
* Get a list of all input topics
@@ -183,11 +176,4 @@ public interface Context {
*/
<O> CompletableFuture<Void> publish(String topicName, O object);
- /**
- * By default acknowledgement management is done transparently by Pulsar
Functions framework.
- * However users can disable that and do ack management by themselves by
using this API.
- * @param messageId The messageId that needs to be acknowledged
- * @return A future that completes when the framework is done acking the
message
- */
- CompletableFuture<Void> ack(byte[] messageId);
}
\ No newline at end of file
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 38d6ed3..2704909 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -28,6 +28,13 @@ import java.util.Optional;
public interface Record<T> {
/**
+ * If the record originated from a topic, report the topic name
+ */
+ default Optional<String> getTopicName() {
+ return Optional.empty();
+ }
+
+ /**
* Return a key if the key has one associated
*/
Optional<String> getKey();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index b41d6f4..8e8f966 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,19 +18,33 @@
*/
package org.apache.pulsar.functions.instance;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
import lombok.Getter;
import lombok.Setter;
+
import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
@@ -40,21 +54,6 @@ import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-
/**
* This class implements the Context interface exposed to the user.
*/
@@ -63,8 +62,7 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
private Logger logger;
// Per Message related
- private MessageId messageId;
- private String currentTopicName;
+ private Record<?> record;
@Getter
@Setter
@@ -98,15 +96,18 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
private Map<String, SerDe> publishSerializers;
private ProducerConfiguration producerConfiguration;
private PulsarClient pulsarClient;
- private ClassLoader classLoader;
- Consumer inputConsumer;
+ private final ClassLoader classLoader;
+
+ private final List<String> inputTopics;
+
+
@Getter
@Setter
private StateContextImpl stateContext;
private Map<String, Object> userConfigs;
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient
client,
- ClassLoader classLoader, Consumer inputConsumer) {
+ ClassLoader classLoader, List<String> inputTopics) {
this.config = config;
this.logger = logger;
this.pulsarClient = client;
@@ -115,7 +116,7 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>();
this.publishSerializers = new HashMap<>();
- this.inputConsumer = inputConsumer;
+ this.inputTopics = inputTopics;
producerConfiguration = new ProducerConfiguration();
producerConfiguration.setBlockIfQueueFull(true);
producerConfiguration.setBatchingEnabled(true);
@@ -129,31 +130,18 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
}
}
- public void setCurrentMessageContext(MessageId messageId, String
topicName) {
- this.messageId = messageId;
- this.currentTopicName = topicName;
+ public void setCurrentMessageContext(Record<?> record) {
+ this.record = record;
}
@Override
- public byte[] getMessageId() {
- return messageId.toByteArray();
- }
-
- @Override
- public String getCurrentMessageTopicName() {
- return currentTopicName;
+ public Record<?> getCurrentRecord() {
+ return record;
}
@Override
public Collection<String> getInputTopics() {
- if (inputConsumer == null) {
- return new LinkedList<>();
- }
- if (inputConsumer instanceof MultiTopicsConsumerImpl) {
- return ((MultiTopicsConsumerImpl) inputConsumer).getTopics();
- } else {
- return Arrays.asList(inputConsumer.getTopic());
- }
+ return inputTopics;
}
@Override
@@ -307,23 +295,6 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
.thenApply(msgId -> null);
}
- //TODO remove topic argument
- @Override
- public CompletableFuture<Void> ack(byte[] messageId) {
- // if inputConsumer is null, then ack is a no-op
- if (inputConsumer == null) {
- return CompletableFuture.completedFuture(null);
- }
- MessageId actualMessageId = null;
- try {
- actualMessageId = MessageId.fromByteArray(messageId);
-
- } catch (IOException e) {
- throw new RuntimeException("Invalid message id to ack", e);
- }
- return inputConsumer.acknowledgeAsync(actualMessageId);
- }
-
@Override
public void recordMetric(String metricName, double value) {
currentAccumulatedMetrics.putIfAbsent(metricName, new
AccumulatedMetricDatum());
@@ -341,7 +312,7 @@ class ContextImpl implements Context, SinkContext,
SourceContext {
this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
this.currentAccumulatedMetrics.clear();
}
-
+
public MetricsData getMetrics() {
MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder();
for (String metricName : accumulatedMetrics.keySet()) {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 42ade3f..39acdb5 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,8 +21,9 @@ package org.apache.pulsar.functions.instance;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.MessageId;
+
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.proto.InstanceCommunication;
/**
@@ -50,9 +51,9 @@ public class JavaInstance implements AutoCloseable {
}
}
- public JavaExecutionResult handleMessage(MessageId messageId, String
topicName, Object input) {
+ public JavaExecutionResult handleMessage(Record<?> record, Object input) {
if (context != null) {
- context.setCurrentMessageContext(messageId, topicName);
+ context.setCurrentMessageContext(record);
}
JavaExecutionResult executionResult = new JavaExecutionResult();
try {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b64224f..3480fd9 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,17 +24,19 @@ import static
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+
import io.netty.buffer.ByteBuf;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -48,32 +50,33 @@ import org.apache.logging.log4j.ThreadContext;
import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import net.jodah.typetools.TypeResolver;
+
/**
* A function container implemented using java thread.
*/
@@ -109,7 +112,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private Source source;
private Sink sink;
-
+
public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
public static final String METRICS_TOTAL_SUCCESS =
"__total_successfully_processed__";
public static final String METRICS_TOTAL_SYS_EXCEPTION =
"__total_system_exceptions__";
@@ -166,14 +169,14 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
}
ContextImpl setupContext() {
- Consumer consumer = null;
+ List<String> inputTopics = null;
if (source instanceof PulsarSource) {
- consumer = ((PulsarSource) source).getInputConsumer();
+ inputTopics = ((PulsarSource<?>) source).getInputTopics();
}
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client,
- Thread.currentThread().getContextClassLoader(), consumer);
+ Thread.currentThread().getContextClassLoader(), inputTopics);
}
/**
@@ -207,12 +210,12 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
String topicName = null;
if (currentRecord instanceof PulsarRecord) {
- PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
+ PulsarRecord<?> pulsarRecord = (PulsarRecord<?>)
currentRecord;
messageId = pulsarRecord.getMessageId();
- topicName = pulsarRecord.getTopicName();
+ topicName = pulsarRecord.getTopicName().get();
}
- result = javaInstance.handleMessage(messageId, topicName,
currentRecord.getValue());
+ result = javaInstance.handleMessage(currentRecord,
currentRecord.getValue());
removeLogTopicHandler();
@@ -424,7 +427,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
stats.resetCurrent();
javaInstance.resetMetrics();
}
-
+
private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr =
InstanceCommunication.MetricsData.newBuilder();
addSystemMetrics(METRICS_TOTAL_PROCESSED,
stats.getStats().getTotalProcessed(), bldr);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8ae9ad0..4be7301 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -240,7 +240,7 @@ public class PulsarSink<T> implements Sink<T> {
if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
PulsarRecord<T> pulsarRecord = (PulsarRecord<T>)
sinkRecord.getSourceRecord();
// forward user properties to sink-topic
- msgBuilder.setProperty("__pfn_input_topic__",
pulsarRecord.getTopicName()).setProperty(
+ msgBuilder.setProperty("__pfn_input_topic__",
pulsarRecord.getTopicName().get()).setProperty(
"__pfn_input_msg_id__",
new
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index c4a1657..c662ad6 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -57,6 +57,11 @@ public class PulsarRecord<T> implements
RecordWithEncryptionContext<T> {
}
@Override
+ public Optional<String> getTopicName() {
+ return Optional.of(topicName);
+ }
+
+ @Override
public Optional<String> getPartitionId() {
return Optional.of(String.format("%s-%s", topicName, partition));
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f70100d..7903267 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -18,16 +18,26 @@
*/
package org.apache.pulsar.functions.source;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
@@ -36,14 +46,10 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import net.jodah.typetools.TypeResolver;
@Slf4j
public class PulsarSource<T> implements Source<T> {
@@ -52,6 +58,7 @@ public class PulsarSource<T> implements Source<T> {
private PulsarSourceConfig pulsarSourceConfig;
private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
private boolean isTopicsPattern;
+ private List<String> inputTopics;
@Getter
private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer;
@@ -84,6 +91,11 @@ public class PulsarSource<T> implements Source<T> {
consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(),
TimeUnit.MILLISECONDS);
}
this.inputConsumer = consumerBuilder.subscribe();
+ if (inputConsumer instanceof MultiTopicsConsumerImpl) {
+ inputTopics = ((MultiTopicsConsumerImpl<?>)
inputConsumer).getTopics();
+ } else {
+ inputTopics = Collections.singletonList(inputConsumer.getTopic());
+ }
}
@Override
@@ -190,4 +202,8 @@ public class PulsarSource<T> implements Source<T> {
}
}
}
+
+ public List<String> getInputTopics() {
+ return inputTopics;
+ }
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
index 19beb5e..1386f86 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
@@ -18,12 +18,20 @@
*/
package org.apache.pulsar.functions.windowing;
+import org.apache.pulsar.functions.api.Record;
+
/**
* An event is a wrapper object that gets stored in the window.
*
* @param <T> the type of the object thats wrapped
*/
public interface Event<T> {
+
+ /**
+ * @return the record associated with the event
+ */
+ Record<?> getRecord();
+
/**
* The event timestamp in millis
*
@@ -46,11 +54,4 @@ public interface Event<T> {
*/
boolean isWatermark();
-
- /**
- * Get the message id of this event
- *
- * @return byte array of the message id
- */
- byte[] getMessageId();
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
index 2497c85..201c051 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
@@ -21,17 +21,19 @@ package org.apache.pulsar.functions.windowing;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import org.apache.pulsar.functions.api.Record;
+
@ToString
@EqualsAndHashCode
public class EventImpl<T> implements Event<T> {
+ private final Record<?> record;
private final T event;
private final long ts;
- private final byte[] messageId;
- EventImpl(T event, long ts, byte[] messageId) {
+ EventImpl(T event, long ts, Record<?> record) {
this.event = event;
this.ts = ts;
- this.messageId = messageId;
+ this.record = record;
}
@Override
@@ -50,7 +52,7 @@ public class EventImpl<T> implements Event<T> {
}
@Override
- public byte[] getMessageId() {
- return messageId;
+ public Record<?> getRecord() {
+ return record;
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index 8f16eac..eb3026d 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -19,10 +19,20 @@
package org.apache.pulsar.functions.windowing;
import com.google.gson.Gson;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.WindowConfig;
import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
@@ -35,13 +45,7 @@ import
org.apache.pulsar.functions.windowing.triggers.TimeTriggerPolicy;
import
org.apache.pulsar.functions.windowing.triggers.WatermarkCountTriggerPolicy;
import
org.apache.pulsar.functions.windowing.triggers.WatermarkTimeTriggerPolicy;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
+import net.jodah.typetools.TypeResolver;
@Slf4j
public class WindowFunctionExecutor<I, O> implements Function<I, O> {
@@ -206,7 +210,7 @@ public class WindowFunctionExecutor<I, O> implements
Function<I, O> {
@Override
public void onExpiry(List<Event<I>> events) {
for (Event<I> event : events) {
- context.ack(event.getMessageId());
+ event.getRecord().ack();
}
}
@@ -275,10 +279,13 @@ public class WindowFunctionExecutor<I, O> implements
Function<I, O> {
if (!this.initialized) {
initialize(context);
}
+
+ Record<?> record = context.getCurrentRecord();
+
if (isEventTime()) {
long ts = this.timestampExtractor.extractTimestamp(input);
- if
(this.waterMarkEventGenerator.track(context.getCurrentMessageTopicName(), ts)) {
- this.windowManager.add(input, ts, context.getMessageId());
+ if
(this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) {
+ this.windowManager.add(input, ts, record);
} else {
if (this.windowConfig.getLateDataTopic() != null) {
context.publish(this.windowConfig.getLateDataTopic(),
input, context.getOutputSerdeClassName());
@@ -287,10 +294,10 @@ public class WindowFunctionExecutor<I, O> implements
Function<I, O> {
"Received a late tuple %s with ts %d. This will
not be " + "processed"
+ ".", input, ts));
}
- context.ack(context.getMessageId());
+ record.ack();
}
} else {
- this.windowManager.add(input, System.currentTimeMillis(),
context.getMessageId());
+ this.windowManager.add(input, System.currentTimeMillis(), record);
}
return null;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
index f121e13..94c2362 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.functions.windowing;
-import lombok.extern.slf4j.Slf4j;
+import static
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE;
+import static
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS;
+import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP;
import java.util.ArrayList;
import java.util.Collection;
@@ -29,9 +31,9 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-import static
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE;
-import static
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS;
-import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.functions.api.Record;
/**
* Tracks a window of events and fires {@link WindowLifecycleListener}
callbacks
@@ -90,8 +92,8 @@ public class WindowManager<T> implements TriggerHandler {
* @param event the event to track
* @param ts the timestamp
*/
- public void add(T event, long ts, byte[] messageId) {
- add(new EventImpl<>(event, ts, messageId));
+ public void add(T event, long ts, Record<?> record) {
+ add(new EventImpl<>(event, ts, record));
}
/**
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 0fb027e..710a127 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -26,7 +26,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
-import org.apache.pulsar.client.api.Consumer;
+import java.util.ArrayList;
+
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -43,7 +44,6 @@ public class ContextImplTest {
private Logger logger;
private PulsarClient client;
private ClassLoader classLoader;
- private Consumer consumer;
private ContextImpl context;
@Before
@@ -56,13 +56,12 @@ public class ContextImplTest {
logger = mock(Logger.class);
client = mock(PulsarClient.class);
classLoader = getClass().getClassLoader();
- consumer = mock(Consumer.class);
context = new ContextImpl(
config,
logger,
client,
classLoader,
- consumer
+ new ArrayList<>()
);
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index f69172a..0cb361d 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -18,14 +18,14 @@
*/
package org.apache.pulsar.functions.instance;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.functions.api.Function;
-import org.testng.annotations.Test;
-
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+import org.testng.annotations.Test;
+
public class JavaInstanceTest {
/**
@@ -38,7 +38,7 @@ public class JavaInstanceTest {
mock(ContextImpl.class),
(Function<String, String>) (input, context) -> input + "-lambda");
String testString = "ABC123";
- JavaExecutionResult result =
instance.handleMessage(MessageId.earliest, "random", testString);
+ JavaExecutionResult result =
instance.handleMessage(mock(Record.class), testString);
assertNotNull(result.getResult());
assertEquals(new String(testString + "-lambda"), result.getResult());
instance.close();
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index c5d5d4a..4d56239 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.windowing;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.utils.WindowConfig;
import org.mockito.Mockito;
@@ -101,6 +102,10 @@ public class WindowFunctionExecutorTest {
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
+ Record<?> record = Mockito.mock(Record.class);
+
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
+
windowConfig = new WindowConfig();
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
windowConfig.setWindowLengthDurationMs(20L);
@@ -111,7 +116,6 @@ public class WindowFunctionExecutorTest {
windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
Mockito.doReturn(Optional.of(new Gson().fromJson(new
Gson().toJson(windowConfig),
Map.class))).when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
-
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
@@ -170,7 +174,6 @@ public class WindowFunctionExecutorTest {
Mockito.doReturn("test-function").when(context).getFunctionName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
-
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
@@ -256,10 +259,12 @@ public class WindowFunctionExecutorTest {
Mockito.doReturn("test-function").when(context).getFunctionName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
-
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+ Record<?> record = Mockito.mock(Record.class);
+
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
WindowConfig windowConfig = new WindowConfig();
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -347,10 +352,12 @@ public class WindowFunctionExecutorTest {
Mockito.doReturn("test-function").when(context).getFunctionName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
-
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+ Record<?> record = Mockito.mock(Record.class);
+
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
WindowConfig windowConfig = new WindowConfig();
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -413,10 +420,12 @@ public class WindowFunctionExecutorTest {
Mockito.doReturn("test-function").when(context).getFunctionName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
-
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+ Record<?> record = Mockito.mock(Record.class);
+
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
WindowConfig windowConfig = new WindowConfig();
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -465,10 +474,12 @@ public class WindowFunctionExecutorTest {
Mockito.doReturn("test-function").when(context).getFunctionName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
-
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+ Record<?> record = Mockito.mock(Record.class);
+
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
WindowConfig windowConfig = new WindowConfig();
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -517,10 +528,12 @@ public class WindowFunctionExecutorTest {
Mockito.doReturn("test-function").when(context).getFunctionName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
-
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+ Record<?> record = Mockito.mock(Record.class);
+
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
WindowConfig windowConfig = new WindowConfig();
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -567,10 +580,12 @@ public class WindowFunctionExecutorTest {
Mockito.doReturn("test-function").when(context).getFunctionName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
-
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+ Record<?> record = Mockito.mock(Record.class);
+
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+ Mockito.doReturn(record).when(context).getCurrentRecord();
WindowConfig windowConfig = new WindowConfig();
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());