zentol commented on a change in pull request #14851:
URL: https://github.com/apache/flink/pull/14851#discussion_r570141334



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SlotAllocator} implementation that supports slot sharing. */
+public class SlotSharingSlotAllocator implements 
SlotAllocator<SlotSharingAssignments> {
+
+    private final ReserveSlotFunction reserveSlot;
+    private final FreeSlotFunction freeSlot;
+
+    public SlotSharingSlotAllocator(ReserveSlotFunction reserveSlot, 
FreeSlotFunction freeSlot) {
+        this.reserveSlot = reserveSlot;
+        this.freeSlot = freeSlot;
+    }
+
+    @Override
+    public ResourceCounter calculateRequiredSlots(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        int numTotalRequiredSlots = 0;
+        for (Integer requiredSlots : 
getMaxParallelismForSlotSharingGroups(vertices).values()) {
+            numTotalRequiredSlots += requiredSlots;
+        }
+        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numTotalRequiredSlots);
+    }
+
+    private static Map<SlotSharingGroupId, Integer> 
getMaxParallelismForSlotSharingGroups(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        final Map<SlotSharingGroupId, Integer> 
maxParallelismForSlotSharingGroups = new HashMap<>();
+        for (JobInformation.VertexInformation vertex : vertices) {
+            maxParallelismForSlotSharingGroups.compute(
+                    vertex.getSlotSharingGroup().getSlotSharingGroupId(),
+                    (slotSharingGroupId, currentMaxParallelism) ->
+                            currentMaxParallelism == null
+                                    ? vertex.getParallelism()
+                                    : Math.max(currentMaxParallelism, 
vertex.getParallelism()));
+        }
+        return maxParallelismForSlotSharingGroups;
+    }
+
+    @Override
+    public Optional<SlotSharingAssignments> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        // TODO: This can waste slots if the max parallelism for slot sharing 
groups is not equal
+        final int slotsPerSlotSharingGroup =
+                freeSlots.size() / 
jobInformation.getSlotSharingGroups().size();
+
+        if (slotsPerSlotSharingGroup == 0) {
+            // => less slots than slot-sharing groups
+            return Optional.empty();
+        }
+
+        final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
+        final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new 
ArrayList<>();
+
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            final List<JobInformation.VertexInformation> containedJobVertices =
+                    slotSharingGroup.getJobVertexIds().stream()
+                            .map(jobInformation::getVertexInformation)
+                            .collect(Collectors.toList());
+
+            final Iterable<ExecutionSlotSharingGroup> 
sharedSlotToVertexAssignment =
+                    determineParallelismAndAssignToFutureSlotIndex(
+                            containedJobVertices, slotsPerSlotSharingGroup);
+
+            for (ExecutionSlotSharingGroup executionSlotSharingGroup :
+                    sharedSlotToVertexAssignment) {
+                final SlotInfo slotInfo = slotIterator.next();
+
+                assignments.add(
+                        new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
+            }
+        }
+
+        return Optional.of(new SlotSharingAssignments(assignments));
+    }
+
+    private static Iterable<ExecutionSlotSharingGroup>
+            determineParallelismAndAssignToFutureSlotIndex(
+                    Collection<JobInformation.VertexInformation> 
containedJobVertices,
+                    int availableSlots) {
+        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
+        for (JobInformation.VertexInformation jobVertex : 
containedJobVertices) {
+            final int parallelism = Math.min(jobVertex.getParallelism(), 
availableSlots);
+
+            for (int i = 0; i < parallelism; i++) {
+                sharedSlotToVertexAssignment
+                        .computeIfAbsent(i, ignored -> new HashSet<>())
+                        .add(new ExecutionVertexID(jobVertex.getJobVertexID(), 
i));
+            }
+        }
+
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Map<ExecutionVertexID, LogicalSlot> reserveResources(
+            SlotSharingAssignments parallelism) {

Review comment:
       yes the naming surround this parameter has been a bit difficult 😩 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SlotAllocator} implementation that supports slot sharing. */
+public class SlotSharingSlotAllocator implements 
SlotAllocator<SlotSharingAssignments> {
+
+    private final ReserveSlotFunction reserveSlot;
+    private final FreeSlotFunction freeSlot;
+
+    public SlotSharingSlotAllocator(ReserveSlotFunction reserveSlot, 
FreeSlotFunction freeSlot) {
+        this.reserveSlot = reserveSlot;
+        this.freeSlot = freeSlot;
+    }
+
+    @Override
+    public ResourceCounter calculateRequiredSlots(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        int numTotalRequiredSlots = 0;
+        for (Integer requiredSlots : 
getMaxParallelismForSlotSharingGroups(vertices).values()) {
+            numTotalRequiredSlots += requiredSlots;
+        }
+        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numTotalRequiredSlots);
+    }
+
+    private static Map<SlotSharingGroupId, Integer> 
getMaxParallelismForSlotSharingGroups(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        final Map<SlotSharingGroupId, Integer> 
maxParallelismForSlotSharingGroups = new HashMap<>();
+        for (JobInformation.VertexInformation vertex : vertices) {
+            maxParallelismForSlotSharingGroups.compute(
+                    vertex.getSlotSharingGroup().getSlotSharingGroupId(),
+                    (slotSharingGroupId, currentMaxParallelism) ->
+                            currentMaxParallelism == null
+                                    ? vertex.getParallelism()
+                                    : Math.max(currentMaxParallelism, 
vertex.getParallelism()));
+        }
+        return maxParallelismForSlotSharingGroups;
+    }
+
+    @Override
+    public Optional<SlotSharingAssignments> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        // TODO: This can waste slots if the max parallelism for slot sharing 
groups is not equal
+        final int slotsPerSlotSharingGroup =
+                freeSlots.size() / 
jobInformation.getSlotSharingGroups().size();
+
+        if (slotsPerSlotSharingGroup == 0) {
+            // => less slots than slot-sharing groups
+            return Optional.empty();
+        }
+
+        final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
+        final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new 
ArrayList<>();
+
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            final List<JobInformation.VertexInformation> containedJobVertices =
+                    slotSharingGroup.getJobVertexIds().stream()
+                            .map(jobInformation::getVertexInformation)
+                            .collect(Collectors.toList());
+
+            final Iterable<ExecutionSlotSharingGroup> 
sharedSlotToVertexAssignment =
+                    determineParallelismAndAssignToFutureSlotIndex(
+                            containedJobVertices, slotsPerSlotSharingGroup);
+
+            for (ExecutionSlotSharingGroup executionSlotSharingGroup :
+                    sharedSlotToVertexAssignment) {
+                final SlotInfo slotInfo = slotIterator.next();
+
+                assignments.add(
+                        new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
+            }
+        }
+
+        return Optional.of(new SlotSharingAssignments(assignments));
+    }
+
+    private static Iterable<ExecutionSlotSharingGroup>
+            determineParallelismAndAssignToFutureSlotIndex(
+                    Collection<JobInformation.VertexInformation> 
containedJobVertices,
+                    int availableSlots) {
+        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
+        for (JobInformation.VertexInformation jobVertex : 
containedJobVertices) {
+            final int parallelism = Math.min(jobVertex.getParallelism(), 
availableSlots);
+
+            for (int i = 0; i < parallelism; i++) {
+                sharedSlotToVertexAssignment
+                        .computeIfAbsent(i, ignored -> new HashSet<>())
+                        .add(new ExecutionVertexID(jobVertex.getJobVertexID(), 
i));
+            }
+        }
+
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Map<ExecutionVertexID, LogicalSlot> reserveResources(
+            SlotSharingAssignments parallelism) {
+        final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new 
HashMap<>();
+
+        for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
+                parallelism.getAssignments()) {
+            final SharedSlot sharedSlot =
+                    reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
+
+            for (ExecutionVertexID executionVertexId :
+                    executionSlotSharingGroup
+                            .getExecutionSlotSharingGroup()
+                            .getContainedExecutionVertices()) {
+                final LogicalSlot logicalSlot = 
sharedSlot.allocateLogicalSlot();
+                assignedSlots.put(executionVertexId, logicalSlot);
+            }
+        }
+
+        return assignedSlots;
+    }
+
+    private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
+        final PhysicalSlot physicalSlot =
+                reserveSlot.reserveSlot(
+                        slotInfo.getAllocationId(),
+                        ResourceProfile.fromResourceSpec(ResourceSpec.DEFAULT, 
MemorySize.ZERO));

