This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 3f3da7aa3417c1c2ff26ac918c6896d319a0f6ec
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Tue May 12 22:57:03 2020 +0800

    [FLINK-17516] [e2e] Refactor StatefulFunctionsAppContainers to use Builder 
pattern
---
 .../e2e/common/StatefulFunctionsAppContainers.java | 328 +++++++++++----------
 .../e2e/routablekafka/RoutableKafkaE2E.java        |   5 +-
 .../statefun/e2e/sanity/SanityVerificationE2E.java |   5 +-
 3 files changed, 183 insertions(+), 155 deletions(-)

diff --git 
a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
 
b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index f648fcd..9a3f2a3 100644
--- 
a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ 
b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -55,7 +55,7 @@ import org.testcontainers.images.builder.ImageFromDockerfile;
  *
  *     {@code @Rule}
  *     public StatefulFunctionsAppContainers myApp =
- *         new StatefulFunctionsAppContainers("app-name", 3);
+ *         StatefulFunctionsAppContainers.builder("app-name", 3).build();
  *
  *     {@code @Test}
  *     public void runTest() {
@@ -77,15 +77,16 @@ import 
org.testcontainers.images.builder.ImageFromDockerfile;
  *
  *     {@code @Rule}
  *     public StatefulFunctionsAppContainers myApp =
- *         new StatefulFunctionsAppContainers("app-name", 3)
- *             .dependsOn(kafka);
+ *         StatefulFunctionsAppContainers.builder("app-name", 3)
+ *             .dependsOn(kafka)
+ *             .build();
  *
  *     ...
  * }
  * }</pre>
  *
  * <p>Application master and worker containers will always be started after 
