merlimat closed pull request #2187:  Removed the `Context.ack(byte[] 
messageId)` and expose `Record` instead 
URL: https://github.com/apache/incubator-pulsar/pull/2187
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 2fa513e120..98690674b2 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -34,17 +34,10 @@
  */
 public interface Context {
     /**
-     * Returns the messageId of the message that we are processing
-     * This messageId is a stringified version of the actual MessageId
-     * @return the messageId
+     * Access the record associated with the current input value
+     * @return
      */
-    byte[] getMessageId();
-
-    /**
-     * The input topic that the message currently being processed belongs to
-     * @return The input topic name
-     */
-    String getCurrentMessageTopicName();
+    Record<?> getCurrentRecord();
 
     /**
      * Get a list of all input topics
@@ -183,11 +176,4 @@
      */
     <O> CompletableFuture<Void> publish(String topicName, O object);
 
-    /**
-     * By default acknowledgement management is done transparently by Pulsar 
Functions framework.
-     * However users can disable that and do ack management by themselves by 
using this API.
-     * @param messageId The messageId that needs to be acknowledged
-     * @return A future that completes when the framework is done acking the 
message
-     */
-    CompletableFuture<Void> ack(byte[] messageId);
 }
\ No newline at end of file
diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 38d6ed36d7..2704909043 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -27,6 +27,13 @@
  */
 public interface Record<T> {
 
+    /**
+     * If the record originated from a topic, report the topic name
+     */
+    default Optional<String> getTopicName() {
+        return Optional.empty();
+    }
+
     /**
      * Return a key if the key has one associated
      */
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index b41d6f48ab..8e8f9665f0 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -18,19 +18,33 @@
  */
 package org.apache.pulsar.functions.instance;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Getter;
 import lombok.Setter;
+
 import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConfiguration;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
@@ -40,21 +54,6 @@
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * This class implements the Context interface exposed to the user.
  */
@@ -63,8 +62,7 @@
     private Logger logger;
 
     // Per Message related
-    private MessageId messageId;
-    private String currentTopicName;
+    private Record<?> record;
 
     @Getter
     @Setter
@@ -98,15 +96,18 @@ public void update(double value) {
     private Map<String, SerDe> publishSerializers;
     private ProducerConfiguration producerConfiguration;
     private PulsarClient pulsarClient;
-    private ClassLoader classLoader;
-    Consumer inputConsumer;
+    private final ClassLoader classLoader;
+
+    private final List<String> inputTopics;
+
+
     @Getter
     @Setter
     private StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient 
client,
-                       ClassLoader classLoader, Consumer inputConsumer) {
+                       ClassLoader classLoader, List<String> inputTopics) {
         this.config = config;
         this.logger = logger;
         this.pulsarClient = client;
@@ -115,7 +116,7 @@ public ContextImpl(InstanceConfig config, Logger logger, 
PulsarClient client,
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.publishSerializers = new HashMap<>();
-        this.inputConsumer = inputConsumer;
+        this.inputTopics = inputTopics;
         producerConfiguration = new ProducerConfiguration();
         producerConfiguration.setBlockIfQueueFull(true);
         producerConfiguration.setBatchingEnabled(true);
@@ -129,31 +130,18 @@ public ContextImpl(InstanceConfig config, Logger logger, 
PulsarClient client,
         }
     }
 
-    public void setCurrentMessageContext(MessageId messageId, String 
topicName) {
-        this.messageId = messageId;
-        this.currentTopicName = topicName;
+    public void setCurrentMessageContext(Record<?> record) {
+        this.record = record;
     }
 
     @Override
-    public byte[] getMessageId() {
-        return messageId.toByteArray();
-    }
-
-    @Override
-    public String getCurrentMessageTopicName() {
-        return currentTopicName;
+    public Record<?> getCurrentRecord() {
+        return record;
     }
 
     @Override
     public Collection<String> getInputTopics() {
-        if (inputConsumer == null) {
-            return new LinkedList<>();
-        }
-        if (inputConsumer instanceof MultiTopicsConsumerImpl) {
-            return ((MultiTopicsConsumerImpl) inputConsumer).getTopics();
-        } else {
-            return Arrays.asList(inputConsumer.getTopic());
-        }
+        return inputTopics;
     }
 
     @Override
@@ -307,23 +295,6 @@ public ByteBuffer getState(String key) {
                 .thenApply(msgId -> null);
     }
 
-    //TODO remove topic argument
-    @Override
-    public CompletableFuture<Void> ack(byte[] messageId) {
-        // if inputConsumer is null, then ack is a no-op
-        if (inputConsumer == null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        MessageId actualMessageId = null;
-        try {
-            actualMessageId = MessageId.fromByteArray(messageId);
-
-        } catch (IOException e) {
-            throw new RuntimeException("Invalid message id to ack", e);
-        }
-        return  inputConsumer.acknowledgeAsync(actualMessageId);
-    }
-
     @Override
     public void recordMetric(String metricName, double value) {
         currentAccumulatedMetrics.putIfAbsent(metricName, new 
AccumulatedMetricDatum());
@@ -341,7 +312,7 @@ public void resetMetrics() {
         this.accumulatedMetrics.putAll(currentAccumulatedMetrics);
         this.currentAccumulatedMetrics.clear();
     }
-    
+
     public MetricsData getMetrics() {
         MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder();
         for (String metricName : accumulatedMetrics.keySet()) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 42ade3ffd1..39acdb57c9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,8 +21,9 @@
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.MessageId;
+
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 
 /**
@@ -50,9 +51,9 @@ public JavaInstance(ContextImpl contextImpl, Object 
userClassObject) {
         }
     }
 
-    public JavaExecutionResult handleMessage(MessageId messageId, String 
topicName, Object input) {
+    public JavaExecutionResult handleMessage(Record<?> record, Object input) {
         if (context != null) {
-            context.setCurrentMessageContext(messageId, topicName);
+            context.setCurrentMessageContext(record);
         }
         JavaExecutionResult executionResult = new JavaExecutionResult();
         try {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b479852b8c..92f129a637 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,17 +24,19 @@
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -47,32 +49,33 @@
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.state.StateContextImpl;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
-import org.apache.pulsar.functions.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
 import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import net.jodah.typetools.TypeResolver;
+
 /**
  * A function container implemented using java thread.
  */
@@ -108,7 +111,7 @@
 
     private Source source;
     private Sink sink;
-    
+
     public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
     public static final String METRICS_TOTAL_SUCCESS = 
"__total_successfully_processed__";
     public static final String METRICS_TOTAL_SYS_EXCEPTION = 
"__total_system_exceptions__";
@@ -165,14 +168,14 @@ JavaInstance setupJavaInstance(ContextImpl contextImpl) 
throws Exception {
     }
 
     ContextImpl setupContext() {
-        Consumer consumer = null;
+        List<String> inputTopics = null;
         if (source instanceof PulsarSource) {
-            consumer = ((PulsarSource) source).getInputConsumer();
+            inputTopics = ((PulsarSource<?>) source).getInputTopics();
         }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
         return new ContextImpl(instanceConfig, instanceLog, client,
-                Thread.currentThread().getContextClassLoader(), consumer);
+                Thread.currentThread().getContextClassLoader(), inputTopics);
     }
 
     /**
@@ -206,12 +209,12 @@ public void run() {
                 String topicName = null;
 
                 if (currentRecord instanceof PulsarRecord) {
-                    PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
+                    PulsarRecord<?> pulsarRecord = (PulsarRecord<?>) 
currentRecord;
                     messageId = pulsarRecord.getMessageId();
-                    topicName = pulsarRecord.getTopicName();
+                    topicName = pulsarRecord.getTopicName().get();
                 }
 
-                result = javaInstance.handleMessage(messageId, topicName, 
currentRecord.getValue());
+                result = javaInstance.handleMessage(currentRecord, 
currentRecord.getValue());
 
                 removeLogTopicHandler();
 
@@ -423,7 +426,7 @@ public void resetMetrics() {
         stats.resetCurrent();
         javaInstance.resetMetrics();
     }
-    
+
     private Builder createMetricsDataBuilder() {
         InstanceCommunication.MetricsData.Builder bldr = 
InstanceCommunication.MetricsData.newBuilder();
         addSystemMetrics(METRICS_TOTAL_PROCESSED, 
stats.getStats().getTotalProcessed(), bldr);
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8ae9ad0074..4be73018d3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -240,7 +240,7 @@ public void write(Record<T> record) throws Exception {
         if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
             PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) 
sinkRecord.getSourceRecord();
             // forward user properties to sink-topic
-            msgBuilder.setProperty("__pfn_input_topic__", 
pulsarRecord.getTopicName()).setProperty(
+            msgBuilder.setProperty("__pfn_input_topic__", 
pulsarRecord.getTopicName().get()).setProperty(
                     "__pfn_input_msg_id__",
                     new 
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index c4a1657414..c662ad63c4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -56,6 +56,11 @@
         }
     }
 
+    @Override
+    public Optional<String> getTopicName() {
+        return Optional.of(topicName);
+    }
+
     @Override
     public Optional<String> getPartitionId() {
         return Optional.of(String.format("%s-%s", topicName, partition));
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f70100d268..7903267aa9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -18,16 +18,26 @@
  */
 package org.apache.pulsar.functions.source;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
@@ -36,14 +46,10 @@
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import net.jodah.typetools.TypeResolver;
 
 @Slf4j
 public class PulsarSource<T> implements Source<T> {
@@ -52,6 +58,7 @@
     private PulsarSourceConfig pulsarSourceConfig;
     private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
     private boolean isTopicsPattern;
+    private List<String> inputTopics;
 
     @Getter
     private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer;
@@ -84,6 +91,11 @@ public void open(Map<String, Object> config, SourceContext 
sourceContext) throws
             consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
         }
         this.inputConsumer = consumerBuilder.subscribe();
+        if (inputConsumer instanceof MultiTopicsConsumerImpl) {
+            inputTopics = ((MultiTopicsConsumerImpl<?>) 
inputConsumer).getTopics();
+        } else {
+            inputTopics = Collections.singletonList(inputConsumer.getTopic());
+        }
     }
 
     @Override
@@ -190,4 +202,8 @@ void setupSerDe() throws ClassNotFoundException {
             }
         }
     }
+
+    public List<String> getInputTopics() {
+        return inputTopics;
+    }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
index 19beb5eb65..1386f86da4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java
@@ -18,12 +18,20 @@
  */
 package org.apache.pulsar.functions.windowing;
 
+import org.apache.pulsar.functions.api.Record;
+
 /**
  * An event is a wrapper object that gets stored in the window.
  *
  * @param <T> the type of the object thats wrapped
  */
 public interface Event<T> {
+
+    /**
+     * @return the record associated with the event
+     */
+    Record<?> getRecord();
+
     /**
      * The event timestamp in millis
      *
@@ -46,11 +54,4 @@
      */
     boolean isWatermark();
 
-
-    /**
-     * Get the message id of this event
-     *
-     * @return byte array of the message id
-     */
-    byte[] getMessageId();
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
index 2497c8521c..201c0511fe 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java
@@ -21,17 +21,19 @@
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 
+import org.apache.pulsar.functions.api.Record;
+
 @ToString
 @EqualsAndHashCode
 public class EventImpl<T> implements Event<T> {
+    private final Record<?> record;
     private final T event;
     private final long ts;
-    private final byte[] messageId;
 
-    EventImpl(T event, long ts, byte[] messageId) {
+    EventImpl(T event, long ts, Record<?> record) {
         this.event = event;
         this.ts = ts;
-        this.messageId = messageId;
+        this.record = record;
     }
 
     @Override
@@ -50,7 +52,7 @@ public boolean isWatermark() {
     }
 
     @Override
-    public byte[] getMessageId() {
-        return messageId;
+    public Record<?> getRecord() {
+        return record;
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index 8f16eacd37..eb3026da57 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -19,10 +19,20 @@
 package org.apache.pulsar.functions.windowing;
 
 import com.google.gson.Gson;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
+
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
+
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.WindowConfig;
 import org.apache.pulsar.functions.utils.validation.ValidatorImpls;
@@ -35,13 +45,7 @@
 import 
org.apache.pulsar.functions.windowing.triggers.WatermarkCountTriggerPolicy;
 import 
org.apache.pulsar.functions.windowing.triggers.WatermarkTimeTriggerPolicy;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
+import net.jodah.typetools.TypeResolver;
 
 @Slf4j
 public class WindowFunctionExecutor<I, O> implements Function<I, O> {
@@ -206,7 +210,7 @@ private WindowConfig getWindowConfigs(Context context) {
             @Override
             public void onExpiry(List<Event<I>> events) {
                 for (Event<I> event : events) {
-                    context.ack(event.getMessageId());
+                    event.getRecord().ack();
                 }
             }
 
@@ -275,10 +279,13 @@ public O process(I input, Context context) throws 
Exception {
         if (!this.initialized) {
             initialize(context);
         }
+
+        Record<?> record = context.getCurrentRecord();
+
         if (isEventTime()) {
             long ts = this.timestampExtractor.extractTimestamp(input);
-            if 
(this.waterMarkEventGenerator.track(context.getCurrentMessageTopicName(), ts)) {
-                this.windowManager.add(input, ts, context.getMessageId());
+            if 
(this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) {
+                this.windowManager.add(input, ts, record);
             } else {
                 if (this.windowConfig.getLateDataTopic() != null) {
                     context.publish(this.windowConfig.getLateDataTopic(), 
input, context.getOutputSerdeClassName());
@@ -287,10 +294,10 @@ public O process(I input, Context context) throws 
Exception {
                             "Received a late tuple %s with ts %d. This will 
not be " + "processed"
                                     + ".", input, ts));
                 }
-                context.ack(context.getMessageId());
+                record.ack();
             }
         } else {
-            this.windowManager.add(input, System.currentTimeMillis(), 
context.getMessageId());
+            this.windowManager.add(input, System.currentTimeMillis(), record);
         }
         return null;
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
index f121e13344..94c236212a 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.functions.windowing;
 
-import lombok.extern.slf4j.Slf4j;
+import static 
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE;
+import static 
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS;
+import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -29,9 +31,9 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static 
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE;
-import static 
org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS;
-import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.functions.api.Record;
 
 /**
  * Tracks a window of events and fires {@link WindowLifecycleListener} 
callbacks
@@ -90,8 +92,8 @@ public void setTriggerPolicy(TriggerPolicy<T, ?> 
triggerPolicy) {
      *  @param event the event to track
      * @param ts the timestamp
      */
-    public void add(T event, long ts, byte[] messageId) {
-        add(new EventImpl<>(event, ts, messageId));
+    public void add(T event, long ts, Record<?> record) {
+        add(new EventImpl<>(event, ts, record));
     }
 
     /**
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index 0fb027e808..710a127354 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -26,7 +26,8 @@
 import static org.mockito.Mockito.verify;
 
 import java.nio.ByteBuffer;
-import org.apache.pulsar.client.api.Consumer;
+import java.util.ArrayList;
+
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -43,7 +44,6 @@
     private Logger logger;
     private PulsarClient client;
     private ClassLoader classLoader;
-    private Consumer consumer;
     private ContextImpl context;
 
     @Before
@@ -56,13 +56,12 @@ public void setup() {
         logger = mock(Logger.class);
         client = mock(PulsarClient.class);
         classLoader = getClass().getClassLoader();
-        consumer = mock(Consumer.class);
         context = new ContextImpl(
             config,
             logger,
             client,
             classLoader,
-            consumer
+            new ArrayList<>()
         );
     }
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index f69172ae05..0cb361de67 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -18,14 +18,14 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.functions.api.Function;
-import org.testng.annotations.Test;
-
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
+import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.api.Record;
+import org.testng.annotations.Test;
+
 public class JavaInstanceTest {
 
     /**
@@ -38,7 +38,7 @@ public void testLambda() {
             mock(ContextImpl.class),
             (Function<String, String>) (input, context) -> input + "-lambda");
         String testString = "ABC123";
-        JavaExecutionResult result = 
instance.handleMessage(MessageId.earliest, "random", testString);
+        JavaExecutionResult result = 
instance.handleMessage(mock(Record.class), testString);
         assertNotNull(result.getResult());
         assertEquals(new String(testString + "-lambda"), result.getResult());
         instance.close();
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
index c5d5d4ac22..4d56239f1e 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java
@@ -21,6 +21,7 @@
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.utils.WindowConfig;
 import org.mockito.Mockito;
@@ -101,6 +102,10 @@ public void setUp() {
         Mockito.doReturn("test-namespace").when(context).getNamespace();
         Mockito.doReturn("test-tenant").when(context).getTenant();
 
+        Record<?> record = Mockito.mock(Record.class);
+        
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+        Mockito.doReturn(record).when(context).getCurrentRecord();
+
         windowConfig = new WindowConfig();
         
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
         windowConfig.setWindowLengthDurationMs(20L);
@@ -111,7 +116,6 @@ public void setUp() {
         
windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName());
         Mockito.doReturn(Optional.of(new Gson().fromJson(new 
Gson().toJson(windowConfig), 
Map.class))).when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY);
 
-        
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
         
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
         
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
         Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
@@ -170,7 +174,6 @@ public void testPrepareLateTupleStreamWithoutTs() throws 
Exception {
         Mockito.doReturn("test-function").when(context).getFunctionName();
         Mockito.doReturn("test-namespace").when(context).getNamespace();
         Mockito.doReturn("test-tenant").when(context).getTenant();
-        
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
         
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
         
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
         Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
@@ -256,10 +259,12 @@ public void testSettingSlidingCountWindow() throws 
Exception {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -347,10 +352,12 @@ public void testSettingSlidingTimeWindow() throws 
Exception {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -413,10 +420,12 @@ public void testSettingTumblingCountWindow() throws 
Exception {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -465,10 +474,12 @@ public void testSettingTumblingTimeWindow() throws 
Exception {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -517,10 +528,12 @@ public void testSettingLagTime() throws Exception {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());
@@ -567,10 +580,12 @@ public void testSettingWaterMarkInterval() throws 
Exception {
                 
Mockito.doReturn("test-function").when(context).getFunctionName();
                 
Mockito.doReturn("test-namespace").when(context).getNamespace();
                 Mockito.doReturn("test-tenant").when(context).getTenant();
-                
Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName();
                 
Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName();
                 
Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics();
                 
Mockito.doReturn("test-sink-topic").when(context).getOutputTopic();
+                Record<?> record = Mockito.mock(Record.class);
+                
Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName();
+                Mockito.doReturn(record).when(context).getCurrentRecord();
 
                 WindowConfig windowConfig = new WindowConfig();
                 
windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to