Review comment:
       that might make things simpler, yes...

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SharedSlot}. */
+public class SharedSlotTest extends TestLogger {
+
+    @Test
+    public void testConstructorAssignsPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        assertThat(physicalSlot.getPayload(), not(nullValue()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testConstructorFailsIfSlotAlreadyHasAssignedPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        physicalSlot.tryAssignPayload(new TestPhysicalSlotPayload());
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+    }
+
+    @Test
+    public void testAllocateLogicalSlot() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot.getAllocationId(), 
equalTo(physicalSlot.getAllocationId()));
+        assertThat(logicalSlot.getSlotSharingGroupId(), nullValue());
+        assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
+        assertThat(logicalSlot.getPayload(), nullValue());
+        assertThat(logicalSlot.getPhysicalSlotNumber(), 
is(physicalSlot.getPhysicalSlotNumber()));

Review comment:
       The slot number can probably be removed fairly easily; it isn't used on 
the TM side. The `AllocationID` is currently required by the Execution because 
it is encoded into the TDD and the TM needs to know which slot should be used.
   
   I'll open a follow-up for the slot number.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SharedSlot}. */
+public class SharedSlotTest extends TestLogger {
+
+    @Test
+    public void testConstructorAssignsPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        assertThat(physicalSlot.getPayload(), not(nullValue()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testConstructorFailsIfSlotAlreadyHasAssignedPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        physicalSlot.tryAssignPayload(new TestPhysicalSlotPayload());
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+    }
+
+    @Test
+    public void testAllocateLogicalSlot() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot.getAllocationId(), 
equalTo(physicalSlot.getAllocationId()));
+        assertThat(logicalSlot.getSlotSharingGroupId(), nullValue());
+        assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
+        assertThat(logicalSlot.getPayload(), nullValue());
+        assertThat(logicalSlot.getPhysicalSlotNumber(), 
is(physicalSlot.getPhysicalSlotNumber()));
+        assertThat(
+                logicalSlot.getTaskManagerLocation(),
+                equalTo(physicalSlot.getTaskManagerLocation()));
+        assertThat(
+                logicalSlot.getTaskManagerGateway(), 
equalTo(physicalSlot.getTaskManagerGateway()));
+    }
+
+    @Test
+    public void testAllocateLogicalSlotIssuesUniqueSlotRequestIds() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+
+        final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+        final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot1.getSlotRequestId(), 
not(equalTo(logicalSlot2.getSlotRequestId())));

