This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit b86616bae4ce58dbdeebb263810de9a9c85fff61 Author: Igal Shilman <[email protected]> AuthorDate: Fri Nov 20 21:20:36 2020 +0100 [FLINK-20303] [e2e] Add a SmokeE2E test This closes #178. --- statefun-e2e-tests/pom.xml | 1 + statefun-e2e-tests/statefun-smoke-e2e/pom.xml | 143 +++++++++++++ .../flink/statefun/e2e/smoke/AsyncCompleter.java | 105 ++++++++++ .../statefun/e2e/smoke/CommandFlinkSource.java | 233 +++++++++++++++++++++ .../flink/statefun/e2e/smoke/CommandGenerator.java | 162 ++++++++++++++ .../statefun/e2e/smoke/CommandInterpreter.java | 144 +++++++++++++ .../flink/statefun/e2e/smoke/CommandRouter.java | 40 ++++ .../apache/flink/statefun/e2e/smoke/Constants.java | 35 ++++ .../org/apache/flink/statefun/e2e/smoke/Fn.java | 39 ++++ .../flink/statefun/e2e/smoke/FunctionProvider.java | 37 ++++ .../statefun/e2e/smoke/FunctionStateTracker.java | 78 +++++++ .../org/apache/flink/statefun/e2e/smoke/Ids.java | 38 ++++ .../apache/flink/statefun/e2e/smoke/Module.java | 78 +++++++ .../flink/statefun/e2e/smoke/ModuleParameters.java | 193 +++++++++++++++++ .../flink/statefun/e2e/smoke/ProtobufUtils.java | 34 +++ .../src/main/protobuf/commands.proto | 71 +++++++ .../src/main/protobuf/internal.proto | 35 ++++ .../statefun/e2e/smoke/CommandGeneratorTest.java | 40 ++++ .../statefun/e2e/smoke/CommandInterpreterTest.java | 73 +++++++ .../e2e/smoke/FunctionStateTrackerTest.java | 52 +++++ .../flink/statefun/e2e/smoke/HarnessTest.java | 90 ++++++++ .../statefun/e2e/smoke/ModuleParametersTest.java | 47 +++++ .../statefun/e2e/smoke/SimpleProtobufServer.java | 142 +++++++++++++ .../flink/statefun/e2e/smoke/SmokeRunner.java | 73 +++++++ .../statefun/e2e/smoke/SmokeVerificationE2E.java | 34 +++ .../org/apache/flink/statefun/e2e/smoke/Utils.java | 87 ++++++++ .../src/test/resources/Dockerfile | 20 ++ .../src/test/resources/log4j.properties | 24 +++ tools/maven/spotbugs-exclude.xml | 3 + 29 files changed, 2151 insertions(+) diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml index b754f0e..e38a954 100644 --- a/statefun-e2e-tests/pom.xml +++ b/statefun-e2e-tests/pom.xml @@ -32,6 +32,7 @@ under the License. <module>statefun-e2e-tests-common</module> <module>statefun-sanity-e2e</module> <module>statefun-exactly-once-remote-e2e</module> + <module>statefun-smoke-e2e</module> </modules> <build> diff --git a/statefun-e2e-tests/statefun-smoke-e2e/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e/pom.xml new file mode 100644 index 0000000..71bb3c3 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/pom.xml @@ -0,0 +1,143 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>statefun-e2e-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>2.3-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>statefun-smoke-e2e</artifactId> + + <properties> + <testcontainers.version>1.12.5</testcontainers.version> + <commons-math3.version>3.5</commons-math3.version> + </properties> + + <dependencies> + <!-- Stateful Functions --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-sdk</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-flink-io</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-flink-common</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- smoke logic --> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + <version>${commons-math3.version}</version> + </dependency> + + <!-- Protobuf --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + + <!-- streaming runtime --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + <exclusions> + <!-- + This artifact transitively depends on different versions of slf4j-api. + To see the complete list, comment this exclusion run mvn enforcer:enforce. + --> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + <exclusions> + <!-- + This artifact transitively depends on different versions of slf4j-api. + To see the complete list, comment this exclusion run mvn enforcer:enforce. + --> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + <!-- logging --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.15</version> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <version>1.2.17</version> + </dependency> + + <!-- End-to-end test common --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-e2e-tests-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <exclusions> + <!-- conflicts with flink-core --> + <exclusion> + <groupId>com.kohlschutter.junixsocket</groupId> + <artifactId>junixsocket-native-common</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>statefun-flink-harness</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>com.github.os72</groupId> + <artifactId>protoc-jar-maven-plugin</artifactId> + <version>${protoc-jar-maven-plugin.version}</version> + </plugin> + </plugins> + </build> + +</project> diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java new file mode 100644 index 0000000..e16bdc8 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/AsyncCompleter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * Creates {@link CompletableFuture}s that can be completed successfully or unsuccessfully, within 1 + * millisecond delay. + */ +final class AsyncCompleter { + + private static final Throwable EXCEPTION; + + static { + Throwable t = new RuntimeException(); + t.setStackTrace(new StackTraceElement[0]); + EXCEPTION = t; + } + + private static final class Task { + final long time; + final CompletableFuture<Boolean> future; + final boolean success; + + public Task(boolean success) { + this.time = System.nanoTime(); + this.future = new CompletableFuture<>(); + this.success = success; + } + } + + private static final int ONE_MILLISECOND = Duration.ofMillis(1).getNano(); + private final LinkedBlockingDeque<Task> queue = new LinkedBlockingDeque<>(); + private boolean started; + + /** + * Returns a future that would be complete successfully, no sooner than 1 millisecond from now. + */ + CompletableFuture<Boolean> successfulFuture() { + return future(true); + } + + /** + * Returns a future that would be completed unsuccessfully, no sooner than 1 millisecond from now. + */ + CompletableFuture<Boolean> failedFuture() { + return future(false); + } + + private CompletableFuture<Boolean> future(boolean success) { + Task e = new Task(success); + queue.add(e); + return e.future; + } + + void start() { + if (started) { + return; + } + started = true; + Thread t = new Thread(this::run); + t.setDaemon(true); + t.start(); + } + + @SuppressWarnings({"InfiniteLoopStatement", "BusyWait"}) + void run() { + while (true) { + try { + Task e = queue.take(); + final long duration = System.nanoTime() - e.time; + if (duration < ONE_MILLISECOND) { + Thread.sleep(1); + } + CompletableFuture<Boolean> future = e.future; + if (e.success) { + future.complete(Boolean.TRUE); + } else { + future.completeExceptionally(EXCEPTION); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java new file mode 100644 index 0000000..ea4ed39 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandFlinkSource.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import static org.apache.flink.statefun.e2e.smoke.generated.Command.Verify; +import static org.apache.flink.statefun.e2e.smoke.generated.Command.newBuilder; + +import com.google.protobuf.Any; +import java.util.Iterator; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.statefun.e2e.smoke.generated.Command; +import org.apache.flink.statefun.e2e.smoke.generated.Commands; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; +import org.apache.flink.statefun.e2e.smoke.generated.SourceSnapshot; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Flink Source that Emits {@link SourceCommand}s. + * + * <p>This source is configured by {@link ModuleParameters} and would generate random commands, + * addressed to various functions. This source might also throw exceptions (kaboom) to simulate + * failures. + * + * <p>After generating {@link ModuleParameters#getMessageCount()} messages, this source will switch + * to {@code verification} step. At this step, it would keep sending (every 2 seconds) a {@link + * Verify} command to every function indefinitely. + */ +final class CommandFlinkSource extends RichSourceFunction<Any> + implements CheckpointedFunction, CheckpointListener { + + private static final Logger LOG = LoggerFactory.getLogger(CommandFlinkSource.class); + + // ------------------------------------------------------------------------------------------------------------ + // Configuration + // ------------------------------------------------------------------------------------------------------------ + + private final ModuleParameters moduleParameters; + + // ------------------------------------------------------------------------------------------------------------ + // Runtime + // ------------------------------------------------------------------------------------------------------------ + + private transient ListState<SourceSnapshot> sourceSnapshotHandle; + private transient FunctionStateTracker functionStateTracker; + private transient int commandsSentSoFar; + private transient int failuresSoFar; + private transient boolean done; + private transient boolean atLeastOneCheckpointCompleted; + + public CommandFlinkSource(ModuleParameters moduleParameters) { + this.moduleParameters = Objects.requireNonNull(moduleParameters); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + OperatorStateStore store = context.getOperatorStateStore(); + sourceSnapshotHandle = + store.getUnionListState(new ListStateDescriptor<>("snapshot", SourceSnapshot.class)); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + SourceSnapshot sourceSnapshot = + getOnlyElement(sourceSnapshotHandle.get(), SourceSnapshot.getDefaultInstance()); + functionStateTracker = + new FunctionStateTracker(moduleParameters.getNumberOfFunctionInstances()) + .apply(sourceSnapshot.getTracker()); + commandsSentSoFar = sourceSnapshot.getCommandsSentSoFarHandle(); + failuresSoFar = sourceSnapshot.getFailuresGeneratedSoFar(); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + sourceSnapshotHandle.clear(); + sourceSnapshotHandle.add( + SourceSnapshot.newBuilder() + .setCommandsSentSoFarHandle(commandsSentSoFar) + .setTracker(functionStateTracker.snapshot()) + .setFailuresGeneratedSoFar(failuresSoFar) + .build()); + + if (commandsSentSoFar < moduleParameters.getMessageCount()) { + double perCent = 100.0d * (commandsSentSoFar) / moduleParameters.getMessageCount(); + LOG.info( + "Commands sent {} / {} ({} %)", + commandsSentSoFar, moduleParameters.getMessageCount(), perCent); + } + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + atLeastOneCheckpointCompleted = true; + } + + @Override + public void cancel() { + done = true; + } + + // ------------------------------------------------------------------------------------------------------------ + // Generation + // ------------------------------------------------------------------------------------------------------------ + + @Override + public void run(SourceContext<Any> ctx) { + generate(ctx); + do { + verify(ctx); + snooze(); + synchronized (ctx.getCheckpointLock()) { + if (done) { + return; + } + } + } while (true); + } + + private void generate(SourceContext<Any> ctx) { + final int startPosition = this.commandsSentSoFar; + final OptionalInt kaboomIndex = + computeFailureIndex(startPosition, failuresSoFar, moduleParameters.getMaxFailures()); + if (kaboomIndex.isPresent()) { + failuresSoFar++; + } + LOG.info( + "starting at {}, kaboom at {}, total messages {}", + startPosition, + kaboomIndex, + moduleParameters.getMessageCount()); + Supplier<SourceCommand> generator = + new CommandGenerator(new JDKRandomGenerator(), moduleParameters); + FunctionStateTracker functionStateTracker = this.functionStateTracker; + for (int i = startPosition; i < moduleParameters.getMessageCount(); i++) { + if (atLeastOneCheckpointCompleted && kaboomIndex.isPresent() && i >= kaboomIndex.getAsInt()) { + throw new RuntimeException("KABOOM!!!"); + } + SourceCommand command = generator.get(); + synchronized (ctx.getCheckpointLock()) { + if (done) { + return; + } + functionStateTracker.apply(command); + ctx.collect(Any.pack(command)); + this.commandsSentSoFar = i; + } + } + } + + private void verify(SourceContext<Any> ctx) { + FunctionStateTracker functionStateTracker = this.functionStateTracker; + + for (int i = 0; i < moduleParameters.getNumberOfFunctionInstances(); i++) { + final long expected = functionStateTracker.stateOf(i); + + Command.Builder verify = newBuilder().setVerify(Verify.newBuilder().setExpected(expected)); + + SourceCommand command = + SourceCommand.newBuilder() + .setTarget(i) + .setCommands(Commands.newBuilder().addCommand(verify)) + .build(); + synchronized (ctx.getCheckpointLock()) { + ctx.collect(Any.pack(command)); + } + } + } + + // --------------------------------------------------------------------------------------------------------------- + // Utils + // --------------------------------------------------------------------------------------------------------------- + + private OptionalInt computeFailureIndex(int startPosition, int failureSoFar, int maxFailures) { + if (failureSoFar >= maxFailures) { + return OptionalInt.empty(); + } + if (startPosition >= moduleParameters.getMessageCount()) { + return OptionalInt.empty(); + } + int index = + ThreadLocalRandom.current().nextInt(startPosition, moduleParameters.getMessageCount()); + return OptionalInt.of(index); + } + + private static void snooze() { + try { + Thread.sleep(2_000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private static <T> T getOnlyElement(Iterable<T> items, T def) { + Iterator<T> it = items.iterator(); + if (!it.hasNext()) { + return def; + } + T item = it.next(); + if (it.hasNext()) { + throw new IllegalStateException("Iterable has additional elements"); + } + return item; + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java new file mode 100644 index 0000000..a062de2 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandGenerator.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import static java.util.Arrays.asList; +import static org.apache.commons.math3.util.Pair.create; + +import java.util.List; +import java.util.Objects; +import java.util.function.Supplier; +import org.apache.commons.math3.distribution.EnumeratedDistribution; +import org.apache.commons.math3.random.RandomGenerator; +import org.apache.commons.math3.util.Pair; +import org.apache.flink.statefun.e2e.smoke.generated.Command; +import org.apache.flink.statefun.e2e.smoke.generated.Commands; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; + +/** + * Generates random commands to be interpreted by {@linkplain CommandInterpreter}. + * + * <p>see {src/main/protobuf/commands.proto} + */ +public final class CommandGenerator implements Supplier<SourceCommand> { + + private final RandomGenerator random; + private final EnumeratedDistribution<Gen> distribution; + private final ModuleParameters moduleParameters; + + public CommandGenerator(RandomGenerator random, ModuleParameters parameters) { + this.random = Objects.requireNonNull(random); + this.moduleParameters = Objects.requireNonNull(parameters); + this.distribution = new EnumeratedDistribution<>(random, randomCommandGenerators()); + } + + @Override + public SourceCommand get() { + final int depth = random.nextInt(moduleParameters.getCommandDepth()); + return SourceCommand.newBuilder().setTarget(address()).setCommands(commands(depth)).build(); + } + + private Commands.Builder commands(int depth) { + Commands.Builder builder = Commands.newBuilder(); + if (depth <= 0) { + StateModifyGen.instance().generate(builder, depth); + return builder; + } + final int n = random.nextInt(moduleParameters.getMaxCommandsPerDepth()); + for (int i = 0; i < n; i++) { + Gen gen = distribution.sample(); + gen.generate(builder, depth); + } + if (builder.getCommandCount() == 0) { + StateModifyGen.instance().generate(builder, depth); + } + return builder; + } + + private int address() { + return random.nextInt(moduleParameters.getNumberOfFunctionInstances()); + } + + private List<Pair<Gen, Double>> randomCommandGenerators() { + return asList( + create(new StateModifyGen(), moduleParameters.getStateModificationsPr()), + create(new SendGen(), moduleParameters.getSendPr()), + create(new SendAfterGen(), moduleParameters.getSendAfterPr()), + create(new SendAsyncOp(), moduleParameters.getAsyncSendPr()), + create(new Noop(), moduleParameters.getNoopPr()), + create(new SendEgress(), moduleParameters.getSendEgressPr())); + } + + interface Gen { + /** generates one or more commands with depth at most @depth. */ + void generate(Commands.Builder builder, int depth); + } + + // ---------------------------------------------------------------------------------------------------- + // generators + // ---------------------------------------------------------------------------------------------------- + + private static final class SendEgress implements Gen { + + @Override + public void generate(Commands.Builder builder, int depth) { + builder.addCommand( + Command.newBuilder().setSendEgress(Command.SendEgress.getDefaultInstance())); + } + } + + private static final class Noop implements Gen { + @Override + public void generate(Commands.Builder builder, int depth) {} + } + + private static final class StateModifyGen implements Gen { + + static final Gen INSTANCE = new StateModifyGen(); + + static Gen instance() { + return INSTANCE; + } + + @Override + public void generate(Commands.Builder builder, int depth) { + builder.addCommand( + Command.newBuilder().setIncrement(Command.IncrementState.getDefaultInstance())); + } + } + + private final class SendAfterGen implements Gen { + + @Override + public void generate(Commands.Builder builder, int depth) { + builder.addCommand(Command.newBuilder().setSendAfter(sendAfter(depth))); + } + + private Command.SendAfter.Builder sendAfter(int depth) { + return Command.SendAfter.newBuilder().setTarget(address()).setCommands(commands(depth - 1)); + } + } + + private final class SendGen implements Gen { + + @Override + public void generate(Commands.Builder builder, int depth) { + builder.addCommand(Command.newBuilder().setSend(send(depth))); + } + + private Command.Send.Builder send(int depth) { + return Command.Send.newBuilder().setTarget(address()).setCommands(commands(depth - 1)); + } + } + + private final class SendAsyncOp implements Gen { + + @Override + public void generate(Commands.Builder builder, int depth) { + builder.addCommand(Command.newBuilder().setAsyncOperation(asyncOp(depth))); + } + + private Command.AsyncOperation.Builder asyncOp(int depth) { + return Command.AsyncOperation.newBuilder() + .setFailure(random.nextBoolean()) + .setResolvedCommands(commands(depth - 1)); + } + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java new file mode 100644 index 0000000..343c8f2 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreter.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import static org.apache.flink.statefun.e2e.smoke.ProtobufUtils.unpack; + +import com.google.protobuf.Any; +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import org.apache.flink.statefun.e2e.smoke.generated.Command; +import org.apache.flink.statefun.e2e.smoke.generated.Commands; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; +import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult; +import org.apache.flink.statefun.sdk.AsyncOperationResult; +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.state.PersistedValue; + +public final class CommandInterpreter { + private final AsyncCompleter asyncCompleter; + private final Ids ids; + private static final Duration sendAfterDelay = Duration.ofMillis(1); + + public CommandInterpreter(Ids ids) { + this.asyncCompleter = new AsyncCompleter(); + asyncCompleter.start(); + this.ids = Objects.requireNonNull(ids); + } + + public void interpret(PersistedValue<Long> state, Context context, Object message) { + if (message instanceof AsyncOperationResult) { + @SuppressWarnings("unchecked") + AsyncOperationResult<Commands, ?> res = (AsyncOperationResult<Commands, ?>) message; + interpret(state, context, res.metadata()); + return; + } + if (!(message instanceof Any)) { + throw new IllegalArgumentException("wtf " + message); + } + Any any = (Any) message; + if (any.is(SourceCommand.class)) { + SourceCommand sourceCommand = unpack(any, SourceCommand.class); + interpret(state, context, sourceCommand.getCommands()); + } else if (any.is(Commands.class)) { + Commands commands = unpack(any, Commands.class); + interpret(state, context, commands); + } else { + throw new IllegalArgumentException("Unknown message type " + any.getTypeUrl()); + } + } + + private void interpret(PersistedValue<Long> state, Context context, Commands command) { + for (Command cmd : command.getCommandList()) { + if (cmd.hasIncrement()) { + modifyState(state, context, cmd.getIncrement()); + } else if (cmd.hasAsyncOperation()) { + registerAsyncOps(state, context, cmd.getAsyncOperation()); + } else if (cmd.hasSend()) { + send(state, context, cmd.getSend()); + } else if (cmd.hasSendAfter()) { + sendAfter(state, context, cmd.getSendAfter()); + } else if (cmd.hasSendEgress()) { + sendEgress(state, context, cmd.getSendEgress()); + } else if (cmd.hasVerify()) { + verify(state, context, cmd.getVerify()); + } + } + } + + private void verify( + PersistedValue<Long> state, + @SuppressWarnings("unused") Context context, + Command.Verify verify) { + int selfId = Integer.parseInt(context.self().id()); + long actual = state.getOrDefault(0L); + long expected = verify.getExpected(); + VerificationResult verificationResult = + VerificationResult.newBuilder() + .setId(selfId) + .setActual(actual) + .setExpected(expected) + .build(); + context.send(Constants.VERIFICATION_RESULT, Any.pack(verificationResult)); + } + + private void sendEgress( + @SuppressWarnings("unused") PersistedValue<Long> state, + Context context, + @SuppressWarnings("unused") Command.SendEgress sendEgress) { + context.send(Constants.OUT, Any.getDefaultInstance()); + } + + private void sendAfter( + @SuppressWarnings("unused") PersistedValue<Long> state, + Context context, + Command.SendAfter send) { + FunctionType functionType = Constants.FN_TYPE; + String id = ids.idOf(send.getTarget()); + context.sendAfter(sendAfterDelay, functionType, id, Any.pack(send.getCommands())); + } + + private void send( + @SuppressWarnings("unused") PersistedValue<Long> state, Context context, Command.Send send) { + FunctionType functionType = Constants.FN_TYPE; + String id = ids.idOf(send.getTarget()); + context.send(functionType, id, Any.pack(send.getCommands())); + } + + private void registerAsyncOps( + @SuppressWarnings("unused") PersistedValue<Long> state, + Context context, + Command.AsyncOperation asyncOperation) { + CompletableFuture<Boolean> future = + asyncOperation.getFailure() + ? asyncCompleter.successfulFuture() + : asyncCompleter.failedFuture(); + + Commands next = asyncOperation.getResolvedCommands(); + context.registerAsyncOperation(next, future); + } + + private void modifyState( + PersistedValue<Long> state, + @SuppressWarnings("unused") Context context, + @SuppressWarnings("unused") Command.IncrementState incrementState) { + state.updateAndGet(n -> n == null ? 1 : n + 1); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java new file mode 100644 index 0000000..e08ae8d --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/CommandRouter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import com.google.protobuf.Any; +import java.util.Objects; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.io.Router; + +public class CommandRouter implements Router<Any> { + private final Ids ids; + + public CommandRouter(Ids ids) { + this.ids = Objects.requireNonNull(ids); + } + + @Override + public void route(Any any, Downstream<Any> downstream) { + SourceCommand sourceCommand = ProtobufUtils.unpack(any, SourceCommand.class); + FunctionType type = Constants.FN_TYPE; + String id = ids.idOf(sourceCommand.getTarget()); + downstream.forward(type, id, any); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java new file mode 100644 index 0000000..f5cf262 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Constants.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import com.google.protobuf.Any; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.io.IngressIdentifier; + +public class Constants { + + public static final IngressIdentifier<Any> IN = new IngressIdentifier<>(Any.class, "", "source"); + + public static final EgressIdentifier<Any> OUT = new EgressIdentifier<>("", "sink", Any.class); + + public static final FunctionType FN_TYPE = new FunctionType("v", "f1"); + + public static final EgressIdentifier<Any> VERIFICATION_RESULT = + new EgressIdentifier<>("", "verification", Any.class); +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java new file mode 100644 index 0000000..369e966 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Fn.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import java.util.Objects; +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.StatefulFunction; +import org.apache.flink.statefun.sdk.annotations.Persisted; +import org.apache.flink.statefun.sdk.state.PersistedValue; + +public class Fn implements StatefulFunction { + + @Persisted private final PersistedValue<Long> state = PersistedValue.of("state", Long.class); + private final CommandInterpreter interpreter; + + public Fn(CommandInterpreter interpreter) { + this.interpreter = Objects.requireNonNull(interpreter); + } + + @Override + public void invoke(Context context, Object message) { + interpreter.interpret(state, context, message); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java new file mode 100644 index 0000000..a7eda2a --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionProvider.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import java.util.Objects; +import org.apache.flink.statefun.sdk.FunctionType; +import org.apache.flink.statefun.sdk.StatefulFunction; +import org.apache.flink.statefun.sdk.StatefulFunctionProvider; + +public class FunctionProvider implements StatefulFunctionProvider { + private final Ids ids; + + public FunctionProvider(Ids ids) { + this.ids = Objects.requireNonNull(ids); + } + + @Override + public StatefulFunction functionOfType(FunctionType functionType) { + CommandInterpreter interpreter = new CommandInterpreter(ids); + return new Fn(interpreter); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java new file mode 100644 index 0000000..d836094 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTracker.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import org.apache.flink.statefun.e2e.smoke.generated.Command; +import org.apache.flink.statefun.e2e.smoke.generated.Commands; +import org.apache.flink.statefun.e2e.smoke.generated.FunctionTrackerSnapshot; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; + +final class FunctionStateTracker { + private final long[] expectedStates; + + public FunctionStateTracker(int numberOfFunctionInstances) { + this.expectedStates = new long[numberOfFunctionInstances]; + } + + /** + * Find any state modification commands nested under @sourceCommand, and apply them in the + * internal state representation. + */ + public void apply(SourceCommand sourceCommand) { + updateInternally(sourceCommand.getTarget(), sourceCommand.getCommands()); + } + + /** Apply all the state modification stored in the snapshot represented by the snapshotBytes. */ + public FunctionStateTracker apply(FunctionTrackerSnapshot snapshot) { + for (int i = 0; i < snapshot.getStateCount(); i++) { + expectedStates[i] += snapshot.getState(i); + } + return this; + } + + /** Get the current expected state of a function instance. */ + public long stateOf(int id) { + return expectedStates[id]; + } + + public FunctionTrackerSnapshot.Builder snapshot() { + FunctionTrackerSnapshot.Builder snapshot = FunctionTrackerSnapshot.newBuilder(); + for (long state : expectedStates) { + snapshot.addState(state); + } + return snapshot; + } + + /** + * Recursively traverse the commands tree and look for {@link Command.IncrementState} commands. + * For each {@code ModifyState} command found update the corresponding expected state. + */ + private void updateInternally(int currentAddress, Commands commands) { + for (Command command : commands.getCommandList()) { + if (command.hasIncrement()) { + expectedStates[currentAddress]++; + } else if (command.hasSend()) { + updateInternally(command.getSend().getTarget(), command.getSend().getCommands()); + } else if (command.hasSendAfter()) { + updateInternally(command.getSendAfter().getTarget(), command.getSendAfter().getCommands()); + } else if (command.hasAsyncOperation()) { + updateInternally(currentAddress, command.getAsyncOperation().getResolvedCommands()); + } + } + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java new file mode 100644 index 0000000..b9fbc9f --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Ids.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +final class Ids { + private final String[] cache; + + public Ids(int maxIds) { + this.cache = createIds(maxIds); + } + + public String idOf(int address) { + return cache[address]; + } + + private static String[] createIds(int maxIds) { + String[] ids = new String[maxIds]; + for (int i = 0; i < maxIds; i++) { + ids[i] = Integer.toString(i); + } + return ids; + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java new file mode 100644 index 0000000..21db25b --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/Module.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import static org.apache.flink.statefun.e2e.smoke.Constants.IN; + +import com.google.auto.service.AutoService; +import com.google.protobuf.Any; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec; +import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec; +import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.SocketClientSink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@AutoService(StatefulFunctionModule.class) +public class Module implements StatefulFunctionModule { + public static final Logger LOG = LoggerFactory.getLogger(Module.class); + + @Override + public void configure(Map<String, String> globalConfiguration, Binder binder) { + ModuleParameters moduleParameters = ModuleParameters.from(globalConfiguration); + LOG.info(moduleParameters.toString()); + + Ids ids = new Ids(moduleParameters.getNumberOfFunctionInstances()); + + binder.bindIngress(new SourceFunctionSpec<>(IN, new CommandFlinkSource(moduleParameters))); + binder.bindEgress(new SinkFunctionSpec<>(Constants.OUT, new DiscardingSink<>())); + binder.bindIngressRouter(IN, new CommandRouter(ids)); + + FunctionProvider provider = new FunctionProvider(ids); + binder.bindFunctionProvider(Constants.FN_TYPE, provider); + + SocketClientSink<Any> client = + new SocketClientSink<>( + moduleParameters.getVerificationServerHost(), + moduleParameters.getVerificationServerPort(), + new VerificationResultSerializer(), + 3, + true); + + binder.bindEgress(new SinkFunctionSpec<>(Constants.VERIFICATION_RESULT, client)); + } + + private static final class VerificationResultSerializer implements SerializationSchema<Any> { + + @Override + public byte[] serialize(Any element) { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(element.getSerializedSize() + 8); + element.writeDelimitedTo(out); + return out.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java new file mode 100644 index 0000000..0d2126e --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ModuleParameters.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import java.io.Serializable; +import java.util.Map; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +@SuppressWarnings("unused") +public final class ModuleParameters implements Serializable { + + private static final long serialVersionUID = 1; + + private int numberOfFunctionInstances = 1_000; + private int commandDepth = 10; + private int messageCount = 100_000; + private int maxCommandsPerDepth = 3; + private double stateModificationsPr = 0.4; + private double sendPr = 0.9; + private double sendAfterPr = 0.1; + private double asyncSendPr = 0.1; + private double noopPr = 0.2; + private double sendEgressPr = 0.03; + private int maxFailures = 1; + private String verificationServerHost = "localhost"; + private int verificationServerPort = 5050; + + /** Creates an instance of ModuleParameters from a key-value map. */ + public static ModuleParameters from(Map<String, String> globalConfiguration) { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper.convertValue(globalConfiguration, ModuleParameters.class); + } + + public Map<String, String> asMap() { + ObjectMapper mapper = new ObjectMapper(); + return mapper.convertValue(this, new TypeReference<Map<String, String>>() {}); + } + + public int getNumberOfFunctionInstances() { + return numberOfFunctionInstances; + } + + public void setNumberOfFunctionInstances(int numberOfFunctionInstances) { + this.numberOfFunctionInstances = numberOfFunctionInstances; + } + + public int getCommandDepth() { + return commandDepth; + } + + public void setCommandDepth(int commandDepth) { + this.commandDepth = commandDepth; + } + + public int getMessageCount() { + return messageCount; + } + + public void setMessageCount(int messageCount) { + this.messageCount = messageCount; + } + + public int getMaxCommandsPerDepth() { + return maxCommandsPerDepth; + } + + public void setMaxCommandsPerDepth(int maxCommandsPerDepth) { + this.maxCommandsPerDepth = maxCommandsPerDepth; + } + + public double getStateModificationsPr() { + return stateModificationsPr; + } + + public void setStateModificationsPr(double stateModificationsPr) { + this.stateModificationsPr = stateModificationsPr; + } + + public double getSendPr() { + return sendPr; + } + + public void setSendPr(double sendPr) { + this.sendPr = sendPr; + } + + public double getSendAfterPr() { + return sendAfterPr; + } + + public void setSendAfterPr(double sendAfterPr) { + this.sendAfterPr = sendAfterPr; + } + + public double getAsyncSendPr() { + return asyncSendPr; + } + + public void setAsyncSendPr(double asyncSendPr) { + this.asyncSendPr = asyncSendPr; + } + + public double getNoopPr() { + return noopPr; + } + + public void setNoopPr(double noopPr) { + this.noopPr = noopPr; + } + + public double getSendEgressPr() { + return sendEgressPr; + } + + public void setSendEgressPr(double sendEgressPr) { + this.sendEgressPr = sendEgressPr; + } + + public void setMaxFailures(int maxFailures) { + this.maxFailures = maxFailures; + } + + public int getMaxFailures() { + return maxFailures; + } + + public String getVerificationServerHost() { + return verificationServerHost; + } + + public void setVerificationServerHost(String verificationServerHost) { + this.verificationServerHost = verificationServerHost; + } + + public int getVerificationServerPort() { + return verificationServerPort; + } + + public void setVerificationServerPort(int verificationServerPort) { + this.verificationServerPort = verificationServerPort; + } + + @Override + public String toString() { + return "ModuleParameters{" + + "numberOfFunctionInstances=" + + numberOfFunctionInstances + + ", commandDepth=" + + commandDepth + + ", messageCount=" + + messageCount + + ", maxCommandsPerDepth=" + + maxCommandsPerDepth + + ", stateModificationsPr=" + + stateModificationsPr + + ", sendPr=" + + sendPr + + ", sendAfterPr=" + + sendAfterPr + + ", asyncSendPr=" + + asyncSendPr + + ", noopPr=" + + noopPr + + ", sendEgressPr=" + + sendEgressPr + + ", maxFailures=" + + maxFailures + + ", verificationServerHost='" + + verificationServerHost + + '\'' + + ", verificationServerPort=" + + verificationServerPort + + '}'; + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java new file mode 100644 index 0000000..25aec2a --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/java/org/apache/flink/statefun/e2e/smoke/ProtobufUtils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; + +final class ProtobufUtils { + private ProtobufUtils() {} + + public static <T extends Message> T unpack(Any any, Class<T> messageType) { + try { + return any.unpack(messageType); + } catch (InvalidProtocolBufferException e) { + throw new IllegalStateException(e); + } + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto new file mode 100644 index 0000000..e3912d1 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/commands.proto @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.flink.statefun.e2e.smoke; +option java_package = "org.apache.flink.statefun.e2e.smoke.generated"; + +option java_multiple_files = true; + +message SourceCommand { + int32 target = 1; + Commands commands = 2; +} + +message Commands { + repeated Command command = 1; +} + +message Command { + message IncrementState { + } + message Send { + int32 target = 1; + Commands commands = 2; + } + message SendAfter { + int32 target = 1; + Commands commands = 2; + } + message SendEgress { + } + message AsyncOperation { + bool failure = 1; + Commands resolved_commands = 2; + } + message Verify { + int64 expected = 1; + } + + oneof command { + IncrementState increment = 1; + Send send = 2; + SendAfter send_after = 3; + SendEgress send_egress = 4; + AsyncOperation async_operation = 5; + Verify verify = 6; + } +} + +message VerificationResult { + int32 id = 1; + int64 expected = 2; + int64 actual = 3; +} + diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto new file mode 100644 index 0000000..138fdff --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/main/protobuf/internal.proto @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.flink.statefun.e2e.smoke; +option java_package = "org.apache.flink.statefun.e2e.smoke.generated"; +option java_multiple_files = true; + +message FunctionTrackerSnapshot { + repeated int64 state = 1; +} + +message SourceSnapshot { + int32 commandsSentSoFarHandle = 1; + int32 failuresGeneratedSoFar = 2; + FunctionTrackerSnapshot tracker = 3; +} + + diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java new file mode 100644 index 0000000..49139cc --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandGeneratorTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; +import org.junit.Test; + +public class CommandGeneratorTest { + + @Test + public void usageExample() { + ModuleParameters parameters = new ModuleParameters(); + CommandGenerator generator = new CommandGenerator(new JDKRandomGenerator(), parameters); + + SourceCommand command = generator.get(); + + assertThat(command.getTarget(), notNullValue()); + assertThat(command.getCommands(), notNullValue()); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java new file mode 100644 index 0000000..1010666 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/CommandInterpreterTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import static org.apache.flink.statefun.e2e.smoke.Utils.aStateModificationCommand; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import com.google.protobuf.Any; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; +import org.apache.flink.statefun.sdk.Address; +import org.apache.flink.statefun.sdk.Context; +import org.apache.flink.statefun.sdk.io.EgressIdentifier; +import org.apache.flink.statefun.sdk.state.PersistedValue; +import org.junit.Test; + +public class CommandInterpreterTest { + + @Test + public void exampleUsage() { + CommandInterpreter interpreter = new CommandInterpreter(new Ids(10)); + + PersistedValue<Long> state = PersistedValue.of("state", Long.class); + Context context = new MockContext(); + SourceCommand sourceCommand = aStateModificationCommand(); + + interpreter.interpret(state, context, Any.pack(sourceCommand)); + + assertThat(state.get(), is(1L)); + } + + private static final class MockContext implements Context { + + @Override + public Address self() { + return null; + } + + @Override + public Address caller() { + return null; + } + + @Override + public void send(Address address, Object o) {} + + @Override + public <T> void send(EgressIdentifier<T> egressIdentifier, T t) {} + + @Override + public void sendAfter(Duration duration, Address address, Object o) {} + + @Override + public <M, T> void registerAsyncOperation(M m, CompletableFuture<T> completableFuture) {} + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java new file mode 100644 index 0000000..97dba1f --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/FunctionStateTrackerTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import static org.apache.flink.statefun.e2e.smoke.Utils.aRelayedStateModificationCommand; +import static org.apache.flink.statefun.e2e.smoke.Utils.aStateModificationCommand; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import org.junit.Test; + +public class FunctionStateTrackerTest { + + @Test + public void exampleUsage() { + FunctionStateTracker tracker = new FunctionStateTracker(1_000); + + tracker.apply(aStateModificationCommand(5)); + tracker.apply(aStateModificationCommand(5)); + tracker.apply(aStateModificationCommand(5)); + + assertThat(tracker.stateOf(5), is(3L)); + } + + @Test + public void testRelay() { + FunctionStateTracker tracker = new FunctionStateTracker(1_000); + + // send a layered state increment message, first to function 5, and then + // to function 6. + tracker.apply(aRelayedStateModificationCommand(5, 6)); + + assertThat(tracker.stateOf(5), is(0L)); + assertThat(tracker.stateOf(6), is(1L)); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java new file mode 100644 index 0000000..88864f8 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/HarnessTest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import static org.apache.flink.statefun.e2e.smoke.Utils.awaitVerificationSuccess; +import static org.apache.flink.statefun.e2e.smoke.Utils.startProtobufServer; + +import com.google.protobuf.Any; +import org.apache.flink.statefun.flink.harness.Harness; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HarnessTest { + + private static final Logger LOG = LoggerFactory.getLogger(HarnessTest.class); + + @Ignore + @Test(timeout = 1_000 * 60 * 2) + public void miniClusterTest() throws Exception { + Harness harness = new Harness(); + + // set Flink related configuration. + harness.withConfiguration( + "classloader.parent-first-patterns.additional", + "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"); + harness.withConfiguration("restart-strategy", "fixed-delay"); + harness.withConfiguration("restart-strategy.fixed-delay.attempts", "2147483647"); + harness.withConfiguration("restart-strategy.fixed-delay.delay", "1sec"); + harness.withConfiguration("execution.checkpointing.interval", "2sec"); + harness.withConfiguration("execution.checkpointing.mode", "EXACTLY_ONCE"); + harness.withConfiguration("execution.checkpointing.max-concurrent-checkpoints", "3"); + harness.withConfiguration("parallelism.default", "2"); + harness.withConfiguration("state.checkpoints.dir", "file:///tmp/checkpoints"); + + // start the Protobuf server + SimpleProtobufServer.StartedServer<Any> started = startProtobufServer(); + + // configure test parameters. + ModuleParameters parameters = new ModuleParameters(); + parameters.setMaxFailures(1); + parameters.setMessageCount(100_000); + parameters.setNumberOfFunctionInstances(128); + parameters.setVerificationServerHost("localhost"); + parameters.setVerificationServerPort(started.port()); + parameters.asMap().forEach(harness::withGlobalConfiguration); + + // run the harness. + try (AutoCloseable ignored = startHarnessInTheBackground(harness)) { + awaitVerificationSuccess(started.results(), parameters.getNumberOfFunctionInstances()); + } + + LOG.info("All done."); + } + + private static AutoCloseable startHarnessInTheBackground(Harness harness) { + Thread t = + new Thread( + () -> { + try { + harness.start(); + } catch (InterruptedException ignored) { + LOG.info("Harness Thread was interrupted. Exiting..."); + } catch (Exception exception) { + LOG.info("Something happened while trying to run the Harness.", exception); + } + }); + t.setName("harness-runner"); + t.setDaemon(true); + t.start(); + return t::interrupt; + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java new file mode 100644 index 0000000..be5a9b5 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/ModuleParametersTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.util.Collections; +import java.util.Map; +import org.junit.Test; + +public class ModuleParametersTest { + + @Test + public void exampleUsage() { + Map<String, String> keys = Collections.singletonMap("messageCount", "1"); + ModuleParameters parameters = ModuleParameters.from(keys); + + assertThat(parameters.getMessageCount(), is(1)); + } + + @Test + public void roundTrip() { + ModuleParameters original = new ModuleParameters(); + original.setCommandDepth(1234); + + ModuleParameters deserialized = ModuleParameters.from(original.asMap()); + + assertThat(deserialized.getCommandDepth(), is(1234)); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java new file mode 100644 index 0000000..58e18bf --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SimpleProtobufServer.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import com.google.protobuf.Message; +import com.google.protobuf.Parser; +import java.io.IOException; +import java.io.InputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.flink.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple threaded TCP server that is able to receive a specific Protocol Buffers message type. + * + * @param <T> input message type. + */ +@ThreadSafe +public final class SimpleProtobufServer<T extends Message> { + private static final Logger LOG = LoggerFactory.getLogger(SimpleProtobufServer.class); + + private final LinkedBlockingDeque<T> results = new LinkedBlockingDeque<>(); + private final ExecutorService executor; + private final AtomicBoolean started = new AtomicBoolean(false); + private final Parser<T> parser; + + public SimpleProtobufServer(Parser<T> parser) { + this.executor = MoreExecutors.newCachedDaemonThreadPool(); + this.parser = parser; + } + + StartedServer<T> start() { + if (!started.compareAndSet(false, true)) { + throw new IllegalArgumentException("Already started."); + } + try { + ServerSocket serverSocket = new ServerSocket(0); + serverSocket.setReuseAddress(true); + LOG.info("Starting server at " + serverSocket.getLocalPort()); + executor.submit(() -> acceptClients(serverSocket)); + return new StartedServer<>(serverSocket.getLocalPort(), results()); + } catch (IOException e) { + throw new IllegalStateException("Unable to bind the TCP server.", e); + } + } + + private Supplier<T> results() { + return () -> { + try { + return results.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + } + + @SuppressWarnings("InfiniteLoopStatement") + private void acceptClients(ServerSocket serverSocket) { + while (true) { + try { + Socket client = serverSocket.accept(); + InputStream input = client.getInputStream(); + executor.submit(() -> pumpVerificationResults(client, input)); + } catch (IOException e) { + LOG.info("Exception while trying to accept a connection.", e); + } + } + } + + private void pumpVerificationResults(Socket client, InputStream input) { + while (true) { + try { + T result = parser.parseDelimitedFrom(input); + if (result != null) { + results.add(result); + } + } catch (IOException e) { + LOG.info( + "Exception reading a verification result from " + + client.getRemoteSocketAddress() + + ", bye...", + e); + IOUtils.closeQuietly(client); + return; + } + } + } + + public static final class StartedServer<T extends Message> { + private final int port; + private final Supplier<T> results; + + public StartedServer(int port, Supplier<T> results) { + this.port = port; + this.results = results; + } + + public int port() { + return port; + } + + public Supplier<T> results() { + return results; + } + } + + private static final class MoreExecutors { + + static ExecutorService newCachedDaemonThreadPool() { + return Executors.newCachedThreadPool( + r -> { + Thread t = new Thread(r); + t.setDaemon(true); + return t; + }); + } + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java new file mode 100644 index 0000000..9f2065e --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import static org.apache.flink.statefun.e2e.smoke.Utils.awaitVerificationSuccess; +import static org.apache.flink.statefun.e2e.smoke.Utils.startProtobufServer; + +import com.google.protobuf.Any; +import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers; +import org.apache.flink.util.function.ThrowingRunnable; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.Testcontainers; + +public final class SmokeRunner { + private static final Logger LOG = LoggerFactory.getLogger(SmokeRunner.class); + + public static void run(ModuleParameters parameters) throws Throwable { + SimpleProtobufServer.StartedServer<Any> server = startProtobufServer(); + parameters.setVerificationServerHost("host.testcontainers.internal"); + parameters.setVerificationServerPort(server.port()); + + StatefulFunctionsAppContainers.Builder builder = + StatefulFunctionsAppContainers.builder("smoke", 2); + builder.exposeMasterLogs(LOG); + + // set the test module parameters as global configurations, so that + // it can be deserialized at Module#configure() + parameters.asMap().forEach(builder::withModuleGlobalConfiguration); + + // run the test + Testcontainers.exposeHostPorts(server.port()); + StatefulFunctionsAppContainers app = builder.build(); + + run( + app, + () -> + awaitVerificationSuccess(server.results(), parameters.getNumberOfFunctionInstances())); + } + + private static void run(StatefulFunctionsAppContainers app, ThrowingRunnable<Throwable> r) + throws Throwable { + Statement statement = + app.apply( + new Statement() { + @Override + public void evaluate() throws Throwable { + r.run(); + } + }, + Description.EMPTY); + + statement.evaluate(); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java new file mode 100644 index 0000000..f1b5b6c --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.statefun.e2e.smoke; + +import org.junit.Test; + +public class SmokeVerificationE2E { + + @Test(timeout = 1_000 * 60 * 10) + public void runWith() throws Throwable { + ModuleParameters parameters = new ModuleParameters(); + parameters.setNumberOfFunctionInstances(128); + parameters.setMessageCount(100_000); + parameters.setMaxFailures(1); + + SmokeRunner.run(parameters); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java new file mode 100644 index 0000000..85f527d --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/Utils.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.statefun.e2e.smoke; + +import com.google.protobuf.Any; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; +import org.apache.flink.statefun.e2e.smoke.generated.Command; +import org.apache.flink.statefun.e2e.smoke.generated.Commands; +import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand; +import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult; + +class Utils { + + public static SourceCommand aStateModificationCommand() { + return aStateModificationCommand(-1234); // the id doesn't matter + } + + public static SourceCommand aStateModificationCommand(int functionInstanceId) { + return SourceCommand.newBuilder() + .setTarget(functionInstanceId) + .setCommands(Commands.newBuilder().addCommand(modify())) + .build(); + } + + public static SourceCommand aRelayedStateModificationCommand( + int firstFunctionId, int secondFunctionId) { + return SourceCommand.newBuilder() + .setTarget(firstFunctionId) + .setCommands(Commands.newBuilder().addCommand(sendTo(secondFunctionId, modify()))) + .build(); + } + + private static Command.Builder sendTo(int id, Command.Builder body) { + return Command.newBuilder() + .setSend( + Command.Send.newBuilder() + .setTarget(id) + .setCommands(Commands.newBuilder().addCommand(body))); + } + + private static Command.Builder modify() { + return Command.newBuilder().setIncrement(Command.IncrementState.getDefaultInstance()); + } + + /** Blocks the currently executing thread until enough successful verification results supply. */ + static void awaitVerificationSuccess(Supplier<Any> results, final int numberOfFunctionInstances) { + Set<Integer> successfullyVerified = new HashSet<>(); + while (successfullyVerified.size() != numberOfFunctionInstances) { + Any any = results.get(); + VerificationResult result = ProtobufUtils.unpack(any, VerificationResult.class); + if (result.getActual() == result.getExpected()) { + successfullyVerified.add(result.getId()); + } else if (result.getActual() > result.getExpected()) { + throw new AssertionError( + "Over counted. Expected: " + + result.getExpected() + + ", actual: " + + result.getActual() + + ", function: " + + result.getId()); + } + } + } + + /** starts a simple Protobuf TCP server that accepts {@link com.google.protobuf.Any}. */ + static SimpleProtobufServer.StartedServer<Any> startProtobufServer() { + SimpleProtobufServer<Any> server = new SimpleProtobufServer<>(Any.parser()); + return server.start(); + } +} diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile new file mode 100644 index 0000000..3166768 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/Dockerfile @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM flink-statefun:2.3-SNAPSHOT + +RUN mkdir -p /opt/statefun/modules/statefun-smoke-e2e +COPY statefun-smoke-e2e*.jar /opt/statefun/modules/statefun-smoke-e2e/ +COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml diff --git a/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties new file mode 100644 index 0000000..fb965d3 --- /dev/null +++ b/statefun-e2e-tests/statefun-smoke-e2e/src/test/resources/log4j.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=INFO, console + +# Log all infos in the given file +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml index 694e52d..ea5ce29 100644 --- a/tools/maven/spotbugs-exclude.xml +++ b/tools/maven/spotbugs-exclude.xml @@ -101,5 +101,8 @@ under the License. <Package name="~org\.apache\.flink\.statefun\.examples\.ridesharing\.simulator.*" /> </Match> + <Match> + <Package name="~org\.apache\.flink\.statefun\.e2e.smoke.*" /> + </Match> </FindBugsFilter>
