This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6db4479  [FLINK-26328] Control Logging Behavior in Flink Deployments
6db4479 is described below

commit 6db4479a87a86342d48612e57051ac38c71f2875
Author: Matyas Orhidi <[email protected]>
AuthorDate: Fri Feb 25 13:21:29 2022 +0100

    [FLINK-26328] Control Logging Behavior in Flink Deployments
    
    Closes #27
---
 examples/custom-logging.yaml                       | 67 ++++++++++++++++++++++
 .../operator/crd/spec/FlinkDeploymentSpec.java     |  1 +
 .../operator/utils/FlinkConfigBuilder.java         | 32 +++++++++++
 .../validation/DefaultDeploymentValidator.java     | 21 +++++++
 .../operator/utils/FlinkConfigBuilderTest.java     | 25 ++++++++
 .../validation/DeploymentValidatorTest.java        | 16 ++++++
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  8 +++
 7 files changed, 170 insertions(+)

diff --git a/examples/custom-logging.yaml b/examples/custom-logging.yaml
new file mode 100644
index 0000000..27450a1
--- /dev/null
+++ b/examples/custom-logging.yaml
@@ -0,0 +1,67 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1alpha1
+kind: FlinkDeployment
+metadata:
+  namespace: default
+  name: custom-logging-example
+spec:
+  image: flink:1.14.3
+  flinkVersion: 1.14.3
+  flinkConfiguration:
+    taskmanager.numberOfTaskSlots: "2"
+  serviceAccount: flink-operator
+  jobManager:
+    replicas: 1
+    resource:
+      memory: "2048m"
+      cpu: 1
+  taskManager:
+    resource:
+      memory: "2048m"
+      cpu: 1
+  job:
+    jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+    parallelism: 2
+    upgradeMode: stateless
+  logConfiguration:
+    "log4j-console.properties": |
+      rootLogger.level = DEBUG
+      rootLogger.appenderRef.file.ref = LogFile
+      rootLogger.appenderRef.console.ref = LogConsole
+      appender.file.name = LogFile
+      appender.file.type = File
+      appender.file.append = false
+      appender.file.fileName = ${sys:log.file}
+      appender.file.layout.type = PatternLayout
+      appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
+      appender.console.name = LogConsole
+      appender.console.type = CONSOLE
+      appender.console.layout.type = PatternLayout
+      appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c 
%x - %m%n
+      logger.akka.name = akka
+      logger.akka.level = INFO
+      logger.kafka.name= org.apache.kafka
+      logger.kafka.level = INFO
+      logger.hadoop.name = org.apache.hadoop
+      logger.hadoop.level = INFO
+      logger.zookeeper.name = org.apache.zookeeper
+      logger.zookeeper.level = INFO
+      logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
+      logger.netty.level = OFF
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 900f7c2..c3e6719 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -41,4 +41,5 @@ public class FlinkDeploymentSpec {
     private JobManagerSpec jobManager;
     private TaskManagerSpec taskManager;
     private JobSpec job;
+    private Map<String, String> logConfiguration;
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 07ff83a..83a964f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -42,7 +42,10 @@ import java.net.URISyntaxException;
 import java.nio.file.Files;
 import java.util.Collections;
 
+import static 
org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR;
 import static 
org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates;
+import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
+import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
 
 /** Builder to get effective flink config from {@link FlinkDeployment}. */
 public class FlinkConfigBuilder {
@@ -80,6 +83,17 @@ public class FlinkConfigBuilder {
         return this;
     }
 
+    public FlinkConfigBuilder applyLogConfiguration() throws IOException {
+        if (spec.getLogConfiguration() != null) {
+            String confDir =
+                    createLogConfigFiles(
+                            
spec.getLogConfiguration().get(CONFIG_FILE_LOG4J_NAME),
+                            
spec.getLogConfiguration().get(CONFIG_FILE_LOGBACK_NAME));
+            effectiveConfig.setString(CONF_DIR, confDir);
+        }
+        return this;
+    }
+
     public FlinkConfigBuilder applyCommonPodTemplate() throws IOException {
         if (spec.getPodTemplate() != null) {
             effectiveConfig.set(
@@ -168,6 +182,7 @@ public class FlinkConfigBuilder {
             throws IOException, URISyntaxException {
         return new FlinkConfigBuilder(dep, flinkConfig)
                 .applyFlinkConfiguration()
+                .applyLogConfiguration()
                 .applyImage()
                 .applyImagePullPolicy()
                 .applyServiceAccount()
@@ -211,6 +226,23 @@ public class FlinkConfigBuilder {
                 podConfigOption, createTempFile(mergePodTemplates(basicPod, 
appendPod)));
     }
 
+    private static String createLogConfigFiles(String log4jConf, String 
logbackConf)
+            throws IOException {
+        File tmpDir = Files.createTempDirectory("conf").toFile();
+
+        if (log4jConf != null) {
+            File log4jConfFile = new File(tmpDir.getAbsolutePath(), 
CONFIG_FILE_LOG4J_NAME);
+            Files.write(log4jConfFile.toPath(), log4jConf.getBytes());
+        }
+
+        if (logbackConf != null) {
+            File logbackConfFile = new File(tmpDir.getAbsolutePath(), 
CONFIG_FILE_LOGBACK_NAME);
+            Files.write(logbackConfFile.toPath(), logbackConf.getBytes());
+        }
+        tmpDir.deleteOnExit();
+        return tmpDir.getAbsolutePath();
+    }
+
     private static String createTempFile(Pod podTemplate) throws IOException {
         final File tmp = File.createTempFile("podTemplate_", ".yaml");
         Files.write(tmp.toPath(), 
SerializationUtils.dumpAsYaml(podTemplate).getBytes());
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index b1208a5..9f8046b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -27,10 +27,12 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 /** Default validator implementation. */
 public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
@@ -38,11 +40,15 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
     private static final String[] FORBIDDEN_CONF_KEYS =
             new String[] {"kubernetes.namespace", "kubernetes.cluster-id"};
 
+    private static final Set<String> ALLOWED_LOG_CONF_KEYS =
+            Set.of(Constants.CONFIG_FILE_LOG4J_NAME, 
Constants.CONFIG_FILE_LOGBACK_NAME);
+
     @Override
     public Optional<String> validate(FlinkDeployment deployment) {
         FlinkDeploymentSpec spec = deployment.getSpec();
         return firstPresent(
                 validateFlinkConfig(spec.getFlinkConfiguration()),
+                validateLogConfig(spec.getLogConfiguration()),
                 validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
                 validateJmSpec(spec.getJobManager(), 
spec.getFlinkConfiguration()),
                 validateTmSpec(spec.getTaskManager()),
@@ -71,6 +77,21 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
         return Optional.empty();
     }
 
+    private Optional<String> validateLogConfig(Map<String, String> confMap) {
+        if (confMap == null) {
+            return Optional.empty();
+        }
+        for (String key : confMap.keySet()) {
+            if (!ALLOWED_LOG_CONF_KEYS.contains(key)) {
+                return Optional.of(
+                        String.format(
+                                "Invalid log config key: %s. Allowed keys are 
%s",
+                                key, ALLOWED_LOG_CONF_KEYS));
+            }
+        }
+        return Optional.empty();
+    }
+
     private Optional<String> validateJobSpec(JobSpec job, Map<String, String> 
confMap) {
         if (job == null) {
             return Optional.empty();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index ad9e798..fd93e7d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.utils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
@@ -28,6 +29,7 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.utils.Constants;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -38,18 +40,23 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Map;
 
 import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
 import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE_POLICY;
 import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
 import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT;
+import static 
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
 
 /** FlinkConfigBuilderTest. */
 public class FlinkConfigBuilderTest {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new 
YAMLFactory());
     private static FlinkDeployment flinkDeployment;
+    private static final String CUSTOM_LOG_CONFIG = "rootLogger.level = INFO";
 
     @BeforeAll
     public static void prepareFlinkDeployment() {
@@ -70,6 +77,9 @@ public class FlinkConfigBuilderTest {
         flinkDeployment.getSpec().getJobManager().setReplicas(2);
         flinkDeployment.getSpec().getTaskManager().setPodTemplate(pod2);
         flinkDeployment.getSpec().getJob().setParallelism(2);
+        flinkDeployment
+                .getSpec()
+                .setLogConfiguration(Map.of(Constants.CONFIG_FILE_LOG4J_NAME, 
CUSTOM_LOG_CONFIG));
     }
 
     @Test
@@ -100,6 +110,21 @@ public class FlinkConfigBuilderTest {
     }
 
     @Test
+    public void testApplyLogConfiguration() throws IOException {
+        final Configuration configuration =
+                new FlinkConfigBuilder(flinkDeployment, new Configuration())
+                        .applyLogConfiguration()
+                        .build();
+
+        final File log4jFile =
+                new File(
+                        configuration.get(DeploymentOptionsInternal.CONF_DIR),
+                        CONFIG_FILE_LOG4J_NAME);
+        Assert.assertTrue(log4jFile.exists() && log4jFile.isFile() && 
log4jFile.canRead());
+        Assert.assertEquals(CUSTOM_LOG_CONFIG, 
Files.readString(log4jFile.toPath()));
+    }
+
+    @Test
     public void testApplyCommonPodTemplate() throws Exception {
         final Configuration configuration =
                 new FlinkConfigBuilder(flinkDeployment, new Configuration())
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 6cc8d09..184d3aa 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -25,11 +25,13 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.utils.Constants;
 
 import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
 
@@ -74,6 +76,20 @@ public class DeploymentValidatorTest {
                                         Collections.singletonMap(
                                                 
KubernetesConfigOptions.NAMESPACE.key(), "myns")),
                 "Forbidden Flink config key");
+
+        // Test log config validation
+        testSuccess(
+                dep ->
+                        dep.getSpec()
+                                .setLogConfiguration(
+                                        Map.of(
+                                                
Constants.CONFIG_FILE_LOG4J_NAME,
+                                                "rootLogger.level = INFO")));
+
+        testError(
+                dep -> dep.getSpec().setLogConfiguration(Map.of("random", 
"config")),
+                "Invalid log config key");
+
         testError(
                 dep -> dep.getSpec().getJobManager().setReplicas(2),
                 "High availability should be enabled when starting standby 
JobManagers.");
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index e0cbbbf..5ae554c 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9057,6 +9057,10 @@ spec:
                     - stateless
                     type: string
                 type: object
+              logConfiguration:
+                additionalProperties:
+                  type: string
+                type: object
             type: object
           status:
             properties:
@@ -18119,6 +18123,10 @@ spec:
                             - stateless
                             type: string
                         type: object
+                      logConfiguration:
+                        additionalProperties:
+                          type: string
+                        type: object
                     type: object
                 type: object
             type: object

Reply via email to