zentol commented on a change in pull request #17387:
URL: https://github.com/apache/flink/pull/17387#discussion_r744329125
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
##########
@@ -682,7 +682,9 @@ private void stopTaskExecutorServices() throws Exception {
try {
changelogStorage =
changelogStoragesManager.stateChangelogStorageForJob(
- jobId,
taskManagerConfiguration.getConfiguration());
+ jobId,
+ taskManagerConfiguration.getConfiguration(),
+
taskManagerMetricGroup.getJobMetricsGroup(jobId));
Review comment:
Split `addTaskForJob` into 2 separate methods instead, and explicitly
create the TMJMG at the start of `submitTask.`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
##########
@@ -39,6 +41,7 @@
* tasks any more
*/
@Internal
+@NotThreadSafe
Review comment:
Let's stick to the current threading model. While it is indeed strictly
speaking not necessary to synchronize here it adds an odd inconsistency and
just makes it more difficult to understand.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
##########
@@ -163,4 +141,8 @@ protected void putVariables(Map<String, String> variables) {
protected String getGroupName(CharacterFilter filter) {
return "taskmanager";
}
+
+ public TaskManagerJobMetricGroup getJobMetricsGroup(JobID jobId) {
Review comment:
This seems to have slipped into the wrong commit because it is
referenced in the metrics lifecycle commit.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
##########
@@ -2764,6 +2821,14 @@ private TestingTaskExecutor
createTestingTaskExecutor(TaskManagerServices taskMa
private TestingTaskExecutor createTestingTaskExecutor(
TaskManagerServices taskManagerServices, HeartbeatServices
heartbeatServices) {
+ return createTestingTaskExecutor(
+ taskManagerServices, heartbeatServices,
createUnregisteredTaskManagerMetricGroup());
+ }
+
+ private TestingTaskExecutor createTestingTaskExecutor(
Review comment:
this change seems unnecessary, at least looking at this commit alone.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
##########
@@ -93,49 +96,24 @@ public TaskMetricGroup addTaskForJob(
String resolvedJobName = jobName == null || jobName.isEmpty() ?
jobId.toString() : jobName;
- // we cannot strictly lock both our map modification and the job group
modification
- // because it might lead to a deadlock
- while (true) {
- // get or create a jobs metric group
- TaskManagerJobMetricGroup currentJobGroup;
- synchronized (this) {
- currentJobGroup = jobs.get(jobId);
-
- if (currentJobGroup == null || currentJobGroup.isClosed()) {
- currentJobGroup =
- new TaskManagerJobMetricGroup(registry, this,
jobId, resolvedJobName);
- jobs.put(jobId, currentJobGroup);
- }
- }
-
- // try to add another task. this may fail if we found a
pre-existing job metrics
- // group and it is closed concurrently
- TaskMetricGroup taskGroup =
- currentJobGroup.addTask(
- jobVertexId, executionAttemptId, taskName,
subtaskIndex, attemptNumber);
-
- if (taskGroup != null) {
- // successfully added the next task
- return taskGroup;
- }
+ TaskManagerJobMetricGroup jobGroup = jobs.get(jobId);
- // else fall through the loop
- }
- }
-
- public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup
group) {
- if (jobId == null || group == null || !group.isClosed()) {
- return;
+ if (jobGroup == null) {
Review comment:
why are you no longer checking whether the `jobGroup` is closed?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
##########
@@ -93,49 +96,24 @@ public TaskMetricGroup addTaskForJob(
String resolvedJobName = jobName == null || jobName.isEmpty() ?
jobId.toString() : jobName;
- // we cannot strictly lock both our map modification and the job group
modification
- // because it might lead to a deadlock
- while (true) {
- // get or create a jobs metric group
- TaskManagerJobMetricGroup currentJobGroup;
- synchronized (this) {
- currentJobGroup = jobs.get(jobId);
-
- if (currentJobGroup == null || currentJobGroup.isClosed()) {
- currentJobGroup =
- new TaskManagerJobMetricGroup(registry, this,
jobId, resolvedJobName);
- jobs.put(jobId, currentJobGroup);
- }
- }
-
- // try to add another task. this may fail if we found a
pre-existing job metrics
- // group and it is closed concurrently
- TaskMetricGroup taskGroup =
- currentJobGroup.addTask(
- jobVertexId, executionAttemptId, taskName,
subtaskIndex, attemptNumber);
-
- if (taskGroup != null) {
- // successfully added the next task
- return taskGroup;
- }
+ TaskManagerJobMetricGroup jobGroup = jobs.get(jobId);
- // else fall through the loop
- }
- }
-
- public void removeJobMetricsGroup(JobID jobId, TaskManagerJobMetricGroup
group) {
- if (jobId == null || group == null || !group.isClosed()) {
- return;
+ if (jobGroup == null) {
+ jobGroup = new TaskManagerJobMetricGroup(registry, this, jobId,
resolvedJobName);
+ jobs.put(jobId, jobGroup);
}
- synchronized (this) {
- // optimistically remove the currently contained group, and check
later if it was
- // correct
- TaskManagerJobMetricGroup containedGroup = jobs.remove(jobId);
+ // note that a pre-existing job group can NOT be closed concurrently -
this is done by the
+ // same TM thread in removeJobMetricsGroup
+ return jobGroup.addTask(
+ jobVertexId, executionAttemptId, taskName, subtaskIndex,
attemptNumber);
+ }
- // check if another group was actually contained, and restore that
one
- if (containedGroup != null && containedGroup != group) {
- jobs.put(jobId, containedGroup);
+ public void removeJobMetricsGroup(JobID jobId) {
Review comment:
There are no tests for this method.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java
##########
@@ -78,46 +80,35 @@ public TaskMetricGroup addTask(
checkNotNull(executionAttemptID);
checkNotNull(taskName);
- synchronized (this) {
- if (!isClosed()) {
- TaskMetricGroup prior = tasks.get(executionAttemptID);
- if (prior != null) {
- return prior;
- } else {
- TaskMetricGroup task =
- new TaskMetricGroup(
- registry,
- this,
- jobVertexId,
- executionAttemptID,
- taskName,
- subtaskIndex,
- attemptNumber);
- tasks.put(executionAttemptID, task);
- return task;
- }
+ if (!isClosed()) {
+ TaskMetricGroup prior = tasks.get(executionAttemptID);
+ if (prior != null) {
+ return prior;
} else {
- return null;
+ TaskMetricGroup task =
+ new TaskMetricGroup(
+ registry,
+ this,
+ jobVertexId,
+ executionAttemptID,
+ taskName,
+ subtaskIndex,
+ attemptNumber);
+ tasks.put(executionAttemptID, task);
+ return task;
}
+ } else {
+ return null;
}
}
public void removeTaskMetricGroup(ExecutionAttemptID executionId) {
checkNotNull(executionId);
- boolean removeFromParent = false;
- synchronized (this) {
- if (!isClosed() && tasks.remove(executionId) != null &&
tasks.isEmpty()) {
- // this call removed the last task. close this group.
- removeFromParent = true;
- close();
- }
- }
-
- // IMPORTANT: removing from the parent must not happen while holding
the this group's lock,
- // because it would violate the "first parent then subgroup" lock
acquisition order
- if (removeFromParent) {
- parent.removeJobMetricsGroup(jobId, this);
+ if (!isClosed()) {
Review comment:
This guard is no longer necessary. It was only relevant because
`close()` had side-effects on the parent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]