zentol commented on code in PR #24292: URL: https://github.com/apache/flink/pull/24292#discussion_r1514074926
########## flink-core/src/main/java/org/apache/flink/util/MdcAwareScheduledExecutorService.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.MdcUtils.wrapCallable; +import static org.apache.flink.util.MdcUtils.wrapRunnable; + +// todo: tests Review Comment: 🙈 ########## flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/concurrent/ComponentMainThreadExecutor.java: ########## @@ -30,7 +32,7 @@ * Interface for an executor that runs tasks in the main thread of an {@link * org.apache.flink.runtime.rpc.RpcEndpoint}. */ -public interface ComponentMainThreadExecutor extends ScheduledExecutor { +public interface ComponentMainThreadExecutor extends ScheduledExecutor, Closeable { Review Comment: We may not need this change anymore ########## flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java: ########## @@ -440,15 +470,16 @@ public void validateRunsInMainThread() { * @return true if all the resources are closed, otherwise false */ boolean validateResourceClosed() { - return mainThreadExecutor.validateScheduledExecutorClosed() && resourceRegistry.isClosed(); + return ((MainThreadExecutor) mainThreadExecutor).validateScheduledExecutorClosed() Review Comment: this maybe can be reverted ########## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java: ########## @@ -1662,6 +1667,7 @@ private class TaskCanceler implements Runnable { @Override public void run() { try { Review Comment: This could be a good candidate for MdcUtils#withContext or `MDC.putCloseable()` (more instances in this file) ########## flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.misc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MdcUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.logging.log4j.core.LogEvent; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.slf4j.event.Level.DEBUG; + +/** + * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the most important cases. + */ +public class JobIDLoggingITCase extends TestLogger { Review Comment: ```suggestion public class JobIDLoggingITCase { ``` This is both out-dated (JUnit 4 tech) and no longer required in flink-tests. ########## flink-rpc/flink-rpc-core/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java: ########## @@ -134,18 +139,30 @@ public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync { * @param rpcService The RPC server that dispatches calls to this RPC endpoint. * @param endpointId Unique identifier for this endpoint */ - protected RpcEndpoint(final RpcService rpcService, final String endpointId) { + protected RpcEndpoint( + RpcService rpcService, String endpointId, Map<String, String> loggingContext) { this.rpcService = checkNotNull(rpcService, "rpcService"); this.endpointId = checkNotNull(endpointId, "endpointId"); - this.rpcServer = rpcService.startServer(this); + this.rpcServer = rpcService.startServer(this, loggingContext); this.resourceRegistry = new CloseableRegistry(); this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread, endpointId); + Review Comment: revert ########## flink-core/src/main/java/org/apache/flink/util/MdcUtils.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.api.common.JobID; + +import org.slf4j.MDC; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.util.Preconditions.checkArgument; + +// todo: tests Review Comment: 🙈 ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -303,7 +308,8 @@ public void onUnknownDeploymentsOf( this.highAvailabilityServices = checkNotNull(highAvailabilityService); this.blobWriter = jobManagerSharedServices.getBlobWriter(); this.futureExecutor = jobManagerSharedServices.getFutureExecutor(); Review Comment: Is there a reason we dont wrap this one as well? ########## flink-tests/pom.xml: ########## @@ -44,7 +44,14 @@ under the License. </properties> <dependencies> - + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.17.1</version> + <scope>test</scope> + <type>test-jar</type> Review Comment: What do we need from the test jar? ########## flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java: ########## @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.misc; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.test.junit5.InjectClusterClient; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.testutils.logging.LoggerAuditingExtension; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.MdcUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.logging.log4j.core.LogEvent; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.slf4j.event.Level.DEBUG; + +/** + * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the most important cases. + */ +public class JobIDLoggingITCase extends TestLogger { + private static final Logger logger = LoggerFactory.getLogger(JobIDLoggingITCase.class); + + @RegisterExtension + public final LoggerAuditingExtension checkpointCoordinatorLogging = + new LoggerAuditingExtension(CheckpointCoordinator.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension streamTaskLogging = + new LoggerAuditingExtension(StreamTask.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension taskExecutorLogging = + new LoggerAuditingExtension(TaskExecutor.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension taskLogging = + new LoggerAuditingExtension(Task.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension executionGraphLogging = + new LoggerAuditingExtension(ExecutionGraph.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension jobMasterLogging = + new LoggerAuditingExtension(JobMaster.class, DEBUG); + + @RegisterExtension + public final LoggerAuditingExtension asyncCheckpointRunnableLogging = + // this class is private + new LoggerAuditingExtension( + "org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable", DEBUG); + + @RegisterExtension + public static MiniClusterExtension miniClusterResource = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); + + @Test + public void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient) + throws Exception { + JobID jobID = null; + try { + jobID = runJob(clusterClient); + } finally { + if (jobID != null) { + clusterClient.cancel(jobID).get(); + } + } Review Comment: ```suggestion JobID jobID = runJob(clusterClient); clusterClient.cancel(jobID).get(); ``` Maybe I'm missing something but the finally branch will never do anything in an exceptional case, because then the jobID couldn't have been assigned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