Review comment:
       It is only really used in the SharedSlot implementations to identify the 
slot during book-keeping.
   We could maybe just use a Set.
   

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SharedSlot}. */
+public class SharedSlotTest extends TestLogger {
+
+    @Test
+    public void testConstructorAssignsPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        assertThat(physicalSlot.getPayload(), not(nullValue()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testConstructorFailsIfSlotAlreadyHasAssignedPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        physicalSlot.tryAssignPayload(new TestPhysicalSlotPayload());
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+    }
+
+    @Test
+    public void testAllocateLogicalSlot() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot.getAllocationId(), 
equalTo(physicalSlot.getAllocationId()));
+        assertThat(logicalSlot.getSlotSharingGroupId(), nullValue());
+        assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
+        assertThat(logicalSlot.getPayload(), nullValue());
+        assertThat(logicalSlot.getPhysicalSlotNumber(), 
is(physicalSlot.getPhysicalSlotNumber()));
+        assertThat(
+                logicalSlot.getTaskManagerLocation(),
+                equalTo(physicalSlot.getTaskManagerLocation()));
+        assertThat(
+                logicalSlot.getTaskManagerGateway(), 
equalTo(physicalSlot.getTaskManagerGateway()));
+    }
+
+    @Test
+    public void testAllocateLogicalSlotIssuesUniqueSlotRequestIds() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+
+        final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+        final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot1.getSlotRequestId(), 
not(equalTo(logicalSlot2.getSlotRequestId())));

Review comment:
       It is only really used in the SharedSlot implementations to identify the 
