This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6142a4c Removed the `Context.ack(byte[] messageId)` and expose `Record` instead (#2187) 6142a4c is described below commit 6142a4c5b03f5a4e26a774fd57ab458844255786 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Jul 19 16:08:16 2018 -0700 Removed the `Context.ack(byte[] messageId)` and expose `Record` instead (#2187) * Removed the `Context.ack(byte[] messageId)` and expose `Record` instead * Unified all per-message function context into record * Fixed compilation * Fixed compilation * Fix test * Fixed mocked object * Fixed test --- .../org/apache/pulsar/functions/api/Context.java | 20 +---- .../org/apache/pulsar/functions/api/Record.java | 7 ++ .../pulsar/functions/instance/ContextImpl.java | 89 ++++++++-------------- .../pulsar/functions/instance/JavaInstance.java | 7 +- .../functions/instance/JavaInstanceRunnable.java | 31 ++++---- .../apache/pulsar/functions/sink/PulsarSink.java | 2 +- .../pulsar/functions/source/PulsarRecord.java | 5 ++ .../pulsar/functions/source/PulsarSource.java | 30 ++++++-- .../apache/pulsar/functions/windowing/Event.java | 15 ++-- .../pulsar/functions/windowing/EventImpl.java | 12 +-- .../windowing/WindowFunctionExecutor.java | 33 ++++---- .../pulsar/functions/windowing/WindowManager.java | 14 ++-- .../pulsar/functions/instance/ContextImplTest.java | 7 +- .../functions/instance/JavaInstanceTest.java | 10 +-- .../windowing/WindowFunctionExecutorTest.java | 31 ++++++-- 15 files changed, 164 insertions(+), 149 deletions(-) diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 2fa513e..9869067 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -34,17 +34,10 @@ import java.util.concurrent.CompletableFuture; */ public interface Context { /** - * Returns the messageId of the message that we are processing - * This messageId is a stringified version of the actual MessageId - * @return the messageId + * Access the record associated with the current input value + * @return */ - byte[] getMessageId(); - - /** - * The input topic that the message currently being processed belongs to - * @return The input topic name - */ - String getCurrentMessageTopicName(); + Record<?> getCurrentRecord(); /** * Get a list of all input topics @@ -183,11 +176,4 @@ public interface Context { */ <O> CompletableFuture<Void> publish(String topicName, O object); - /** - * By default acknowledgement management is done transparently by Pulsar Functions framework. - * However users can disable that and do ack management by themselves by using this API. - * @param messageId The messageId that needs to be acknowledged - * @return A future that completes when the framework is done acking the message - */ - CompletableFuture<Void> ack(byte[] messageId); } \ No newline at end of file diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java index 38d6ed3..2704909 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java @@ -28,6 +28,13 @@ import java.util.Optional; public interface Record<T> { /** + * If the record originated from a topic, report the topic name + */ + default Optional<String> getTopicName() { + return Optional.empty(); + } + + /** * Return a key if the key has one associated */ Optional<String> getKey(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index b41d6f4..8e8f966 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -18,19 +18,33 @@ */ package org.apache.pulsar.functions.instance; +import static com.google.common.base.Preconditions.checkState; + import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + import lombok.Getter; import lombok.Setter; + import org.apache.commons.lang.StringUtils; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.state.StateContextImpl; @@ -40,21 +54,6 @@ import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkState; - /** * This class implements the Context interface exposed to the user. */ @@ -63,8 +62,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { private Logger logger; // Per Message related - private MessageId messageId; - private String currentTopicName; + private Record<?> record; @Getter @Setter @@ -98,15 +96,18 @@ class ContextImpl implements Context, SinkContext, SourceContext { private Map<String, SerDe> publishSerializers; private ProducerConfiguration producerConfiguration; private PulsarClient pulsarClient; - private ClassLoader classLoader; - Consumer inputConsumer; + private final ClassLoader classLoader; + + private final List<String> inputTopics; + + @Getter @Setter private StateContextImpl stateContext; private Map<String, Object> userConfigs; public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, - ClassLoader classLoader, Consumer inputConsumer) { + ClassLoader classLoader, List<String> inputTopics) { this.config = config; this.logger = logger; this.pulsarClient = client; @@ -115,7 +116,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { this.accumulatedMetrics = new ConcurrentHashMap<>(); this.publishProducers = new HashMap<>(); this.publishSerializers = new HashMap<>(); - this.inputConsumer = inputConsumer; + this.inputTopics = inputTopics; producerConfiguration = new ProducerConfiguration(); producerConfiguration.setBlockIfQueueFull(true); producerConfiguration.setBatchingEnabled(true); @@ -129,31 +130,18 @@ class ContextImpl implements Context, SinkContext, SourceContext { } } - public void setCurrentMessageContext(MessageId messageId, String topicName) { - this.messageId = messageId; - this.currentTopicName = topicName; + public void setCurrentMessageContext(Record<?> record) { + this.record = record; } @Override - public byte[] getMessageId() { - return messageId.toByteArray(); - } - - @Override - public String getCurrentMessageTopicName() { - return currentTopicName; + public Record<?> getCurrentRecord() { + return record; } @Override public Collection<String> getInputTopics() { - if (inputConsumer == null) { - return new LinkedList<>(); - } - if (inputConsumer instanceof MultiTopicsConsumerImpl) { - return ((MultiTopicsConsumerImpl) inputConsumer).getTopics(); - } else { - return Arrays.asList(inputConsumer.getTopic()); - } + return inputTopics; } @Override @@ -307,23 +295,6 @@ class ContextImpl implements Context, SinkContext, SourceContext { .thenApply(msgId -> null); } - //TODO remove topic argument - @Override - public CompletableFuture<Void> ack(byte[] messageId) { - // if inputConsumer is null, then ack is a no-op - if (inputConsumer == null) { - return CompletableFuture.completedFuture(null); - } - MessageId actualMessageId = null; - try { - actualMessageId = MessageId.fromByteArray(messageId); - - } catch (IOException e) { - throw new RuntimeException("Invalid message id to ack", e); - } - return inputConsumer.acknowledgeAsync(actualMessageId); - } - @Override public void recordMetric(String metricName, double value) { currentAccumulatedMetrics.putIfAbsent(metricName, new AccumulatedMetricDatum()); @@ -341,7 +312,7 @@ class ContextImpl implements Context, SinkContext, SourceContext { this.accumulatedMetrics.putAll(currentAccumulatedMetrics); this.currentAccumulatedMetrics.clear(); } - + public MetricsData getMetrics() { MetricsData.Builder metricsDataBuilder = MetricsData.newBuilder(); for (String metricName : accumulatedMetrics.keySet()) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 42ade3f..39acdb5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -21,8 +21,9 @@ package org.apache.pulsar.functions.instance; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.MessageId; + import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.proto.InstanceCommunication; /** @@ -50,9 +51,9 @@ public class JavaInstance implements AutoCloseable { } } - public JavaExecutionResult handleMessage(MessageId messageId, String topicName, Object input) { + public JavaExecutionResult handleMessage(Record<?> record, Object input) { if (context != null) { - context.setCurrentMessageContext(messageId, topicName); + context.setCurrentMessageContext(record); } JavaExecutionResult executionResult = new JavaExecutionResult(); try { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index b64224f..3480fd9 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -24,17 +24,19 @@ import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; + import io.netty.buffer.ByteBuf; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import net.jodah.typetools.TypeResolver; + import org.apache.bookkeeper.api.StorageClient; import org.apache.bookkeeper.api.kv.Table; import org.apache.bookkeeper.clients.StorageClientBuilder; @@ -48,32 +50,33 @@ import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.instance.state.StateContextImpl; +import org.apache.pulsar.functions.proto.Function.SinkSpec; +import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder; -import org.apache.pulsar.functions.proto.Function.SourceSpec; -import org.apache.pulsar.functions.proto.Function.SinkSpec; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.sink.PulsarSinkConfig; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; import org.apache.pulsar.functions.utils.FunctionConfig; -import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; -import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import net.jodah.typetools.TypeResolver; + /** * A function container implemented using java thread. */ @@ -109,7 +112,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Source source; private Sink sink; - + public static final String METRICS_TOTAL_PROCESSED = "__total_processed__"; public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__"; public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__"; @@ -166,14 +169,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { } ContextImpl setupContext() { - Consumer consumer = null; + List<String> inputTopics = null; if (source instanceof PulsarSource) { - consumer = ((PulsarSource) source).getInputConsumer(); + inputTopics = ((PulsarSource<?>) source).getInputTopics(); } Logger instanceLog = LoggerFactory.getLogger( "function-" + instanceConfig.getFunctionDetails().getName()); return new ContextImpl(instanceConfig, instanceLog, client, - Thread.currentThread().getContextClassLoader(), consumer); + Thread.currentThread().getContextClassLoader(), inputTopics); } /** @@ -207,12 +210,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { String topicName = null; if (currentRecord instanceof PulsarRecord) { - PulsarRecord pulsarRecord = (PulsarRecord) currentRecord; + PulsarRecord<?> pulsarRecord = (PulsarRecord<?>) currentRecord; messageId = pulsarRecord.getMessageId(); - topicName = pulsarRecord.getTopicName(); + topicName = pulsarRecord.getTopicName().get(); } - result = javaInstance.handleMessage(messageId, topicName, currentRecord.getValue()); + result = javaInstance.handleMessage(currentRecord, currentRecord.getValue()); removeLogTopicHandler(); @@ -424,7 +427,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { stats.resetCurrent(); javaInstance.resetMetrics(); } - + private Builder createMetricsDataBuilder() { InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder(); addSystemMetrics(METRICS_TOTAL_PROCESSED, stats.getStats().getTotalProcessed(), bldr); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 8ae9ad0..4be7301 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -240,7 +240,7 @@ public class PulsarSink<T> implements Sink<T> { if (sinkRecord.getSourceRecord() instanceof PulsarRecord) { PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) sinkRecord.getSourceRecord(); // forward user properties to sink-topic - msgBuilder.setProperty("__pfn_input_topic__", pulsarRecord.getTopicName()).setProperty( + msgBuilder.setProperty("__pfn_input_topic__", pulsarRecord.getTopicName().get()).setProperty( "__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray()))); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java index c4a1657..c662ad6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java @@ -57,6 +57,11 @@ public class PulsarRecord<T> implements RecordWithEncryptionContext<T> { } @Override + public Optional<String> getTopicName() { + return Optional.of(topicName); + } + + @Override public Optional<String> getPartitionId() { return Optional.of(String.format("%s-%s", topicName, partition)); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index f70100d..7903267 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -18,16 +18,26 @@ */ package org.apache.pulsar.functions.source; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + import com.google.common.annotations.VisibleForTesting; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import net.jodah.typetools.TypeResolver; -import static org.apache.commons.lang3.StringUtils.isNotBlank; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.functions.api.Record; @@ -36,14 +46,10 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.InstanceUtils; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; -import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.SourceContext; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import net.jodah.typetools.TypeResolver; @Slf4j public class PulsarSource<T> implements Source<T> { @@ -52,6 +58,7 @@ public class PulsarSource<T> implements Source<T> { private PulsarSourceConfig pulsarSourceConfig; private Map<String, SerDe> topicToSerDeMap = new HashMap<>(); private boolean isTopicsPattern; + private List<String> inputTopics; @Getter private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer; @@ -84,6 +91,11 @@ public class PulsarSource<T> implements Source<T> { consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), TimeUnit.MILLISECONDS); } this.inputConsumer = consumerBuilder.subscribe(); + if (inputConsumer instanceof MultiTopicsConsumerImpl) { + inputTopics = ((MultiTopicsConsumerImpl<?>) inputConsumer).getTopics(); + } else { + inputTopics = Collections.singletonList(inputConsumer.getTopic()); + } } @Override @@ -190,4 +202,8 @@ public class PulsarSource<T> implements Source<T> { } } } + + public List<String> getInputTopics() { + return inputTopics; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java index 19beb5e..1386f86 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/Event.java @@ -18,12 +18,20 @@ */ package org.apache.pulsar.functions.windowing; +import org.apache.pulsar.functions.api.Record; + /** * An event is a wrapper object that gets stored in the window. * * @param <T> the type of the object thats wrapped */ public interface Event<T> { + + /** + * @return the record associated with the event + */ + Record<?> getRecord(); + /** * The event timestamp in millis * @@ -46,11 +54,4 @@ public interface Event<T> { */ boolean isWatermark(); - - /** - * Get the message id of this event - * - * @return byte array of the message id - */ - byte[] getMessageId(); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java index 2497c85..201c051 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/EventImpl.java @@ -21,17 +21,19 @@ package org.apache.pulsar.functions.windowing; import lombok.EqualsAndHashCode; import lombok.ToString; +import org.apache.pulsar.functions.api.Record; + @ToString @EqualsAndHashCode public class EventImpl<T> implements Event<T> { + private final Record<?> record; private final T event; private final long ts; - private final byte[] messageId; - EventImpl(T event, long ts, byte[] messageId) { + EventImpl(T event, long ts, Record<?> record) { this.event = event; this.ts = ts; - this.messageId = messageId; + this.record = record; } @Override @@ -50,7 +52,7 @@ public class EventImpl<T> implements Event<T> { } @Override - public byte[] getMessageId() { - return messageId; + public Record<?> getRecord() { + return record; } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java index 8f16eac..eb3026d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java @@ -19,10 +19,20 @@ package org.apache.pulsar.functions.windowing; import com.google.gson.Gson; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; -import net.jodah.typetools.TypeResolver; + import org.apache.pulsar.functions.api.Context; import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.WindowConfig; import org.apache.pulsar.functions.utils.validation.ValidatorImpls; @@ -35,13 +45,7 @@ import org.apache.pulsar.functions.windowing.triggers.TimeTriggerPolicy; import org.apache.pulsar.functions.windowing.triggers.WatermarkCountTriggerPolicy; import org.apache.pulsar.functions.windowing.triggers.WatermarkTimeTriggerPolicy; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.stream.Collectors; +import net.jodah.typetools.TypeResolver; @Slf4j public class WindowFunctionExecutor<I, O> implements Function<I, O> { @@ -206,7 +210,7 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { @Override public void onExpiry(List<Event<I>> events) { for (Event<I> event : events) { - context.ack(event.getMessageId()); + event.getRecord().ack(); } } @@ -275,10 +279,13 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { if (!this.initialized) { initialize(context); } + + Record<?> record = context.getCurrentRecord(); + if (isEventTime()) { long ts = this.timestampExtractor.extractTimestamp(input); - if (this.waterMarkEventGenerator.track(context.getCurrentMessageTopicName(), ts)) { - this.windowManager.add(input, ts, context.getMessageId()); + if (this.waterMarkEventGenerator.track(record.getTopicName().get(), ts)) { + this.windowManager.add(input, ts, record); } else { if (this.windowConfig.getLateDataTopic() != null) { context.publish(this.windowConfig.getLateDataTopic(), input, context.getOutputSerdeClassName()); @@ -287,10 +294,10 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> { "Received a late tuple %s with ts %d. This will not be " + "processed" + ".", input, ts)); } - context.ack(context.getMessageId()); + record.ack(); } } else { - this.windowManager.add(input, System.currentTimeMillis(), context.getMessageId()); + this.windowManager.add(input, System.currentTimeMillis(), record); } return null; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java index f121e13..94c2362 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowManager.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.functions.windowing; -import lombok.extern.slf4j.Slf4j; +import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE; +import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS; +import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP; import java.util.ArrayList; import java.util.Collection; @@ -29,9 +31,9 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.EXPIRE; -import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.PROCESS; -import static org.apache.pulsar.functions.windowing.EvictionPolicy.Action.STOP; +import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.functions.api.Record; /** * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks @@ -90,8 +92,8 @@ public class WindowManager<T> implements TriggerHandler { * @param event the event to track * @param ts the timestamp */ - public void add(T event, long ts, byte[] messageId) { - add(new EventImpl<>(event, ts, messageId)); + public void add(T event, long ts, Record<?> record) { + add(new EventImpl<>(event, ts, record)); } /** diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index 0fb027e..710a127 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -26,7 +26,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; -import org.apache.pulsar.client.api.Consumer; +import java.util.ArrayList; + import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.proto.Function.FunctionDetails; @@ -43,7 +44,6 @@ public class ContextImplTest { private Logger logger; private PulsarClient client; private ClassLoader classLoader; - private Consumer consumer; private ContextImpl context; @Before @@ -56,13 +56,12 @@ public class ContextImplTest { logger = mock(Logger.class); client = mock(PulsarClient.class); classLoader = getClass().getClassLoader(); - consumer = mock(Consumer.class); context = new ContextImpl( config, logger, client, classLoader, - consumer + new ArrayList<>() ); } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index f69172a..0cb361d 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -18,14 +18,14 @@ */ package org.apache.pulsar.functions.instance; -import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.functions.api.Function; -import org.testng.annotations.Test; - import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.Record; +import org.testng.annotations.Test; + public class JavaInstanceTest { /** @@ -38,7 +38,7 @@ public class JavaInstanceTest { mock(ContextImpl.class), (Function<String, String>) (input, context) -> input + "-lambda"); String testString = "ABC123"; - JavaExecutionResult result = instance.handleMessage(MessageId.earliest, "random", testString); + JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString); assertNotNull(result.getResult()); assertEquals(new String(testString + "-lambda"), result.getResult()); instance.close(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java index c5d5d4a..4d56239 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutorTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.functions.windowing; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.utils.WindowConfig; import org.mockito.Mockito; @@ -101,6 +102,10 @@ public class WindowFunctionExecutorTest { Mockito.doReturn("test-namespace").when(context).getNamespace(); Mockito.doReturn("test-tenant").when(context).getTenant(); + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); + windowConfig = new WindowConfig(); windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName()); windowConfig.setWindowLengthDurationMs(20L); @@ -111,7 +116,6 @@ public class WindowFunctionExecutorTest { windowConfig.setActualWindowFunctionClassName(TestFunction.class.getName()); Mockito.doReturn(Optional.of(new Gson().fromJson(new Gson().toJson(windowConfig), Map.class))).when(context).getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY); - Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName(); Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName(); Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics(); Mockito.doReturn("test-sink-topic").when(context).getOutputTopic(); @@ -170,7 +174,6 @@ public class WindowFunctionExecutorTest { Mockito.doReturn("test-function").when(context).getFunctionName(); Mockito.doReturn("test-namespace").when(context).getNamespace(); Mockito.doReturn("test-tenant").when(context).getTenant(); - Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName(); Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName(); Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics(); Mockito.doReturn("test-sink-topic").when(context).getOutputTopic(); @@ -256,10 +259,12 @@ public class WindowFunctionExecutorTest { Mockito.doReturn("test-function").when(context).getFunctionName(); Mockito.doReturn("test-namespace").when(context).getNamespace(); Mockito.doReturn("test-tenant").when(context).getTenant(); - Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName(); Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName(); Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics(); Mockito.doReturn("test-sink-topic").when(context).getOutputTopic(); + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); WindowConfig windowConfig = new WindowConfig(); windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName()); @@ -347,10 +352,12 @@ public class WindowFunctionExecutorTest { Mockito.doReturn("test-function").when(context).getFunctionName(); Mockito.doReturn("test-namespace").when(context).getNamespace(); Mockito.doReturn("test-tenant").when(context).getTenant(); - Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName(); Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName(); Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics(); Mockito.doReturn("test-sink-topic").when(context).getOutputTopic(); + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); WindowConfig windowConfig = new WindowConfig(); windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName()); @@ -413,10 +420,12 @@ public class WindowFunctionExecutorTest { Mockito.doReturn("test-function").when(context).getFunctionName(); Mockito.doReturn("test-namespace").when(context).getNamespace(); Mockito.doReturn("test-tenant").when(context).getTenant(); - Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName(); Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName(); Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics(); Mockito.doReturn("test-sink-topic").when(context).getOutputTopic(); + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); WindowConfig windowConfig = new WindowConfig(); windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName()); @@ -465,10 +474,12 @@ public class WindowFunctionExecutorTest { Mockito.doReturn("test-function").when(context).getFunctionName(); Mockito.doReturn("test-namespace").when(context).getNamespace(); Mockito.doReturn("test-tenant").when(context).getTenant(); - Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName(); Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName(); Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics(); Mockito.doReturn("test-sink-topic").when(context).getOutputTopic(); + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); WindowConfig windowConfig = new WindowConfig(); windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName()); @@ -517,10 +528,12 @@ public class WindowFunctionExecutorTest { Mockito.doReturn("test-function").when(context).getFunctionName(); Mockito.doReturn("test-namespace").when(context).getNamespace(); Mockito.doReturn("test-tenant").when(context).getTenant(); - Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName(); Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName(); Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics(); Mockito.doReturn("test-sink-topic").when(context).getOutputTopic(); + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); WindowConfig windowConfig = new WindowConfig(); windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName()); @@ -567,10 +580,12 @@ public class WindowFunctionExecutorTest { Mockito.doReturn("test-function").when(context).getFunctionName(); Mockito.doReturn("test-namespace").when(context).getNamespace(); Mockito.doReturn("test-tenant").when(context).getTenant(); - Mockito.doReturn("test-source-topic").when(context).getCurrentMessageTopicName(); Mockito.doReturn(DefaultSerDe.class.getName()).when(context).getOutputSerdeClassName(); Mockito.doReturn(Collections.singleton("test-source-topic")).when(context).getInputTopics(); Mockito.doReturn("test-sink-topic").when(context).getOutputTopic(); + Record<?> record = Mockito.mock(Record.class); + Mockito.doReturn(Optional.of("test-topic")).when(record).getTopicName(); + Mockito.doReturn(record).when(context).getCurrentRecord(); WindowConfig windowConfig = new WindowConfig(); windowConfig.setTimestampExtractorClassName(TestTimestampExtractor.class.getName());