This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d6a4eb966fb [FLINK-34417] Log Job ID via MDC
d6a4eb966fb is described below
commit d6a4eb966fbc47277e07b79e7c64939a62eb1d54
Author: Roman Khachatryan <[email protected]>
AuthorDate: Sat Feb 3 13:17:36 2024 +0100
[FLINK-34417] Log Job ID via MDC
---
.../content.zh/docs/deployment/advanced/logging.md | 15 ++
docs/content/docs/deployment/advanced/logging.md | 14 ++
.../org/apache/flink/util/MdcAwareExecutor.java | 39 ++++
.../apache/flink/util/MdcAwareExecutorService.java | 114 +++++++++++
.../util/MdcAwareScheduledExecutorService.java | 61 ++++++
.../main/java/org/apache/flink/util/MdcUtils.java | 112 +++++++++++
.../java/org/apache/flink/util/MdcUtilsTest.java | 148 ++++++++++++++
.../runtime/rpc/pekko/FencedPekkoRpcActor.java | 7 +-
.../flink/runtime/rpc/pekko/PekkoRpcActor.java | 75 +++----
.../flink/runtime/rpc/pekko/PekkoRpcService.java | 11 +-
.../flink/runtime/rpc/FencedRpcEndpoint.java | 14 +-
.../org/apache/flink/runtime/rpc/RpcEndpoint.java | 33 +++-
.../org/apache/flink/runtime/rpc/RpcService.java | 7 +-
.../runtime/checkpoint/CheckpointCoordinator.java | 47 ++---
.../ChannelStateWriteRequestExecutorFactory.java | 3 +-
.../ChannelStateWriteRequestExecutorImpl.java | 60 +++---
.../flink/runtime/dispatcher/Dispatcher.java | 41 ++--
.../JobMasterServiceLeadershipRunnerFactory.java | 4 +-
.../executiongraph/DefaultExecutionGraph.java | 10 +-
.../apache/flink/runtime/jobmaster/JobMaster.java | 21 +-
.../runtime/resourcemanager/ResourceManager.java | 48 +++--
.../flink/runtime/taskexecutor/TaskExecutor.java | 183 +++++++++--------
.../org/apache/flink/runtime/taskmanager/Task.java | 28 ++-
.../ChannelStateWriteRequestExecutorImplTest.java | 29 +--
.../flink/runtime/rpc/TestingRpcService.java | 7 +-
.../flink/streaming/runtime/tasks/StreamTask.java | 24 ++-
.../testutils/logging/LoggerAuditingExtension.java | 21 +-
flink-tests/pom.xml | 2 +-
.../OperatorEventSendingCheckpointITCase.java | 6 +-
.../apache/flink/test/misc/JobIDLoggingITCase.java | 220 +++++++++++++++++++++
.../src/test/resources/log4j2-test.properties | 2 +-
31 files changed, 1148 insertions(+), 258 deletions(-)
diff --git a/docs/content.zh/docs/deployment/advanced/logging.md
b/docs/content.zh/docs/deployment/advanced/logging.md
index abb4b1025f0..432336946de 100644
--- a/docs/content.zh/docs/deployment/advanced/logging.md
+++ b/docs/content.zh/docs/deployment/advanced/logging.md
@@ -40,6 +40,21 @@ Flink 中的日志记录是使用 [SLF4J](http://www.slf4j.org/) 日志接口实
<a name="configuring-log4j-2"></a>
+### Structured logging
+
+Flink adds the following fields to
[MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log
messages (experimental feature):
+- Job ID
+ - key: `flink-job-id`
+ - format: string
+ - length 32
+
+This is most useful in environments with structured logging and allows you to
quickly filter the relevant logs.
+
+The MDC is propagated by slf4j to the logging backend which usually adds it to
the log records automatically (e.g. in [log4j json
layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)).
+Alternatively, it can be configured explicitly - [log4j pattern
layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html)
might look like this:
+
+`[%-32X{flink-job-id}] %c{0} %m%n`.
+
## 配置 Log4j 2
Log4j 2 是通过 property 配置文件进行配置的。
diff --git a/docs/content/docs/deployment/advanced/logging.md
b/docs/content/docs/deployment/advanced/logging.md
index 6c01e1ddff1..cc2d0201e17 100644
--- a/docs/content/docs/deployment/advanced/logging.md
+++ b/docs/content/docs/deployment/advanced/logging.md
@@ -38,6 +38,20 @@ This allows you to use any logging framework that supports
SLF4J, without having
By default, [Log4j 2](https://logging.apache.org/log4j/2.x/index.html) is used
as the underlying logging framework.
+### Structured logging
+
+Flink adds the following fields to
[MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log
messages (experimental feature):
+- Job ID
+ - key: `flink-job-id`
+ - format: string
+ - length 32
+
+This is most useful in environments with structured logging and allows you to
quickly filter the relevant logs.
+
+The MDC is propagated by slf4j to the logging backend which usually adds it to
the log records automatically (e.g. in [log4j json
layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)).
+Alternatively, it can be configured explicitly - [log4j pattern
layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html)
might look like this:
+
+`[%-32X{flink-job-id}] %c{0} %m%n`.
## Configuring Log4j 2
diff --git
a/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java
new file mode 100644
index 00000000000..a6f9c662dab
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class MdcAwareExecutor<T extends Executor> implements Executor {
+ protected final Map<String, String> contextData;
+ protected final T delegate;
+
+ protected MdcAwareExecutor(T delegate, Map<String, String> contextData) {
+ this.delegate = checkNotNull(delegate);
+ this.contextData =
Collections.unmodifiableMap(checkNotNull(contextData));
+ }
+
+ public void execute(Runnable command) {
+ delegate.execute(MdcUtils.wrapRunnable(contextData, command));
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java
new file mode 100644
index 00000000000..693a247481b
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareExecutorService.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.MdcUtils.wrapCallable;
+import static org.apache.flink.util.MdcUtils.wrapRunnable;
+
+class MdcAwareExecutorService<S extends ExecutorService> extends
MdcAwareExecutor<S>
+ implements ExecutorService {
+
+ public MdcAwareExecutorService(S delegate, Map<String, String>
contextData) {
+ super(delegate, contextData);
+ }
+
+ @Override
+ public void shutdown() {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return delegate.submit(wrapCallable(contextData, task));
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return delegate.submit(wrapRunnable(contextData, task), result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return delegate.submit(wrapRunnable(contextData, task));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks)
+ throws InterruptedException {
+ return delegate.invokeAll(wrapCallables(tasks));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit
unit)
+ throws InterruptedException {
+ return delegate.invokeAll(wrapCallables(tasks), timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ return delegate.invokeAny(wrapCallables(tasks));
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.invokeAny(wrapCallables(tasks), timeout, unit);
+ }
+
+ private <T> List<Callable<T>> wrapCallables(Collection<? extends
Callable<T>> tasks) {
+ List<Callable<T>> list = new ArrayList<>(tasks.size());
+ for (Callable<T> task : tasks) {
+ list.add(wrapCallable(contextData, task));
+ }
+ return list;
+ }
+}
diff --git
a/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java
new file mode 100644
index 00000000000..1fa71dd659f
--- /dev/null
+++
b/flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.MdcUtils.wrapCallable;
+import static org.apache.flink.util.MdcUtils.wrapRunnable;
+
+class MdcAwareScheduledExecutorService extends
MdcAwareExecutorService<ScheduledExecutorService>
+ implements ScheduledExecutorService {
+
+ public MdcAwareScheduledExecutorService(
+ ScheduledExecutorService delegate, Map<String, String>
contextData) {
+ super(delegate, contextData);
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
+ return delegate.schedule(wrapRunnable(contextData, command), delay,
unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
+ return delegate.schedule(wrapCallable(contextData, callable), delay,
unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return delegate.scheduleAtFixedRate(
+ wrapRunnable(contextData, command), initialDelay, period,
unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return delegate.scheduleWithFixedDelay(
+ wrapRunnable(contextData, command), initialDelay, delay, unit);
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java
b/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java
new file mode 100644
index 00000000000..c448c9837db
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/MdcUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.api.common.JobID;
+
+import org.slf4j.MDC;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** Utility class to manage common Flink attributes in {@link MDC} (only
{@link JobID} ATM). */
+public class MdcUtils {
+
+ public static final String JOB_ID = "flink-job-id";
+
+ /**
+ * Replace MDC contents with the provided one and return a closeable
object that can be used to
+ * restore the original MDC.
+ *
+ * @param context to put into MDC
+ */
+ public static MdcCloseable withContext(Map<String, String> context) {
+ final Map<String, String> orig = MDC.getCopyOfContextMap();
+ MDC.setContextMap(context);
+ return () -> MDC.setContextMap(orig);
+ }
+
+ /** {@link AutoCloseable } that restores the {@link MDC} contents on
close. */
+ public interface MdcCloseable extends AutoCloseable {
+ @Override
+ void close();
+ }
+
+ /**
+ * Wrap the given {@link Runnable} so that the given data is added to
{@link MDC} before its
+ * execution and removed afterward.
+ */
+ public static Runnable wrapRunnable(Map<String, String> contextData,
Runnable command) {
+ return () -> {
+ try (MdcCloseable ctx = withContext(contextData)) {
+ command.run();
+ }
+ };
+ }
+
+ /**
+ * Wrap the given {@link Callable} so that the given data is added to
{@link MDC} before its
+ * execution and removed afterward.
+ */
+ public static <T> Callable<T> wrapCallable(
+ Map<String, String> contextData, Callable<T> command) {
+ return () -> {
+ try (MdcCloseable ctx = withContext(contextData)) {
+ return command.call();
+ }
+ };
+ }
+
+ /**
+ * Wrap the given {@link Executor} so that the given {@link JobID} is
added before it executes
+ * any submitted commands and removed afterward.
+ */
+ public static Executor scopeToJob(JobID jobID, Executor executor) {
+ checkArgument(!(executor instanceof MdcAwareExecutor));
+ return new MdcAwareExecutor<>(executor, asContextData(jobID));
+ }
+
+ /**
+ * Wrap the given {@link ExecutorService} so that the given {@link JobID}
is added before it
+ * executes any submitted commands and removed afterward.
+ */
+ public static ExecutorService scopeToJob(JobID jobID, ExecutorService
delegate) {
+ checkArgument(!(delegate instanceof MdcAwareExecutorService));
+ return new MdcAwareExecutorService<>(delegate, asContextData(jobID));
+ }
+
+ /**
+ * Wrap the given {@link ScheduledExecutorService} so that the given
{@link JobID} is added
+ * before it executes any submitted commands and removed afterward.
+ */
+ public static ScheduledExecutorService scopeToJob(JobID jobID,
ScheduledExecutorService ses) {
+ checkArgument(!(ses instanceof MdcAwareScheduledExecutorService));
+ return new MdcAwareScheduledExecutorService(ses, asContextData(jobID));
+ }
+
+ public static Map<String, String> asContextData(JobID jobID) {
+ return Collections.singletonMap(JOB_ID, jobID.toHexString());
+ }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java
b/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java
new file mode 100644
index 00000000000..35917d28c72
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/util/MdcUtilsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
+import org.apache.flink.util.concurrent.Executors;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.apache.logging.log4j.core.LogEvent;
+import org.assertj.core.api.AbstractObjectAssert;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.MdcUtils.asContextData;
+import static org.apache.flink.util.MdcUtils.wrapCallable;
+import static org.apache.flink.util.MdcUtils.wrapRunnable;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.slf4j.event.Level.DEBUG;
+
+/** Tests for the {@link MdcUtils}. */
+class MdcUtilsTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MdcUtilsTest.class);
+ private static final Runnable LOGGING_RUNNABLE = () ->
LOGGER.info("ignore");
+
+ @RegisterExtension
+ public final LoggerAuditingExtension loggerExtension =
+ new LoggerAuditingExtension(MdcUtilsTest.class, DEBUG);
+
+ @Test
+ public void testJobIDAsContext() {
+ JobID jobID = new JobID();
+ assertThat(MdcUtils.asContextData(jobID))
+ .isEqualTo(Collections.singletonMap("flink-job-id",
jobID.toHexString()));
+ }
+
+ @Test
+ public void testMdcCloseableAddsJobId() throws Exception {
+ assertJobIDLogged(
+ jobID -> {
+ try (MdcCloseable ignored =
MdcUtils.withContext(asContextData(jobID))) {
+ LOGGER.warn("ignore");
+ }
+ });
+ }
+
+ @Test
+ public void testMdcCloseableRemovesJobId() {
+ JobID jobID = new JobID();
+ try (MdcCloseable ignored =
MdcUtils.withContext(asContextData(jobID))) {
+ // ...
+ }
+ LOGGER.warn("with-job");
+ assertJobIdLogged(null);
+ }
+
+ @Test
+ public void testWrapRunnable() throws Exception {
+ assertJobIDLogged(jobID -> wrapRunnable(asContextData(jobID),
LOGGING_RUNNABLE).run());
+ }
+
+ @Test
+ public void testWrapCallable() throws Exception {
+ assertJobIDLogged(
+ jobID ->
+ wrapCallable(
+ asContextData(jobID),
+ () -> {
+ LOGGER.info("ignore");
+ return null;
+ })
+ .call());
+ }
+
+ @Test
+ public void testScopeExecutor() throws Exception {
+ assertJobIDLogged(
+ jobID ->
+ MdcUtils.scopeToJob(jobID, Executors.directExecutor())
+ .execute(LOGGING_RUNNABLE));
+ }
+
+ @Test
+ public void testScopeExecutorService() throws Exception {
+ assertJobIDLogged(
+ jobID ->
+ MdcUtils.scopeToJob(jobID,
Executors.newDirectExecutorService())
+ .submit(LOGGING_RUNNABLE)
+ .get());
+ }
+
+ @Test
+ public void testScopeScheduledExecutorService() throws Exception {
+ ScheduledExecutorService ses =
+
java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
+ try {
+ assertJobIDLogged(
+ jobID ->
+ MdcUtils.scopeToJob(jobID, ses)
+ .schedule(LOGGING_RUNNABLE, 1L,
TimeUnit.MILLISECONDS)
+ .get());
+ } finally {
+ ses.shutdownNow();
+ }
+ }
+
+ private void assertJobIDLogged(ThrowingConsumer<JobID, Exception> action)
throws Exception {
+ JobID jobID = new JobID();
+ action.accept(jobID);
+ assertJobIdLogged(jobID);
+ }
+
+ private void assertJobIdLogged(JobID jobId) {
+ AbstractObjectAssert<?, Object> extracting =
+ assertThat(loggerExtension.getEvents())
+ .singleElement()
+ .extracting(LogEvent::getContextData)
+ .extracting(m -> m.getValue("flink-job-id"));
+ if (jobId == null) {
+ extracting.isNull();
+ } else {
+ extracting.isEqualTo(jobId.toHexString());
+ }
+ }
+}
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java
index 860a263f539..b2ebf0a7320 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/FencedPekkoRpcActor.java
@@ -26,6 +26,7 @@ import
org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
import java.io.Serializable;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -47,14 +48,16 @@ public class FencedPekkoRpcActor<
int version,
final long maximumFramesize,
final boolean forceSerialization,
- ClassLoader flinkClassLoader) {
+ ClassLoader flinkClassLoader,
+ final Map<String, String> loggingContext) {
super(
rpcEndpoint,
terminationFuture,
version,
maximumFramesize,
forceSerialization,
- flinkClassLoader);
+ flinkClassLoader,
+ loggingContext);
}
@Override
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
index dc4e342f35a..a9877796867 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcActor.java
@@ -35,6 +35,7 @@ import
org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -52,6 +53,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -102,6 +104,7 @@ class PekkoRpcActor<T extends RpcEndpoint & RpcGateway>
extends AbstractActor {
private final AtomicBoolean rpcEndpointStopped;
private final boolean forceSerialization;
+ private final Map<String, String> loggingContext;
private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult;
@@ -113,7 +116,9 @@ class PekkoRpcActor<T extends RpcEndpoint & RpcGateway>
extends AbstractActor {
final int version,
final long maximumFramesize,
final boolean forceSerialization,
- final ClassLoader flinkClassLoader) {
+ final ClassLoader flinkClassLoader,
+ final Map<String, String> loggingContext) {
+ this.loggingContext = loggingContext;
checkArgument(maximumFramesize > 0, "Maximum framesize must be
positive.");
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
@@ -161,30 +166,32 @@ class PekkoRpcActor<T extends RpcEndpoint & RpcGateway>
extends AbstractActor {
}
private void handleMessage(final Object message) {
- if (state.isRunning()) {
- mainThreadValidator.enterMainThread();
+ try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext))
{
+ if (state.isRunning()) {
+ mainThreadValidator.enterMainThread();
+
+ try {
+ handleRpcMessage(message);
+ } finally {
+ mainThreadValidator.exitMainThread();
+ }
+ } else {
+ log.info(
+ "The rpc endpoint {} has not been started yet.
Discarding message {} until processing is started.",
+ rpcEndpoint.getClass().getName(),
+ message);
- try {
- handleRpcMessage(message);
- } finally {
- mainThreadValidator.exitMainThread();
+ sendErrorIfSender(
+ new EndpointNotStartedException(
+ String.format(
+ "Discard message %s, because the rpc
endpoint %s has not been started yet.",
+ message, getSelf().path())));
}
- } else {
- log.info(
- "The rpc endpoint {} has not been started yet. Discarding
message {} until processing is started.",
- rpcEndpoint.getClass().getName(),
- message);
-
- sendErrorIfSender(
- new EndpointNotStartedException(
- String.format(
- "Discard message %s, because the rpc
endpoint %s has not been started yet.",
- message, getSelf().path())));
}
}
private void handleControlMessage(ControlMessages controlMessage) {
- try {
+ try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext))
{
switch (controlMessage) {
case START:
state = state.start(this, flinkClassLoader);
@@ -237,20 +244,22 @@ class PekkoRpcActor<T extends RpcEndpoint & RpcGateway>
extends AbstractActor {
}
private void handleHandshakeMessage(RemoteHandshakeMessage
handshakeMessage) {
- if (!isCompatibleVersion(handshakeMessage.getVersion())) {
- sendErrorIfSender(
- new HandshakeException(
- String.format(
- "Version mismatch between source (%s) and
target (%s) rpc component. Please verify that all components have the same
version.",
- handshakeMessage.getVersion(),
getVersion())));
- } else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
- sendErrorIfSender(
- new HandshakeException(
- String.format(
- "The rpc endpoint does not support the
gateway %s.",
-
handshakeMessage.getRpcGateway().getSimpleName())));
- } else {
- getSender().tell(new
Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
+ try (MdcUtils.MdcCloseable ctx = MdcUtils.withContext(loggingContext))
{
+ if (!isCompatibleVersion(handshakeMessage.getVersion())) {
+ sendErrorIfSender(
+ new HandshakeException(
+ String.format(
+ "Version mismatch between source (%s)
and target (%s) rpc component. Please verify that all components have the same
version.",
+ handshakeMessage.getVersion(),
getVersion())));
+ } else if (!isGatewaySupported(handshakeMessage.getRpcGateway())) {
+ sendErrorIfSender(
+ new HandshakeException(
+ String.format(
+ "The rpc endpoint does not support the
gateway %s.",
+
handshakeMessage.getRpcGateway().getSimpleName())));
+ } else {
+ getSender().tell(new
Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
+ }
}
}
diff --git
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java
index c9d276685ed..ab8bee41822 100644
---
a/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java
+++
b/flink-rpc/flink-rpc-akka/src/main/java/org/apache/flink/runtime/rpc/pekko/PekkoRpcService.java
@@ -260,10 +260,12 @@ public class PekkoRpcService implements RpcService {
}
@Override
- public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C
rpcEndpoint) {
+ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
+ C rpcEndpoint, Map<String, String> loggingContext) {
checkNotNull(rpcEndpoint, "rpc endpoint");
- final SupervisorActor.ActorRegistration actorRegistration =
registerRpcActor(rpcEndpoint);
+ final SupervisorActor.ActorRegistration actorRegistration =
+ registerRpcActor(rpcEndpoint, loggingContext);
final ActorRef actorRef = actorRegistration.getActorRef();
final CompletableFuture<Void> actorTerminationFuture =
actorRegistration.getTerminationFuture();
@@ -336,7 +338,7 @@ public class PekkoRpcService implements RpcService {
}
private <C extends RpcEndpoint & RpcGateway>
SupervisorActor.ActorRegistration registerRpcActor(
- C rpcEndpoint) {
+ C rpcEndpoint, Map<String, String> loggingContext) {
final Class<? extends AbstractActor> rpcActorType;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
@@ -359,7 +361,8 @@ public class PekkoRpcService implements RpcService {
getVersion(),
configuration.getMaximumFramesize(),
configuration.isForceRpcInvocationSerialization(),
- flinkClassLoader),
+ flinkClassLoader,
+ loggingContext),
rpcEndpoint.getEndpointId());
final SupervisorActor.ActorRegistration actorRegistration =
diff --git
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
index e6ed59f737d..a6fbd2792a4 100644
---
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
+++
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/FencedRpcEndpoint.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rpc;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
/**
@@ -34,8 +36,12 @@ public abstract class FencedRpcEndpoint<F extends
Serializable> extends RpcEndpo
private final F fencingToken;
- protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F
fencingToken) {
- super(rpcService, endpointId);
+ protected FencedRpcEndpoint(
+ RpcService rpcService,
+ String endpointId,
+ F fencingToken,
+ Map<String, String> loggingContext) {
+ super(rpcService, endpointId, loggingContext);
Preconditions.checkNotNull(fencingToken, "The fence token should be
null");
Preconditions.checkNotNull(rpcServer, "The rpc server should be null");
@@ -43,6 +49,10 @@ public abstract class FencedRpcEndpoint<F extends
Serializable> extends RpcEndpo
this.fencingToken = fencingToken;
}
+ protected FencedRpcEndpoint(RpcService rpcService, String endpointId, F
fencingToken) {
+ this(rpcService, endpointId, fencingToken, Collections.emptyMap());
+ }
+
protected FencedRpcEndpoint(RpcService rpcService, F fencingToken) {
this(rpcService, UUID.randomUUID().toString(), fencingToken);
}
diff --git
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 47b64d1200e..b1fda4a0443 100644
---
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -19,10 +19,12 @@
package org.apache.flink.runtime.rpc;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -34,9 +36,12 @@ import javax.annotation.Nonnull;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
@@ -134,11 +139,12 @@ public abstract class RpcEndpoint implements RpcGateway,
AutoCloseableAsync {
* @param rpcService The RPC server that dispatches calls to this RPC
endpoint.
* @param endpointId Unique identifier for this endpoint
*/
- protected RpcEndpoint(final RpcService rpcService, final String
endpointId) {
+ protected RpcEndpoint(
+ RpcService rpcService, String endpointId, Map<String, String>
loggingContext) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
- this.rpcServer = rpcService.startServer(this);
+ this.rpcServer = rpcService.startServer(this, loggingContext);
this.resourceRegistry = new CloseableRegistry();
this.mainThreadExecutor =
@@ -146,6 +152,16 @@ public abstract class RpcEndpoint implements RpcGateway,
AutoCloseableAsync {
registerResource(this.mainThreadExecutor);
}
+ /**
+ * Initializes the RPC endpoint.
+ *
+ * @param rpcService The RPC server that dispatches calls to this RPC
endpoint.
+ * @param endpointId Unique identifier for this endpoint
+ */
+ protected RpcEndpoint(final RpcService rpcService, final String
endpointId) {
+ this(rpcService, endpointId, Collections.emptyMap());
+ }
+
/**
* Initializes the RPC endpoint with a random endpoint id.
*
@@ -342,6 +358,19 @@ public abstract class RpcEndpoint implements RpcGateway,
AutoCloseableAsync {
return mainThreadExecutor;
}
+ /**
+ * Gets the main thread execution context. The main thread execution
context can be used to
+ * execute tasks in the main thread of the underlying RPC endpoint.
+ *
+ * @param jobID the {@link JobID} to scope the returned {@link
ComponentMainThreadExecutor} to,
+ * i.e. add/remove before/after the invocations using the returned
executor
+ * @return Main thread execution context
+ */
+ protected Executor getMainThreadExecutor(JobID jobID) {
+ // todo: consider caching
+ return MdcUtils.scopeToJob(jobID, getMainThreadExecutor());
+ }
+
/**
* Gets the endpoint's RPC service.
*
diff --git
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 9d60fd00534..788f2cb93e5 100644
---
a/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++
b/flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import java.io.Serializable;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
@@ -93,11 +94,13 @@ public interface RpcService extends AutoCloseableAsync {
/**
* Start a rpc server which forwards the remote procedure calls to the
provided rpc endpoint.
*
- * @param rpcEndpoint Rpc protocol to dispatch the rpcs to
* @param <C> Type of the rpc endpoint
+ * @param rpcEndpoint Rpc protocol to dispatch the rpcs to
+ * @param loggingContext
* @return Self gateway to dispatch remote procedure calls to oneself
*/
- <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint);
+ <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
+ C rpcEndpoint, Map<String, String> loggingContext);
/**
* Stop the underlying rpc server of the provided self gateway.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d74f1062767..25afade0239 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -47,6 +47,7 @@ import
org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.clock.Clock;
@@ -795,29 +796,31 @@ public class CheckpointCoordinator {
triggerTasks(request, timestamp, checkpoint)
.exceptionally(
failure -> {
- LOG.info(
- "Triggering Checkpoint {} for job {}
failed due to {}",
- checkpoint.getCheckpointID(),
- job,
- failure);
-
- final CheckpointException cause;
- if (failure instanceof CheckpointException) {
- cause = (CheckpointException) failure;
- } else {
- cause =
- new CheckpointException(
- CheckpointFailureReason
-
.TRIGGER_CHECKPOINT_FAILURE,
- failure);
+ try (MdcUtils.MdcCloseable ignored =
+
MdcUtils.withContext(MdcUtils.asContextData(job))) {
+ LOG.info(
+ "Triggering Checkpoint {} for job
{} failed due to {}",
+ checkpoint.getCheckpointID(),
+ job,
+ failure);
+ final CheckpointException cause;
+ if (failure instanceof
CheckpointException) {
+ cause = (CheckpointException) failure;
+ } else {
+ cause =
+ new CheckpointException(
+ CheckpointFailureReason
+
.TRIGGER_CHECKPOINT_FAILURE,
+ failure);
+ }
+ timer.execute(
+ () -> {
+ synchronized (lock) {
+
abortPendingCheckpoint(checkpoint, cause);
+ }
+ });
+ return null;
}
- timer.execute(
- () -> {
- synchronized (lock) {
-
abortPendingCheckpoint(checkpoint, cause);
- }
- });
- return null;
});
// It is possible that the tasks has finished
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java
index 8854c0976d1..a05e47a1916 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java
@@ -70,7 +70,8 @@ public class ChannelStateWriteRequestExecutorFactory {
checkState(this.executor == executor);
this.executor = null;
},
- lock);
+ lock,
+ jobID);
if (startExecutor) {
executor.start();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
index b20388e1be0..25fa06c9f2d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.checkpoint.channel;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
@@ -91,17 +93,21 @@ class ChannelStateWriteRequestExecutorImpl implements
ChannelStateWriteRequestEx
@GuardedBy("registerLock")
private final Consumer<ChannelStateWriteRequestExecutor> onRegistered;
+ private final JobID jobID;
+
ChannelStateWriteRequestExecutorImpl(
ChannelStateWriteRequestDispatcher dispatcher,
int maxSubtasksPerChannelStateFile,
Consumer<ChannelStateWriteRequestExecutor> onRegistered,
- Object registerLock) {
+ Object registerLock,
+ JobID jobID) {
this(
dispatcher,
new ArrayDeque<>(),
maxSubtasksPerChannelStateFile,
registerLock,
- onRegistered);
+ onRegistered,
+ jobID);
}
ChannelStateWriteRequestExecutorImpl(
@@ -109,44 +115,48 @@ class ChannelStateWriteRequestExecutorImpl implements
ChannelStateWriteRequestEx
Deque<ChannelStateWriteRequest> deque,
int maxSubtasksPerChannelStateFile,
Object registerLock,
- Consumer<ChannelStateWriteRequestExecutor> onRegistered) {
+ Consumer<ChannelStateWriteRequestExecutor> onRegistered,
+ JobID jobID) {
this.dispatcher = dispatcher;
this.deque = deque;
this.maxSubtasksPerChannelStateFile = maxSubtasksPerChannelStateFile;
this.registerLock = registerLock;
this.onRegistered = onRegistered;
- this.thread = new Thread(this::run, "Channel state writer ");
+ this.thread = new Thread(this::run, "Channel state writer");
this.subtasks = new HashSet<>();
this.thread.setDaemon(true);
+ this.jobID = jobID;
}
@VisibleForTesting
void run() {
- try {
- FileSystemSafetyNet.initializeSafetyNetForThread();
- loop();
- } catch (Exception ex) {
- thrown = ex;
- } finally {
+ try (MdcUtils.MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
try {
- closeAll(
- this::cleanupRequests,
- () -> {
- Throwable cause;
- synchronized (lock) {
- cause = thrown == null ? new
CancellationException() : thrown;
- }
- dispatcher.fail(cause);
- });
- } catch (Exception e) {
- synchronized (lock) {
- //noinspection NonAtomicOperationOnVolatileField
- thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+ FileSystemSafetyNet.initializeSafetyNetForThread();
+ loop();
+ } catch (Exception ex) {
+ thrown = ex;
+ } finally {
+ try {
+ closeAll(
+ this::cleanupRequests,
+ () -> {
+ Throwable cause;
+ synchronized (lock) {
+ cause = thrown == null ? new
CancellationException() : thrown;
+ }
+ dispatcher.fail(cause);
+ });
+ } catch (Exception e) {
+ synchronized (lock) {
+ //noinspection NonAtomicOperationOnVolatileField
+ thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+ }
}
+
FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
- FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
+ LOG.debug("loop terminated");
}
- LOG.debug("loop terminated");
}
private void loop() throws Exception {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index fed987e9819..5b89203095e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -96,6 +96,8 @@ import
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -403,7 +405,8 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
initJobClientExpiredTime(recoveredJob);
- try {
+ try (MdcCloseable ignored =
+
MdcUtils.withContext(MdcUtils.asContextData(recoveredJob.getJobID()))) {
runJob(createJobMasterRunner(recoveredJob),
ExecutionType.RECOVERY);
} catch (Throwable throwable) {
onFatalError(
@@ -431,7 +434,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
.getScheduledExecutor()
.scheduleWithFixedDelay(
() ->
- getMainThreadExecutor()
+ getMainThreadExecutor(jobID)
.execute(this::checkJobClientAliveness),
0L,
jobClientAlivenessCheckInterval,
@@ -513,7 +516,9 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time
timeout) {
final JobID jobID = jobGraph.getJobID();
- log.info("Received JobGraph submission '{}' ({}).",
jobGraph.getName(), jobID);
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
+ log.info("Received JobGraph submission '{}' ({}).",
jobGraph.getName(), jobID);
+ }
return isInGloballyTerminalState(jobID)
.thenComposeAsync(
isTerminated -> {
@@ -547,7 +552,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
return internalSubmitJob(jobGraph);
}
},
- getMainThreadExecutor());
+ getMainThreadExecutor(jobID));
}
@Override
@@ -636,7 +641,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
new JobSubmissionException(
jobId, "Failed to submit
job.", strippedThrowable));
},
- getMainThreadExecutor());
+ getMainThreadExecutor(jobId));
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
@@ -668,7 +673,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
dirtyJobResult,
highAvailabilityServices.getCheckpointRecoveryFactory(),
configuration,
- ioExecutor);
+ getIoExecutor(dirtyJobResult.getJobId()));
}
private void runJob(JobManagerRunner jobManagerRunner, ExecutionType
executionType)
@@ -698,7 +703,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
jobId,
JobStatus.FAILED, throwable));
}
},
- getMainThreadExecutor())
+ getMainThreadExecutor(jobId))
.thenCompose(Function.identity());
final CompletableFuture<Void> jobTerminationFuture =
@@ -1185,7 +1190,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
e));
}
},
- ioExecutor)
+ getIoExecutor(jobId))
.thenComposeAsync(
ignored ->
performOperationOnJobMasterGateway(
@@ -1193,7 +1198,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
jobMasterGateway ->
jobMasterGateway.updateJobResourceRequirements(
jobResourceRequirements)),
- getMainThreadExecutor())
+ getMainThreadExecutor(jobId))
.whenComplete(
(ack, error) -> {
if (error != null) {
@@ -1254,7 +1259,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
jobManagerRunnerTerminationFutures.put(jobId,
terminationFuture);
}
},
- getMainThreadExecutor());
+ getMainThreadExecutor(jobId));
}
private CompletableFuture<Void> removeJob(JobID jobId, CleanupJobState
cleanupJobState) {
@@ -1280,7 +1285,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
() ->
runPostJobGloballyTerminated(
jobId,
cleanupJobState.getJobStatus()),
- getMainThreadExecutor());
+ getMainThreadExecutor(jobId));
} else {
return localResourceCleaner.cleanupAsync(jobId);
}
@@ -1400,7 +1405,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
}
return
CleanupJobState.globalCleanup(terminalJobStatus);
},
- getMainThreadExecutor());
+ getMainThreadExecutor(jobId));
}
/**
@@ -1474,7 +1479,8 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
}
return Acknowledge.get();
},
- getMainThreadExecutor());
+ getMainThreadExecutor(
+
executionGraphInfo.getArchivedExecutionGraph().getJobID()));
}
private void jobMasterFailed(JobID jobId, Throwable cause) {
@@ -1562,7 +1568,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
return FutureUtils.thenAcceptAsyncIfNotDone(
jobManagerTerminationFuture,
- getMainThreadExecutor(),
+ getMainThreadExecutor(jobId),
FunctionUtils.uncheckedConsumer(
(ignored) -> {
jobManagerRunnerTerminationFutures.remove(jobId);
@@ -1586,7 +1592,7 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
}
public CompletableFuture<Void> onRemovedJobGraph(JobID jobId) {
- return CompletableFuture.runAsync(() -> terminateJob(jobId),
getMainThreadExecutor());
+ return CompletableFuture.runAsync(() -> terminateJob(jobId),
getMainThreadExecutor(jobId));
}
private void applyParallelismOverrides(JobGraph jobGraph) {
@@ -1607,4 +1613,9 @@ public abstract class Dispatcher extends
FencedRpcEndpoint<DispatcherId>
}
}
}
+
+ private Executor getIoExecutor(JobID jobID) {
+ // todo: consider caching
+ return MdcUtils.scopeToJob(jobID, ioExecutor);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
index ecd67e1f3d4..bf7c69fb077 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java
@@ -39,6 +39,7 @@ import
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFact
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
@@ -98,7 +99,8 @@ public enum JobMasterServiceLeadershipRunnerFactory
implements JobManagerRunnerF
final DefaultJobMasterServiceFactory jobMasterServiceFactory =
new DefaultJobMasterServiceFactory(
- jobManagerServices.getIoExecutor(),
+ MdcUtils.scopeToJob(
+ jobGraph.getJobID(),
jobManagerServices.getIoExecutor()),
rpcService,
jobMasterConfiguration,
jobGraph,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 168a7436b28..1a853d851b6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -85,6 +85,7 @@ import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
@@ -484,9 +485,12 @@ public class DefaultExecutionGraph implements
ExecutionGraph, InternalExecutionG
checkState(checkpointCoordinatorTimer == null);
checkpointCoordinatorTimer =
- Executors.newSingleThreadScheduledExecutor(
- new DispatcherThreadFactory(
- Thread.currentThread().getThreadGroup(),
"Checkpoint Timer"));
+ MdcUtils.scopeToJob(
+ getJobID(),
+ Executors.newSingleThreadScheduledExecutor(
+ new DispatcherThreadFactory(
+
Thread.currentThread().getThreadGroup(),
+ "Checkpoint Timer")));
// create the coordinator that triggers and commits checkpoints and
holds the state
checkpointCoordinator =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6edaa00ef07..3b288089a46 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -104,6 +104,7 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -250,7 +251,11 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
long initializationTimestamp)
throws Exception {
- super(rpcService, RpcServiceUtils.createRandomName(JOB_MANAGER_NAME),
jobMasterId);
+ super(
+ rpcService,
+ RpcServiceUtils.createRandomName(JOB_MANAGER_NAME),
+ jobMasterId,
+ MdcUtils.asContextData(jobGraph.getJobID()));
final ExecutionDeploymentReconciliationHandler
executionStateReconciliationHandler =
new ExecutionDeploymentReconciliationHandler() {
@@ -291,6 +296,10 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
}
}
};
+ final String jobName = jobGraph.getName();
+ final JobID jid = jobGraph.getJobID();
+
+ log.info("Initializing job '{}' ({}).", jobName, jid);
this.executionDeploymentTracker = executionDeploymentTracker;
this.executionDeploymentReconciler =
@@ -302,8 +311,9 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.blobWriter = jobManagerSharedServices.getBlobWriter();
- this.futureExecutor = jobManagerSharedServices.getFutureExecutor();
- this.ioExecutor = jobManagerSharedServices.getIoExecutor();
+ this.futureExecutor =
+ MdcUtils.scopeToJob(jid,
jobManagerSharedServices.getFutureExecutor());
+ this.ioExecutor = MdcUtils.scopeToJob(jid,
jobManagerSharedServices.getIoExecutor());
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.userCodeLoader = checkNotNull(userCodeLoader);
@@ -313,11 +323,6 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
.getConfiguration()
.get(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);
- final String jobName = jobGraph.getName();
- final JobID jid = jobGraph.getJobID();
-
- log.info("Initializing job '{}' ({}).", jobName, jid);
-
resourceManagerLeaderRetriever =
highAvailabilityServices.getResourceManagerLeaderRetriever();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 45aa27eca69..422bbecb30e 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -85,6 +85,8 @@ import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkExpectedException;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
import org.apache.flink.util.concurrent.FutureUtils;
import javax.annotation.Nullable;
@@ -435,7 +437,7 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
new FlinkException(declineMessage));
}
},
- getMainThreadExecutor());
+ getMainThreadExecutor(jobId));
// handle exceptions which might have occurred in one of the futures
inputs of combine
return registrationResponseFuture.handleAsync(
@@ -572,30 +574,34 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
public CompletableFuture<Acknowledge> declareRequiredResources(
JobMasterId jobMasterId, ResourceRequirements
resourceRequirements, Time timeout) {
final JobID jobId = resourceRequirements.getJobId();
- final JobManagerRegistration jobManagerRegistration =
jobManagerRegistrations.get(jobId);
-
- if (null != jobManagerRegistration) {
- if (Objects.equals(jobMasterId,
jobManagerRegistration.getJobMasterId())) {
- return getReadyToServeFuture()
- .thenApply(
- acknowledge -> {
- validateRunsInMainThread();
-
slotManager.processResourceRequirements(resourceRequirements);
- return null;
- });
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+ final JobManagerRegistration jobManagerRegistration =
+ jobManagerRegistrations.get(jobId);
+
+ if (null != jobManagerRegistration) {
+ if (Objects.equals(jobMasterId,
jobManagerRegistration.getJobMasterId())) {
+ return getReadyToServeFuture()
+ .thenApply(
+ acknowledge -> {
+ validateRunsInMainThread();
+
slotManager.processResourceRequirements(
+ resourceRequirements);
+ return null;
+ });
+ } else {
+ return FutureUtils.completedExceptionally(
+ new ResourceManagerException(
+ "The job leader's id "
+ +
jobManagerRegistration.getJobMasterId()
+ + " does not match the received id
"
+ + jobMasterId
+ + '.'));
+ }
} else {
return FutureUtils.completedExceptionally(
new ResourceManagerException(
- "The job leader's id "
- +
jobManagerRegistration.getJobMasterId()
- + " does not match the received id "
- + jobMasterId
- + '.'));
+ "Could not find registered job manager for job
" + jobId + '.'));
}
- } else {
- return FutureUtils.completedExceptionally(
- new ResourceManagerException(
- "Could not find registered job manager for job " +
jobId + '.'));
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e75714425a2..8be2e04d24a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -142,6 +142,8 @@ import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkExpectedException;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
import org.apache.flink.util.OptionalConsumer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
@@ -640,8 +642,10 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time
timeout) {
- try {
- final JobID jobId = tdd.getJobId();
+ final JobID jobId = tdd.getJobId();
+ // todo: consider adding task info
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+
final ExecutionAttemptID executionAttemptID =
tdd.getExecutionAttemptId();
final JobTable.Connection jobManagerConnection =
@@ -817,7 +821,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
taskManagerConfiguration,
taskMetricGroup,
partitionStateChecker,
- getRpcService().getScheduledExecutor(),
+ MdcUtils.scopeToJob(jobId,
getRpcService().getScheduledExecutor()),
channelStateExecutorFactoryManager.getOrCreateExecutorFactory(jobId));
taskMetricGroup.gauge(MetricNames.IS_BACK_PRESSURED,
task::isBackPressured);
@@ -905,13 +909,17 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {
- try {
- task.cancelExecution();
- return CompletableFuture.completedFuture(Acknowledge.get());
- } catch (Throwable t) {
- return FutureUtils.completedExceptionally(
- new TaskException(
- "Cannot cancel task for execution " +
executionAttemptID + '.', t));
+ try (MdcCloseable ignored =
+
MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) {
+ try {
+ task.cancelExecution();
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ } catch (Throwable t) {
+ return FutureUtils.completedExceptionally(
+ new TaskException(
+ "Cannot cancel task for execution " +
executionAttemptID + '.',
+ t));
+ }
}
} else {
final String message =
@@ -1039,18 +1047,19 @@ public class TaskExecutor extends RpcEndpoint
implements TaskExecutorGateway {
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
- log.debug(
- "Trigger checkpoint {}@{} for {}.",
- checkpointId,
- checkpointTimestamp,
- executionAttemptID);
-
final Task task = taskSlotTable.getTask(executionAttemptID);
-
if (task != null) {
- task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp,
checkpointOptions);
+ try (MdcCloseable ignored =
+
MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) {
+ log.debug(
+ "Trigger checkpoint {}@{} for {}.",
+ checkpointId,
+ checkpointTimestamp,
+ executionAttemptID);
+ task.triggerCheckpointBarrier(checkpointId,
checkpointTimestamp, checkpointOptions);
- return CompletableFuture.completedFuture(Acknowledge.get());
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
} else {
final String message =
"TaskManager received a checkpoint request for unknown
task "
@@ -1070,20 +1079,21 @@ public class TaskExecutor extends RpcEndpoint
implements TaskExecutorGateway {
long completedCheckpointId,
long completedCheckpointTimestamp,
long lastSubsumedCheckpointId) {
- log.debug(
- "Confirm completed checkpoint {}@{} and last subsumed
checkpoint {} for {}.",
- completedCheckpointId,
- completedCheckpointTimestamp,
- lastSubsumedCheckpointId,
- executionAttemptID);
-
final Task task = taskSlotTable.getTask(executionAttemptID);
-
if (task != null) {
- task.notifyCheckpointComplete(completedCheckpointId);
-
- task.notifyCheckpointSubsumed(lastSubsumedCheckpointId);
- return CompletableFuture.completedFuture(Acknowledge.get());
+ try (MdcCloseable ignored =
+
MdcUtils.withContext(MdcUtils.asContextData(task.getJobID()))) {
+ log.debug(
+ "Confirm completed checkpoint {}@{} and last subsumed
checkpoint {} for {}.",
+ completedCheckpointId,
+ completedCheckpointTimestamp,
+ lastSubsumedCheckpointId,
+ executionAttemptID);
+ task.notifyCheckpointComplete(completedCheckpointId);
+
+ task.notifyCheckpointSubsumed(lastSubsumedCheckpointId);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ }
} else {
final String message =
"TaskManager received a checkpoint confirmation for
unknown task "
@@ -1146,37 +1156,40 @@ public class TaskExecutor extends RpcEndpoint
implements TaskExecutorGateway {
// TODO: Filter invalid requests from the resource manager by using the
// instance/registration Id
- log.info(
- "Receive slot request {} for job {} from resource manager with
leader id {}.",
- allocationId,
- jobId,
- resourceManagerId);
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+ log.info(
+ "Receive slot request {} for job {} from resource manager
with leader id {}.",
+ allocationId,
+ jobId,
+ resourceManagerId);
- if (!isConnectedToResourceManager(resourceManagerId)) {
- final String message =
- String.format(
- "TaskManager is not connected to the resource
manager %s.",
- resourceManagerId);
- log.debug(message);
- return FutureUtils.completedExceptionally(new
TaskManagerException(message));
- }
+ if (!isConnectedToResourceManager(resourceManagerId)) {
+ final String message =
+ String.format(
+ "TaskManager is not connected to the resource
manager %s.",
+ resourceManagerId);
+ log.debug(message);
+ return FutureUtils.completedExceptionally(new
TaskManagerException(message));
+ }
- tryPersistAllocationSnapshot(
- new SlotAllocationSnapshot(
- slotId, jobId, targetAddress, allocationId,
resourceProfile));
+ tryPersistAllocationSnapshot(
+ new SlotAllocationSnapshot(
+ slotId, jobId, targetAddress, allocationId,
resourceProfile));
- try {
- final boolean isConnected =
- allocateSlotForJob(jobId, slotId, allocationId,
resourceProfile, targetAddress);
+ try {
+ final boolean isConnected =
+ allocateSlotForJob(
+ jobId, slotId, allocationId, resourceProfile,
targetAddress);
- if (isConnected) {
- offerSlotsToJobManager(jobId);
- }
+ if (isConnected) {
+ offerSlotsToJobManager(jobId);
+ }
- return CompletableFuture.completedFuture(Acknowledge.get());
- } catch (SlotAllocationException e) {
- log.debug("Could not allocate slot for allocation id {}.",
allocationId, e);
- return FutureUtils.completedExceptionally(e);
+ return CompletableFuture.completedFuture(Acknowledge.get());
+ } catch (SlotAllocationException e) {
+ log.debug("Could not allocate slot for allocation id {}.",
allocationId, e);
+ return FutureUtils.completedExceptionally(e);
+ }
}
}
@@ -1266,15 +1279,17 @@ public class TaskExecutor extends RpcEndpoint
implements TaskExecutorGateway {
@Override
public void freeInactiveSlots(JobID jobId, Time timeout) {
- log.debug("Freeing inactive slots for job {}.", jobId);
-
- // need a copy to prevent ConcurrentModificationExceptions
- final ImmutableList<TaskSlot<Task>> inactiveSlots =
- ImmutableList.copyOf(taskSlotTable.getAllocatedSlots(jobId));
- for (TaskSlot<Task> slot : inactiveSlots) {
- freeSlotInternal(
- slot.getAllocationId(),
- new FlinkException("Slot was re-claimed by resource
manager."));
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+ log.debug("Freeing inactive slots for job {}.", jobId);
+
+ // need a copy to prevent ConcurrentModificationExceptions
+ final ImmutableList<TaskSlot<Task>> inactiveSlots =
+
ImmutableList.copyOf(taskSlotTable.getAllocatedSlots(jobId));
+ for (TaskSlot<Task> slot : inactiveSlots) {
+ freeSlotInternal(
+ slot.getAllocationId(),
+ new FlinkException("Slot was re-claimed by resource
manager."));
+ }
}
}
@@ -1338,16 +1353,22 @@ public class TaskExecutor extends RpcEndpoint
implements TaskExecutorGateway {
@Override
public void disconnectJobManager(JobID jobId, Exception cause) {
- jobTable.getConnection(jobId)
- .ifPresent(
- jobManagerConnection ->
-
disconnectAndTryReconnectToJobManager(jobManagerConnection, cause));
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
+ jobTable.getConnection(jobId)
+ .ifPresent(
+ jobManagerConnection ->
+ disconnectAndTryReconnectToJobManager(
+ jobManagerConnection, cause));
+ }
}
private void disconnectAndTryReconnectToJobManager(
JobTable.Connection jobManagerConnection, Exception cause) {
- disconnectJobManagerConnection(jobManagerConnection, cause);
- jobLeaderService.reconnect(jobManagerConnection.getJobId());
+ try (MdcCloseable ignored =
+
MdcUtils.withContext(MdcUtils.asContextData(jobManagerConnection.getJobId()))) {
+ disconnectJobManagerConnection(jobManagerConnection, cause);
+ jobLeaderService.reconnect(jobManagerConnection.getJobId());
+ }
}
@Override
@@ -1658,7 +1679,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(
jobId, jobMasterGateway, jobMasterId,
reservedSlots, slotOfferId),
- getMainThreadExecutor());
+ getMainThreadExecutor(jobId));
} else {
log.debug("There are no unassigned slots for the job {}.", jobId);
}
@@ -2061,13 +2082,17 @@ public class TaskExecutor extends RpcEndpoint
implements TaskExecutorGateway {
// only respond to freeing slots when not shutting down to avoid
freeing slot allocation
// information
if (isRunning()) {
- log.debug(
- "Free slot with allocation id {} because: {}",
- allocationId,
- cause.getMessage());
+ final JobID jobId = taskSlotTable.getOwningJob(allocationId);
+ try (MdcCloseable ignored =
+ MdcUtils.withContext(
+ jobId == null
+ ? Collections.emptyMap()
+ : MdcUtils.asContextData(jobId))) {
- try {
- final JobID jobId = taskSlotTable.getOwningJob(allocationId);
+ log.debug(
+ "Free slot with allocation id {} because: {}",
+ allocationId,
+ cause.getMessage());
final int slotIndex = taskSlotTable.freeSlot(allocationId,
cause);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 877279326e9..f23ec1f3a3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -87,6 +87,8 @@ import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.MdcUtils;
+import org.apache.flink.util.MdcUtils.MdcCloseable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TaskManagerExceptionUtils;
@@ -562,7 +564,7 @@ public class Task
/** The core work method that bootstraps the task and executes its code. */
@Override
public void run() {
- try {
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
doRun();
} finally {
terminationFuture.complete(executionState);
@@ -1244,7 +1246,8 @@ public class Task
invokable,
executingThread,
taskNameWithSubtask,
- taskCancellationInterval);
+ taskCancellationInterval,
+ jobId);
Thread interruptingThread =
new Thread(
@@ -1266,7 +1269,8 @@ public class Task
taskInfo,
executingThread,
taskManagerActions,
- taskCancellationTimeout);
+ taskCancellationTimeout,
+ jobId);
Thread watchDogThread =
new Thread(
@@ -1661,7 +1665,7 @@ public class Task
@Override
public void run() {
- try {
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
// the user-defined cancel method may throw errors.
// we need do continue despite that
try {
@@ -1708,23 +1712,27 @@ public class Task
/** The interval in which we interrupt. */
private final long interruptIntervalMillis;
+ private final JobID jobID;
+
TaskInterrupter(
Logger log,
TaskInvokable task,
Thread executorThread,
String taskName,
- long interruptIntervalMillis) {
+ long interruptIntervalMillis,
+ JobID jobID) {
this.log = log;
this.task = task;
this.executorThread = executorThread;
this.taskName = taskName;
this.interruptIntervalMillis = interruptIntervalMillis;
+ this.jobID = jobID;
}
@Override
public void run() {
- try {
+ try (MdcCloseable ignored =
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
// we initially wait for one interval
// in most cases, the threads go away immediately (by the
cancellation thread)
// and we need not actually do anything
@@ -1765,11 +1773,14 @@ public class Task
private final TaskInfo taskInfo;
+ private final JobID jobID;
+
TaskCancelerWatchDog(
TaskInfo taskInfo,
Thread executorThread,
TaskManagerActions taskManager,
- long timeoutMillis) {
+ long timeoutMillis,
+ JobID jobID) {
checkArgument(timeoutMillis > 0);
@@ -1777,11 +1788,12 @@ public class Task
this.executorThread = executorThread;
this.taskManager = taskManager;
this.timeoutMillis = timeoutMillis;
+ this.jobID = jobID;
}
@Override
public void run() {
- try {
+ try (MdcCloseable ign =
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
Deadline timeout =
Deadline.fromNow(Duration.ofMillis(timeoutMillis));
while (executorThread.isAlive() && timeout.hasTimeLeft()) {
try {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
index ea31130b3fc..8f9f073154c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java
@@ -87,7 +87,7 @@ class ChannelStateWriteRequestExecutorImplTest {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
new ChannelStateWriteRequestExecutorImpl(
- NO_OP, closingDeque, 5, registerLock, e -> {});
+ NO_OP, closingDeque, 5, registerLock, e -> {}, new
JobID());
closingDeque.setWorker(worker);
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
@@ -109,7 +109,7 @@ class ChannelStateWriteRequestExecutorImplTest {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl executor =
new ChannelStateWriteRequestExecutorImpl(
- NO_OP, deque, 5, registerLock, e -> {});
+ NO_OP, deque, 5, registerLock, e -> {}, new
JobID());
synchronized (registerLock) {
executor.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
}
@@ -134,7 +134,7 @@ class ChannelStateWriteRequestExecutorImplTest {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
new ChannelStateWriteRequestExecutorImpl(
- requestProcessor, deque, 5, registerLock, e -> {});
+ requestProcessor, deque, 5, registerLock, e -> {}, new
JobID());
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
}
@@ -153,7 +153,7 @@ class ChannelStateWriteRequestExecutorImplTest {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
new ChannelStateWriteRequestExecutorImpl(
- requestProcessor, deque, 5, registerLock, e -> {});
+ requestProcessor, deque, 5, registerLock, e -> {}, new
JobID());
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
}
@@ -180,7 +180,8 @@ class ChannelStateWriteRequestExecutorImplTest {
new ChannelStateSerializerImpl());
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
- new ChannelStateWriteRequestExecutorImpl(processor, 5, e ->
{}, registerLock);
+ new ChannelStateWriteRequestExecutorImpl(
+ processor, 5, e -> {}, registerLock, new JobID());
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
}
@@ -273,7 +274,7 @@ class ChannelStateWriteRequestExecutorImplTest {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
new ChannelStateWriteRequestExecutorImpl(
- throwingRequestProcessor, 5, e -> {}, registerLock);
+ throwingRequestProcessor, 5, e -> {}, registerLock,
new JobID());
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, subtaskIndex0);
worker.registerSubtask(JOB_VERTEX_ID, subtaskIndex1);
@@ -319,7 +320,7 @@ class ChannelStateWriteRequestExecutorImplTest {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
new ChannelStateWriteRequestExecutorImpl(
- throwingRequestProcessor, deque, 5, registerLock, e ->
{});
+ throwingRequestProcessor, deque, 5, registerLock, e ->
{}, new JobID());
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
}
@@ -342,7 +343,8 @@ class ChannelStateWriteRequestExecutorImplTest {
void testSubmitRequestOfUnregisteredSubtask() throws Exception {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
- new ChannelStateWriteRequestExecutorImpl(NO_OP, 5, e -> {},
registerLock);
+ new ChannelStateWriteRequestExecutorImpl(
+ NO_OP, 5, e -> {}, registerLock, new JobID());
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
}
@@ -366,7 +368,8 @@ class ChannelStateWriteRequestExecutorImplTest {
void testSubmitPriorityUnreadyRequest() throws Exception {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
- new ChannelStateWriteRequestExecutorImpl(NO_OP, 5, e -> {},
registerLock);
+ new ChannelStateWriteRequestExecutorImpl(
+ NO_OP, 5, e -> {}, registerLock, new JobID());
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
}
@@ -390,7 +393,7 @@ class ChannelStateWriteRequestExecutorImplTest {
Object registerLock = new Object();
ChannelStateWriteRequestExecutorImpl worker =
new ChannelStateWriteRequestExecutorImpl(
- NO_OP, maxSubtasksPerChannelStateFile, e -> {},
registerLock);
+ NO_OP, maxSubtasksPerChannelStateFile, e -> {},
registerLock, new JobID());
synchronized (registerLock) {
for (int i = 0; i < maxSubtasksPerChannelStateFile; i++) {
assertThat(worker.isRegistering()).isTrue();
@@ -429,7 +432,8 @@ class ChannelStateWriteRequestExecutorImplTest {
dispatcher,
maxSubtasksPerChannelStateFile,
workerFuture::complete,
- registerLock);
+ registerLock,
+ new JobID());
worker.start();
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
@@ -467,7 +471,8 @@ class ChannelStateWriteRequestExecutorImplTest {
new TestRequestDispatcher(),
maxSubtasksPerChannelStateFile,
workerFuture::complete,
- registerLock);
+ registerLock,
+ new JobID());
worker.start();
synchronized (registerLock) {
worker.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index d61ba418d0d..afabbeff592 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,8 @@ import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
@@ -195,8 +197,9 @@ public class TestingRpcService implements RpcService {
}
@Override
- public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C
rpcEndpoint) {
- return backingRpcService.startServer(rpcEndpoint);
+ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
+ C rpcEndpoint, Map<String, String> loggingContext) {
+ return backingRpcService.startServer(rpcEndpoint,
Collections.emptyMap());
}
@Override
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 23912711562..d16bc40a8be 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -110,6 +110,7 @@ import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MdcUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.clock.SystemClock;
@@ -416,8 +417,10 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
resourceCloser.registerCloseable(mailboxProcessor);
this.channelIOExecutor =
- Executors.newSingleThreadExecutor(
- new
ExecutorThreadFactory("channel-state-unspilling"));
+ MdcUtils.scopeToJob(
+ environment.getJobID(),
+ Executors.newSingleThreadExecutor(
+ new
ExecutorThreadFactory("channel-state-unspilling")));
resourceCloser.registerCloseable(channelIOExecutor::shutdown);
this.recordWriter = createRecordWriterDelegate(configuration,
environment);
@@ -436,13 +439,16 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
// for simultaneous N ongoing concurrent checkpoints and for
example clean up of one
// aborted one.
this.asyncOperationsThreadPool =
- new ThreadPoolExecutor(
- 0,
- configuration.getMaxConcurrentCheckpoints() + 1,
- 60L,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new ExecutorThreadFactory("AsyncOperations",
uncaughtExceptionHandler));
+ MdcUtils.scopeToJob(
+ getEnvironment().getJobID(),
+ new ThreadPoolExecutor(
+ 0,
+
configuration.getMaxConcurrentCheckpoints() + 1,
+ 60L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ExecutorThreadFactory(
+ "AsyncOperations",
uncaughtExceptionHandler)));
// Register all asynchronous checkpoint threads.
resourceCloser.registerCloseable(this::shutdownAsyncThreads);
diff --git
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
index 8ca1b5cc1da..a1e7367713d 100644
---
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
+++
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/testutils/logging/LoggerAuditingExtension.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.extension.ExtensionContext;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
/**
* Utility for auditing logged messages.(Junit5 extension)
@@ -47,17 +48,31 @@ public class LoggerAuditingExtension implements
BeforeEachCallback, AfterEachCal
private final String loggerName;
private final org.slf4j.event.Level level;
- private ConcurrentLinkedQueue<String> loggingEvents;
+ private ConcurrentLinkedQueue<LogEvent> loggingEvents;
public LoggerAuditingExtension(Class<?> clazz, org.slf4j.event.Level
level) {
- this.loggerName = clazz.getCanonicalName();
+ this(clazz.getCanonicalName(), level);
+ }
+
+ public LoggerAuditingExtension(String loggerName, org.slf4j.event.Level
level) {
+ this.loggerName = loggerName;
this.level = level;
}
public List<String> getMessages() {
+ return loggingEvents.stream()
+ .map(e -> e.getMessage().getFormattedMessage())
+ .collect(Collectors.toList());
+ }
+
+ public List<LogEvent> getEvents() {
return new ArrayList<>(loggingEvents);
}
+ public String getLoggerName() {
+ return loggerName;
+ }
+
@Override
public void beforeEach(ExtensionContext context) throws Exception {
loggingEvents = new ConcurrentLinkedQueue<>();
@@ -66,7 +81,7 @@ public class LoggerAuditingExtension implements
BeforeEachCallback, AfterEachCal
new AbstractAppender("test-appender", null, null, false,
Property.EMPTY_ARRAY) {
@Override
public void append(LogEvent event) {
-
loggingEvents.add(event.getMessage().getFormattedMessage());
+ loggingEvents.add(event.toImmutable());
}
};
testAppender.start();
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 6f10bdfd43d..8c46a28d0a7 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -44,7 +44,7 @@ under the License.
</properties>
<dependencies>
-
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index aceabcea7c4..2ef37eddd13 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -56,6 +56,7 @@ import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -382,8 +383,9 @@ public class OperatorEventSendingCheckpointITCase extends
TestLogger {
}
@Override
- public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C
rpcEndpoint) {
- return rpcService.startServer(rpcEndpoint);
+ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(
+ C rpcEndpoint, Map<String, String> loggingContext) {
+ return rpcService.startServer(rpcEndpoint, Collections.emptyMap());
}
@Override
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
new file mode 100644
index 00000000000..3380698feb7
--- /dev/null
+++
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.misc;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.core.execution.CheckpointType;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.logging.LoggerAuditingExtension;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.MdcUtils;
+
+import org.apache.logging.log4j.core.LogEvent;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.slf4j.event.Level.DEBUG;
+
+/**
+ * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the
most important cases.
+ */
+public class JobIDLoggingITCase {
+ private static final Logger logger =
LoggerFactory.getLogger(JobIDLoggingITCase.class);
+
+ @RegisterExtension
+ public final LoggerAuditingExtension checkpointCoordinatorLogging =
+ new LoggerAuditingExtension(CheckpointCoordinator.class, DEBUG);
+
+ @RegisterExtension
+ public final LoggerAuditingExtension streamTaskLogging =
+ new LoggerAuditingExtension(StreamTask.class, DEBUG);
+
+ @RegisterExtension
+ public final LoggerAuditingExtension taskExecutorLogging =
+ new LoggerAuditingExtension(TaskExecutor.class, DEBUG);
+
+ @RegisterExtension
+ public final LoggerAuditingExtension taskLogging =
+ new LoggerAuditingExtension(Task.class, DEBUG);
+
+ @RegisterExtension
+ public final LoggerAuditingExtension executionGraphLogging =
+ new LoggerAuditingExtension(ExecutionGraph.class, DEBUG);
+
+ @RegisterExtension
+ public final LoggerAuditingExtension jobMasterLogging =
+ new LoggerAuditingExtension(JobMaster.class, DEBUG);
+
+ @RegisterExtension
+ public final LoggerAuditingExtension asyncCheckpointRunnableLogging =
+ // this class is private
+ new LoggerAuditingExtension(
+
"org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable", DEBUG);
+
+ @RegisterExtension
+ public static MiniClusterExtension miniClusterResource =
+ new MiniClusterExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+
+ @Test
+ public void testJobIDLogging(@InjectClusterClient ClusterClient<?>
clusterClient)
+ throws Exception {
+ JobID jobID = runJob(clusterClient);
+ clusterClient.cancel(jobID).get();
+
+ // NOTE: most of the assertions are empirical, such as
+ // - which classes are important
+ // - how many messages to expect
+ // - which log patterns to ignore
+
+ assertJobIDPresent(jobID, 3, checkpointCoordinatorLogging);
+ assertJobIDPresent(jobID, 6, streamTaskLogging);
+ assertJobIDPresent(
+ jobID,
+ 9,
+ taskExecutorLogging,
+ "Un-registering task.*",
+ "Successful registration.*",
+ "Establish JobManager connection.*",
+ "Offer reserved slots.*",
+ ".*ResourceManager.*",
+ "Operator event.*");
+
+ assertJobIDPresent(jobID, 10, taskLogging);
+ assertJobIDPresent(jobID, 10, executionGraphLogging);
+ assertJobIDPresent(
+ jobID,
+ 15,
+ jobMasterLogging,
+ "Registration at ResourceManager.*",
+ "Registration with ResourceManager.*",
+ "Resolved ResourceManager address.*");
+ assertJobIDPresent(jobID, 1, asyncCheckpointRunnableLogging);
+ }
+
+ private static void assertJobIDPresent(
+ JobID jobID,
+ int expectedLogMessages,
+ LoggerAuditingExtension ext,
+ String... ignPatterns) {
+ String loggerName = ext.getLoggerName();
+ checkState(
+ ext.getEvents().size() >= expectedLogMessages,
+ "Too few log events recorded for %s (%s) - this must be a bug
in the test code",
+ loggerName,
+ ext.getEvents().size());
+
+ final List<LogEvent> eventsWithMissingJobId = new ArrayList<>();
+ final List<LogEvent> eventsWithWrongJobId = new ArrayList<>();
+ final List<LogEvent> ignoredEvents = new ArrayList<>();
+ final List<Pattern> ignorePatterns =
+
Arrays.stream(ignPatterns).map(Pattern::compile).collect(Collectors.toList());
+
+ for (LogEvent e : ext.getEvents()) {
+ if (e.getContextData().containsKey(MdcUtils.JOB_ID)) {
+ if (!Objects.equals(
+ e.getContextData().getValue(MdcUtils.JOB_ID),
jobID.toHexString())) {
+ eventsWithWrongJobId.add(e);
+ }
+ } else if (matchesAny(ignorePatterns,
e.getMessage().getFormattedMessage())) {
+ ignoredEvents.add(e);
+ } else {
+ eventsWithMissingJobId.add(e);
+ }
+ }
+ logger.debug(
+ "checked events for {}:\n {};\n ignored: {},\n wrong job
id: {},\n missing job id: {}",
+ loggerName,
+ ext.getEvents(),
+ ignoredEvents,
+ eventsWithWrongJobId,
+ eventsWithMissingJobId);
+ assertThat(eventsWithWrongJobId).as("events with a wrong Job
ID").isEmpty();
+ assertTrue(
+ eventsWithMissingJobId.isEmpty(),
+ "too many events without Job ID recorded for "
+ + loggerName
+ + ": "
+ + eventsWithMissingJobId);
+ }
+
+ private static boolean matchesAny(List<Pattern> patternStream, String
message) {
+ return patternStream.stream().anyMatch(p ->
p.matcher(message).matches());
+ }
+
+ private static JobID runJob(ClusterClient<?> clusterClient) throws
Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).addSink(new
DiscardingSink<>());
+ JobID jobId =
clusterClient.submitJob(env.getStreamGraph().getJobGraph()).get();
+ Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
+ while (deadline.hasTimeLeft()
+ && clusterClient.listJobs().get().stream()
+ .noneMatch(
+ m ->
+ m.getJobId().equals(jobId)
+ &&
m.getJobState().equals(JobStatus.RUNNING))) {
+ Thread.sleep(10);
+ }
+ // wait for all tasks ready and then checkpoint
+ while (true) {
+ try {
+ clusterClient.triggerCheckpoint(jobId,
CheckpointType.DEFAULT).get();
+ return jobId;
+ } catch (ExecutionException e) {
+ if (ExceptionUtils.findThrowable(e,
CheckpointException.class).isPresent()
+ && !deadline.isOverdue()) {
+ Thread.sleep(10);
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+}
diff --git a/flink-tests/src/test/resources/log4j2-test.properties
b/flink-tests/src/test/resources/log4j2-test.properties
index c5d9b0f65be..843e105b0ea 100644
--- a/flink-tests/src/test/resources/log4j2-test.properties
+++ b/flink-tests/src/test/resources/log4j2-test.properties
@@ -28,7 +28,7 @@ appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
-appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
+appender.testlogger.layout.pattern = [%-32X{flink-job-id}] %c{0} [%t] %-5p %m%n
logger.migration.name = org.apache.flink.test.migration
logger.migration.level = INFO