PatrickRen commented on a change in pull request #17892:
URL: https://github.com/apache/flink/pull/17892#discussion_r761632767



##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/container/FlinkContainerBuilder.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.flink.container;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+
+import org.slf4j.Logger;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.images.builder.ImageFromDockerfile;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A builder class for {@link FlinkContainer}. */
+public class FlinkContainerBuilder {
+
+    private final List<GenericContainer<?>> dependentContainers = new 
ArrayList<>();
+    private final Configuration conf = new Configuration();
+    private final Map<String, String> envVars = new HashMap<>();
+
+    private int numTaskManagers = 1;
+    private Network network = Network.newNetwork();
+    private Logger logger;
+    private boolean enableHAService = false;
+
+    /** Sets number of TaskManagers. */
+    public FlinkContainerBuilder numTaskManagers(int numTaskManagers) {
+        this.numTaskManagers = numTaskManagers;
+        return this;
+    }
+
+    /** Sets configuration of the cluster. */
+    public FlinkContainerBuilder setConfiguration(Configuration conf) {
+        this.conf.addAll(conf);
+        return this;
+    }
+
+    /**
+     * Lets Flink cluster depending on another container, and bind the network 
of Flink cluster to
+     * the dependent one.
+     */
+    public FlinkContainerBuilder dependsOn(GenericContainer<?> container) {
+        container.withNetwork(this.network);
+        this.dependentContainers.add(container);
+        return this;
+    }
+
+    /** Sets environment variable to containers. */
+    public FlinkContainerBuilder setEnvironmentVariable(String key, String 
value) {
+        this.envVars.put(key, value);
+        return this;
+    }
+
+    /** Sets network of the Flink cluster. */
+    public FlinkContainerBuilder setNetwork(Network network) {
+        this.network = network;
+        return this;
+    }
+
+    /** Sets a logger to the cluster in order to consume STDOUT of containers 
to the logger. */
+    public FlinkContainerBuilder setLogger(Logger logger) {
+        this.logger = logger;
+        return this;
+    }
+
+    /** Enables high availability service. */
+    public FlinkContainerBuilder enableHAService() {
+        this.enableHAService = true;
+        return this;
+    }
+
+    /** Builds {@link FlinkContainer}. */
+    public FlinkContainer build() {
+        GenericContainer<?> haService = null;
+        if (enableHAService) {
+            enableHAServiceConfigurations();
+            haService = buildZookeeperContainer();
+        }
+
+        this.conf.set(JobManagerOptions.ADDRESS, 
FlinkContainer.JOB_MANAGER_HOSTNAME);
+        this.conf.set(
+                CheckpointingOptions.CHECKPOINTS_DIRECTORY,
+                
FlinkContainer.CHECKPOINT_PATH.toAbsolutePath().toUri().toString());
+
+        final GenericContainer<?> jobManager = buildJobManagerContainer();
+        final List<GenericContainer<?>> taskManagers = 
buildTaskManagerContainers();
+
+        if (enableHAService) {
+            try {
+                Path recoveryPath = 
Files.createTempDirectory("flink-recovery");
+                jobManager.withFileSystemBind(
+                        recoveryPath.toAbsolutePath().toString(),
+                        
FlinkContainer.HA_STORAGE_PATH.toAbsolutePath().toString());
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to create temporary 
recovery directory", e);
+            }
+        }
+
+        try {
+            Path checkpointPath = 
Files.createTempDirectory("flink-checkpoint");
+            jobManager.withFileSystemBind(
+                    checkpointPath.toAbsolutePath().toString(),
+                    
FlinkContainer.CHECKPOINT_PATH.toAbsolutePath().toString());
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to create temporary 
checkpoint directory", e);
+        }
+
+        return new FlinkContainer(jobManager, taskManagers, haService, conf);
+    }
+
+    // --------------------------- Helper Functions 
-------------------------------------
+
+    private GenericContainer<?> buildJobManagerContainer() {
+        // Configure JobManager
+        final Configuration jobManagerConf = new Configuration();
+        jobManagerConf.addAll(this.conf);
+        // Build JobManager container
+        final ImageFromDockerfile jobManagerImage;
+        try {
+            jobManagerImage =
+                    new 
FlinkImageBuilder().setConfiguration(jobManagerConf).asJobManager().build();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to build JobManager image", e);
+        }
+        final GenericContainer<?> jobManager = buildContainer(jobManagerImage);
+        // Setup network for JobManager
+        jobManager
+                .withNetworkAliases(FlinkContainer.JOB_MANAGER_HOSTNAME)
+                .withExposedPorts(jobManagerConf.get(RestOptions.PORT));
+        // Setup logger
+        if (this.logger != null) {
+            jobManager.withLogConsumer(new 
Slf4jLogConsumer(this.logger).withPrefix("JobManager"));

Review comment:
       I think syncing with logger's level is not quite useful in this case. 
Instead I added a method in `FlinkContainerBuilder` for configuring log4j 
properties directly. By default it will use `log4j-console.properties` under 
flink-dist, with root logger  level set to INFO, and properties specified in 
builder will append or overwrite entries in `log4j-console.properties`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to