This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit e518ea0717dc54b9e4d6af96aa1839df38f3fa1b Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu May 14 12:50:35 2020 +0800 [FLINK-17516] [e2e] Allow restarting workers with StatefulFunctionsAppContainers --- .../e2e/common/StatefulFunctionsAppContainers.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java index 9894c2d..f03a5de 100644 --- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java +++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java @@ -168,6 +168,22 @@ public final class StatefulFunctionsAppContainers extends ExternalResource { return master.getMappedPort(8081); } + /** + * Restarts a single worker of this Stateful Functions application. + * + * @param workerIndex the index of the worker to restart. + */ + public void restartWorker(int workerIndex) { + if (workerIndex >= workers.size()) { + throw new IndexOutOfBoundsException( + "Invalid worker index; valid values are 0 to " + (workers.size() - 1)); + } + + final GenericContainer<?> worker = workers.get(workerIndex); + worker.stop(); + worker.start(); + } + private static File temporaryCheckpointDir() throws IOException { final Path currentWorkingDir = Paths.get(System.getProperty("user.dir")); return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile();
