asfgit closed pull request #6389: [FLINK-9917][JM] Remove superfluous lock from
SlotSharingManager
URL: https://github.com/apache/flink/pull/6389
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
index afcd24f1064..ef288a26469 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
@@ -36,7 +36,6 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-import javax.annotation.concurrent.GuardedBy;
import java.util.AbstractCollection;
import java.util.Collection;
@@ -82,9 +81,6 @@
private static final Logger LOG =
LoggerFactory.getLogger(SlotSharingManager.class);
- /** Lock for the internal data structures. */
- private final Object lock = new Object();
-
private final SlotSharingGroupId slotSharingGroupId;
/** Actions to release allocated slots after a complete multi task slot
hierarchy has been released. */
@@ -96,11 +92,9 @@
private final Map<SlotRequestId, TaskSlot> allTaskSlots;
/** Root nodes which have not been completed because the allocated slot
is still pending. */
- @GuardedBy("lock")
private final Map<SlotRequestId, MultiTaskSlot> unresolvedRootSlots;
/** Root nodes which have been completed (the underlying allocated slot
has been assigned). */
- @GuardedBy("lock")
private final Map<TaskManagerLocation, Set<MultiTaskSlot>>
resolvedRootSlots;
SlotSharingManager(
@@ -152,27 +146,23 @@ MultiTaskSlot createRootSlot(
allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
- synchronized (lock) {
- unresolvedRootSlots.put(slotRequestId,
rootMultiTaskSlot);
- }
+ unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
// add the root node to the set of resolved root nodes once the
SlotContext future has
// been completed and we know the slot's TaskManagerLocation
slotContextFuture.whenComplete(
(SlotContext slotContext, Throwable throwable) -> {
if (slotContext != null) {
- synchronized (lock) {
- final MultiTaskSlot
resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+ final MultiTaskSlot resolvedRootNode =
unresolvedRootSlots.remove(slotRequestId);
- if (resolvedRootNode != null) {
- LOG.trace("Fulfill
multi task slot [{}] with slot [{}].", slotRequestId,
slotContext.getAllocationId());
+ if (resolvedRootNode != null) {
+ LOG.trace("Fulfill multi task
slot [{}] with slot [{}].", slotRequestId, slotContext.getAllocationId());
- final
Set<MultiTaskSlot> innerCollection = resolvedRootSlots.computeIfAbsent(
-
slotContext.getTaskManagerLocation(),
-
taskManagerLocation -> new HashSet<>(4));
+ final Set<MultiTaskSlot>
innerCollection = resolvedRootSlots.computeIfAbsent(
+
slotContext.getTaskManagerLocation(),
+ taskManagerLocation ->
new HashSet<>(4));
-
innerCollection.add(resolvedRootNode);
- }
+
innerCollection.add(resolvedRootNode);
}
} else {
rootMultiTaskSlot.release(throwable);
@@ -193,15 +183,13 @@ MultiTaskSlot createRootSlot(
*/
@Nullable
MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId,
SchedulingStrategy matcher, SlotProfile slotProfile) {
- synchronized (lock) {
- Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues
= this.resolvedRootSlots.values();
- return matcher.findMatchWithLocality(
- slotProfile,
-
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
- (MultiTaskSlot multiTaskSlot) ->
multiTaskSlot.getSlotContextFuture().join(),
- (MultiTaskSlot multiTaskSlot) ->
!multiTaskSlot.contains(groupId),
- MultiTaskSlotLocality::of);
- }
+ Collection<Set<MultiTaskSlot>> resolvedRootSlotsValues =
this.resolvedRootSlots.values();
+ return matcher.findMatchWithLocality(
+ slotProfile,
+
resolvedRootSlotsValues.stream().flatMap(Collection::stream),
+ (MultiTaskSlot multiTaskSlot) ->
multiTaskSlot.getSlotContextFuture().join(),
+ (MultiTaskSlot multiTaskSlot) ->
!multiTaskSlot.contains(groupId),
+ MultiTaskSlotLocality::of);
}
/**
@@ -213,11 +201,9 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID
groupId, SchedulingStrategy
*/
@Nullable
MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
- synchronized (lock) {
- for (MultiTaskSlot multiTaskSlot :
unresolvedRootSlots.values()) {
- if (!multiTaskSlot.contains(groupId)) {
- return multiTaskSlot;
- }
+ for (MultiTaskSlot multiTaskSlot :
unresolvedRootSlots.values()) {
+ if (!multiTaskSlot.contains(groupId)) {
+ return multiTaskSlot;
}
}
@@ -228,11 +214,9 @@ MultiTaskSlot getUnresolvedRootSlot(AbstractID groupId) {
public String toString() {
final StringBuilder builder = new
StringBuilder("{\n\tgroupId=").append(slotSharingGroupId).append('\n');
- synchronized (lock) {
-
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
-
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
-
builder.append("\tall=").append(allTaskSlots).append('\n');
- }
+
builder.append("\tunresolved=").append(unresolvedRootSlots).append('\n');
+
builder.append("\tresolved=").append(resolvedRootSlots).append('\n');
+ builder.append("\tall=").append(allTaskSlots).append('\n');
return builder.append('}').toString();
}
@@ -479,26 +463,20 @@ public void release(Throwable cause) {
parent.releaseChild(getGroupId());
} else if (allTaskSlots.remove(getSlotRequestId()) !=
null) {
// we are the root node --> remove the root
node from the list of task slots
+ final MultiTaskSlot unresolvedRootSlot =
unresolvedRootSlots.remove(getSlotRequestId());
- if (!slotContextFuture.isDone() ||
slotContextFuture.isCompletedExceptionally()) {
- synchronized (lock) {
- // the root node should still
be unresolved
-
unresolvedRootSlots.remove(getSlotRequestId());
- }
- } else {
+ if (unresolvedRootSlot == null) {
// the root node should be resolved -->
we can access the slot context
final SlotContext slotContext =
slotContextFuture.getNow(null);
if (slotContext != null) {
- synchronized (lock) {
- final
Set<MultiTaskSlot> multiTaskSlots =
resolvedRootSlots.get(slotContext.getTaskManagerLocation());
+ final Set<MultiTaskSlot>
multiTaskSlots = resolvedRootSlots.get(slotContext.getTaskManagerLocation());
- if (multiTaskSlots !=
null) {
-
multiTaskSlots.remove(this);
+ if (multiTaskSlots != null) {
+
multiTaskSlots.remove(this);
- if
(multiTaskSlots.isEmpty()) {
-
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
- }
+ if
(multiTaskSlots.isEmpty()) {
+
resolvedRootSlots.remove(slotContext.getTaskManagerLocation());
}
}
}
@@ -637,9 +615,7 @@ public String toString() {
@VisibleForTesting
Collection<MultiTaskSlot> getUnresolvedRootSlots() {
- synchronized (lock) {
- return unresolvedRootSlots.values();
- }
+ return unresolvedRootSlots.values();
}
/**
@@ -649,19 +625,15 @@ public String toString() {
@Override
public Iterator<MultiTaskSlot> iterator() {
- synchronized (lock) {
- return new
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
- }
+ return new
ResolvedRootSlotIterator(resolvedRootSlots.values().iterator());
}
@Override
public int size() {
int numberResolvedMultiTaskSlots = 0;
- synchronized (lock) {
- for (Set<MultiTaskSlot> multiTaskSlots :
resolvedRootSlots.values()) {
- numberResolvedMultiTaskSlots +=
multiTaskSlots.size();
- }
+ for (Set<MultiTaskSlot> multiTaskSlots :
resolvedRootSlots.values()) {
+ numberResolvedMultiTaskSlots +=
multiTaskSlots.size();
}
return numberResolvedMultiTaskSlots;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services