tzulitai commented on a change in pull request #248: URL: https://github.com/apache/flink-statefun/pull/248#discussion_r684759568
########## File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java ########## @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import static org.apache.flink.statefun.e2e.smoke.Constants.*; + +import java.time.Duration; +import java.util.Objects; +import org.apache.flink.statefun.e2e.smoke.generated.Command; +import org.apache.flink.statefun.e2e.smoke.generated.Commands; +import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult; +import org.apache.flink.statefun.sdk.java.*; +import org.apache.flink.statefun.sdk.java.message.EgressMessage; +import org.apache.flink.statefun.sdk.java.message.EgressMessageBuilder; +import org.apache.flink.statefun.sdk.java.message.Message; +import org.apache.flink.statefun.sdk.java.message.MessageBuilder; + +public final class CommandInterpreter { + private final Ids ids; + private static final Duration sendAfterDelay = Duration.ofMillis(1); + + public CommandInterpreter(Ids ids) { + this.ids = Objects.requireNonNull(ids); + } + + public void interpret(ValueSpec<Long> state, Context context, Message message) { + if (message.is(SOURCE_COMMAND_TYPE)) { + interpret(state, context, message.as(SOURCE_COMMAND_TYPE).getCommands()); + } else if (message.is(COMMANDS_TYPE)) { + interpret(state, context, message.as(COMMANDS_TYPE)); + } else { + throw new IllegalArgumentException("Unrecognized message type " + message.valueTypeName()); + } + } + + private void interpret(ValueSpec<Long> state, Context context, Commands cmds) { + for (Command cmd : cmds.getCommandList()) { + if (cmd.hasIncrement()) { + modifyState(state, context, cmd.getIncrement()); + } else if (cmd.hasSend()) { + send(state, context, cmd.getSend()); + } else if (cmd.hasSendAfter()) { + sendAfter(state, context, cmd.getSendAfter()); + } else if (cmd.hasSendEgress()) { + sendEgress(state, context, cmd.getSendEgress()); + } else if (cmd.hasVerify()) { + verify(state, context, cmd.getVerify()); + } + } + } + + private void verify( + ValueSpec<Long> state, @SuppressWarnings("unused") Context context, Command.Verify verify) { + AddressScopedStorage storage = context.storage(); + int selfId = Integer.parseInt(context.self().id()); + long actual = storage.get(state).orElse(0L); + long expected = verify.getExpected(); + VerificationResult verificationResult = + VerificationResult.newBuilder() + .setId(selfId) + .setActual(actual) + .setExpected(expected) + .build(); + EgressMessage egressMessage = + EgressMessageBuilder.forEgress(VERIFICATION_EGRESS) + .withCustomType(VERIFICATION_RESULT_TYPE, verificationResult) + .build(); + context.send(egressMessage); + } + + private void sendEgress( + @SuppressWarnings("unused") ValueSpec<Long> state, + Context context, + @SuppressWarnings("unused") Command.SendEgress sendEgress) { + EgressMessage egressMessage = + EgressMessageBuilder.forEgress(DISCARD_EGRESS) + .withCustomType(COMMAND_TYPE, Command.getDefaultInstance()) + .build(); Review comment: This operation can be simplified by just sending strings, since the message is discarded anyways. ```suggestion EgressMessage egressMessage = EgressMessageBuilder.forEgress(DISCARD_EGRESS) .withValue("discarded-message") .build(); ``` ########## File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/generated/Command.java ########## @@ -0,0 +1,5005 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: commands.proto + +package org.apache.flink.statefun.e2e.smoke.generated; Review comment: Now that the `SimpleVerificationServer` no longer exposes `TypedValue` on the surface, I believe you no longer need to pre-generate these Protobuf messages, and can just use the Maven protoc plugin to do that for you. ########## File path: statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/EmbeddedModule.java ########## @@ -25,7 +25,7 @@ @AutoService(StatefulFunctionModule.class) Review comment: I realized that the Java service provider configuration file generated by `AutoService` in the fat uber jar (i.e., the `META-INF/services/org.apache.flink.statefun.sdk.spi.StatefulFunctionModule` file you'll see in the built jar) does not correctly list `EmbeddedModule` as an implementation. It only lists the `DriverModule`. This should probably be addressed with a resource transformer via the Maven shade plugin. Both `EmbeddedModule` and `DriverModule` needs to be listed in the service provider configuration file, so that the StateFun runtime picks both up as a registered module. The only reason this "coincidentally" works is because you are adding two jars in the `Dockerfile` right now, with the non-uber jar containing a service provider file for `EmbeddedModule`, and the uber jar containing a service provider for `DriverModule`. Strictly speaking, we only need to add the uber jar, which should contain a service provider file listing both modules. ########## File path: statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java ########## @@ -62,12 +62,13 @@ public static SourceCommand aRelayedStateModificationCommand( /** Blocks the currently executing thread until enough successful verification results supply. */ static void awaitVerificationSuccess( - Supplier<TypedValue> results, final int numberOfFunctionInstances) { + Supplier<TypedValue> results, final int numberOfFunctionInstances) + throws InvalidProtocolBufferException { Set<Integer> successfullyVerified = new HashSet<>(); while (successfullyVerified.size() != numberOfFunctionInstances) { TypedValue typedValue = results.get(); VerificationResult result = - TypedValueUtil.unpackProtobufMessage(typedValue, VerificationResult.parser()); + VerificationResult.parser().parseFrom(typedValue.getValue().toByteArray()); Review comment: I think the `StartedServer` and `SimpleVerificationServer` should really just work directly against `VerificationResult` instead of a `TypedValue` that wraps the result. I don't see any reason that `SimpleVerificationServer` should work against a generic type `T`. This will also help simplify the Protobuf dependency complication you mentioned in the PR description, since there will be no more `TypedValue` on the surface of these classes. ########## File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterAppServer.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; Review comment: It seems like a lot of the dependency complication come from the fact that all the classes of `statefun-smoke-e2e-driver`, `statefun-smoke-e2e-java`, and `statefun-smoke-e2e-embedded` all reside in the package `org.apache.flink.statefun.e2e.smoke`. In reality, since we may be including more than one of these in the classpath at the same time, there are some class clashes that are not apparent right away. Lets be safe here, and use different packages for the respective modules, e.g. `rg.apache.flink.statefun.e2e.smoke.<driver/java/embedded>`. ########## File path: statefun-e2e-tests/statefun-smoke-e2e-java/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java ########## @@ -0,0 +1,61 @@ +package org.apache.flink.statefun.e2e.smoke; + +import org.apache.flink.statefun.e2e.smoke.generated.Command; +import org.apache.flink.statefun.e2e.smoke.generated.Commands; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; +import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult; +import org.apache.flink.statefun.sdk.java.TypeName; +import org.apache.flink.statefun.sdk.java.types.SimpleType; +import org.apache.flink.statefun.sdk.java.types.Type; + +final class Constants { + private Constants() {} + + private static final String APP_NAMESPACE = "statefun.smoke.e2e"; + private static final String PROTO_TYPES_NAMESPACE = "type.googleapis.com"; Review comment: I don't think we need to strictly use this namespace for user-level messages. The only reason this was required was because on the driver side, you're using the `TypedValueUtils` to unwrap / wrap Protobuf messages from / to `TypedValue`s. Since `SourceCommand`, `Commands`, and `VerificationResult` is pure user-space messages, lets reflect that by using custom typenames. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
