This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new c1fe5715b [ISSUE-3108][Improve] Improve streampark-testcontainer 
module based on [3.1 Naming Style] (#3227)
c1fe5715b is described below

commit c1fe5715b02f79bd1ca077e8e95e101ce437785a
Author: Yuepeng Pan <[email protected]>
AuthorDate: Mon Oct 9 10:29:39 2023 -0500

    [ISSUE-3108][Improve] Improve streampark-testcontainer module based on [3.1 
Naming Style] (#3227)
---
 .../testcontainer/flink/FlinkContainer.java        |  8 ++---
 .../flink/FlinkStandaloneSessionCluster.java       | 34 +++++++++++-----------
 .../flink/FlinkStandaloneSessionClusterITest.java  |  9 +++---
 3 files changed, 26 insertions(+), 25 deletions(-)

diff --git 
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
 
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
index d9814f0d2..2c85b86c8 100644
--- 
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
+++ 
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
@@ -33,7 +33,7 @@ import static 
org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAG
 
 class FlinkContainer extends GenericContainer<FlinkContainer> {
 
-  public static final AtomicInteger TM_COUNT = new AtomicInteger(0);
+  public static final AtomicInteger TM_INDEX_SUFFIX = new AtomicInteger(0);
 
   public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES";
 
@@ -43,7 +43,7 @@ class FlinkContainer extends GenericContainer<FlinkContainer> 
{
       @Nonnull DockerImageName dockerImageName,
       @Nonnull FlinkComponent component,
       @Nonnull Network network,
-      @Nonnull String yamlPropStr,
+      @Nonnull String yamlPropContent,
       @Nullable Slf4jLogConsumer slf4jLogConsumer) {
     super(dockerImageName);
     this.component = component;
@@ -51,7 +51,7 @@ class FlinkContainer extends GenericContainer<FlinkContainer> 
{
     this.withCreateContainerCmdModifier(
         createContainerCmd -> 
createContainerCmd.withName(getFlinkContainerName()));
     this.withNetwork(network);
-    this.withEnv(FLINK_PROPS_KEY, yamlPropStr);
+    this.withEnv(FLINK_PROPS_KEY, yamlPropContent);
     Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer);
   }
 
@@ -59,6 +59,6 @@ class FlinkContainer extends GenericContainer<FlinkContainer> 
{
     if (component == JOBMANAGER) {
       return JOBMANAGER.getName();
     }
-    return String.format("%s_%s", TASKMANAGER.getName(), 
TM_COUNT.incrementAndGet());
+    return String.format("%s_%s", TASKMANAGER.getName(), 
TM_INDEX_SUFFIX.incrementAndGet());
   }
 }
diff --git 
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
 
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
index e5f01dea6..c238675d5 100644
--- 
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
+++ 
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -48,13 +48,13 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
   public static final Network NETWORK = Network.newNetwork();
 
   public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address";
-  public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots";
-  public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", 
SLOT_CONF_KEY);
+  public static final String TM_SLOT_NUM_KEY = "taskmanager.numberOfTaskSlots";
+  public static final String SLOT_CONF_FORMAT = String.format("%s: %%s", 
TM_SLOT_NUM_KEY);
 
   public static final int BLOB_SERVER_PORT = 6123;
   public static final int WEB_PORT = 8081;
 
-  private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY, 
JOBMANAGER.getName());
+  private String yamlConfContent = String.format("%s: %s", JM_RPC_ADDR_KEY, 
JOBMANAGER.getName());
 
   private final FlinkContainer jobManagerContainer;
 
@@ -64,17 +64,17 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
       DockerImageName dockerImageName,
       int taskManagerNum,
       int slotsNumPerTm,
