This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6b5ae445724 [FLINK-34643][tests] Fix JobIDLoggingITCase
6b5ae445724 is described below
commit 6b5ae445724b68db05a3f9687cff6dd68e2129d7
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Mar 11 16:22:42 2024 +0100
[FLINK-34643][tests] Fix JobIDLoggingITCase
---
.../apache/flink/test/misc/JobIDLoggingITCase.java | 134 +++++++++++++++------
1 file changed, 98 insertions(+), 36 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
index 3380698feb7..e13bfce16e3 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/misc/JobIDLoggingITCase.java
@@ -37,9 +37,9 @@ import org.apache.flink.test.junit5.InjectClusterClient;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.MdcUtils;
import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.util.ReadOnlyStringMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
@@ -52,17 +52,14 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import static org.apache.flink.util.Preconditions.checkState;
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+import static org.apache.flink.util.MdcUtils.JOB_ID;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.slf4j.event.Level.DEBUG;
-/**
- * Tests adding of {@link JobID} to logs (via {@link org.slf4j.MDC}) in the
most important cases.
- */
-public class JobIDLoggingITCase {
+class JobIDLoggingITCase {
private static final Logger logger =
LoggerFactory.getLogger(JobIDLoggingITCase.class);
@RegisterExtension
@@ -104,8 +101,7 @@ public class JobIDLoggingITCase {
.build());
@Test
- public void testJobIDLogging(@InjectClusterClient ClusterClient<?>
clusterClient)
- throws Exception {
+ void testJobIDLogging(@InjectClusterClient ClusterClient<?> clusterClient)
throws Exception {
JobID jobID = runJob(clusterClient);
clusterClient.cancel(jobID).get();
@@ -114,53 +110,113 @@ public class JobIDLoggingITCase {
// - how many messages to expect
// - which log patterns to ignore
- assertJobIDPresent(jobID, 3, checkpointCoordinatorLogging);
- assertJobIDPresent(jobID, 6, streamTaskLogging);
assertJobIDPresent(
jobID,
- 9,
+ checkpointCoordinatorLogging,
+ asList(
+ "No checkpoint found during restore.",
+ "Resetting the master hooks.",
+ "Triggering checkpoint .*",
+ "Received acknowledge message for checkpoint .*",
+ "Completed checkpoint .*",
+ "Checkpoint state: .*"));
+
+ assertJobIDPresent(
+ jobID,
+ streamTaskLogging,
+ asList(
+ "State backend is set to .*",
+ "Initializing Source: .*",
+ "Invoking Source: .*",
+ "Starting checkpoint .*",
+ "Notify checkpoint \\d+ complete .*"));
+
+ assertJobIDPresent(
+ jobID,
taskExecutorLogging,
+ asList(
+ "Received task .*",
+ "Trigger checkpoint .*",
+ "Confirm completed checkpoint .*"),
"Un-registering task.*",
"Successful registration.*",
"Establish JobManager connection.*",
"Offer reserved slots.*",
".*ResourceManager.*",
- "Operator event.*");
+ "Operator event.*",
+ "Recovered slot allocation snapshots.*",
+ ".*heartbeat.*");
+
+ assertJobIDPresent(
+ jobID,
+ taskLogging,
+ asList(
+ "Source: .* switched from CREATED to DEPLOYING.",
+ "Source: .* switched from DEPLOYING to INITIALIZING.",
+ "Source: .* switched from INITIALIZING to RUNNING."));
+
+ assertJobIDPresent(
+ jobID,
+ executionGraphLogging,
+ asList(
+ "Created execution graph .*",
+ "Deploying Source.*",
+ "Job .* switched from state CREATED to RUNNING.",
+ "Source: .* switched from CREATED to SCHEDULED.",
+ "Source: .* switched from SCHEDULED to DEPLOYING.",
+ "Source: .* switched from DEPLOYING to INITIALIZING.",
+ "Source: .* switched from INITIALIZING to RUNNING."));
- assertJobIDPresent(jobID, 10, taskLogging);
- assertJobIDPresent(jobID, 10, executionGraphLogging);
assertJobIDPresent(
jobID,
- 15,
jobMasterLogging,
+ asList(
+ "Checkpoint storage is set to .*",
+ "Initializing job .*",
+ "Running initialization on master for job .*",
+ "Starting execution of job .*",
+ "Starting scheduling.*",
+ "State backend is set to .*",
+ "Successfully created execution graph from job graph
.*",
+ "Successfully ran initialization on master.*",
+ "Triggering a manual checkpoint for job .*.",
+ "Using failover strategy .*",
+ "Using restart back off time strategy .*"),
"Registration at ResourceManager.*",
"Registration with ResourceManager.*",
"Resolved ResourceManager address.*");
- assertJobIDPresent(jobID, 1, asyncCheckpointRunnableLogging);
+
+ assertJobIDPresent(
+ jobID,
+ asyncCheckpointRunnableLogging,
+ asList(
+ ".* started executing asynchronous part of checkpoint
.*",
+ ".* finished asynchronous part of checkpoint .*"));
}
private static void assertJobIDPresent(
JobID jobID,
- int expectedLogMessages,
LoggerAuditingExtension ext,
+ List<String> expPatterns,
String... ignPatterns) {
- String loggerName = ext.getLoggerName();
- checkState(
- ext.getEvents().size() >= expectedLogMessages,
- "Too few log events recorded for %s (%s) - this must be a bug
in the test code",
- loggerName,
- ext.getEvents().size());
final List<LogEvent> eventsWithMissingJobId = new ArrayList<>();
final List<LogEvent> eventsWithWrongJobId = new ArrayList<>();
final List<LogEvent> ignoredEvents = new ArrayList<>();
+ final List<Pattern> expectedPatterns =
+ expPatterns.stream().map(Pattern::compile).collect(toList());
final List<Pattern> ignorePatterns =
-
Arrays.stream(ignPatterns).map(Pattern::compile).collect(Collectors.toList());
+
Arrays.stream(ignPatterns).map(Pattern::compile).collect(toList());
for (LogEvent e : ext.getEvents()) {
- if (e.getContextData().containsKey(MdcUtils.JOB_ID)) {
- if (!Objects.equals(
- e.getContextData().getValue(MdcUtils.JOB_ID),
jobID.toHexString())) {
+ ReadOnlyStringMap context = e.getContextData();
+ if (context.containsKey(JOB_ID)) {
+ if (Objects.equals(context.getValue(JOB_ID),
jobID.toHexString())) {
+ expectedPatterns.removeIf(
+ pattern ->
+
pattern.matcher(e.getMessage().getFormattedMessage())
+ .matches());
+ } else {
eventsWithWrongJobId.add(e);
}
} else if (matchesAny(ignorePatterns,
e.getMessage().getFormattedMessage())) {
@@ -169,20 +225,23 @@ public class JobIDLoggingITCase {
eventsWithMissingJobId.add(e);
}
}
+
logger.debug(
"checked events for {}:\n {};\n ignored: {},\n wrong job
id: {},\n missing job id: {}",
- loggerName,
+ ext.getLoggerName(),
ext.getEvents(),
ignoredEvents,
eventsWithWrongJobId,
eventsWithMissingJobId);
assertThat(eventsWithWrongJobId).as("events with a wrong Job
ID").isEmpty();
- assertTrue(
- eventsWithMissingJobId.isEmpty(),
- "too many events without Job ID recorded for "
- + loggerName
- + ": "
- + eventsWithMissingJobId);
+ assertThat(expectedPatterns)
+ .as(
+ "not all expected events logged by %s, logged:\n%s",
+ ext.getLoggerName(), ext.getEvents())
+ .isEmpty();
+ assertThat(eventsWithMissingJobId)
+ .as("too many events without Job ID logged by %s",
ext.getLoggerName())
+ .isEmpty();
}
private static boolean matchesAny(List<Pattern> patternStream, String
message) {
@@ -205,6 +264,9 @@ public class JobIDLoggingITCase {
// wait for all tasks ready and then checkpoint
while (true) {
try {
+ clusterClient.triggerCheckpoint(jobId,
CheckpointType.DEFAULT).get();
+ // to check the log message about checkpoint completion
notification we need to
+ // either wait or trigger another checkpoint
clusterClient.triggerCheckpoint(jobId,
CheckpointType.DEFAULT).get();
return jobId;
} catch (ExecutionException e) {