igalshilman commented on a change in pull request #236:
URL: https://github.com/apache/flink-statefun/pull/236#discussion_r641524541



##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();

Review comment:
       @knaufk What do you think about making an explicit seperation between 
delayed messages and regular messages, along side with `getDelayedMessage()`. ?
   This would make the overloaded `Envelope` two separate classes 
`DelayedEnvelope` and `Envelope`?

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestAddressScopedStorage.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.util.HashMap;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+
+class TestAddressScopedStorage implements AddressScopedStorage {

Review comment:
       It is definitely useful to keep the objects like that in their original 
form as a reference, but I'm wondering if we should go trough serde here. 
Otherwise the test might be a little misleading. 
   
   While it is true that testing serialization should happen else-where, but 
I'm just wondering how many people would follow these guidelines.
   

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();
+  private List<EgressMessage> sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional<Address> caller) {
+    this.self = self;
+    this.caller = caller;
+    this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+    this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+    this(self, Optional.of(caller));
+  }
+
+  @Override
+  public Address self() {
+    return self;
+  }
+
+  @Override
+  public Optional<Address> caller() {
+    return caller;
+  }
+
+  @Override
+  public void send(Message message) {
+    sentMessages.add(new Envelope(Duration.ofMillis(0), message));
+  }
+
+  @Override
+  public void sendAfter(Duration duration, Message message) {
+    sentMessages.add(new Envelope(duration, message));
+  }
+
+  @Override
+  public void send(EgressMessage message) {
+    sentEgressMessages.add(message);
+  }
+
+  @Override
+  public AddressScopedStorage storage() {
+    return storage;
+  }
+
+  /**
+   * This method returns a list of all messages sent by this function via 
{@link
+   * Context#send(Message)} or {@link Context#sendAfter(Duration, Message)}.
+   *
+   * <p>Messages are wrapped in an {@link Envelope} that contains the message 
itself and the
+   * duration after which the message was sent. The Duration is {@link 
Duration#ZERO} for messages
+   * sent via {@link Context#send(Message)}.
+   *
+   * @return the list of sent messages wrapped in {@link Envelope}s
+   */
+  public List<Envelope> getSentMessages() {
+    return sentMessages;

Review comment:
       perhaps you'd like to make a copy here or wrap in a 
`Collections.unmodifiableList`

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();
+  private List<EgressMessage> sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional<Address> caller) {
+    this.self = self;
+    this.caller = caller;
+    this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {

Review comment:
       It would be probably a tad more nicer if there will be static factory 
methods like:
   
   `TestContext.forTarget(Address self)`
   
   `TestContext.forTargetWithCaller(Address self, Address caller)`
   
   Possibly later we can add `builder` as well. where we can configure few more 
things (like the `ValueSpec` for storage)

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();
+  private List<EgressMessage> sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional<Address> caller) {
+    this.self = self;
+    this.caller = caller;
+    this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+    this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+    this(self, Optional.of(caller));
+  }
+
+  @Override
+  public Address self() {
+    return self;
+  }
+
+  @Override
+  public Optional<Address> caller() {
+    return caller;
+  }
+
+  @Override
+  public void send(Message message) {
+    sentMessages.add(new Envelope(Duration.ofMillis(0), message));

Review comment:
       Better to capture nullness in all these methods.

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/Envelope.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * A utility class that wraps a {@link Message} and the {@link Duration} after 
which it was sent by
+ * a {@link org.apache.flink.statefun.sdk.java.StatefulFunction}. It is used 
by the {@link
+ * TestContext}.
+ */
+public class Envelope {

Review comment:
       I like the idea of an envelope, but I think that this class is a bit 
confusing as it, requires a delay, and generally a bit tricky to extend in the 
future (make it capture a `context` method).
   How about we'll just split it to `SentEgressMessage` , `SentMessage`, and 
`SentDelayedMessage` ?
   
   They don't have to implement a common interface) but just 3 separate classes 
to capture the result of 3 different `context` invocations.

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {
+
+  private final TestAddressScopedStorage storage;
+  private Address self;
+  private Optional<Address> caller;
+
+  private List<Envelope> sentMessages = new ArrayList<>();
+  private List<EgressMessage> sentEgressMessages = new ArrayList<>();
+
+  private TestContext(Address self, Optional<Address> caller) {
+    this.self = self;
+    this.caller = caller;
+    this.storage = new TestAddressScopedStorage();
+  }
+
+  public TestContext(Address self) {
+    this(self, Optional.empty());
+  }
+
+  public TestContext(Address self, Address caller) {
+    this(self, Optional.of(caller));

Review comment:
       `self` should probably not be `null` here.

##########
File path: 
statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/testing/TestContextIntegrationTest.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.*;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+import org.junit.Test;
+
+public class TestContextIntegrationTest {
+
+  private static class SimpleFunctionUnderTest implements StatefulFunction {
+
+    static final TypeName TYPE = 
TypeName.typeNameFromString("com.example.fns/simple-fn");
+
+    static final TypeName ANOTHER_TYPE = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+    static final TypeName SOME_EGRESS = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+    static final ValueSpec<Integer> NUM_INVOCATIONS = 
ValueSpec.named("seen").withIntType();
+
+    @Override
+    public CompletableFuture<Void> apply(Context context, Message message) 
throws Throwable {
+
+      String name = message.asUtf8String();
+
+      AddressScopedStorage storage = context.storage();
+      int numInvocations = storage.get(NUM_INVOCATIONS).orElse(0);
+      storage.set(NUM_INVOCATIONS, numInvocations + 1);
+
+      Message messageToSomeone =
+          MessageBuilder.forAddress(ANOTHER_TYPE, "someone")
+              .withValue("I have an important message!")
+              .build();
+      context.send(messageToSomeone);
+
+      context.send(
+          EgressMessageBuilder.forEgress(SOME_EGRESS)
+              .withValue("I have an important egress message!")
+              .build());
+
+      context.sendAfter(Duration.ofMillis(1000), messageToSomeone);
+
+      return context.done();
+    }
+  }
+
+  @Test
+  public void testSimpleFunction() throws Throwable {

Review comment:
       nice tests!

##########
File path: 
statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/testing/TestContextIntegrationTest.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.*;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageBuilder;
+import org.junit.Test;
+
+public class TestContextIntegrationTest {
+
+  private static class SimpleFunctionUnderTest implements StatefulFunction {
+
+    static final TypeName TYPE = 
TypeName.typeNameFromString("com.example.fns/simple-fn");
+
+    static final TypeName ANOTHER_TYPE = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+    static final TypeName SOME_EGRESS = 
TypeName.typeNameFromString("com.example.fns/another-fn");
+
+    static final ValueSpec<Integer> NUM_INVOCATIONS = 
ValueSpec.named("seen").withIntType();
+
+    @Override
+    public CompletableFuture<Void> apply(Context context, Message message) 
throws Throwable {
+
+      String name = message.asUtf8String();

Review comment:
       This seems to be unused

##########
File path: 
statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/testing/TestContext.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.testing;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+/**
+ * An implementation of {@link Context} to to make it easier to test {@link
+ * org.apache.flink.statefun.sdk.java.StatefulFunction}s in isolation. It can 
be instantiated with
+ * the address of the function under test and optionally the address of the 
caller.
+ */
+public class TestContext implements Context {

Review comment:
       Can you make that `final`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to