This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6142a4c   Removed the `Context.ack(byte[] messageId)` and expose 
`Record` instead  (#2187)
6142a4c is described below

commit 6142a4c5b03f5a4e26a774fd57ab458844255786
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Jul 19 16:08:16 2018 -0700

     Removed the `Context.ack(byte[] messageId)` and expose `Record` instead  
(#2187)
    
    * Removed the `Context.ack(byte[] messageId)` and expose `Record` instead
    
    * Unified all per-message function context into record
    
    * Fixed compilation
    
    * Fixed compilation
    
    * Fix test
    
    * Fixed mocked object
    
    * Fixed test
---
 .../org/apache/pulsar/functions/api/Context.java   | 20 +----
 .../org/apache/pulsar/functions/api/Record.java    |  7 ++
 .../pulsar/functions/instance/ContextImpl.java     | 89 ++++++++--------------
 .../pulsar/functions/instance/JavaInstance.java    |  7 +-
 .../functions/instance/JavaInstanceRunnable.java   | 31 ++++----
 .../apache/pulsar/functions/sink/PulsarSink.java   |  2 +-
 .../pulsar/functions/source/PulsarRecord.java      |  5 ++
 .../pulsar/functions/source/PulsarSource.java      | 30 ++++++--
 .../apache/pulsar/functions/windowing/Event.java   | 15 ++--
 .../pulsar/functions/windowing/EventImpl.java      | 12 +--
 .../windowing/WindowFunctionExecutor.java          | 33 ++++----
 .../pulsar/functions/windowing/WindowManager.java  | 14 ++--
 .../pulsar/functions/instance/ContextImplTest.java |  7 +-
 .../functions/instance/JavaInstanceTest.java       | 10 +--
 .../windowing/WindowFunctionExecutorTest.java      | 31 ++++++--
 15 files changed, 164 insertions(+), 149 deletions(-)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 2fa513e..9869067 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -34,17 +34,10 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface Context {
     /**
-     * Returns the messageId of the message that we are processing
-     * This messageId is a stringified version of the actual MessageId
-     * @return the messageId
+     * Access the record associated with the current input value
+     * @return
      */
-    byte[] getMessageId();
-
-    /**
-     * The input topic that the message currently being processed belongs to
-     * @return The input topic name
-     */
-    String getCurrentMessageTopicName();
+    Record<?> getCurrentRecord();
 
     /**
      * Get a list of all input topics
@@ -183,11 +176,4 @@ public interface Context {
      */
     <O> CompletableFuture<Void> publish(String topicName, O object);
 
-    /**
-     * By default acknowledgement management is done transparently by Pulsar 
Functions framework.
-     * However users can disable that and do ack management by themselves by 
using this API.
-     * @param messageId The messageId that needs to be acknowledged
-     * @return A future that completes when the framework is done acking the 
message
-     */
-    CompletableFuture<Void> ack(byte[] messageId);
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 38d6ed3..2704909 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -28,6 +28,13 @@ import java.util.Optional;
 public interface Record<T> {
 
     /**
+     * If the record originated from a topic, report the topic name
+     */
+    default Optional<String> getTopicName() {
+        return Optional.empty();
+    }
+
+    /**
      * Return a key if the key has one associated
      */
     Optional<String> getKey();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index b41d6f4..8e8f966 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,19 +18,33 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Getter;
 import lombok.Setter;
+
 import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
@@ -40,21 +54,6 @@ import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * This class implements the Context interface exposed to the user.
  */
@@ -63,8 +62,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     private Logger logger;
 
     // Per Message related
-    private MessageId messageId;
-    private String currentTopicName;
+    private Record<?> record;
 
     @Getter
     @Setter
@@ -98,15 +96,18 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     private Map<String, SerDe> publishSerializers;
     private ProducerConfiguration producerConfiguration;
     private PulsarClient pulsarClient;
-    private ClassLoader classLoader;
-    Consumer inputConsumer;
+    private final ClassLoader classLoader;
+
+    private final List<String> inputTopics;
+
+
     @Getter
     @Setter
     private StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client,
-                       ClassLoader classLoader, Consumer inputConsumer) {
+                       ClassLoader classLoader, List<String> inputTopics) {
         this.config = config;
         this.logger = logger;
         this.pulsarClient = client;
@@ -115,7 +116,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.publishSerializers = new HashMap<>();
-        this.inputConsumer = inputConsumer;
+        this.inputTopics = inputTopics;
         producerConfiguration = new ProducerConfiguration();
         producerConfiguration.setBlockIfQueueFull(true);
         producerConfiguration.setBatchingEnabled(true);
@@ -129,31 +130,18 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         }
     }
 
-    public void setCurrentMessageContext(MessageId messageId, String 
topicName) {
-        this.messageId = messageId;
-        this.currentTopicName = topicName;
+    public void setCurrentMessageContext(Record<?> record) {
+        this.record = record;
     }
 
     @Override
-    public byte[] getMessageId() {
-        return messageId.toByteArray();
-    }
-
-    @Override
-    public String getCurrentMessageTopicName() {
-        return currentTopicName;
+    public Record<?> getCurrentRecord() {
+        return record;
     }
 
     @Override
     public Collection<String> getInputTopics() {
-        if (inputConsumer == null) {
-            return new LinkedList<>();
-        }
-        if (inputConsumer instanceof MultiTopicsConsumerImpl) {
-            return ((MultiTopicsConsumerImpl) inputConsumer).getTopics();
-        } else {
-            return Arrays.asList(inputConsumer.getTopic());
-        }
+        return inputTopics;
     }
 
     @Override
@@ -307,23 +295,6 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
                 .thenApply(msgId -> null);
     }
 
-    //TODO remove topic argument
-    @Override
-    public CompletableFuture<Void> ack(byte[] messageId) {
-        // if inputConsumer is null, then ack is a no-op
-        if (inputConsumer == null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        MessageId actualMessageId = null;
-        try {
-            actualMessageId = MessageId.fromByteArray(messageId);
-
-        } catch (IOException e) {
-            throw new RuntimeException("Invalid message id to ack", e);
-        }
-        return  inputConsumer.acknowledgeAsync(actualMessageId);
-    }
-
     @Override
     public void recordMetric(String metricName, double value) {
         currentAccumulatedMetrics.putIfAbsent(metricName, new 
AccumulatedMetricDatum());
@@ -341,7 +312,7 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
         this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
         this.currentAccumulatedMetrics.clear();
     }
-    
+
     public MetricsData getMetrics() {
         MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder();
         for (String metricName : accumulatedMetrics.keySet()) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 42ade3f..39acdb5 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,8 +21,9 @@ package org.apache.pulsar.functions.instance;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.MessageId;
+
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
 /**
@@ -50,9 +51,9 @@ public class JavaInstance implements AutoCloseable {
         }
     }
 
-    public JavaExecutionResult handleMessage(MessageId messageId, String 
topicName, Object input) {
+    public JavaExecutionResult handleMessage(Record<?> record, Object input) {
         if (context != null) {
-            context.setCurrentMessageContext(messageId, topicName);
+            context.setCurrentMessageContext(record);
         }
         JavaExecutionResult executionResult = new JavaExecutionResult();
         try {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b64224f..3480fd9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,17 +24,19 @@ import static 
org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -48,32 +50,33 @@ import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
 import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import net.jodah.typetools.TypeResolver;
+
 /**
  * A function container implemented using java thread.
  */
@@ -109,7 +112,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     private Source source;
     private Sink sink;
-    
+
     public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
     public static final String METRICS_TOTAL_SUCCESS = 
"__total_successfully_processed__";
     public static final String METRICS_TOTAL_SYS_EXCEPTION = 
"__total_system_exceptions__";
@@ -166,14 +169,14 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
     }
 
     ContextImpl setupContext() {
-        Consumer consumer = null;
+        List<String> inputTopics = null;
         if (source instanceof PulsarSource) {
-            consumer = ((PulsarSource) source).getInputConsumer();
+            inputTopics = ((PulsarSource<?>) source).getInputTopics();
         }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
         return new ContextImpl(instanceConfig, instanceLog, client,
-                Thread.currentThread().getContextClassLoader(), consumer);
+                Thread.currentThread().getContextClassLoader(), inputTopics);
     }
 
     /**
@@ -207,12 +210,12 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
                 String topicName = null;
 
                 if (currentRecord instanceof PulsarRecord) {
-                    PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
+                    PulsarRecord<?> pulsarRecord = (PulsarRecord<?>) 
currentRecord;
                     messageId = pulsarRecord.getMessageId();
-                    topicName = pulsarRecord.getTopicName();
+                    topicName = pulsarRecord.getTopicName().get();
                 }
 
-                result = javaInstance.handleMessage(messageId, topicName, 
currentRecord.getValue());
+                result = javaInstance.handleMessage(currentRecord, 
currentRecord.getValue());
 
                 removeLogTopicHandler();
 
@@ -424,7 +427,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         stats.resetCurrent();
         javaInstance.resetMetrics();
     }
-    
+
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
         addSystemMetrics(METRICS_TOTAL_PROCESSED, 
stats.getStats().getTotalProcessed(), bldr);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8ae9ad0..4be7301 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -240,7 +240,7 @@ public class PulsarSink<T> implements Sink<T> {
         if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
             PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) 
sinkRecord.getSourceRecord();
             // forward user properties to sink-topic
-            msgBuilder.setProperty("__pfn_input_topic__", 
pulsarRecord.getTopicName()).setProperty(
+            msgBuilder.setProperty("__pfn_input_topic__", 
pulsarRecord.getTopicName().get()).setProperty(
                     "__pfn_input_msg_id__",
                     new 
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index c4a1657..c662ad6 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -57,6 +57,11 @@ public class PulsarRecord<T> implements 
RecordWithEncryptionContext<T> {
     }
 
     @Override
+    public Optional<String> getTopicName() {
+        return Optional.of(topicName);
+    }
+
+    @Override
     public Optional<String> getPartitionId() {
         return Optional.of(String.format("%s-%s", topicName, partition));
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f70100d..7903267 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -18,16 +18,26 @@
  */
 package org.apache.pulsar.functions.source;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
@@ -36,14 +46,10 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import net.jodah.typetools.TypeResolver;
 
 @Slf4j
 public class PulsarSource<T> implements Source<T> {
@@ -52,6 +58,7 @@ public class PulsarSource<T> implements Source<T> {
     private PulsarSourceConfig pulsarSourceConfig;
     private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
     private boolean isTopicsPattern;
+    private List<String> inputTopics;
 
     @Getter
     private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer;
@@ -84,6 +91,11 @@ public class PulsarSource<T> implements Source<T> {
             consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
         }
         this.inputConsumer = consumerBuilder.subscribe();
+        if (inputConsumer instanceof MultiTopicsConsumerImpl) {
+            inputTopics = ((MultiTopicsConsumerImpl<?>) 
inputConsumer).getTopics();
+        } else {
+            inputTopics = Collections.singletonList(inputConsumer.getTopic());
+        }
     }
 
     @Override
@@ -190,4 +202,8 @@ public class PulsarSource<T> implements Source<T> {
             }
         }
     }
+
+    public List<String> getInputTopics() {
+        return inputTopics;
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
index 19beb5e..1386f86 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pulsar.functions.windowing;
 
+import org.apache.pulsar.functions.api.Record;
+
 /**
  * An event is a wrapper object that gets stored in the window.
  *
  * @param <T> the type of the object thats wrapped
  */
 public interface Event<T> {
+
+    /**
+     * @return the record associated with the event
+     */
+    Record<?> getRecord();
+
     /**
      * The event timestamp in millis
      *
@@ -46,11 +54,4 @@ public interface Event<T> {
      */
     boolean isWatermark();
 
-
-    /**
-     * Get the message id of this event
-     *
-     * @return byte array of the message id
-     */
-    byte[] getMessageId();
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
index 2497c85..201c051 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
@@ -21,17 +21,19 @@ package org.apache.pulsar.functions.windowing;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
+import org.apache.pulsar.functions.api.Record;
+
 @ToString
 @EqualsAndHashCode
 public class EventImpl<T> implements Event<T> {
+    private final Record<?> record;
     private final T event;
     private final long ts;
-    private final byte[] messageId;
 
-    EventImpl(T event, long ts, byte[] messageId) {
+    EventImpl(T event, long ts, Record<?> record) {
         this.event = event;
         this.ts = ts;
-        this.messageId = messageId;
+        this.record = record;
     }
 
     @Override
@@ -50,7 +52,7 @@ public class EventImpl<T> implements Event<T> {
     }
 
     @Override
-    public byte[] getMessageId() {
-        return messageId;
+    public Record<?> getRecord() {
+        return record;
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index 8f16eac..eb3026d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -19,10 +19,20 @@
 package org.apache.pulsar.functions.windowing;
 
 import com.google.gson.Gson;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.WindowConfig;
 import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
@@ -35,13 +45,7 @@ import 
org.apache.pulsar.functions.windowing.triggers.TimeTriggerPolicy;
 import 
org.apache.pulsar.functions.windowing.triggers.WatermarkCountTriggerPolicy;
 import 
org.apache.pulsar.functions.windowing.triggers.WatermarkTimeTriggerPolicy;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
+import net.jodah.typetools.TypeResolver;
 
 @Slf4j
 public class WindowFunctionExecutor<I, O> implements Function<I, O> {
@@ -206,7 +210,7 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
             @Override
             public void onExpiry(List<Event<I>> events) {
                 for (Event<I> event : events) {
-                    context.ack(event.getMessageId());
+                    event.getRecord().ack();
                 }
             }
 
@@ -275,10 +279,13 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
         if (!this.initialized) {
             initialize(context);
         }
+
+        Record<?> record = context.getCurrentRecord();
+
         if (isEventTime()) {
             long ts = this.timestampExtractor.extractTimestamp(input);
-            if 
(this.waterMarkEventGenerator.track(context.getCurrentMessageTopicName(), ts)) {
-                this.windowManager.add(input, ts, context.getMessageId());
+            if 
(this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) {
+                this.windowManager.add(input, ts, record);
             } else {
                 if (this.windowConfig.getLateDataTopic() != null) {
                     context.publish(this.windowConfig.getLateDataTopic(), 
input, context.getOutputSerdeClassName());
@@ -287,10 +294,10 @@ public class WindowFunctionExecutor<I, O> implements 
Function<I, O> {
                             "Received a late tuple %s with ts %d. This will 
not be " + "processed"
                                     + ".", input, ts));
                 }
-                context.ack(context.getMessageId());
+                record.ack();
             }
         } else {
-            this.windowManager.add(input, System.currentTimeMillis(), 
context.getMessageId());
+            this.windowManager.add(input, System.currentTimeMillis(), record);
         }
         return null;
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
index f121e13..94c2362 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.functions.windowing;
 
-import lombok.extern.slf4j.Slf4j;
+import static 
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE;
+import static 
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS;
+import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,9 +31,9 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static 
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE;
-import static 
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS;
-import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.functions.api.Record;
 
 /**
  * Tracks a window of events and fires {@link WindowLifecycleListener} 
callbacks
@@ -90,8 +92,8 @@ public class WindowManager<T> implements TriggerHandler {
      *  @param event the event to track
      * @param ts the timestamp
      */
-    public void add(T event, long ts, byte[] messageId) {
-        add(new EventImpl<>(event, ts, messageId));
+    public void add(T event, long ts, Record<?> record) {
+        add(new EventImpl<>(event, ts, record));
     }
 
     /**
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 0fb027e..710a127 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -26,7 +26,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import java.nio.ByteBuffer;
-import org.apache.pulsar.client.api.Consumer;
+import java.util.ArrayList;
+
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -43,7 +44,6 @@ public class ContextImplTest {
     private Logger logger;
     private PulsarClient client;
     private ClassLoader classLoader;
-    private Consumer consumer;
     private ContextImpl context;
 
     @Before
@@ -56,13 +56,12 @@ public class ContextImplTest {
         logger = mock(Logger.class);
         client = mock(PulsarClient.class);
         classLoader = getClass().getClassLoader();
-        consumer = mock(Consumer.class);
         context = new ContextImpl(
             config,
             logger,
             client,
             classLoader,
-            consumer
+            new ArrayList<>()
         );
     }
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index f69172a..0cb361d 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -18,14 +18,14 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.functions.api.Function;
-import org.testng.annotations.Test;
-
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+import org.testng.annotations.Test;
+
 public class JavaInstanceTest {
 
     /**
@@ -38,7 +38,7 @@ public class JavaInstanceTest {
             mock(ContextImpl.class),
             (Function<String, String>) (input, context) -> input + "-lambda");
         String testString = "ABC123";
-        JavaExecutionResult result = 
instance.handleMessage(MessageId.earliest, "random", testString);
+        JavaExecutionResult result = 
instance.handleMessage(mock(Record.class), testString);
         assertNotNull(result.getResult());
         assertEquals(new String(testString + "-lambda"), result.getResult());
         instance.close();
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index c5d5d4a..4d56239 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.windowing;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.utils.WindowConfig;
 import org.mockito.Mockito;
@@ -101,6 +102,10 @@ public class WindowFunctionExecutorTest {
         Mockito.doReturn("test-namespace").when(context).getNamespace();
         Mockito.doReturn("test-tenant").when(context).getTenant();
 
+        Record<?> record = Mockito.mock(Record.class);
+        
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+        Mockito.doReturn(record).when(context).getCurrentRecord();
+
         windowConfig = new WindowConfig();
         
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
         windowConfig.setWindowLengthDurationMs(20L);
@@ -111,7 +116,6 @@ public class WindowFunctionExecutorTest {
         
windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
         Mockito.doReturn(Optional.of(new Gson().fromJson(new 
Gson().toJson(windowConfig), 
Map.class))).when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
 
-        
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
         
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
         
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
         Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
@@ -170,7 +174,6 @@ public class WindowFunctionExecutorTest {
         Mockito.doReturn("test-function").when(context).getFunctionName();
         Mockito.doReturn("test-namespace").when(context).getNamespace();
         Mockito.doReturn("test-tenant").when(context).getTenant();
-        
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
         
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
         
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
         Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
@@ -256,10 +259,12 @@ public class WindowFunctionExecutorTest {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -347,10 +352,12 @@ public class WindowFunctionExecutorTest {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -413,10 +420,12 @@ public class WindowFunctionExecutorTest {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -465,10 +474,12 @@ public class WindowFunctionExecutorTest {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -517,10 +528,12 @@ public class WindowFunctionExecutorTest {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -567,10 +580,12 @@ public class WindowFunctionExecutorTest {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());

Reply via email to