slot during book-keeping.
   We could maybe just use a Set internally.
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Shared slot implementation for the declarative scheduler. */
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SharedSlot.class);
+
+    private final SlotRequestId physicalSlotRequestId;
+
+    private final PhysicalSlot physicalSlot;
+
+    private final Runnable externalReleaseCallback;
+
+    private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots;
+
+    private final boolean slotWillBeOccupiedIndefinitely;
+
+    private State state;
+
+    public SharedSlot(
+            SlotRequestId physicalSlotRequestId,

Review comment:
       hmm...it is only used during logging; maybe we use the AllocationID of 
the physical slot instead?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SlotSharingSlotAllocator}. */
+public class SlotSharingSlotAllocatorTest extends TestLogger {
+
+    private static final FreeSlotFunction TEST_FREE_SLOT_FUNCTION = (a, c, t) 
-> {};
+    private static final ReserveSlotFunction TEST_RESERVE_SLOT_FUNCTION =
+            (allocationId, resourceProfile) ->
+                    TestingPhysicalSlot.builder()
+                            .withAllocationID(allocationId)
+                            .withResourceProfile(resourceProfile)
+                            .build();
+
+    private static final SlotSharingGroup slotSharingGroup1 = new 
SlotSharingGroup();
+    private static final SlotSharingGroup slotSharingGroup2 = new 
SlotSharingGroup();
+    private static final JobInformation.VertexInformation vertex1 =
+            new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
+    private static final JobInformation.VertexInformation vertex2 =
+            new TestVertexInformation(new JobVertexID(), 2, slotSharingGroup1);
+    private static final JobInformation.VertexInformation vertex3 =
+            new TestVertexInformation(new JobVertexID(), 3, slotSharingGroup2);
+
+    @Test
+    public void testCalculateRequiredSlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final ResourceCounter resourceCounter =
+                slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, 
vertex2, vertex3));
+
+        assertThat(resourceCounter.getResources(), 
contains(ResourceProfile.UNKNOWN));
+        assertThat(
+                resourceCounter.getResourceCount(ResourceProfile.UNKNOWN),
+                is(
+                        Math.max(vertex1.getParallelism(), 
vertex2.getParallelism())
+                                + vertex3.getParallelism()));
+    }
+
+    @Test
+    public void testDetermineParallelismWithMinimumSlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelism slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(2)).get();
+
+        final Map<JobVertexID, Integer> maxParallelismForVertices =
+                slotSharingAssignments.getMaxParallelismForVertices();
+
+        assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), 
is(1));
+        assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), 
is(1));
+        assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), 
is(1));
+    }
+
+    @Test
+    public void testDetermineParallelismWithManySlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelism slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+
+        final Map<JobVertexID, Integer> maxParallelismForVertices =
+                slotSharingAssignments.getMaxParallelismForVertices();
+
+        assertThat(
+                maxParallelismForVertices.get(vertex1.getJobVertexID()),
+                is(vertex1.getParallelism()));
+        assertThat(
+                maxParallelismForVertices.get(vertex2.getJobVertexID()),
+                is(vertex2.getParallelism()));
+        assertThat(
+                maxParallelismForVertices.get(vertex3.getJobVertexID()),
+                is(vertex3.getParallelism()));
+    }
+
+    @Test
+    public void 
testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final Optional<SlotSharingAssignments> slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(1));
+
+        assertThat(slotSharingAssignments.isPresent(), is(false));
+    }
+
+    @Test
+    public void testReserveResources() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final SlotSharingAssignments slotAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+
+        final Map<ExecutionVertexID, LogicalSlot> assignedResources =
+                slotAllocator.reserveResources(slotAssignments);
+
+        final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new 
HashMap<>();
+        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot 
assignment :
+                slotAssignments.getAssignments()) {
+            for (ExecutionVertexID containedExecutionVertex :
+                    
assignment.getExecutionSlotSharingGroup().getContainedExecutionVertices()) {
+                expectedAssignments.put(containedExecutionVertex, 
assignment.getSlotInfo());
+            }
+        }
+
+        for (Map.Entry<ExecutionVertexID, SlotInfo> expectedAssignment :
+                expectedAssignments.entrySet()) {
+            final LogicalSlot assignedSlot = 
assignedResources.get(expectedAssignment.getKey());
+
+            final SlotInfo backingSlot = expectedAssignment.getValue();
+
+            assertThat(assignedSlot.getAllocationId(), 
is(backingSlot.getAllocationId()));
+        }
+    }
+
+    private static Collection<SlotInfo> getSlots(int count) {
+        final Collection<SlotInfo> slotInfo = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            slotInfo.add(new TestSlotInfo());
+        }
+        return slotInfo;
+    }
+
+    private static class TestJobInformation implements JobInformation {
+
+        private final Map<JobVertexID, VertexInformation> vertexInformation;
+        private final Collection<SlotSharingGroup> slotSharingGroups;
+
+        private TestJobInformation(Collection<VertexInformation> 
vertexInformation) {
+            this.vertexInformation =
+                    vertexInformation.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            VertexInformation::getJobVertexID,
+                                            Function.identity()));
+            this.slotSharingGroups =
+                    vertexInformation.stream()
+                            .map(VertexInformation::getSlotSharingGroup)
+                            .collect(
+                                    Collectors.toMap(
+                                            
SlotSharingGroup::getSlotSharingGroupId,
+                                            Function.identity(),
+                                            (slotSharingGroup1, 
slotSharingGroup2) ->
+                                                    slotSharingGroup1))

Review comment:
       why did I do this in such a complicated way...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to