PatrickRen commented on code in PR #19856:
URL: https://github.com/apache/flink/pull/19856#discussion_r892569574


##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static 
org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static 
org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static 
org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static 
org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test 
environments. */
+public class FlinkContainersConfig {
+    private final String baseImage;
+    private final int numTaskManagers;
+    private final int numSlotsPerTaskManager;
+    private final Collection<String> jarPaths;
+    private final Configuration flinkConfiguration;
+    private final String taskManagerHostnamePrefix;
+    private final Boolean buildFromFlinkDist;
+    private final String flinkDistLocation;
+    private final String flinkHome;
+    private final String checkpointPath;
+    private final String haStoragePath;
+    private final Boolean zookeeperHA;
+    private final String zookeeperHostname;
+    private final Properties logProperties;
+
+    // Defaults
+    private static final long DEFAULT_METRIC_FETCHER_UPDATE_INTERVAL_MS = 
1000L;
+    private static final long DEFAULT_SLOT_REQUEST_TIMEOUT_MS = 10_000L;
+    private static final long DEFAULT_HEARTBEAT_TIMEOUT_MS = 5_000L;
+    private static final long DEFAULT_HEARTBEAT_INTERVAL_MS = 1000L;
+    private static final MemorySize DEFAULT_TM_TOTAL_PROCESS_MEMORY = 
MemorySize.ofMebiBytes(1728);
+    private static final MemorySize DEFAULT_JM_TOTAL_PROCESS_MEMORY = 
MemorySize.ofMebiBytes(1600);
+
+    private static final int DEFAULT_NUM_TASK_MANAGERS = 1;
+    private static final int DEFAULT_NUM_SLOTS_PER_TASK_MANAGER = 1;
+    private static final String DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX = 
"taskmanager-";
+    private static final String DEFAULT_JOB_MANAGER_HOSTNAME = "jobmanager";
+    private static final String DEFAULT_BIND_ADDRESS = "0.0.0.0";
+    private static final String DEFAULT_FLINK_HOME = "/opt/flink";
+    private static final String DEFAULT_CHECKPOINT_PATH = DEFAULT_FLINK_HOME + 
"/checkpoint";
+    private static final String DEFAULT_HA_STORAGE_PATH = DEFAULT_FLINK_HOME + 
"/recovery";
+
+    private static final String DEFAULT_ZOOKEEPER_HOSTNAME = "zookeeper";
+    private static final String DEFAULT_ZOOKEEPER_QUORUM = 
DEFAULT_ZOOKEEPER_HOSTNAME + ":2181";
+    // --
+
+    private FlinkContainersConfig(Builder builder) {
+        baseImage = builder.baseImage;
+        numTaskManagers = builder.numTaskManagers;
+        numSlotsPerTaskManager = builder.numSlotsPerTaskManager;
+        jarPaths = builder.jarPaths;
+        flinkConfiguration = builder.flinkConfiguration;
+        taskManagerHostnamePrefix = builder.taskManagerHostnamePrefix;
+        buildFromFlinkDist = builder.buildFromFlinkDist;
+        flinkDistLocation = builder.flinkDistLocation;
+        flinkHome = builder.flinkHome;
+        checkpointPath = builder.checkpointPath;
+        haStoragePath = builder.haStoragePath;
+        zookeeperHA = builder.zookeeperHA;
+        zookeeperHostname = builder.zookeeperHostname;
+        logProperties = builder.logProperties;
+    }
+
+    /**
+     * Builder for {@code FlinkContainersConfig}.
+     *
+     * @return The builder.
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on defaults.
+     *
+     * @return The Flink containers config.
+     */
+    public static FlinkContainersConfig defaultConfig() {
+        return builder().build();
+    }
+
+    /**
+     * {@code FlinkContainersConfig} based on provided Flink configuration.
+     *
+     * @param config The config.
+     * @return The flink containers config.
+     */
+    public static FlinkContainersConfig basedOn(Configuration config) {
+        return builder().basedOn(config).build();
+    }
+
+    /** {@code FlinkContainersConfig} builder static inner class. */
+    public static final class Builder {
+        private String baseImage;
+        private int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
+        private int numSlotsPerTaskManager = 
DEFAULT_NUM_SLOTS_PER_TASK_MANAGER;
+        private Collection<String> jarPaths = new ArrayList<>();
+        private Configuration flinkConfiguration = defaultFlinkConfig();
+        private String taskManagerHostnamePrefix = 
DEFAULT_TASK_MANAGERS_HOSTNAME_PREFIX;
+        private Boolean buildFromFlinkDist = true;
+        private String flinkDistLocation;
+        private String flinkHome = DEFAULT_FLINK_HOME;
+        private String checkpointPath = DEFAULT_CHECKPOINT_PATH;
+        private String haStoragePath = DEFAULT_HA_STORAGE_PATH;
+        private Boolean zookeeperHA = false;
+        private String zookeeperHostname = DEFAULT_ZOOKEEPER_HOSTNAME;
+        private Properties logProperties = defaultLoggingProperties();
+
+        private Builder() {}
+
+        /**
+         * Sets the {@code baseImage} and returns a reference to this Builder 
enabling method
+         * chaining.
+         *
+         * @param baseImage The {@code baseImage} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder baseImage(String baseImage) {
+            this.baseImage = baseImage;
+            this.buildFromFlinkDist = false;
+            return this;
+        }
+
+        /**
+         * Sets the {@code flinkDistLocation} and returns a reference to this 
Builder enabling
+         * method chaining.
+         *
+         * @param flinkDistLocation The {@code flinkDistLocation} to set.
+         * @return A reference to this Builder.
+         */
+        public Builder flinkDistLocation(String flinkDistLocation) {

Review Comment:
   What about using `Path` as the parameter type? Same for `flinkHome`, 
`checkpointPath` and `haStoragePath`



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java:
##########
@@ -149,15 +160,15 @@ public FlinkImageBuilder setTimeout(Duration timeout) {
     /** Use this image for building a JobManager. */
     public FlinkImageBuilder asJobManager() {
         checkStartupCommandNotSet();
-        this.startupCommand = "flink/bin/jobmanager.sh start-foreground && 
tail -f /dev/null";
+        this.startupCommand = "bin/jobmanager.sh start-foreground && tail -f 
/dev/null";

Review Comment:
   What about using absolute path here: `flinkHome + "bin/jobmanager.sh ...`? 
As users can pass customize base image here we are not sure where the workdir 
is. Same for the startup command of TaskManager.



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainersConfig.java:
##########
@@ -0,0 +1,599 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+
+import static 
org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_INTERVAL;
+import static 
org.apache.flink.configuration.HeartbeatManagerOptions.HEARTBEAT_TIMEOUT;
+import static 
org.apache.flink.configuration.JobManagerOptions.SLOT_REQUEST_TIMEOUT;
+import static 
org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+
+/** The central configuration holder for Flink container-based test 
environments. */
+public class FlinkContainersConfig {

Review Comment:
   Actually I'm a little confused by the new `FlinkContainersConfig` 🤔 
   
   If I understand correctly, this new class is trying to unify the config 
across different container-based envs such as `FlinkContainers` and 
`FlinkContainersTestEnvironment`. This idea makes sense to me but I notice that 
`FlinkContainersBuilder` is kept. I think both the builder and the config are 
just syntax sugar to make the constructor of `FlinkContainers` and 
`FlinkContainersTestEnvironment` more human-readable, so is it possible to 
remove the `FlinkContainersBuilder` and move all paramters to 
`FlinkContainersConfig`?



##########
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkImageBuilder.java:
##########
@@ -80,12 +80,23 @@ public FlinkImageBuilder setTempDirectory(Path 
tempDirectory) {
     }
 
     /**
-     * Sets the name of building image.
+     * Sets flink home.
      *
-     * <p>If the name is not specified, {@link #DEFAULT_IMAGE_NAME} will be 
used.
+     * @param flinkHome The flink home.
+     * @return The flink home.
      */
-    public FlinkImageBuilder setImageName(String imageName) {
-        this.imageName = imageName;
+    public FlinkImageBuilder setFlinkHome(String flinkHome) {

Review Comment:
   Maybe use `Path` as parameter type here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to