knaufk commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r663946418



##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {

Review comment:
       Done.

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();
+  private List<EgressMessage> sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional<Address> caller) {
+    this.self = self;
+    this.caller = caller;
+    this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {

Review comment:
       Done.

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();
+  private List<EgressMessage> sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional<Address> caller) {
+    this.self = self;
+    this.caller = caller;
+    this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+    this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+    this(self, Optional.of(caller));

Review comment:
       Done

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();
+  private List<EgressMessage> sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional<Address> caller) {
+    this.self = self;
+    this.caller = caller;
+    this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+    this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+    this(self, Optional.of(caller));
+  }
+
+  @Override
+  public Address self() {
+    return self;
+  }
+
+  @Override
+  public Optional<Address> caller() {
+    return caller;
+  }
+
+  @Override
+  public void send(Message message) {
+    sentMessages.add(new Envelope(Duration.ofMillis(0), message));

Review comment:
       Done

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();
+  private List<EgressMessage> sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional<Address> caller) {
+    this.self = self;
+    this.caller = caller;
+    this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+    this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+    this(self, Optional.of(caller));
+  }
+
+  @Override
+  public Address self() {
+    return self;
+  }
+
+  @Override
+  public Optional<Address> caller() {
+    return caller;
+  }
+
+  @Override
+  public void send(Message message) {
+    sentMessages.add(new Envelope(Duration.ofMillis(0), message));
+  }
+
+  @Override
+  public void sendAfter(Duration duration, Message message) {
+    sentMessages.add(new Envelope(duration, message));
+  }
+
+  @Override
+  public void send(EgressMessage message) {
+    sentEgressMessages.add(message);
+  }
+
+  @Override
+  public AddressScopedStorage storage() {
+    return storage;
+  }
+
+  /**
+   * This method returns a list of all messages sent by this function via 
{@link
+   * Context#send(Message)} or {@link Context#sendAfter(Duration, Message)}.
+   *
+   * <p>Messages are wrapped in an {@link Envelope} that contains the message 
itself and the
+   * duration after which the message was sent. The Duration is {@link 
Duration#ZERO} for messages
+   * sent via {@link Context#send(Message)}.
+   *
+   * @return the list of sent messages wrapped in {@link Envelope}s
+   */
+  public List<Envelope> getSentMessages() {
+    return sentMessages;

Review comment:
       Done.

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();

Review comment:
       Done.

##########
File path: 
statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/testing/TestContextIntegrationTest.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.*;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+import org.junit.Test;
+
+public class TestContextIntegrationTest {
+
+  private static class SimpleFunctionUnderTest implements StatefulFunction {
+
+    static final TypeName TYPE = 
TypeName.typeNameFromString("com.example.fns/simple-fn");
+
+    static final TypeName ANOTHER_TYPE = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+    static final TypeName SOME_EGRESS = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+    static final ValueSpec<Integer> NUM_INVOCATIONS = 
ValueSpec.named("seen").withIntType();
+
+    @Override
+    public CompletableFuture<Void> apply(Context context, Message message) 
throws Throwable {
+
+      String name = message.asUtf8String();

Review comment:
       Removed.

##########
File path: 
statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/testing/TestContextIntegrationTest.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.*;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+import org.junit.Test;
+
+public class TestContextIntegrationTest {
+
+  private static class SimpleFunctionUnderTest implements StatefulFunction {
+
+    static final TypeName TYPE = 
TypeName.typeNameFromString("com.example.fns/simple-fn");
+
+    static final TypeName ANOTHER_TYPE = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+    static final TypeName SOME_EGRESS = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+    static final ValueSpec<Integer> NUM_INVOCATIONS = 
ValueSpec.named("seen").withIntType();
+
+    @Override
+    public CompletableFuture<Void> apply(Context context, Message message) 
throws Throwable {
+
+      String name = message.asUtf8String();
+
+      AddressScopedStorage storage = context.storage();
+      int numInvocations = storage.get(NUM_INVOCATIONS).orElse(0);
+      storage.set(NUM_INVOCATIONS, numInvocations + 1);
+
+      Message messageToSomeone =
+          MessageBuilder.forAddress(ANOTHER_TYPE, "someone")
+              .withValue("I have an important message!")
+              .build();
+      context.send(messageToSomeone);
+
+      context.send(
+          EgressMessageBuilder.forEgress(SOME_EGRESS)
+              .withValue("I have an important egress message!")
+              .build());
+
+      context.sendAfter(Duration.ofMillis(1000), messageToSomeone);
+
+      return context.done();
+    }
+  }
+
+  @Test
+  public void testSimpleFunction() throws Throwable {

Review comment:
       Thanks.

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestAddressScopedStorage.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.util.HashMap;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+
+class TestAddressScopedStorage implements AddressScopedStorage {

Review comment:
       Do you suggest to use `ConcurrentAddressScopedStorage` instead of the 
TestAdressScopedStorage? 

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/Envelope.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * A utility class that wraps a {@link Message} and the {@link Duration} after 
which it was sent by
+ * a {@link org.apache.flink.statefun.sdk.java.StatefulFunction}. It is used 
by the {@link
+ * TestContext}.
+ */
+public class Envelope {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to