This is an automated email from the ASF dual-hosted git repository.
mbalassi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new b7fc9b71 [FLINK-29194] Add logging for ResourceListener
b7fc9b71 is described below
commit b7fc9b71458ca72364431635007945816a4e107f
Author: Matyas Orhidi <[email protected]>
AuthorDate: Wed Sep 7 21:01:56 2022 +0200
[FLINK-29194] Add logging for ResourceListener
---
.../kubernetes/operator/listener/AuditUtils.java | 60 ++++++++++++++++++++++
.../operator/listener/ListenerUtils.java | 3 +-
.../metrics/lifecycle/ResourceLifecycleState.java | 24 +++++----
.../kubernetes/operator/utils/EventRecorder.java | 58 +++++++++++----------
.../kubernetes/operator/utils/StatusRecorder.java | 2 +
.../src/main/resources/log4j2.properties | 3 ++
6 files changed, 111 insertions(+), 39 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
new file mode 100644
index 00000000..0692dec9
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.listener;
+
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
+
+import io.fabric8.kubernetes.api.model.Event;
+import lombok.NonNull;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Responsible for logging resource event/status updates. */
+public class AuditUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(AuditUtils.class);
+
+ public static <R extends AbstractFlinkResource<?, S>, S extends
CommonStatus<?>>
+ void logContext(FlinkResourceListener.StatusUpdateContext<R, S>
ctx) {
+ LOG.info(format(ctx.getNewStatus()));
+ }
+
+ public static <R extends AbstractFlinkResource<?, ?>> void logContext(
+ FlinkResourceListener.ResourceEventContext<R> ctx) {
+ LOG.info(format(ctx.getEvent()));
+ }
+
+ private static String format(@NonNull CommonStatus<?> status) {
+ return String.format(
+ ">>> Status | %-7s | %-15s | %s ",
+ StringUtils.isEmpty(status.getError()) ? "Info" : "Error",
+ status.getLifecycleState(),
+ StringUtils.isEmpty(status.getError())
+ ? status.getLifecycleState().getDescription()
+ : status.getError());
+ }
+
+ private static String format(@NonNull Event event) {
+ return String.format(
+ ">>> Event | %-7s | %-15s | %s",
+ event.getType().equals("Normal") ? "Info" : event.getType(),
+ event.getReason().toUpperCase(),
+ event.getMessage());
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
index f5ce71d4..843cdf26 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/ListenerUtils.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +44,7 @@ import java.util.regex.Pattern;
/** Flink resource listener utilities. */
public class ListenerUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(FlinkUtils.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ListenerUtils.class);
private static final String PREFIX =
"kubernetes.operator.plugins.listeners.";
private static final String SUFFIX = ".class";
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
index 19c0da36..324f5346 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleState.java
@@ -17,25 +17,31 @@
package org.apache.flink.kubernetes.operator.metrics.lifecycle;
+import lombok.Getter;
+
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
/** Enum encapsulating the lifecycle state of a Flink resource. */
public enum ResourceLifecycleState {
- CREATED(false),
- SUSPENDED(true),
- UPGRADING(false),
- DEPLOYED(false),
- STABLE(true),
- ROLLING_BACK(false),
- ROLLED_BACK(true),
- FAILED(true);
+ CREATED(false, "The resource was created in Kubernetes but not yet handled
by the operator"),
+ SUSPENDED(true, "The resource (job) has been suspended"),
+ UPGRADING(false, "The resource is being upgraded"),
+ DEPLOYED(
+ false,
+ "The resource is deployed/submitted to Kubernetes, but it’s not
yet considered to be stable and might be rolled back in the future"),
+ STABLE(true, "The resource deployment is considered to be stable and won’t
be rolled back"),
+ ROLLING_BACK(false, "The resource is being rolled back to the last stable
spec"),
+ ROLLED_BACK(true, "The resource is deployed with the last stable spec"),
+ FAILED(true, "The job terminally failed");
private final boolean terminal;
+ @Getter private final String description;
- ResourceLifecycleState(boolean terminal) {
+ ResourceLifecycleState(boolean terminal, String description) {
this.terminal = terminal;
+ this.description = description;
}
public Set<ResourceLifecycleState> getClearedStatesAfterTransition(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index 9bc55762..a3baeb9a 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.operator.utils;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.listener.AuditUtils;
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
import io.fabric8.kubernetes.api.model.Event;
@@ -69,34 +70,35 @@ public class EventRecorder {
KubernetesClient client, Collection<FlinkResourceListener>
listeners) {
BiConsumer<AbstractFlinkResource<?, ?>, Event> biConsumer =
- (resource, event) ->
- listeners.forEach(
- listener -> {
- var ctx =
- new
FlinkResourceListener.ResourceEventContext() {
- @Override
- public Event getEvent() {
- return event;
- }
-
- @Override
- public
AbstractFlinkResource<?, ?>
- getFlinkResource() {
- return resource;
- }
-
- @Override
- public KubernetesClient
getKubernetesClient() {
- return client;
- }
- };
-
- if (resource instanceof FlinkDeployment) {
- listener.onDeploymentEvent(ctx);
- } else {
- listener.onSessionJobEvent(ctx);
- }
- });
+ (resource, event) -> {
+ var ctx =
+ new FlinkResourceListener.ResourceEventContext() {
+ @Override
+ public Event getEvent() {
+ return event;
+ }
+
+ @Override
+ public AbstractFlinkResource<?, ?>
getFlinkResource() {
+ return resource;
+ }
+
+ @Override
+ public KubernetesClient getKubernetesClient() {
+ return client;
+ }
+ };
+ listeners.forEach(
+ listener -> {
+ if (resource instanceof FlinkDeployment) {
+ listener.onDeploymentEvent(ctx);
+ } else {
+ listener.onSessionJobEvent(ctx);
+ }
+ });
+ AuditUtils.logContext(ctx);
+ };
+
return new EventRecorder(client, biConsumer);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
index c042e0de..bd196dd5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/StatusRecorder.java
@@ -24,6 +24,7 @@ import
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
+import org.apache.flink.kubernetes.operator.listener.AuditUtils;
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
@@ -196,6 +197,7 @@ public class StatusRecorder<
listener.onSessionJobStatusUpdate(ctx);
}
});
+ AuditUtils.logContext(ctx);
};
return new StatusRecorder<>(kubernetesClient, metricManager, consumer);
diff --git a/flink-kubernetes-operator/src/main/resources/log4j2.properties
b/flink-kubernetes-operator/src/main/resources/log4j2.properties
index f7f24f44..15b1eef7 100644
--- a/flink-kubernetes-operator/src/main/resources/log4j2.properties
+++ b/flink-kubernetes-operator/src/main/resources/log4j2.properties
@@ -27,3 +27,6 @@ appender.console.layout.pattern = %style{%d}{yellow}
%style{%-30c{1.}}{cyan} %hi
logger.conf.name = org.apache.flink.configuration.GlobalConfiguration
logger.conf.level = WARN
+
+logger.event.name = org.apache.flink.kubernetes.operator.listener.AuditUtils
+logger.event.level = INFO