This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 3f3da7aa3417c1c2ff26ac918c6896d319a0f6ec Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Tue May 12 22:57:03 2020 +0800 [FLINK-17516] [e2e] Refactor StatefulFunctionsAppContainers to use Builder pattern --- .../e2e/common/StatefulFunctionsAppContainers.java | 328 +++++++++++---------- .../e2e/routablekafka/RoutableKafkaE2E.java | 5 +- .../statefun/e2e/sanity/SanityVerificationE2E.java | 5 +- 3 files changed, 183 insertions(+), 155 deletions(-) diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java index f648fcd..9a3f2a3 100644 --- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java +++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java @@ -55,7 +55,7 @@ import org.testcontainers.images.builder.ImageFromDockerfile; * * {@code @Rule} * public StatefulFunctionsAppContainers myApp = - * new StatefulFunctionsAppContainers("app-name", 3); + * StatefulFunctionsAppContainers.builder("app-name", 3).build(); * * {@code @Test} * public void runTest() { @@ -77,15 +77,16 @@ import org.testcontainers.images.builder.ImageFromDockerfile; * * {@code @Rule} * public StatefulFunctionsAppContainers myApp = - * new StatefulFunctionsAppContainers("app-name", 3) - * .dependsOn(kafka); + * StatefulFunctionsAppContainers.builder("app-name", 3) + * .dependsOn(kafka) + * .build(); * * ... * } * }</pre> * * <p>Application master and worker containers will always be started after containers that are - * added using {@link #dependsOn(GenericContainer)} have started. Moreover, containers being + * added using {@link Builder#dependsOn(GenericContainer)} have started. Moreover, containers being * depended on will also be setup such that they share the same network with the master and workers, * so that they can freely communicate with each other. * @@ -114,70 +115,28 @@ public final class StatefulFunctionsAppContainers extends ExternalResource { private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class); - private static final String MASTER_HOST = "statefun-app-master"; - private static final String WORKER_HOST_PREFIX = "statefun-app-worker"; - - private final String appName; - private final int numWorkers; - private final Network network; - - private final Configuration dynamicProperties = new Configuration(); - private final List<GenericContainer<?>> dependentContainers = new ArrayList<>(); - private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>(); - private Logger masterLogger; - private GenericContainer<?> master; private List<GenericContainer<?>> workers; - public StatefulFunctionsAppContainers(String appName, int numWorkers) { - if (appName == null || appName.isEmpty()) { - throw new IllegalArgumentException( - "App name must be non-empty. This is used as the application image name."); - } - if (numWorkers < 1) { - throw new IllegalArgumentException("Must have at least 1 worker."); - } - - this.network = Network.newNetwork(); - this.appName = appName; - this.numWorkers = numWorkers; - } - - public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> container) { - container.withNetwork(network); - this.dependentContainers.add(container); - return this; - } - - public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) { - this.masterLogger = logger; - return this; + private StatefulFunctionsAppContainers( + GenericContainer<?> masterContainer, List<GenericContainer<?>> workerContainers) { + this.master = Objects.requireNonNull(masterContainer); + this.workers = Objects.requireNonNull(workerContainers); } - public StatefulFunctionsAppContainers withModuleGlobalConfiguration(String key, String value) { - this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value); - return this; - } - - public <T> StatefulFunctionsAppContainers withConfiguration(ConfigOption<T> config, T value) { - this.dynamicProperties.set(config, value); - return this; - } - - public StatefulFunctionsAppContainers withBuildContextFileFromClasspath( - String buildContextPath, String resourcePath) { - this.classpathBuildContextFiles.add( - new ClasspathBuildContextFile(buildContextPath, resourcePath)); - return this; + /** + * Creates a builder for creating a {@link StatefulFunctionsAppContainers}. + * + * @param appName the name of the application. + * @param numWorkers the number of workers to run the application. + * @return a builder for creating a {@link StatefulFunctionsAppContainers}. + */ + public static Builder builder(String appName, int numWorkers) { + return new Builder(appName, numWorkers); } @Override protected void before() throws Throwable { - final ImageFromDockerfile appImage = - appImage(appName, dynamicProperties, classpathBuildContextFiles); - this.master = masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger); - this.workers = workerContainers(appImage, numWorkers, network); - master.start(); workers.forEach(GenericContainer::start); } @@ -188,122 +147,189 @@ public final class StatefulFunctionsAppContainers extends ExternalResource { workers.forEach(GenericContainer::stop); } - private static ImageFromDockerfile appImage( - String appName, - Configuration dynamicProperties, - List<ClasspathBuildContextFile> classpathBuildContextFiles) { - final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/"); - LOG.info("Building app image with built artifacts located at: {}", targetDirPath); - - final ImageFromDockerfile appImage = - new ImageFromDockerfile(appName) - .withFileFromClasspath("Dockerfile", "Dockerfile") - .withFileFromPath(".", targetDirPath); - - Configuration flinkConf = resolveFlinkConf(dynamicProperties); - String flinkConfString = flinkConfigAsString(flinkConf); - LOG.info( - "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}", - flinkConf); - appImage.withFileFromString("flink-conf.yaml", flinkConfString); - - for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) { - appImage.withFileFromClasspath( - classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath); + public static final class Builder { + private static final String MASTER_HOST = "statefun-app-master"; + private static final String WORKER_HOST_PREFIX = "statefun-app-worker"; + + private final String appName; + private final int numWorkers; + private final Network network; + + private final Configuration dynamicProperties = new Configuration(); + private final List<GenericContainer<?>> dependentContainers = new ArrayList<>(); + private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>(); + private Logger masterLogger; + + private Builder(String appName, int numWorkers) { + if (appName == null || appName.isEmpty()) { + throw new IllegalArgumentException( + "App name must be non-empty. This is used as the application image name."); + } + if (numWorkers < 1) { + throw new IllegalArgumentException("Must have at least 1 worker."); + } + + this.network = Network.newNetwork(); + this.appName = appName; + this.numWorkers = numWorkers; } - return appImage; - } + public StatefulFunctionsAppContainers.Builder dependsOn(GenericContainer<?> container) { + container.withNetwork(network); + this.dependentContainers.add(container); + return this; + } - /** - * Merges set dynamic properties with configuration in the base flink-conf.yaml located in - * resources. - */ - private static Configuration resolveFlinkConf(Configuration dynamicProperties) { - final InputStream baseFlinkConfResourceInputStream = - StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml"); - if (baseFlinkConfResourceInputStream == null) { - throw new RuntimeException("Base flink-conf.yaml cannot be found."); + public StatefulFunctionsAppContainers.Builder exposeMasterLogs(Logger logger) { + this.masterLogger = logger; + return this; } - final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream); - return GlobalConfiguration.loadConfiguration( - tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties); - } + public StatefulFunctionsAppContainers.Builder withModuleGlobalConfiguration( + String key, String value) { + this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value); + return this; + } - private static String flinkConfigAsString(Configuration configuration) { - StringBuilder yaml = new StringBuilder(); - for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) { - yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); + public <T> StatefulFunctionsAppContainers.Builder withConfiguration( + ConfigOption<T> config, T value) { + this.dynamicProperties.set(config, value); + return this; } - return yaml.toString(); - } + public StatefulFunctionsAppContainers.Builder withBuildContextFileFromClasspath( + String buildContextPath, String resourcePath) { + this.classpathBuildContextFiles.add( + new ClasspathBuildContextFile(buildContextPath, resourcePath)); + return this; + } - private static File copyToTempFlinkConfFile(InputStream inputStream) { - try { - final File tempFile = - new File( - Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml"); - Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - return tempFile; - } catch (Exception e) { - throw new RuntimeException(e); + public StatefulFunctionsAppContainers build() { + final ImageFromDockerfile appImage = + appImage(appName, dynamicProperties, classpathBuildContextFiles); + + return new StatefulFunctionsAppContainers( + masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger), + workerContainers(appImage, numWorkers, network)); } - } - private static GenericContainer<?> masterContainer( - ImageFromDockerfile appImage, - Network network, - List<GenericContainer<?>> dependents, - int numWorkers, - @Nullable Logger masterLogger) { - final GenericContainer<?> master = - new GenericContainer(appImage) - .withNetwork(network) - .withNetworkAliases(MASTER_HOST) - .withEnv("ROLE", "master") - .withEnv("MASTER_HOST", MASTER_HOST) - .withCommand("-p " + numWorkers); - - for (GenericContainer<?> dependent : dependents) { - master.dependsOn(dependent); + private static ImageFromDockerfile appImage( + String appName, + Configuration dynamicProperties, + List<ClasspathBuildContextFile> classpathBuildContextFiles) { + final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/"); + LOG.info("Building app image with built artifacts located at: {}", targetDirPath); + + final ImageFromDockerfile appImage = + new ImageFromDockerfile(appName) + .withFileFromClasspath("Dockerfile", "Dockerfile") + .withFileFromPath(".", targetDirPath); + + Configuration flinkConf = resolveFlinkConf(dynamicProperties); + String flinkConfString = flinkConfigAsString(flinkConf); + LOG.info( + "Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}", + flinkConf); + appImage.withFileFromString("flink-conf.yaml", flinkConfString); + + for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) { + appImage.withFileFromClasspath( + classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath); + } + + return appImage; } - if (masterLogger != null) { - master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true)); + /** + * Merges set dynamic properties with configuration in the base flink-conf.yaml located in + * resources. + */ + private static Configuration resolveFlinkConf(Configuration dynamicProperties) { + final InputStream baseFlinkConfResourceInputStream = + StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml"); + if (baseFlinkConfResourceInputStream == null) { + throw new RuntimeException("Base flink-conf.yaml cannot be found."); + } + + final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream); + return GlobalConfiguration.loadConfiguration( + tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties); } - return master; - } + private static String flinkConfigAsString(Configuration configuration) { + StringBuilder yaml = new StringBuilder(); + for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) { + yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n"); + } + + return yaml.toString(); + } - private static List<GenericContainer<?>> workerContainers( - ImageFromDockerfile appImage, int numWorkers, Network network) { - final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers); + private static File copyToTempFlinkConfFile(InputStream inputStream) { + try { + final File tempFile = + new File( + Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml"); + Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + return tempFile; + } catch (Exception e) { + throw new RuntimeException(e); + } + } - for (int i = 0; i < numWorkers; i++) { - workers.add( + private static GenericContainer<?> masterContainer( + ImageFromDockerfile appImage, + Network network, + List<GenericContainer<?>> dependents, + int numWorkers, + @Nullable Logger masterLogger) { + final GenericContainer<?> master = new GenericContainer(appImage) .withNetwork(network) - .withNetworkAliases(workerHostOf(i)) - .withEnv("ROLE", "worker") - .withEnv("MASTER_HOST", MASTER_HOST)); + .withNetworkAliases(MASTER_HOST) + .withEnv("ROLE", "master") + .withEnv("MASTER_HOST", MASTER_HOST) + .withCommand("-p " + numWorkers); + + for (GenericContainer<?> dependent : dependents) { + master.dependsOn(dependent); + } + + if (masterLogger != null) { + master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true)); + } + + return master; } - return workers; - } + private static List<GenericContainer<?>> workerContainers( + ImageFromDockerfile appImage, int numWorkers, Network network) { + final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers); - private static String workerHostOf(int workerIndex) { - return WORKER_HOST_PREFIX + "-" + workerIndex; - } + for (int i = 0; i < numWorkers; i++) { + workers.add( + new GenericContainer(appImage) + .withNetwork(network) + .withNetworkAliases(workerHostOf(i)) + .withEnv("ROLE", "worker") + .withEnv("MASTER_HOST", MASTER_HOST)); + } + + return workers; + } + + private static String workerHostOf(int workerIndex) { + return WORKER_HOST_PREFIX + "-" + workerIndex; + } - private static class ClasspathBuildContextFile { - private final String buildContextPath; - private final String fromResourcePath; + private static class ClasspathBuildContextFile { + private final String buildContextPath; + private final String fromResourcePath; - ClasspathBuildContextFile(String buildContextPath, String fromResourcePath) { - this.buildContextPath = Objects.requireNonNull(buildContextPath); - this.fromResourcePath = Objects.requireNonNull(fromResourcePath); + ClasspathBuildContextFile(String buildContextPath, String fromResourcePath) { + this.buildContextPath = Objects.requireNonNull(buildContextPath); + this.fromResourcePath = Objects.requireNonNull(fromResourcePath); + } } } } diff --git a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java index 9f4eda1..3869163 100644 --- a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java +++ b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java @@ -64,13 +64,14 @@ public class RoutableKafkaE2E { @Rule public StatefulFunctionsAppContainers verificationApp = - new StatefulFunctionsAppContainers("routable-kafka-verification", 1) + StatefulFunctionsAppContainers.builder("routable-kafka-verification", 1) .dependsOn(kafka) .exposeMasterLogs(LOG) .withBuildContextFileFromClasspath( "routable-kafka-ingress-module", "/routable-kafka-ingress-module/") .withModuleGlobalConfiguration( - Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092"); + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092") + .build(); @Test(timeout = 60_000L) public void run() { diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java index 1bd08c7..473dc9b 100644 --- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java +++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java @@ -63,11 +63,12 @@ public class SanityVerificationE2E { @Rule public StatefulFunctionsAppContainers verificationApp = - new StatefulFunctionsAppContainers("sanity-verification", 2) + StatefulFunctionsAppContainers.builder("sanity-verification", 2) .dependsOn(kafka) .exposeMasterLogs(LOG) .withModuleGlobalConfiguration( - Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092"); + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092") + .build(); @Test(timeout = 60_000L) public void run() throws Exception {
