This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
The following commit(s) were added to refs/heads/master by this push:
new 641c251 [FLINK-16002] [testutil] Add stateful functions test utilities
641c251 is described below
commit 641c251c3023c8e242ac63158c9d0ea2e735574c
Author: Seth Wiesman <[email protected]>
AuthorDate: Mon Feb 10 13:07:13 2020 -0600
[FLINK-16002] [testutil] Add stateful functions test utilities
This closes #22.
---
pom.xml | 1 +
statefun-testutil/pom.xml | 50 ++++++
.../testutils/function/FunctionTestHarness.java | 137 ++++++++++++++
.../statefun/testutils/function/TestContext.java | 198 +++++++++++++++++++++
.../testutils/matchers/MatchersByAddress.java | 36 ++++
.../testutils/matchers/MessagesSentToAddress.java | 72 ++++++++
.../testutils/matchers/SentNothingMatcher.java | 40 +++++
.../matchers/StatefulFunctionMatchers.java | 72 ++++++++
.../function/FunctionTestHarnessTest.java | 145 +++++++++++++++
9 files changed, 751 insertions(+)
diff --git a/pom.xml b/pom.xml
index c5bd551..86319e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@ under the License.
<module>statefun-flink</module>
<module>statefun-quickstart</module>
<module>statefun-docs</module>
+ <module>statefun-testutil</module>
</modules>
<properties>
diff --git a/statefun-testutil/pom.xml b/statefun-testutil/pom.xml
new file mode 100644
index 0000000..c00a85c
--- /dev/null
+++ b/statefun-testutil/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>statefun-parent</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>statefun-testutil</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/FunctionTestHarness.java
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/FunctionTestHarness.java
new file mode 100644
index 0000000..9c1fedf
--- /dev/null
+++
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/FunctionTestHarness.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.function;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+
+/**
+ * The {@link FunctionTestHarness} provides a thin convenience wrapper around
a {@link
+ * StatefulFunction} to capture its results under test.
+ *
+ * <p>The harness captures all messages sent using the {@link
org.apache.flink.statefun.sdk.Context}
+ * from within the functions {@link StatefulFunction#invoke(Context, Object)}
method and returns
+ * them. Values sent to an egress are also captured and can be queried via the
{@link
+ * #getEgress(EgressIdentifier)} method.
+ *
+ * <p><b>Important</b>This test harness is intended strictly for basic unit
tests of functions. As
+ * such, {@link Context#registerAsyncOperation(Object, CompletableFuture)}
awaits all futures. If
+ * you want to test in an asyncronous environment please consider using the
the {@code
+ * statefun-flink-harness}.
+ *
+ * <pre>{@code
+ * {@code @Test}
+ * public void test() {
+ * FunctionType type = new FunctionType("flink", "testfunc");
+ * FunctionTestHarness harness = TestHarness.test(new
TestFunctionProvider(), type, "my-id");
+ *
+ * Assert.assertThat(
+ * harness.invoke("ping"),
+ * sent(
+ * messagesTo(
+ * new Address(new FunctionType("flink", "func"), "id"),
equalTo("pong"));
+ * }
+ * }</pre>
+ */
+@SuppressWarnings("WeakerAccess")
+public class FunctionTestHarness {
+
+ private final TestContext context;
+
+ /**
+ * Creates a test harness, pinning the function to a particular address.
+ *
+ * @param provider A provider for the function under test.
+ * @param type The type of the function.
+ * @param id The static id of the function for the duration of the test.
+ * @param startTime The initial timestamp of the internal clock.
+ * @return A fully configured test harness.
+ */
+ public static FunctionTestHarness test(
+ StatefulFunctionProvider provider, FunctionType type, String id, Instant
startTime) {
+ Objects.requireNonNull(provider, "Function provider can not be null");
+ return new FunctionTestHarness(provider.functionOfType(type), type, id,
startTime);
+ }
+
+ /**
+ * Creates a test harness, pinning the function to a particular address.
+ *
+ * @param provider A provider for the function under test.
+ * @param type The type of the function.
+ * @param id The static id of the function for the duration of the test.
+ * @return A fully configured test harness.
+ */
+ public static FunctionTestHarness test(
+ StatefulFunctionProvider provider, FunctionType type, String id) {
+ Objects.requireNonNull(provider, "Function provider can not be null");
+ return new FunctionTestHarness(provider.functionOfType(type), type, id,
Instant.EPOCH);
+ }
+
+ private FunctionTestHarness(
+ StatefulFunction function, FunctionType type, String id, Instant
startTime) {
+ this.context = new TestContext(new Address(type, id), function, startTime);
+ }
+
+ /**
+ * @param message A message that will be sent to the function.
+ * @return A responses sent from the function after invocation using {@link
Context#send(Address,
+ * Object)}.
+ */
+ public Map<Address, List<Object>> invoke(Object message) {
+ return context.invoke(null, message);
+ }
+
+ /**
+ * @param message A message that will be sent to the function.
+ * @param from The address of the function that sent the message.
+ * @return A responses sent from the function after invocation using {@link
Context#send(Address,
+ * Object)}.
+ */
+ public Map<Address, List<Object>> invoke(Address from, Object message) {
+ Objects.requireNonNull(from);
+ return context.invoke(from, message);
+ }
+
+ /**
+ * Advances the internal clock the harness and fires and pending timers.
+ *
+ * @return A responses sent from the function after invocation.
+ */
+ public Map<Address, List<Object>> tick(Duration duration) {
+ Objects.requireNonNull(duration);
+ return context.tick(duration);
+ }
+
+ /**
+ * @param identifier An egress identifier
+ * @return All the messages sent to that egress.
+ */
+ public <T> List<T> getEgress(EgressIdentifier<T> identifier) {
+ return context.getEgress(identifier);
+ }
+}
diff --git
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/TestContext.java
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/TestContext.java
new file mode 100644
index 0000000..167f1c7
--- /dev/null
+++
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/TestContext.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.function;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.AsyncOperationResult;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+
+/** A simple context that is strictly synchronous and captures all responses.
*/
+class TestContext implements Context {
+
+ private final Address selfAddress;
+
+ private final StatefulFunction function;
+
+ private final Map<EgressIdentifier<?>, List<Object>> outputs;
+
+ private final Queue<Envelope> messages;
+
+ private final PriorityQueue<PendingMessage> pendingMessage;
+
+ private Map<Address, List<Object>> responses;
+
+ private Address from;
+
+ private long watermark;
+
+ TestContext(Address selfAddress, StatefulFunction function, Instant
startTime) {
+ this.selfAddress = Objects.requireNonNull(selfAddress);
+ this.function = Objects.requireNonNull(function);
+
+ this.watermark = startTime.toEpochMilli();
+ this.messages = new ArrayDeque<>();
+ this.pendingMessage = new PriorityQueue<>(Comparator.comparingLong(a ->
a.timer));
+ this.outputs = new HashMap<>();
+ }
+
+ @Override
+ public Address self() {
+ return selfAddress;
+ }
+
+ @Override
+ public Address caller() {
+ return from;
+ }
+
+ @Override
+ public void reply(Object message) {
+ Address to = caller();
+ if (to == null) {
+ throw new IllegalStateException("The caller address is null");
+ }
+
+ send(to, message);
+ }
+
+ @Override
+ public void send(Address to, Object message) {
+ if (to.equals(selfAddress)) {
+ messages.add(new Envelope(self(), to, message));
+ return;
+ }
+ responses.computeIfAbsent(to, ignore -> new ArrayList<>()).add(message);
+ }
+
+ @Override
+ public <T> void send(EgressIdentifier<T> egress, T message) {
+ outputs.computeIfAbsent(egress, ignore -> new ArrayList<>()).add(message);
+ }
+
+ @Override
+ public void sendAfter(Duration delay, Address to, Object message) {
+ pendingMessage.add(
+ new PendingMessage(new Envelope(self(), to, message), watermark +
delay.toMillis()));
+ }
+
+ @Override
+ public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T>
future) {
+ T value = null;
+ Throwable error = null;
+
+ try {
+ value = future.get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Failed to get results from async action", e);
+ } catch (ExecutionException e) {
+ error = e.getCause();
+ }
+
+ AsyncOperationResult.Status status;
+ if (error == null) {
+ status = AsyncOperationResult.Status.SUCCESS;
+ } else {
+ status = AsyncOperationResult.Status.FAILURE;
+ }
+
+ AsyncOperationResult<M, T> result = new AsyncOperationResult<>(metadata,
status, value, error);
+ messages.add(new Envelope(self(), self(), result));
+ }
+
+ @SuppressWarnings("unchecked")
+ <T> List<T> getEgress(EgressIdentifier<T> identifier) {
+ List<?> values = outputs.getOrDefault(identifier, Collections.emptyList());
+
+ // Because the type is part of the identifier key
+ // this cast is always safe.
+ return (List<T>) values;
+ }
+
+ Map<Address, List<Object>> invoke(Address from, Object message) {
+ messages.add(new Envelope(from, null, message));
+
+ return processAllMessages();
+ }
+
+ Map<Address, List<Object>> tick(Duration duration) {
+ watermark += duration.toMillis();
+
+ while (!pendingMessage.isEmpty() && pendingMessage.peek().timer <=
watermark) {
+ messages.add(pendingMessage.poll().envelope);
+ }
+
+ return processAllMessages();
+ }
+
+ private Map<Address, List<Object>> processAllMessages() {
+ responses = new HashMap<>();
+
+ while (!messages.isEmpty()) {
+ Envelope envelope = messages.poll();
+ if (envelope.to != null && !envelope.to.equals(self())) {
+ send(envelope.to, envelope.message);
+ } else {
+ from = envelope.from;
+ function.invoke(this, envelope.message);
+ }
+ }
+
+ return responses;
+ }
+
+ private static class Envelope {
+ Address from;
+
+ Address to;
+
+ Object message;
+
+ Envelope(Address from, Address to, Object message) {
+ this.from = from;
+ this.to = to;
+ this.message = message;
+ }
+ }
+
+ private static class PendingMessage {
+ Envelope envelope;
+
+ long timer;
+
+ PendingMessage(Envelope envelope, long timer) {
+ this.envelope = envelope;
+ this.timer = timer;
+ }
+ }
+}
diff --git
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MatchersByAddress.java
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MatchersByAddress.java
new file mode 100644
index 0000000..367f64e
--- /dev/null
+++
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MatchersByAddress.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.matchers;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.Address;
+import org.hamcrest.Matcher;
+
+@SuppressWarnings("WeakerAccess")
+public class MatchersByAddress {
+
+ final Address address;
+
+ final List<Matcher<?>> matchers;
+
+ MatchersByAddress(Address address, List<Matcher<?>> messages) {
+ this.address = Objects.requireNonNull(address);
+ this.matchers = Objects.requireNonNull(messages);
+ }
+}
diff --git
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MessagesSentToAddress.java
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MessagesSentToAddress.java
new file mode 100644
index 0000000..19a8b70
--- /dev/null
+++
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MessagesSentToAddress.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.matchers;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.Address;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/** A matcher for checking all the responses sent to a particular function
type. */
+public class MessagesSentToAddress extends TypeSafeMatcher<Map<Address,
List<Object>>> {
+
+ private final Map<Address, List<Matcher<?>>> matcherByAddress;
+
+ MessagesSentToAddress(Map<Address, List<Matcher<?>>> matcherByAddress) {
+ this.matcherByAddress = matcherByAddress;
+ }
+
+ @Override
+ protected boolean matchesSafely(Map<Address, List<Object>> item) {
+ for (Map.Entry<Address, List<Matcher<?>>> entry :
matcherByAddress.entrySet()) {
+ List<Object> messages = item.get(entry.getKey());
+ if (messages == null) {
+ return false;
+ }
+
+ if (messages.size() != entry.getValue().size()) {
+ return false;
+ }
+
+ for (int i = 0; i < messages.size(); i++) {
+ Matcher<?> matcher = entry.getValue().get(i);
+ if (!matcher.matches(messages.get(i))) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("<{");
+
+ for (Map.Entry<Address, List<Matcher<?>>> entry :
matcherByAddress.entrySet()) {
+ description
+ .appendText(entry.getKey().toString())
+ .appendText("=")
+ .appendList("[", ",", "]", entry.getValue());
+ }
+
+ description.appendText("}>");
+ }
+}
diff --git
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/SentNothingMatcher.java
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/SentNothingMatcher.java
new file mode 100644
index 0000000..3d99941
--- /dev/null
+++
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/SentNothingMatcher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.matchers;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.Address;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+/** A matcher for that the function did not message any other functions. */
+public class SentNothingMatcher extends TypeSafeMatcher<Map<Address,
List<Object>>> {
+
+ SentNothingMatcher() {}
+
+ @Override
+ protected boolean matchesSafely(Map<Address, List<Object>> item) {
+ return item.isEmpty();
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("Nothing Sent");
+ }
+}
diff --git
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/StatefulFunctionMatchers.java
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/StatefulFunctionMatchers.java
new file mode 100644
index 0000000..5cf1644
--- /dev/null
+++
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/StatefulFunctionMatchers.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.matchers;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.hamcrest.Matcher;
+
+/**
+ * A set of Hamcrest matchers to help check the responses from a {@link
+ * org.apache.flink.statefun.testutils.function.FunctionTestHarness}
+ *
+ * <p>{@see FunctionTestHarness} for usage details.
+ */
+public final class StatefulFunctionMatchers {
+
+ private StatefulFunctionMatchers() {
+ throw new AssertionError();
+ }
+
+ public static MatchersByAddress messagesTo(
+ Address to, Matcher<?> matcher, Matcher<?>... matchers) {
+ List<Matcher<?>> allMatchers = new ArrayList<>(1 + matchers.length);
+ allMatchers.add(matcher);
+ allMatchers.addAll(Arrays.asList(matchers));
+
+ return new MatchersByAddress(to, allMatchers);
+ }
+
+ /**
+ * A matcher that checks all the responses sent to a given {@link
FunctionType}.
+ *
+ * <p><b>Important:</b> This matcher expects an exact match on the number of
responses sent to
+ * this function.
+ */
+ public static MessagesSentToAddress sent(
+ MatchersByAddress matcher, MatchersByAddress... matchers) {
+ Map<Address, List<Matcher<?>>> messagesByAddress = new HashMap<>();
+ messagesByAddress.put(matcher.address, matcher.matchers);
+
+ for (MatchersByAddress match : matchers) {
+ messagesByAddress.put(match.address, match.matchers);
+ }
+
+ return new MessagesSentToAddress(messagesByAddress);
+ }
+
+ /** A matcher that checks the function did not send any messages. */
+ public static SentNothingMatcher sentNothing() {
+ return new SentNothingMatcher();
+ }
+}
diff --git
a/statefun-testutil/src/test/java/org/apache/flink/statefun/testutils/function/FunctionTestHarnessTest.java
b/statefun-testutil/src/test/java/org/apache/flink/statefun/testutils/function/FunctionTestHarnessTest.java
new file mode 100644
index 0000000..671e315
--- /dev/null
+++
b/statefun-testutil/src/test/java/org/apache/flink/statefun/testutils/function/FunctionTestHarnessTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.function;
+
+import static
org.apache.flink.statefun.testutils.matchers.StatefulFunctionMatchers.*;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.AsyncOperationResult;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Simple validation tests of the test harness. */
+public class FunctionTestHarnessTest {
+
+ private static final FunctionType UNDER_TEST = new FunctionType("flink",
"undertest");
+
+ private static final FunctionType OTHER_FUNCTION = new FunctionType("flink",
"function");
+
+ private static final Address CALLER = new Address(OTHER_FUNCTION, "id");
+
+ private static final Address SOME_ADDRESS = new Address(OTHER_FUNCTION,
"id2");
+
+ private static final EgressIdentifier<String> EGRESS =
+ new EgressIdentifier<>("flink", "egress", String.class);
+
+ @Test
+ public void basicMessageTest() {
+ FunctionTestHarness harness =
+ FunctionTestHarness.test(ignore -> new BasicFunction(), UNDER_TEST,
"id");
+
+ Assert.assertThat(harness.invoke(CALLER, "ping"), sent(messagesTo(CALLER,
equalTo("pong"))));
+ }
+
+ @Test
+ public void multiReturnTest() {
+ FunctionTestHarness harness =
+ FunctionTestHarness.test(ignore -> new MultiResponseFunction(),
UNDER_TEST, "id");
+
+ Assert.assertThat(
+ harness.invoke("hello"),
+ sent(
+ messagesTo(CALLER, equalTo("a"), equalTo("b")),
+ messagesTo(SOME_ADDRESS, equalTo("c"))));
+ }
+
+ @Test
+ public void egressTest() {
+ FunctionTestHarness harness =
+ FunctionTestHarness.test(ignore -> new EgressFunction(), UNDER_TEST,
"id");
+
+ Assert.assertThat(harness.invoke(CALLER, "ping"), sentNothing());
+ Assert.assertThat(harness.getEgress(EGRESS), contains(equalTo("pong")));
+ }
+
+ @Test
+ public void delayedMessageTest() {
+ FunctionTestHarness harness =
+ FunctionTestHarness.test(ignore -> new DelayedResponse(), UNDER_TEST,
"id");
+
+ Assert.assertThat(harness.invoke(CALLER, "ping"), sentNothing());
+ Assert.assertThat(
+ harness.tick(Duration.ofMinutes(1)), sent(messagesTo(CALLER,
equalTo("pong"))));
+ }
+
+ @Test
+ public void asyncMessageTest() {
+ FunctionTestHarness harness =
+ FunctionTestHarness.test(ignore -> new AsyncOperation(), UNDER_TEST,
"id");
+
+ Assert.assertThat(harness.invoke(CALLER, "ping"), sent(messagesTo(CALLER,
equalTo("pong"))));
+ }
+
+ private static class BasicFunction implements StatefulFunction {
+
+ @Override
+ public void invoke(Context context, Object input) {
+ context.reply("pong");
+ }
+ }
+
+ private static class MultiResponseFunction implements StatefulFunction {
+
+ @Override
+ public void invoke(Context context, Object input) {
+ context.send(CALLER, "a");
+ context.send(CALLER, "b");
+ context.send(SOME_ADDRESS, "c");
+ }
+ }
+
+ private static class EgressFunction implements StatefulFunction {
+ @Override
+ public void invoke(Context context, Object input) {
+ context.send(EGRESS, "pong");
+ }
+ }
+
+ private static class DelayedResponse implements StatefulFunction {
+
+ @Override
+ public void invoke(Context context, Object input) {
+ context.sendAfter(Duration.ofMinutes(1), context.caller(), "pong");
+ }
+ }
+
+ private static class AsyncOperation implements StatefulFunction {
+
+ @Override
+ public void invoke(Context context, Object input) {
+ if (input instanceof String) {
+ CompletableFuture<String> future =
CompletableFuture.completedFuture("pong");
+ context.registerAsyncOperation(context.caller(), future);
+ }
+
+ if (input instanceof AsyncOperationResult) {
+ AsyncOperationResult<Address, String> result =
+ (AsyncOperationResult<Address, String>) input;
+ context.send(result.metadata(), result.value());
+ }
+ }
+ }
+}