This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git


The following commit(s) were added to refs/heads/master by this push:
     new 641c251  [FLINK-16002] [testutil] Add stateful functions test utilities
641c251 is described below

commit 641c251c3023c8e242ac63158c9d0ea2e735574c
Author: Seth Wiesman <[email protected]>
AuthorDate: Mon Feb 10 13:07:13 2020 -0600

    [FLINK-16002] [testutil] Add stateful functions test utilities
    
    This closes #22.
---
 pom.xml                                            |   1 +
 statefun-testutil/pom.xml                          |  50 ++++++
 .../testutils/function/FunctionTestHarness.java    | 137 ++++++++++++++
 .../statefun/testutils/function/TestContext.java   | 198 +++++++++++++++++++++
 .../testutils/matchers/MatchersByAddress.java      |  36 ++++
 .../testutils/matchers/MessagesSentToAddress.java  |  72 ++++++++
 .../testutils/matchers/SentNothingMatcher.java     |  40 +++++
 .../matchers/StatefulFunctionMatchers.java         |  72 ++++++++
 .../function/FunctionTestHarnessTest.java          | 145 +++++++++++++++
 9 files changed, 751 insertions(+)

diff --git a/pom.xml b/pom.xml
index c5bd551..86319e1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@ under the License.
         <module>statefun-flink</module>
         <module>statefun-quickstart</module>
         <module>statefun-docs</module>
+        <module>statefun-testutil</module>
     </modules>
 
     <properties>
diff --git a/statefun-testutil/pom.xml b/statefun-testutil/pom.xml
new file mode 100644
index 0000000..c00a85c
--- /dev/null
+++ b/statefun-testutil/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xmlns="http://maven.apache.org/POM/4.0.0";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>statefun-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>1.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-testutil</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/FunctionTestHarness.java
 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/FunctionTestHarness.java
new file mode 100644
index 0000000..9c1fedf
--- /dev/null
+++ 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/FunctionTestHarness.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.function;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+
+/**
+ * The {@link FunctionTestHarness} provides a thin convenience wrapper around 
a {@link
+ * StatefulFunction} to capture its results under test.
+ *
+ * <p>The harness captures all messages sent using the {@link 
org.apache.flink.statefun.sdk.Context}
+ * from within the functions {@link StatefulFunction#invoke(Context, Object)} 
method and returns
+ * them. Values sent to an egress are also captured and can be queried via the 
{@link
+ * #getEgress(EgressIdentifier)} method.
+ *
+ * <p><b>Important</b>This test harness is intended strictly for basic unit 
tests of functions. As
+ * such, {@link Context#registerAsyncOperation(Object, CompletableFuture)} 
awaits all futures. If
+ * you want to test in an asyncronous environment please consider using the 
the {@code
+ * statefun-flink-harness}.
+ *
+ * <pre>{@code
+ * {@code @Test}
+ * public void test() {
+ *     FunctionType type = new FunctionType("flink", "testfunc");
+ *     FunctionTestHarness harness = TestHarness.test(new 
TestFunctionProvider(), type, "my-id");
+ *
+ *     Assert.assertThat(
+ *          harness.invoke("ping"),
+ *          sent(
+ *              messagesTo(
+ *                  new Address(new FunctionType("flink", "func"), "id"), 
equalTo("pong"));
+ * }
+ * }</pre>
+ */
+@SuppressWarnings("WeakerAccess")
+public class FunctionTestHarness {
+
+  private final TestContext context;
+
+  /**
+   * Creates a test harness, pinning the function to a particular address.
+   *
+   * @param provider A provider for the function under test.
+   * @param type The type of the function.
+   * @param id The static id of the function for the duration of the test.
+   * @param startTime The initial timestamp of the internal clock.
+   * @return A fully configured test harness.
+   */
+  public static FunctionTestHarness test(
+      StatefulFunctionProvider provider, FunctionType type, String id, Instant 
startTime) {
+    Objects.requireNonNull(provider, "Function provider can not be null");
+    return new FunctionTestHarness(provider.functionOfType(type), type, id, 
startTime);
+  }
+
+  /**
+   * Creates a test harness, pinning the function to a particular address.
+   *
+   * @param provider A provider for the function under test.
+   * @param type The type of the function.
+   * @param id The static id of the function for the duration of the test.
+   * @return A fully configured test harness.
+   */
+  public static FunctionTestHarness test(
+      StatefulFunctionProvider provider, FunctionType type, String id) {
+    Objects.requireNonNull(provider, "Function provider can not be null");
+    return new FunctionTestHarness(provider.functionOfType(type), type, id, 
Instant.EPOCH);
+  }
+
+  private FunctionTestHarness(
+      StatefulFunction function, FunctionType type, String id, Instant 
startTime) {
+    this.context = new TestContext(new Address(type, id), function, startTime);
+  }
+
+  /**
+   * @param message A message that will be sent to the function.
+   * @return A responses sent from the function after invocation using {@link 
Context#send(Address,
+   *     Object)}.
+   */
+  public Map<Address, List<Object>> invoke(Object message) {
+    return context.invoke(null, message);
+  }
+
+  /**
+   * @param message A message that will be sent to the function.
+   * @param from The address of the function that sent the message.
+   * @return A responses sent from the function after invocation using {@link 
Context#send(Address,
+   *     Object)}.
+   */
+  public Map<Address, List<Object>> invoke(Address from, Object message) {
+    Objects.requireNonNull(from);
+    return context.invoke(from, message);
+  }
+
+  /**
+   * Advances the internal clock the harness and fires and pending timers.
+   *
+   * @return A responses sent from the function after invocation.
+   */
+  public Map<Address, List<Object>> tick(Duration duration) {
+    Objects.requireNonNull(duration);
+    return context.tick(duration);
+  }
+
+  /**
+   * @param identifier An egress identifier
+   * @return All the messages sent to that egress.
+   */
+  public <T> List<T> getEgress(EgressIdentifier<T> identifier) {
+    return context.getEgress(identifier);
+  }
+}
diff --git 
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/TestContext.java
 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/TestContext.java
