dawidwys commented on a change in pull request #17946:
URL: https://github.com/apache/flink/pull/17946#discussion_r760226755
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
##########
@@ -38,32 +38,34 @@
private final List<ExecutionVertex> tasks;
/** The cached mapping, which would only be updated on miss. */
- private final LinkedHashMap<ExecutionAttemptID, ExecutionVertex>
cachedTasksById;
+ private volatile Map<ExecutionAttemptID, ExecutionVertex> cachedTasksById;
public ExecutionAttemptMappingProvider(Iterable<ExecutionVertex>
tasksIterable) {
this.tasks = new ArrayList<>();
tasksIterable.forEach(this.tasks::add);
- this.cachedTasksById =
- new LinkedHashMap<ExecutionAttemptID,
ExecutionVertex>(tasks.size()) {
-
- @Override
- protected boolean removeEldestEntry(
- Map.Entry<ExecutionAttemptID, ExecutionVertex>
eldest) {
- return size() > tasks.size();
- }
- };
+ this.cachedTasksById = new ConcurrentHashMap<>(tasks.size());
Review comment:
That's too much of synchronization imo.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
##########
@@ -38,32 +38,34 @@
private final List<ExecutionVertex> tasks;
/** The cached mapping, which would only be updated on miss. */
- private final LinkedHashMap<ExecutionAttemptID, ExecutionVertex>
cachedTasksById;
+ private volatile Map<ExecutionAttemptID, ExecutionVertex> cachedTasksById;
Review comment:
Do we need it to be `volatile`? I believe we don't really care about the
read outside of the `synchronized` block. IMO it is just fine if it reads from
an older copy. If the thread local map still has the entry, good for us, we
will return it. If not, we enter the synchronized block and update the variable.
Or am I missing something?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
##########
@@ -38,32 +38,34 @@
private final List<ExecutionVertex> tasks;
/** The cached mapping, which would only be updated on miss. */
- private final LinkedHashMap<ExecutionAttemptID, ExecutionVertex>
cachedTasksById;
+ private volatile Map<ExecutionAttemptID, ExecutionVertex> cachedTasksById;
public ExecutionAttemptMappingProvider(Iterable<ExecutionVertex>
tasksIterable) {
this.tasks = new ArrayList<>();
tasksIterable.forEach(this.tasks::add);
- this.cachedTasksById =
- new LinkedHashMap<ExecutionAttemptID,
ExecutionVertex>(tasks.size()) {
-
- @Override
- protected boolean removeEldestEntry(
- Map.Entry<ExecutionAttemptID, ExecutionVertex>
eldest) {
- return size() > tasks.size();
- }
- };
+ this.cachedTasksById = new ConcurrentHashMap<>(tasks.size());
}
public Optional<ExecutionVertex> getVertex(ExecutionAttemptID id) {
- if (!cachedTasksById.containsKey(id)) {
- cachedTasksById.putAll(getCurrentAttemptMappings());
- if (!cachedTasksById.containsKey(id)) {
- // the task probably gone after a restart
- cachedTasksById.put(id, null);
+ ExecutionVertex vertex = cachedTasksById.get(id);
+ if (vertex != null) {
+ return Optional.of(vertex);
+ }
+
+ return updateAndGet(id);
+ }
+
+ private Optional<ExecutionVertex> updateAndGet(ExecutionAttemptID id) {
+ synchronized (tasks) {
Review comment:
Why do you prefer synchronizing on variable over making the method
`synchronized`? Is there a reason for that? Maybe I am missing something
:thinking:?
Personally I find it a bit misleading. (What was the reason for the choice
of this variable, how is it connected with the synchronized block, where else
it is used for synchronization etc.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]