This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new c1fe5715b [ISSUE-3108][Improve] Improve streampark-testcontainer
module based on [3.1 Naming Style] (#3227)
c1fe5715b is described below
commit c1fe5715b02f79bd1ca077e8e95e101ce437785a
Author: Yuepeng Pan <[email protected]>
AuthorDate: Mon Oct 9 10:29:39 2023 -0500
[ISSUE-3108][Improve] Improve streampark-testcontainer module based on [3.1
Naming Style] (#3227)
---
.../testcontainer/flink/FlinkContainer.java | 8 ++---
.../flink/FlinkStandaloneSessionCluster.java | 34 +++++++++++-----------
.../flink/FlinkStandaloneSessionClusterITest.java | 9 +++---
3 files changed, 26 insertions(+), 25 deletions(-)
diff --git
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
index d9814f0d2..2c85b86c8 100644
---
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
+++
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkContainer.java
@@ -33,7 +33,7 @@ import static
org.apache.streampark.testcontainer.flink.FlinkComponent.TASKMANAG
class FlinkContainer extends GenericContainer<FlinkContainer> {
- public static final AtomicInteger TM_COUNT = new AtomicInteger(0);
+ public static final AtomicInteger TM_INDEX_SUFFIX = new AtomicInteger(0);
public static final String FLINK_PROPS_KEY = "FLINK_PROPERTIES";
@@ -43,7 +43,7 @@ class FlinkContainer extends GenericContainer<FlinkContainer>
{
@Nonnull DockerImageName dockerImageName,
@Nonnull FlinkComponent component,
@Nonnull Network network,
- @Nonnull String yamlPropStr,
+ @Nonnull String yamlPropContent,
@Nullable Slf4jLogConsumer slf4jLogConsumer) {
super(dockerImageName);
this.component = component;
@@ -51,7 +51,7 @@ class FlinkContainer extends GenericContainer<FlinkContainer>
{
this.withCreateContainerCmdModifier(
createContainerCmd ->
createContainerCmd.withName(getFlinkContainerName()));
this.withNetwork(network);
- this.withEnv(FLINK_PROPS_KEY, yamlPropStr);
+ this.withEnv(FLINK_PROPS_KEY, yamlPropContent);
Optional.ofNullable(slf4jLogConsumer).ifPresent(this::withLogConsumer);
}
@@ -59,6 +59,6 @@ class FlinkContainer extends GenericContainer<FlinkContainer>
{
if (component == JOBMANAGER) {
return JOBMANAGER.getName();
}
- return String.format("%s_%s", TASKMANAGER.getName(),
TM_COUNT.incrementAndGet());
+ return String.format("%s_%s", TASKMANAGER.getName(),
TM_INDEX_SUFFIX.incrementAndGet());
}
}
diff --git
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
index e5f01dea6..c238675d5 100644
---
a/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
+++
b/streampark-tests/streampark-testcontainer/src/main/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionCluster.java
@@ -48,13 +48,13 @@ public class FlinkStandaloneSessionCluster implements
Startable {
public static final Network NETWORK = Network.newNetwork();
public static final String JM_RPC_ADDR_KEY = "jobmanager.rpc.address";
- public static final String SLOT_CONF_KEY = "taskmanager.numberOfTaskSlots";
- public static final String SLOT_CONF_FORMAT = String.format("%s: %%s",
SLOT_CONF_KEY);
+ public static final String TM_SLOT_NUM_KEY = "taskmanager.numberOfTaskSlots";
+ public static final String SLOT_CONF_FORMAT = String.format("%s: %%s",
TM_SLOT_NUM_KEY);
public static final int BLOB_SERVER_PORT = 6123;
public static final int WEB_PORT = 8081;
- private String yamlConfStr = String.format("%s: %s", JM_RPC_ADDR_KEY,
JOBMANAGER.getName());
+ private String yamlConfContent = String.format("%s: %s", JM_RPC_ADDR_KEY,
JOBMANAGER.getName());
private final FlinkContainer jobManagerContainer;
@@ -64,17 +64,17 @@ public class FlinkStandaloneSessionCluster implements
Startable {
DockerImageName dockerImageName,
int taskManagerNum,
int slotsNumPerTm,
- @Nullable String yamlConfStr,
+ @Nullable String yamlConfContent,
Slf4jLogConsumer slf4jLogConsumer) {
- renderJmRpcConfIfNeeded(yamlConfStr);
+ renderJmRpcConfIfNeeded(yamlConfContent);
renderSlotNumIfNeeded(slotsNumPerTm);
// Set for JM.
this.jobManagerContainer =
new FlinkContainer(
- dockerImageName, JOBMANAGER, NETWORK, this.yamlConfStr,
slf4jLogConsumer);
+ dockerImageName, JOBMANAGER, NETWORK, this.yamlConfContent,
slf4jLogConsumer);
this.jobManagerContainer.addExposedPort(BLOB_SERVER_PORT);
this.jobManagerContainer.addExposedPort(WEB_PORT);
@@ -88,7 +88,7 @@ public class FlinkStandaloneSessionCluster implements
Startable {
for (int i = 0; i < taskManagerNum; i++) {
FlinkContainer taskManager =
new FlinkContainer(
- dockerImageName, TASKMANAGER, NETWORK, this.yamlConfStr,
slf4jLogConsumer);
+ dockerImageName, TASKMANAGER, NETWORK, this.yamlConfContent,
slf4jLogConsumer);
this.taskManagerContainers.add(taskManager);
}
}
@@ -119,20 +119,20 @@ public class FlinkStandaloneSessionCluster implements
Startable {
}
private void renderSlotNumIfNeeded(int slotsNumPerTm) {
- if (!this.yamlConfStr.contains(SLOT_CONF_KEY)) {
- this.yamlConfStr =
+ if (!this.yamlConfContent.contains(TM_SLOT_NUM_KEY)) {
+ this.yamlConfContent =
String.format(
- "%s\n%s\n", this.yamlConfStr, String.format(SLOT_CONF_FORMAT,
slotsNumPerTm));
+ "%s\n%s\n", this.yamlConfContent,
String.format(SLOT_CONF_FORMAT, slotsNumPerTm));
}
}
private void renderJmRpcConfIfNeeded(@Nullable String yamlConfStr) {
- this.yamlConfStr =
+ this.yamlConfContent =
yamlConfStr == null
- ? this.yamlConfStr
+ ? this.yamlConfContent
: (yamlConfStr.contains(JM_RPC_ADDR_KEY)
? yamlConfStr
- : String.format("%s\n%s\n", this.yamlConfStr, yamlConfStr));
+ : String.format("%s\n%s\n", this.yamlConfContent,
yamlConfStr));
}
public static class Builder {
@@ -141,7 +141,7 @@ public class FlinkStandaloneSessionCluster implements
Startable {
DockerImageName.parse("apache/flink:1.17.1-scala_2.12");
private int taskManagerNum = 1;
private int slotsNumPerTm = 8;
- private @Nullable String yamlConfStr;
+ private @Nullable String yamlConfContent;
private Slf4jLogConsumer slf4jLogConsumer = new Slf4jLogConsumer(LOG,
false);
private Builder() {}
@@ -163,8 +163,8 @@ public class FlinkStandaloneSessionCluster implements
Startable {
return this;
}
- public Builder yamlConfStr(@Nullable String yamlConfStr) {
- this.yamlConfStr = yamlConfStr;
+ public Builder yamlConfContent(@Nullable String yamlConfContent) {
+ this.yamlConfContent = yamlConfContent;
return this;
}
@@ -175,7 +175,7 @@ public class FlinkStandaloneSessionCluster implements
Startable {
public FlinkStandaloneSessionCluster build() {
return new FlinkStandaloneSessionCluster(
- dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfStr,
slf4jLogConsumer);
+ dockerImageName, taskManagerNum, slotsNumPerTm, yamlConfContent,
slf4jLogConsumer);
}
}
diff --git
a/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
b/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
index 6f34e720f..f84ddf66c 100644
---
a/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
+++
b/streampark-tests/streampark-testcontainer/src/test/java/org/apache/streampark/testcontainer/flink/FlinkStandaloneSessionClusterITest.java
@@ -33,22 +33,23 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test for flink standalone session cluster env available. */
class FlinkStandaloneSessionClusterITest {
- private final FlinkStandaloneSessionCluster cluster =
+ private final FlinkStandaloneSessionCluster flinkStandaloneSessionCluster =
FlinkStandaloneSessionCluster.builder().build();
@BeforeEach
void up() {
- cluster.start();
+ flinkStandaloneSessionCluster.start();
}
@AfterEach
void down() {
- cluster.stop();
+ flinkStandaloneSessionCluster.stop();
}
@Test
void testRestApiAvailable() throws IOException {
- String url = String.format("%s/%s", cluster.getFlinkJobManagerUrl(),
"config");
+ String url =
+ String.format("%s/%s",
flinkStandaloneSessionCluster.getFlinkJobManagerUrl(), "config");
CloseableHttpClient httpClient = HttpClients.createDefault();
CloseableHttpResponse response = httpClient.execute(new HttpGet(url));
assertThat(response.getCode()).isEqualTo(200);