new file mode 100644
index 0000000..167f1c7
--- /dev/null
+++ 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/function/TestContext.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.function;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.AsyncOperationResult;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+
+/** A simple context that is strictly synchronous and captures all responses. 
*/
+class TestContext implements Context {
+
+  private final Address selfAddress;
+
+  private final StatefulFunction function;
+
+  private final Map<EgressIdentifier<?>, List<Object>> outputs;
+
+  private final Queue<Envelope> messages;
+
+  private final PriorityQueue<PendingMessage> pendingMessage;
+
+  private Map<Address, List<Object>> responses;
+
+  private Address from;
+
+  private long watermark;
+
+  TestContext(Address selfAddress, StatefulFunction function, Instant 
startTime) {
+    this.selfAddress = Objects.requireNonNull(selfAddress);
+    this.function = Objects.requireNonNull(function);
+
+    this.watermark = startTime.toEpochMilli();
+    this.messages = new ArrayDeque<>();
+    this.pendingMessage = new PriorityQueue<>(Comparator.comparingLong(a -> 
a.timer));
+    this.outputs = new HashMap<>();
+  }
+
+  @Override
+  public Address self() {
+    return selfAddress;
+  }
+
+  @Override
+  public Address caller() {
+    return from;
+  }
+
+  @Override
+  public void reply(Object message) {
+    Address to = caller();
+    if (to == null) {
+      throw new IllegalStateException("The caller address is null");
+    }
+
+    send(to, message);
+  }
+
+  @Override
+  public void send(Address to, Object message) {
+    if (to.equals(selfAddress)) {
+      messages.add(new Envelope(self(), to, message));
+      return;
+    }
+    responses.computeIfAbsent(to, ignore -> new ArrayList<>()).add(message);
+  }
+
+  @Override
+  public <T> void send(EgressIdentifier<T> egress, T message) {
+    outputs.computeIfAbsent(egress, ignore -> new ArrayList<>()).add(message);
+  }
+
+  @Override
+  public void sendAfter(Duration delay, Address to, Object message) {
+    pendingMessage.add(
+        new PendingMessage(new Envelope(self(), to, message), watermark + 
delay.toMillis()));
+  }
+
+  @Override
+  public <M, T> void registerAsyncOperation(M metadata, CompletableFuture<T> 
future) {
+    T value = null;
+    Throwable error = null;
+
+    try {
+      value = future.get();
+    } catch (InterruptedException e) {
+      throw new RuntimeException("Failed to get results from async action", e);
+    } catch (ExecutionException e) {
+      error = e.getCause();
+    }
+
+    AsyncOperationResult.Status status;
+    if (error == null) {
+      status = AsyncOperationResult.Status.SUCCESS;
+    } else {
+      status = AsyncOperationResult.Status.FAILURE;
+    }
+
+    AsyncOperationResult<M, T> result = new AsyncOperationResult<>(metadata, 
status, value, error);
+    messages.add(new Envelope(self(), self(), result));
+  }
+
+  @SuppressWarnings("unchecked")
+  <T> List<T> getEgress(EgressIdentifier<T> identifier) {
+    List<?> values = outputs.getOrDefault(identifier, Collections.emptyList());
+
+    // Because the type is part of the identifier key
+    // this cast is always safe.
+    return (List<T>) values;
+  }
+
+  Map<Address, List<Object>> invoke(Address from, Object message) {
+    messages.add(new Envelope(from, null, message));
+
+    return processAllMessages();
+  }
+
+  Map<Address, List<Object>> tick(Duration duration) {
+    watermark += duration.toMillis();
+
+    while (!pendingMessage.isEmpty() && pendingMessage.peek().timer <= 
watermark) {
+      messages.add(pendingMessage.poll().envelope);
+    }
+
+    return processAllMessages();
+  }
+
+  private Map<Address, List<Object>> processAllMessages() {
+    responses = new HashMap<>();
+
+    while (!messages.isEmpty()) {
+      Envelope envelope = messages.poll();
+      if (envelope.to != null && !envelope.to.equals(self())) {
+        send(envelope.to, envelope.message);
+      } else {
+        from = envelope.from;
+        function.invoke(this, envelope.message);
+      }
+    }
+
+    return responses;
+  }
+
+  private static class Envelope {
+    Address from;
+
+    Address to;
+
+    Object message;
+
+    Envelope(Address from, Address to, Object message) {
+      this.from = from;
+      this.to = to;
+      this.message = message;
+    }
+  }
+
+  private static class PendingMessage {
+    Envelope envelope;
+
+    long timer;
+
+    PendingMessage(Envelope envelope, long timer) {
+      this.envelope = envelope;
+      this.timer = timer;
+    }
+  }
+}
diff --git 
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MatchersByAddress.java
 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MatchersByAddress.java
