This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 60f4bd74cc7c88422dc795fda630d818943949e7 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Tue May 12 23:24:55 2020 +0800 [FLINK-17516] [e2e] Bind checkpoint dirs to temporary local directory --- .../e2e/common/StatefulFunctionsAppContainers.java | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java index 5880ae3..9894c2d 100644 --- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java +++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java @@ -19,6 +19,7 @@ package org.apache.flink.statefun.e2e.common; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; @@ -33,9 +34,11 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig; +import org.apache.flink.util.FileUtils; import org.junit.rules.ExternalResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.output.Slf4jLogConsumer; @@ -118,6 +121,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource { private GenericContainer<?> master; private List<GenericContainer<?>> workers; + private File checkpointDir; + private StatefulFunctionsAppContainers( GenericContainer<?> masterContainer, List<GenericContainer<?>> workerContainers) { this.master = Objects.requireNonNull(masterContainer); @@ -137,6 +142,15 @@ public final class StatefulFunctionsAppContainers extends ExternalResource { @Override protected void before() throws Throwable { + checkpointDir = temporaryCheckpointDir(); + + master.withFileSystemBind( + checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE); + workers.forEach( + worker -> + worker.withFileSystemBind( + checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE)); + master.start(); workers.forEach(GenericContainer::start); } @@ -145,6 +159,8 @@ public final class StatefulFunctionsAppContainers extends ExternalResource { protected void after() { master.stop(); workers.forEach(GenericContainer::stop); + + FileUtils.deleteDirectoryQuietly(checkpointDir); } /** @return the exposed port on master for calling REST APIs. */ @@ -152,6 +168,11 @@ public final class StatefulFunctionsAppContainers extends ExternalResource { return master.getMappedPort(8081); } + private static File temporaryCheckpointDir() throws IOException { + final Path currentWorkingDir = Paths.get(System.getProperty("user.dir")); + return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile(); + } + public static final class Builder { private static final String MASTER_HOST = "statefun-app-master"; private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
