This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 6db4479 [FLINK-26328] Control Logging Behavior in Flink Deployments
6db4479 is described below
commit 6db4479a87a86342d48612e57051ac38c71f2875
Author: Matyas Orhidi <[email protected]>
AuthorDate: Fri Feb 25 13:21:29 2022 +0100
[FLINK-26328] Control Logging Behavior in Flink Deployments
Closes #27
---
examples/custom-logging.yaml | 67 ++++++++++++++++++++++
.../operator/crd/spec/FlinkDeploymentSpec.java | 1 +
.../operator/utils/FlinkConfigBuilder.java | 32 +++++++++++
.../validation/DefaultDeploymentValidator.java | 21 +++++++
.../operator/utils/FlinkConfigBuilderTest.java | 25 ++++++++
.../validation/DeploymentValidatorTest.java | 16 ++++++
.../crds/flinkdeployments.flink.apache.org-v1.yml | 8 +++
7 files changed, 170 insertions(+)
diff --git a/examples/custom-logging.yaml b/examples/custom-logging.yaml
new file mode 100644
index 0000000..27450a1
--- /dev/null
+++ b/examples/custom-logging.yaml
@@ -0,0 +1,67 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+apiVersion: flink.apache.org/v1alpha1
+kind: FlinkDeployment
+metadata:
+ namespace: default
+ name: custom-logging-example
+spec:
+ image: flink:1.14.3
+ flinkVersion: 1.14.3
+ flinkConfiguration:
+ taskmanager.numberOfTaskSlots: "2"
+ serviceAccount: flink-operator
+ jobManager:
+ replicas: 1
+ resource:
+ memory: "2048m"
+ cpu: 1
+ taskManager:
+ resource:
+ memory: "2048m"
+ cpu: 1
+ job:
+ jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
+ parallelism: 2
+ upgradeMode: stateless
+ logConfiguration:
+ "log4j-console.properties": |
+ rootLogger.level = DEBUG
+ rootLogger.appenderRef.file.ref = LogFile
+ rootLogger.appenderRef.console.ref = LogConsole
+ appender.file.name = LogFile
+ appender.file.type = File
+ appender.file.append = false
+ appender.file.fileName = ${sys:log.file}
+ appender.file.layout.type = PatternLayout
+ appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n
+ appender.console.name = LogConsole
+ appender.console.type = CONSOLE
+ appender.console.layout.type = PatternLayout
+ appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c
%x - %m%n
+ logger.akka.name = akka
+ logger.akka.level = INFO
+ logger.kafka.name= org.apache.kafka
+ logger.kafka.level = INFO
+ logger.hadoop.name = org.apache.hadoop
+ logger.hadoop.level = INFO
+ logger.zookeeper.name = org.apache.zookeeper
+ logger.zookeeper.level = INFO
+ logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
+ logger.netty.level = OFF
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
index 900f7c2..c3e6719 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkDeploymentSpec.java
@@ -41,4 +41,5 @@ public class FlinkDeploymentSpec {
private JobManagerSpec jobManager;
private TaskManagerSpec taskManager;
private JobSpec job;
+ private Map<String, String> logConfiguration;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
index 07ff83a..83a964f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilder.java
@@ -42,7 +42,10 @@ import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.Collections;
+import static
org.apache.flink.configuration.DeploymentOptionsInternal.CONF_DIR;
import static
org.apache.flink.kubernetes.operator.utils.FlinkUtils.mergePodTemplates;
+import static
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
+import static
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOGBACK_NAME;
/** Builder to get effective flink config from {@link FlinkDeployment}. */
public class FlinkConfigBuilder {
@@ -80,6 +83,17 @@ public class FlinkConfigBuilder {
return this;
}
+ public FlinkConfigBuilder applyLogConfiguration() throws IOException {
+ if (spec.getLogConfiguration() != null) {
+ String confDir =
+ createLogConfigFiles(
+
spec.getLogConfiguration().get(CONFIG_FILE_LOG4J_NAME),
+
spec.getLogConfiguration().get(CONFIG_FILE_LOGBACK_NAME));
+ effectiveConfig.setString(CONF_DIR, confDir);
+ }
+ return this;
+ }
+
public FlinkConfigBuilder applyCommonPodTemplate() throws IOException {
if (spec.getPodTemplate() != null) {
effectiveConfig.set(
@@ -168,6 +182,7 @@ public class FlinkConfigBuilder {
throws IOException, URISyntaxException {
return new FlinkConfigBuilder(dep, flinkConfig)
.applyFlinkConfiguration()
+ .applyLogConfiguration()
.applyImage()
.applyImagePullPolicy()
.applyServiceAccount()
@@ -211,6 +226,23 @@ public class FlinkConfigBuilder {
podConfigOption, createTempFile(mergePodTemplates(basicPod,
appendPod)));
}
+ private static String createLogConfigFiles(String log4jConf, String
logbackConf)
+ throws IOException {
+ File tmpDir = Files.createTempDirectory("conf").toFile();
+
+ if (log4jConf != null) {
+ File log4jConfFile = new File(tmpDir.getAbsolutePath(),
CONFIG_FILE_LOG4J_NAME);
+ Files.write(log4jConfFile.toPath(), log4jConf.getBytes());
+ }
+
+ if (logbackConf != null) {
+ File logbackConfFile = new File(tmpDir.getAbsolutePath(),
CONFIG_FILE_LOGBACK_NAME);
+ Files.write(logbackConfFile.toPath(), logbackConf.getBytes());
+ }
+ tmpDir.deleteOnExit();
+ return tmpDir.getAbsolutePath();
+ }
+
private static String createTempFile(Pod podTemplate) throws IOException {
final File tmp = File.createTempFile("podTemplate_", ".yaml");
Files.write(tmp.toPath(),
SerializationUtils.dumpAsYaml(podTemplate).getBytes());
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index b1208a5..9f8046b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -27,10 +27,12 @@ import
org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
/** Default validator implementation. */
public class DefaultDeploymentValidator implements FlinkDeploymentValidator {
@@ -38,11 +40,15 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
private static final String[] FORBIDDEN_CONF_KEYS =
new String[] {"kubernetes.namespace", "kubernetes.cluster-id"};
+ private static final Set<String> ALLOWED_LOG_CONF_KEYS =
+ Set.of(Constants.CONFIG_FILE_LOG4J_NAME,
Constants.CONFIG_FILE_LOGBACK_NAME);
+
@Override
public Optional<String> validate(FlinkDeployment deployment) {
FlinkDeploymentSpec spec = deployment.getSpec();
return firstPresent(
validateFlinkConfig(spec.getFlinkConfiguration()),
+ validateLogConfig(spec.getLogConfiguration()),
validateJobSpec(spec.getJob(), spec.getFlinkConfiguration()),
validateJmSpec(spec.getJobManager(),
spec.getFlinkConfiguration()),
validateTmSpec(spec.getTaskManager()),
@@ -71,6 +77,21 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
return Optional.empty();
}
+ private Optional<String> validateLogConfig(Map<String, String> confMap) {
+ if (confMap == null) {
+ return Optional.empty();
+ }
+ for (String key : confMap.keySet()) {
+ if (!ALLOWED_LOG_CONF_KEYS.contains(key)) {
+ return Optional.of(
+ String.format(
+ "Invalid log config key: %s. Allowed keys are
%s",
+ key, ALLOWED_LOG_CONF_KEYS));
+ }
+ }
+ return Optional.empty();
+ }
+
private Optional<String> validateJobSpec(JobSpec job, Map<String, String>
confMap) {
if (job == null) {
return Optional.empty();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
index ad9e798..fd93e7d 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkConfigBuilderTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
@@ -28,6 +29,7 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.utils.Constants;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -38,18 +40,23 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Map;
import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE;
import static org.apache.flink.kubernetes.operator.TestUtils.IMAGE_POLICY;
import static org.apache.flink.kubernetes.operator.TestUtils.SAMPLE_JAR;
import static org.apache.flink.kubernetes.operator.TestUtils.SERVICE_ACCOUNT;
+import static
org.apache.flink.kubernetes.utils.Constants.CONFIG_FILE_LOG4J_NAME;
/** FlinkConfigBuilderTest. */
public class FlinkConfigBuilderTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new
YAMLFactory());
private static FlinkDeployment flinkDeployment;
+ private static final String CUSTOM_LOG_CONFIG = "rootLogger.level = INFO";
@BeforeAll
public static void prepareFlinkDeployment() {
@@ -70,6 +77,9 @@ public class FlinkConfigBuilderTest {
flinkDeployment.getSpec().getJobManager().setReplicas(2);
flinkDeployment.getSpec().getTaskManager().setPodTemplate(pod2);
flinkDeployment.getSpec().getJob().setParallelism(2);
+ flinkDeployment
+ .getSpec()
+ .setLogConfiguration(Map.of(Constants.CONFIG_FILE_LOG4J_NAME,
CUSTOM_LOG_CONFIG));
}
@Test
@@ -100,6 +110,21 @@ public class FlinkConfigBuilderTest {
}
@Test
+ public void testApplyLogConfiguration() throws IOException {
+ final Configuration configuration =
+ new FlinkConfigBuilder(flinkDeployment, new Configuration())
+ .applyLogConfiguration()
+ .build();
+
+ final File log4jFile =
+ new File(
+ configuration.get(DeploymentOptionsInternal.CONF_DIR),
+ CONFIG_FILE_LOG4J_NAME);
+ Assert.assertTrue(log4jFile.exists() && log4jFile.isFile() &&
log4jFile.canRead());
+ Assert.assertEquals(CUSTOM_LOG_CONFIG,
Files.readString(log4jFile.toPath()));
+ }
+
+ @Test
public void testApplyCommonPodTemplate() throws Exception {
final Configuration configuration =
new FlinkConfigBuilder(flinkDeployment, new Configuration())
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 6cc8d09..184d3aa 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -25,11 +25,13 @@ import
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.kubernetes.utils.Constants;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
@@ -74,6 +76,20 @@ public class DeploymentValidatorTest {
Collections.singletonMap(
KubernetesConfigOptions.NAMESPACE.key(), "myns")),
"Forbidden Flink config key");
+
+ // Test log config validation
+ testSuccess(
+ dep ->
+ dep.getSpec()
+ .setLogConfiguration(
+ Map.of(
+
Constants.CONFIG_FILE_LOG4J_NAME,
+ "rootLogger.level = INFO")));
+
+ testError(
+ dep -> dep.getSpec().setLogConfiguration(Map.of("random",
"config")),
+ "Invalid log config key");
+
testError(
dep -> dep.getSpec().getJobManager().setReplicas(2),
"High availability should be enabled when starting standby
JobManagers.");
diff --git a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index e0cbbbf..5ae554c 100644
--- a/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -9057,6 +9057,10 @@ spec:
- stateless
type: string
type: object
+ logConfiguration:
+ additionalProperties:
+ type: string
+ type: object
type: object
status:
properties:
@@ -18119,6 +18123,10 @@ spec:
- stateless
type: string
type: object
+ logConfiguration:
+ additionalProperties:
+ type: string
+ type: object
type: object
type: object
type: object