containers that are
- * added using {@link #dependsOn(GenericContainer)} have started. Moreover, 
containers being
+ * added using {@link Builder#dependsOn(GenericContainer)} have started. 
Moreover, containers being
  * depended on will also be setup such that they share the same network with 
the master and workers,
  * so that they can freely communicate with each other.
  *
@@ -114,70 +115,28 @@ public final class StatefulFunctionsAppContainers extends 
ExternalResource {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
 
-  private static final String MASTER_HOST = "statefun-app-master";
-  private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
-
-  private final String appName;
-  private final int numWorkers;
-  private final Network network;
-
-  private final Configuration dynamicProperties = new Configuration();
-  private final List<GenericContainer<?>> dependentContainers = new 
ArrayList<>();
-  private final List<ClasspathBuildContextFile> classpathBuildContextFiles = 
new ArrayList<>();
-  private Logger masterLogger;
-
   private GenericContainer<?> master;
   private List<GenericContainer<?>> workers;
 
-  public StatefulFunctionsAppContainers(String appName, int numWorkers) {
-    if (appName == null || appName.isEmpty()) {
-      throw new IllegalArgumentException(
-          "App name must be non-empty. This is used as the application image 
name.");
-    }
-    if (numWorkers < 1) {
-      throw new IllegalArgumentException("Must have at least 1 worker.");
-    }
-
-    this.network = Network.newNetwork();
-    this.appName = appName;
-    this.numWorkers = numWorkers;
-  }
-
-  public StatefulFunctionsAppContainers dependsOn(GenericContainer<?> 
container) {
-    container.withNetwork(network);
-    this.dependentContainers.add(container);
-    return this;
-  }
-
-  public StatefulFunctionsAppContainers exposeMasterLogs(Logger logger) {
-    this.masterLogger = logger;
-    return this;
+  private StatefulFunctionsAppContainers(
+      GenericContainer<?> masterContainer, List<GenericContainer<?>> 
workerContainers) {
+    this.master = Objects.requireNonNull(masterContainer);
+    this.workers = Objects.requireNonNull(workerContainers);
   }
 
-  public StatefulFunctionsAppContainers withModuleGlobalConfiguration(String 
key, String value) {
-    
this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + 
key, value);
-    return this;
-  }
-
-  public <T> StatefulFunctionsAppContainers withConfiguration(ConfigOption<T> 
config, T value) {
-    this.dynamicProperties.set(config, value);
-    return this;
-  }
-
-  public StatefulFunctionsAppContainers withBuildContextFileFromClasspath(
-      String buildContextPath, String resourcePath) {
-    this.classpathBuildContextFiles.add(
-        new ClasspathBuildContextFile(buildContextPath, resourcePath));
-    return this;
+  /**
+   * Creates a builder for creating a {@link StatefulFunctionsAppContainers}.
+   *
+   * @param appName the name of the application.
+   * @param numWorkers the number of workers to run the application.
+   * @return a builder for creating a {@link StatefulFunctionsAppContainers}.
+   */
+  public static Builder builder(String appName, int numWorkers) {
+    return new Builder(appName, numWorkers);
   }
 
   @Override
   protected void before() throws Throwable {
-    final ImageFromDockerfile appImage =
-        appImage(appName, dynamicProperties, classpathBuildContextFiles);
-    this.master = masterContainer(appImage, network, dependentContainers, 
numWorkers, masterLogger);
-    this.workers = workerContainers(appImage, numWorkers, network);
-
     master.start();
     workers.forEach(GenericContainer::start);
   }
@@ -188,122 +147,189 @@ public final class StatefulFunctionsAppContainers 
extends ExternalResource {
     workers.forEach(GenericContainer::stop);
   }
 
-  private static ImageFromDockerfile appImage(
-      String appName,
-      Configuration dynamicProperties,
-      List<ClasspathBuildContextFile> classpathBuildContextFiles) {
-    final Path targetDirPath = Paths.get(System.getProperty("user.dir") + 
"/target/");
-    LOG.info("Building app image with built artifacts located at: {}", 
targetDirPath);
-
-    final ImageFromDockerfile appImage =
-        new ImageFromDockerfile(appName)
-            .withFileFromClasspath("Dockerfile", "Dockerfile")
-            .withFileFromPath(".", targetDirPath);
-
-    Configuration flinkConf = resolveFlinkConf(dynamicProperties);
-    String flinkConfString = flinkConfigAsString(flinkConf);
-    LOG.info(
-        "Resolved Flink configuration after merging dynamic properties with 
base flink-conf.yaml:\n\n{}",
-        flinkConf);
-    appImage.withFileFromString("flink-conf.yaml", flinkConfString);
-
-    for (ClasspathBuildContextFile classpathBuildContextFile : 
classpathBuildContextFiles) {
-      appImage.withFileFromClasspath(
-          classpathBuildContextFile.buildContextPath, 
classpathBuildContextFile.fromResourcePath);
+  public static final class Builder {
+    private static final String MASTER_HOST = "statefun-app-master";
+    private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
+
+    private final String appName;
+    private final int numWorkers;
+    private final Network network;
+
+    private final Configuration dynamicProperties = new Configuration();
+    private final List<GenericContainer<?>> dependentContainers = new 
ArrayList<>();
+    private final List<ClasspathBuildContextFile> classpathBuildContextFiles = 
new ArrayList<>();
+    private Logger masterLogger;
+
+    private Builder(String appName, int numWorkers) {
+      if (appName == null || appName.isEmpty()) {
+        throw new IllegalArgumentException(
+            "App name must be non-empty. This is used as the application image 
name.");
+      }
+      if (numWorkers < 1) {
+        throw new IllegalArgumentException("Must have at least 1 worker.");
+      }
+
+      this.network = Network.newNetwork();
+      this.appName = appName;
+      this.numWorkers = numWorkers;
     }
 
-    return appImage;
-  }
+    public StatefulFunctionsAppContainers.Builder 
dependsOn(GenericContainer<?> container) {
+      container.withNetwork(network);
+      this.dependentContainers.add(container);
+      return this;
+    }
 
-  /**
-   * Merges set dynamic properties with configuration in the base 
flink-conf.yaml located in
-   * resources.
-   */
-  private static Configuration resolveFlinkConf(Configuration 
dynamicProperties) {
-    final InputStream baseFlinkConfResourceInputStream =
-        
StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
-    if (baseFlinkConfResourceInputStream == null) {
-      throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+    public StatefulFunctionsAppContainers.Builder exposeMasterLogs(Logger 
logger) {
+      this.masterLogger = logger;
+      return this;
     }
 
-    final File tempBaseFlinkConfFile = 
copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
-    return GlobalConfiguration.loadConfiguration(
-        tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), 
dynamicProperties);
-  }
+    public StatefulFunctionsAppContainers.Builder 
withModuleGlobalConfiguration(
+        String key, String value) {
+      
this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + 
key, value);
+      return this;
+    }
 
-  private static String flinkConfigAsString(Configuration configuration) {
-    StringBuilder yaml = new StringBuilder();
-    for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
-      yaml.append(entry.getKey()).append(": 
").append(entry.getValue()).append("\n");
+    public <T> StatefulFunctionsAppContainers.Builder withConfiguration(
+        ConfigOption<T> config, T value) {
+      this.dynamicProperties.set(config, value);
+      return this;
     }
 
-    return yaml.toString();
-  }
+    public StatefulFunctionsAppContainers.Builder 
withBuildContextFileFromClasspath(
+        String buildContextPath, String resourcePath) {
+      this.classpathBuildContextFiles.add(
+          new ClasspathBuildContextFile(buildContextPath, resourcePath));
+      return this;
+    }
 
-  private static File copyToTempFlinkConfFile(InputStream inputStream) {
-    try {
-      final File tempFile =
-          new File(
-              Files.createTempDirectory("statefun-app-containers").toString(), 
"flink-conf.yaml");
-      Files.copy(inputStream, tempFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
-      return tempFile;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
+    public StatefulFunctionsAppContainers build() {
+      final ImageFromDockerfile appImage =
+          appImage(appName, dynamicProperties, classpathBuildContextFiles);
+
+      return new StatefulFunctionsAppContainers(
+          masterContainer(appImage, network, dependentContainers, numWorkers, 
masterLogger),
+          workerContainers(appImage, numWorkers, network));
     }
-  }
 
-  private static GenericContainer<?> masterContainer(
-      ImageFromDockerfile appImage,
-      Network network,
-      List<GenericContainer<?>> dependents,
-      int numWorkers,
-      @Nullable Logger masterLogger) {
-    final GenericContainer<?> master =
-        new GenericContainer(appImage)
-            .withNetwork(network)
-            .withNetworkAliases(MASTER_HOST)
-            .withEnv("ROLE", "master")
-            .withEnv("MASTER_HOST", MASTER_HOST)
-            .withCommand("-p " + numWorkers);
-
-    for (GenericContainer<?> dependent : dependents) {
-      master.dependsOn(dependent);
+    private static ImageFromDockerfile appImage(
+        String appName,
+        Configuration dynamicProperties,
+        List<ClasspathBuildContextFile> classpathBuildContextFiles) {
+      final Path targetDirPath = Paths.get(System.getProperty("user.dir") + 
"/target/");
+      LOG.info("Building app image with built artifacts located at: {}", 
targetDirPath);
+
+      final ImageFromDockerfile appImage =
+          new ImageFromDockerfile(appName)
+              .withFileFromClasspath("Dockerfile", "Dockerfile")
+              .withFileFromPath(".", targetDirPath);
+
+      Configuration flinkConf = resolveFlinkConf(dynamicProperties);
+      String flinkConfString = flinkConfigAsString(flinkConf);
+      LOG.info(
+          "Resolved Flink configuration after merging dynamic properties with 
base flink-conf.yaml:\n\n{}",
+          flinkConf);
+      appImage.withFileFromString("flink-conf.yaml", flinkConfString);
+
+      for (ClasspathBuildContextFile classpathBuildContextFile : 
classpathBuildContextFiles) {
+        appImage.withFileFromClasspath(
+            classpathBuildContextFile.buildContextPath, 
classpathBuildContextFile.fromResourcePath);
+      }
+
+      return appImage;
     }
 
-    if (masterLogger != null) {
-      master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+    /**
+     * Merges set dynamic properties with configuration in the base 
flink-conf.yaml located in
+     * resources.
+     */
+    private static Configuration resolveFlinkConf(Configuration 
dynamicProperties) {
+      final InputStream baseFlinkConfResourceInputStream =
+          
StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
+      if (baseFlinkConfResourceInputStream == null) {
+        throw new RuntimeException("Base flink-conf.yaml cannot be found.");
+      }
+
+      final File tempBaseFlinkConfFile = 
copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
+      return GlobalConfiguration.loadConfiguration(
+          tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), 
dynamicProperties);
     }
 
-    return master;
-  }
+    private static String flinkConfigAsString(Configuration configuration) {
+      StringBuilder yaml = new StringBuilder();
+      for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) 
{
+        yaml.append(entry.getKey()).append(": 
").append(entry.getValue()).append("\n");
+      }
+
+      return yaml.toString();
+    }
 
-  private static List<GenericContainer<?>> workerContainers(
-      ImageFromDockerfile appImage, int numWorkers, Network network) {
-    final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
+    private static File copyToTempFlinkConfFile(InputStream inputStream) {
+      try {
+        final File tempFile =
+            new File(
+                
Files.createTempDirectory("statefun-app-containers").toString(), 
"flink-conf.yaml");
+        Files.copy(inputStream, tempFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+        return tempFile;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
 
-    for (int i = 0; i < numWorkers; i++) {
-      workers.add(
+    private static GenericContainer<?> masterContainer(
+        ImageFromDockerfile appImage,
+        Network network,
+        List<GenericContainer<?>> dependents,
+        int numWorkers,
+        @Nullable Logger masterLogger) {
+      final GenericContainer<?> master =
           new GenericContainer(appImage)
               .withNetwork(network)
-              .withNetworkAliases(workerHostOf(i))
-              .withEnv("ROLE", "worker")
-              .withEnv("MASTER_HOST", MASTER_HOST));
+              .withNetworkAliases(MASTER_HOST)
+              .withEnv("ROLE", "master")
+              .withEnv("MASTER_HOST", MASTER_HOST)
+              .withCommand("-p " + numWorkers);
+
+      for (GenericContainer<?> dependent : dependents) {
+        master.dependsOn(dependent);
+      }
+
+      if (masterLogger != null) {
+        master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+      }
+
+      return master;
     }
 
-    return workers;
-  }
+    private static List<GenericContainer<?>> workerContainers(
+        ImageFromDockerfile appImage, int numWorkers, Network network) {
+      final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
 
-  private static String workerHostOf(int workerIndex) {
-    return WORKER_HOST_PREFIX + "-" + workerIndex;
-  }
+      for (int i = 0; i < numWorkers; i++) {
+        workers.add(
+            new GenericContainer(appImage)
+                .withNetwork(network)
+                .withNetworkAliases(workerHostOf(i))
+                .withEnv("ROLE", "worker")
+                .withEnv("MASTER_HOST", MASTER_HOST));
+      }
+
+      return workers;
+    }
+
+    private static String workerHostOf(int workerIndex) {
+      return WORKER_HOST_PREFIX + "-" + workerIndex;
+    }
 
-  private static class ClasspathBuildContextFile {
-    private final String buildContextPath;
-    private final String fromResourcePath;
+    private static class ClasspathBuildContextFile {
+      private final String buildContextPath;
+      private final String fromResourcePath;
 
-    ClasspathBuildContextFile(String buildContextPath, String 
fromResourcePath) {
-      this.buildContextPath = Objects.requireNonNull(buildContextPath);
-      this.fromResourcePath = Objects.requireNonNull(fromResourcePath);
+      ClasspathBuildContextFile(String buildContextPath, String 
fromResourcePath) {
+        this.buildContextPath = Objects.requireNonNull(buildContextPath);
+        this.fromResourcePath = Objects.requireNonNull(fromResourcePath);
+      }
     }
   }
 }
diff --git 
a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
 
b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
index 9f4eda1..3869163 100644
--- 
a/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
+++ 
b/statefun-e2e-tests/statefun-routable-kafka-e2e/src/test/java/org/apache/flink/statefun/e2e/routablekafka/RoutableKafkaE2E.java
@@ -64,13 +64,14 @@ public class RoutableKafkaE2E {
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      new StatefulFunctionsAppContainers("routable-kafka-verification", 1)
+      StatefulFunctionsAppContainers.builder("routable-kafka-verification", 1)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG)
           .withBuildContextFileFromClasspath(
               "routable-kafka-ingress-module", 
"/routable-kafka-ingress-module/")
           .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
 
   @Test(timeout = 60_000L)
   public void run() {
diff --git 
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
 
b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 1bd08c7..473dc9b 100644
--- 
a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ 
b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -63,11 +63,12 @@ public class SanityVerificationE2E {
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      new StatefulFunctionsAppContainers("sanity-verification", 2)
+      StatefulFunctionsAppContainers.builder("sanity-verification", 2)
           .dependsOn(kafka)
           .exposeMasterLogs(LOG)
           .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092");
+              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
+          .build();
 
   @Test(timeout = 60_000L)
   public void run() throws Exception {

Reply via email to