-      @Nullable String yamlConfStr,
+      @Nullable String yamlConfContent,
       Slf4jLogConsumer slf4jLogConsumer) {
 
-    renderJmRpcConfIfNeeded(yamlConfStr);
+    renderJmRpcConfIfNeeded(yamlConfContent);
 
     renderSlotNumIfNeeded(slotsNumPerTm);
 
     // Set for JM.
     this.jobManagerContainer =
         new FlinkContainer(
-            dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr, 
slf4jLogConsumer);
+            dockerImageName, JOBMANAGER, NETWORK, this.yamlConfContent, 
slf4jLogConsumer);
     this.jobManagerContainer.addExposedPort(BLOB_SERVER_PORT);
     this.jobManagerContainer.addExposedPort(WEB_PORT);
 
@@ -88,7 +88,7 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
     for (int i = 0; i < taskManagerNum; i++) {
       FlinkContainer taskManager =
           new FlinkContainer(
-              dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr, 
slf4jLogConsumer);
+              dockerImageName, TASKMANAGER, NETWORK, this.yamlConfContent, 
slf4jLogConsumer);
       this.taskManagerContainers.add(taskManager);
     }
   }
@@ -119,20 +119,20 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
   }
 
   private void renderSlotNumIfNeeded(int slotsNumPerTm) {
-    if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) {
-      this.yamlConfStr =
+    if (!this.yamlConfContent.contains(TM_SLOT_NUM_KEY)) {
+      this.yamlConfContent =
           String.format(
-              "%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT, 
slotsNumPerTm));
+              "%s\n%s\n", this.yamlConfContent, 
String.format(SLOT_CONF_FORMAT, slotsNumPerTm));
     }
   }
 
   private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) {
-    this.yamlConfStr =
+    this.yamlConfContent =
         yamlConfStr == null
-            ? this.yamlConfStr
+            ? this.yamlConfContent
             : (yamlConfStr.contains(JM_RPC_ADDR_KEY)
                 ? yamlConfStr
-                : String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr));
+                : String.format("%s\n%s\n", this.yamlConfContent, 
yamlConfStr));
   }
 
   public static class Builder {
@@ -141,7 +141,7 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
         DockerImageName.parse("apache/flink:1.17.1-scala_2.12");
     private int taskManagerNum = 1;
     private int slotsNumPerTm = 8;
-    private @Nullable String yamlConfStr;
+    private @Nullable String yamlConfContent;
     private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG, 
false);
 
     private Builder() {}
@@ -163,8 +163,8 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
       return this;
     }
 
-    public Builder yamlConfStr(@Nullable String yamlConfStr) {
-      this.yamlConfStr = yamlConfStr;
+    public Builder yamlConfContent(@Nullable String yamlConfContent) {
+      this.yamlConfContent = yamlConfContent;
       return this;
     }
 
@@ -175,7 +175,7 @@ public class FlinkStandaloneSessionCluster implements 
Startable {
 
     public FlinkStandaloneSessionCluster build() {
       return new FlinkStandaloneSessionCluster(
-          dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr, 
slf4jLogConsumer);
+          dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfContent, 
slf4jLogConsumer);
     }
   }
 
diff --git 
a/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
 
b/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
index 6f34e720f..f84ddf66c 100644
--- 
a/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
+++ 
b/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
@@ -33,22 +33,23 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test for flink standalone session cluster env available. */
 class FlinkStandaloneSessionClusterITest {
 
-  private final FlinkStandaloneSessionCluster cluster =
+  private final FlinkStandaloneSessionCluster flinkStandaloneSessionCluster =
       FlinkStandaloneSessionCluster.builder().build();
 
   @BeforeEach
   void up() {
-    cluster.start();
+    flinkStandaloneSessionCluster.start();
   }
 
   @AfterEach
   void down() {
-    cluster.stop();
+    flinkStandaloneSessionCluster.stop();
   }
 
   @Test
   void testRestApiAvailable() throws IOException {
-    String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(), 
"config");
+    String url =
+        String.format("%s/%s", 
flinkStandaloneSessionCluster.getFlinkJobManagerUrl(), "config");
     CloseableHttpClient httpClient = HttpClients.createDefault();
     CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
     assertThat(response.getCode()).isEqualTo(200);

Reply via email to