azagrebin commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r440693739



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -22,12 +22,16 @@
 
 import java.util.AbstractCollection;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Set;
 
 /**
- * Map which stores values under two different indices.
+ * Map which stores values under two different indices. The mapping of the 
primary key to the
+ * value is backed by {@link LinkedHashMap} so that the iteration order over 
the values and
+ * the primary key set is the insertion order. Note that there is no contract 
of the iteration
+ * order over the secondary key set.

Review comment:
       I would also explicitly document that `@param <A>` is the primary key.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -37,13 +41,13 @@
 
        private final LinkedHashMap<A, Tuple2<B, V>> aMap;
 
-       private final LinkedHashMap<B, A> bMap;
+       private final HashMap<B, A> bMap;

Review comment:
       Does it matter for this PR which type `bMap` has?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##########
@@ -85,4 +85,28 @@ public void 
ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
                assertThat(map.getByKeyB(1), is(secondValue));
                assertThat(map.getByKeyA(2), is(secondValue));
        }
+
+       @Test
+       public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+               final DualKeyLinkedMap<Integer, Integer, String> map = new 
DualKeyLinkedMap<>(2);
+
+               final String value = "foobar";
+               map.put(1, 1, value);
+               map.put(2, 2, value);
+
+               map.put(1, 1, value);
+               assertThat(map.keySetA().iterator().next(), is(1));

Review comment:
       I think `values()` are also interesting to check for both added tests.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
                }
        }
 
+       public A getKeyA(B bKey) {

Review comment:
       ```suggestion
        public A getKeyAByKeyB(B bKey) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -64,7 +64,7 @@ public V getKeyA(A aKey) {
                }
        }
 
-       public V getKeyB(B bKey) {
+       public V getByKeyB(B bKey) {

Review comment:
       ```suggestion
        public V getValueByKeyB(B bKey) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -54,7 +54,7 @@ public int size() {
                return aMap.size();
        }
 
-       public V getKeyA(A aKey) {
+       public V getByKeyA(A aKey) {

Review comment:
       ```suggestion
        public V getValueByKeyA(A aKey) {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##########
@@ -85,4 +85,28 @@ public void 
ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
                assertThat(map.getByKeyB(1), is(secondValue));
                assertThat(map.getByKeyA(2), is(secondValue));
        }
+
+       @Test
+       public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+               final DualKeyLinkedMap<Integer, Integer, String> map = new 
DualKeyLinkedMap<>(2);
+
+               final String value = "foobar";
+               map.put(1, 1, value);
+               map.put(2, 2, value);
+
+               map.put(1, 1, value);
+               assertThat(map.keySetA().iterator().next(), is(1));
+       }
+
+       @Test
+       public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithDifferentSecondaryKey() {
+               final DualKeyLinkedMap<Integer, Integer, String> map = new 
DualKeyLinkedMap<>(2);
+
+               final String value = "foobar";
+               map.put(1, 1, value);
+               map.put(2, 2, value);
+
+               map.put(1, 3, value);

Review comment:
       do we also want to check cleanup of key B `3` if it were in the map?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final 
AllocatedSlot slot) {
                return null;
        }
 
+       private void maybeRemapOrphanedAllocation(
+                       final AllocationID allocationIdOfRequest,
+                       final AllocationID allocationIdOfSlot) {
+
+               final AllocationID orphanedAllocationId = 
allocationIdOfRequest.equals(allocationIdOfSlot)
+                       ? null : allocationIdOfRequest;
+
+               // if the request that initiated the allocation is still 
pending, it should take over the orphaned allocation
+               // of the fulfilled request so that it can fail fast if the 
remapped allocation fails
+               if (orphanedAllocationId != null) {

Review comment:
       ```suggestion
                // if the request that initiated the allocation is still 
pending, it should take over the orphaned allocation
                // of the fulfilled request so that it can fail fast if the 
remapped allocation fails
                if (!allocationIdOfRequest.equals(allocationIdOfSlot)) {
                    final AllocationID orphanedAllocationId = 
allocationIdOfRequest;
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##########
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
                }
        }
 
+       public A getKeyA(B bKey) {
+               return bMap.get(bKey);
+       }
+
+       public B getKeyB(A aKey) {

Review comment:
       ```suggestion
        public B getKeyBByKeyA(A aKey) {
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##########
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final 
AllocatedSlot slot) {
                return null;
        }
 
+       private void maybeRemapOrphanedAllocation(
+                       final AllocationID allocationIdOfRequest,
+                       final AllocationID allocationIdOfSlot) {
+
+               final AllocationID orphanedAllocationId = 
allocationIdOfRequest.equals(allocationIdOfSlot)
+                       ? null : allocationIdOfRequest;
+
+               // if the request that initiated the allocation is still 
pending, it should take over the orphaned allocation
+               // of the fulfilled request so that it can fail fast if the 
remapped allocation fails
+               if (orphanedAllocationId != null) {
+                       final SlotRequestId requestIdOfAllocatedSlot = 
pendingRequests.getKeyA(allocationIdOfSlot);
+                       if (requestIdOfAllocatedSlot != null) {
+                               final PendingRequest requestOfAllocatedSlot = 
pendingRequests.getByKeyA(requestIdOfAllocatedSlot);
+                               
requestOfAllocatedSlot.setAllocationId(orphanedAllocationId);
+
+                               // this re-insertion of initiatedRequestId will 
not affect its original insertion order
+                               pendingRequests.put(requestIdOfAllocatedSlot, 
orphanedAllocationId, requestOfAllocatedSlot);
+                       } else {
+                               // cancel the slot request if the orphaned 
allocation is not remapped to a pending request.
+                               // the request id can be null if the slot is 
returned by scheduler
+                               
resourceManagerGateway.cancelSlotRequest(orphanedAllocationId);

Review comment:
       is it about previously allocated slots returned by scheduler after e.g. 
finishing tasks?
   do they also have to be cancelled in the RM?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to