new file mode 100644
index 0000000..367f64e
--- /dev/null
+++ 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MatchersByAddress.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.matchers;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.Address;
+import org.hamcrest.Matcher;
+
+@SuppressWarnings("WeakerAccess")
+public class MatchersByAddress {
+
+  final Address address;
+
+  final List<Matcher<?>> matchers;
+
+  MatchersByAddress(Address address, List<Matcher<?>> messages) {
+    this.address = Objects.requireNonNull(address);
+    this.matchers = Objects.requireNonNull(messages);
+  }
+}
diff --git 
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MessagesSentToAddress.java
 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MessagesSentToAddress.java
new file mode 100644
index 0000000..19a8b70
--- /dev/null
+++ 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/MessagesSentToAddress.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.matchers;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.Address;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/** A matcher for checking all the responses sent to a particular function 
type. */
+public class MessagesSentToAddress extends TypeSafeMatcher<Map<Address, 
List<Object>>> {
+
+  private final Map<Address, List<Matcher<?>>> matcherByAddress;
+
+  MessagesSentToAddress(Map<Address, List<Matcher<?>>> matcherByAddress) {
+    this.matcherByAddress = matcherByAddress;
+  }
+
+  @Override
+  protected boolean matchesSafely(Map<Address, List<Object>> item) {
+    for (Map.Entry<Address, List<Matcher<?>>> entry : 
matcherByAddress.entrySet()) {
+      List<Object> messages = item.get(entry.getKey());
+      if (messages == null) {
+        return false;
+      }
+
+      if (messages.size() != entry.getValue().size()) {
+        return false;
+      }
+
+      for (int i = 0; i < messages.size(); i++) {
+        Matcher<?> matcher = entry.getValue().get(i);
+        if (!matcher.matches(messages.get(i))) {
+          return false;
+        }
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public void describeTo(Description description) {
+    description.appendText("<{");
+
+    for (Map.Entry<Address, List<Matcher<?>>> entry : 
matcherByAddress.entrySet()) {
+      description
+          .appendText(entry.getKey().toString())
+          .appendText("=")
+          .appendList("[", ",", "]", entry.getValue());
+    }
+
+    description.appendText("}>");
+  }
+}
diff --git 
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/SentNothingMatcher.java
 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/SentNothingMatcher.java
new file mode 100644
index 0000000..3d99941
--- /dev/null
+++ 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/SentNothingMatcher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.matchers;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.Address;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+/** A matcher for that the function did not message any other functions. */
+public class SentNothingMatcher extends TypeSafeMatcher<Map<Address, 
List<Object>>> {
+
+  SentNothingMatcher() {}
+
+  @Override
+  protected boolean matchesSafely(Map<Address, List<Object>> item) {
+    return item.isEmpty();
+  }
+
+  @Override
+  public void describeTo(Description description) {
+    description.appendText("Nothing Sent");
+  }
+}
diff --git 
a/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/StatefulFunctionMatchers.java
 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/StatefulFunctionMatchers.java
new file mode 100644
index 0000000..5cf1644
--- /dev/null
+++ 
b/statefun-testutil/src/main/java/org/apache/flink/statefun/testutils/matchers/StatefulFunctionMatchers.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.matchers;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.hamcrest.Matcher;
+
+/**
+ * A set of Hamcrest matchers to help check the responses from a {@link
+ * org.apache.flink.statefun.testutils.function.FunctionTestHarness}
+ *
+ * <p>{@see FunctionTestHarness} for usage details.
+ */
+public final class StatefulFunctionMatchers {
+
+  private StatefulFunctionMatchers() {
+    throw new AssertionError();
+  }
+
+  public static MatchersByAddress messagesTo(
+      Address to, Matcher<?> matcher, Matcher<?>... matchers) {
+    List<Matcher<?>> allMatchers = new ArrayList<>(1 + matchers.length);
+    allMatchers.add(matcher);
+    allMatchers.addAll(Arrays.asList(matchers));
+
+    return new MatchersByAddress(to, allMatchers);
+  }
+
+  /**
+   * A matcher that checks all the responses sent to a given {@link 
FunctionType}.
+   *
+   * <p><b>Important:</b> This matcher expects an exact match on the number of 
responses sent to
+   * this function.
+   */
+  public static MessagesSentToAddress sent(
+      MatchersByAddress matcher, MatchersByAddress... matchers) {
+    Map<Address, List<Matcher<?>>> messagesByAddress = new HashMap<>();
+    messagesByAddress.put(matcher.address, matcher.matchers);
+
+    for (MatchersByAddress match : matchers) {
+      messagesByAddress.put(match.address, match.matchers);
+    }
+
+    return new MessagesSentToAddress(messagesByAddress);
+  }
+
+  /** A matcher that checks the function did not send any messages. */
+  public static SentNothingMatcher sentNothing() {
+    return new SentNothingMatcher();
+  }
+}
diff --git 
a/statefun-testutil/src/test/java/org/apache/flink/statefun/testutils/function/FunctionTestHarnessTest.java
 
b/statefun-testutil/src/test/java/org/apache/flink/statefun/testutils/function/FunctionTestHarnessTest.java
new file mode 100644
index 0000000..671e315
--- /dev/null
+++ 
b/statefun-testutil/src/test/java/org/apache/flink/statefun/testutils/function/FunctionTestHarnessTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.testutils.function;
+
+import static 
org.apache.flink.statefun.testutils.matchers.StatefulFunctionMatchers.*;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.Address;
+import org.apache.flink.statefun.sdk.AsyncOperationResult;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Simple validation tests of the test harness. */
+public class FunctionTestHarnessTest {
+
+  private static final FunctionType UNDER_TEST = new FunctionType("flink", 
"undertest");
+
+  private static final FunctionType OTHER_FUNCTION = new FunctionType("flink", 
"function");
+
+  private static final Address CALLER = new Address(OTHER_FUNCTION, "id");
+
+  private static final Address SOME_ADDRESS = new Address(OTHER_FUNCTION, 
"id2");
+
+  private static final EgressIdentifier<String> EGRESS =
+      new EgressIdentifier<>("flink", "egress", String.class);
+
+  @Test
+  public void basicMessageTest() {
+    FunctionTestHarness harness =
+        FunctionTestHarness.test(ignore -> new BasicFunction(), UNDER_TEST, 
"id");
+
+    Assert.assertThat(harness.invoke(CALLER, "ping"), sent(messagesTo(CALLER, 
equalTo("pong"))));
+  }
+
+  @Test
+  public void multiReturnTest() {
+    FunctionTestHarness harness =
+        FunctionTestHarness.test(ignore -> new MultiResponseFunction(), 
UNDER_TEST, "id");
+
+    Assert.assertThat(
+        harness.invoke("hello"),
+        sent(
+            messagesTo(CALLER, equalTo("a"), equalTo("b")),
+            messagesTo(SOME_ADDRESS, equalTo("c"))));
+  }
+
+  @Test
+  public void egressTest() {
+    FunctionTestHarness harness =
+        FunctionTestHarness.test(ignore -> new EgressFunction(), UNDER_TEST, 
"id");
+
+    Assert.assertThat(harness.invoke(CALLER, "ping"), sentNothing());
+    Assert.assertThat(harness.getEgress(EGRESS), contains(equalTo("pong")));
+  }
+
+  @Test
+  public void delayedMessageTest() {
+    FunctionTestHarness harness =
+        FunctionTestHarness.test(ignore -> new DelayedResponse(), UNDER_TEST, 
"id");
+
+    Assert.assertThat(harness.invoke(CALLER, "ping"), sentNothing());
+    Assert.assertThat(
+        harness.tick(Duration.ofMinutes(1)), sent(messagesTo(CALLER, 
equalTo("pong"))));
+  }
+
+  @Test
+  public void asyncMessageTest() {
+    FunctionTestHarness harness =
+        FunctionTestHarness.test(ignore -> new AsyncOperation(), UNDER_TEST, 
"id");
+
+    Assert.assertThat(harness.invoke(CALLER, "ping"), sent(messagesTo(CALLER, 
equalTo("pong"))));
+  }
+
+  private static class BasicFunction implements StatefulFunction {
+
+    @Override
+    public void invoke(Context context, Object input) {
+      context.reply("pong");
+    }
+  }
+
+  private static class MultiResponseFunction implements StatefulFunction {
+
+    @Override
+    public void invoke(Context context, Object input) {
+      context.send(CALLER, "a");
+      context.send(CALLER, "b");
+      context.send(SOME_ADDRESS, "c");
+    }
+  }
+
+  private static class EgressFunction implements StatefulFunction {
+    @Override
+    public void invoke(Context context, Object input) {
+      context.send(EGRESS, "pong");
+    }
+  }
+
+  private static class DelayedResponse implements StatefulFunction {
+
+    @Override
+    public void invoke(Context context, Object input) {
+      context.sendAfter(Duration.ofMinutes(1), context.caller(), "pong");
+    }
+  }
+
+  private static class AsyncOperation implements StatefulFunction {
+
+    @Override
+    public void invoke(Context context, Object input) {
+      if (input instanceof String) {
+        CompletableFuture<String> future = 
CompletableFuture.completedFuture("pong");
+        context.registerAsyncOperation(context.caller(), future);
+      }
+
+      if (input instanceof AsyncOperationResult) {
+        AsyncOperationResult<Address, String> result =
+            (AsyncOperationResult<Address, String>) input;
+        context.send(result.metadata(), result.value());
+      }
+    }
+  }
+}

Reply via email to