tillrohrmann commented on a change in pull request #14851:
URL: https://github.com/apache/flink/pull/14851#discussion_r569640208



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Shared slot implementation for the declarative scheduler. */
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SharedSlot.class);
+
+    private final SlotRequestId physicalSlotRequestId;
+
+    private final PhysicalSlot physicalSlot;
+
+    private final Runnable externalReleaseCallback;
+
+    private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots;
+
+    private final boolean slotWillBeOccupiedIndefinitely;
+
+    private State state;
+
+    public SharedSlot(
+            SlotRequestId physicalSlotRequestId,
+            PhysicalSlot physicalSlot,
+            boolean slotWillBeOccupiedIndefinitely,
+            Runnable externalReleaseCallback) {
+        this.physicalSlotRequestId = physicalSlotRequestId;
+        this.physicalSlot = physicalSlot;
+        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
+        this.externalReleaseCallback = externalReleaseCallback;
+        this.allocatedLogicalSlots = new HashMap<>();
+
+        Preconditions.checkState(
+                physicalSlot.tryAssignPayload(this),
+                "The provided slot (%s) was not free.",
+                physicalSlot.getAllocationId());
+        this.state = State.ALLOCATED;
+    }
+
+    /**
+     * Registers an allocation request for a logical slot.
+     *
+     * <p>The logical slot request is complete once the underlying physical 
slot request is
+     * complete.

Review comment:
       This seems to be no longer needed because the physical slot request is 
complete when creating this class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotAllocator.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+
+/** Component for calculating the slot requirements and mapping of vertices to 
slots. */
+public interface SlotAllocator<T extends VertexParallelism> {
+
+    /**
+     * Calculates the total resources required for scheduling the given 
vertices.
+     *
+     * @param vertices vertices to schedule
+     * @return required resources
+     */
+    ResourceCounter 
calculateRequiredSlots(Iterable<JobInformation.VertexInformation> vertices);
+
+    /**
+     * Determines the parallelism at which the vertices could be scheduled 
given the collection of
+     * slots. This method may be called with any number of slots providing any 
amount of resources,
+     * irrespective of what {@link #calculateRequiredSlots(Iterable)} returned.
+     *
+     * <p>If a {@link VertexParallelism} is returned then it covers all 
vertices contained in the
+     * given job information.
+     *
+     * <p>A returned {@link VertexParallelism} should be directly consumed 
afterwards (by either
+     * discarding it or calling {@link #reserveResources(VertexParallelism)}, 
as there is no
+     * guarantee that the assignment remains valid over time (because slots 
can be lost).
+     *
+     * <p>Implementations of this method must be side-effect free. There is no 
guarantee that the
+     * result of this method is ever passed to {@link 
#reserveResources(VertexParallelism)}.
+     *
+     * @param jobInformation information about the job graph
+     * @param slots Slots to consider for determining the parallelism

Review comment:
       ```suggestion
        * @param slots slots to consider for determining the parallelism
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingAssignments.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** {@link VertexParallelism} implementation for the {@link 
SlotSharingSlotAllocator}. */
+public class SlotSharingAssignments implements VertexParallelism {

Review comment:
       Maybe we can call it `VertexParallelismWithSlotSharing`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/FreeSlotFunction.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/** A function for freeing slots. */
+@FunctionalInterface
+public interface FreeSlotFunction {
+    /**
+     * Frees the slot identified by the given {@link AllocationID} due to the 
given {@link
+     * Throwable} cause.
+     *
+     * @param allocationId identifies the slot
+     * @param cause reason for why the slot was freed
+     * @param timestamp when the slot was freed
+     */
+    void freeSlot(AllocationID allocationId, Throwable cause, long timestamp);

Review comment:
       I think `@Nullable` is missing for the cause. Please also update the 
JavaDocs.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Shared slot implementation for the declarative scheduler. */
+class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SharedSlot.class);
+
+    private final SlotRequestId physicalSlotRequestId;
+
+    private final PhysicalSlot physicalSlot;
+
+    private final Runnable externalReleaseCallback;
+
+    private final Map<SlotRequestId, LogicalSlot> allocatedLogicalSlots;
+
+    private final boolean slotWillBeOccupiedIndefinitely;
+
+    private State state;
+
+    public SharedSlot(
+            SlotRequestId physicalSlotRequestId,

Review comment:
       Do we really need the `SlotRequestId`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SlotSharingSlotAllocator}. */
+public class SlotSharingSlotAllocatorTest extends TestLogger {
+
+    private static final FreeSlotFunction TEST_FREE_SLOT_FUNCTION = (a, c, t) 
-> {};
+    private static final ReserveSlotFunction TEST_RESERVE_SLOT_FUNCTION =
+            (allocationId, resourceProfile) ->
+                    TestingPhysicalSlot.builder()
+                            .withAllocationID(allocationId)
+                            .withResourceProfile(resourceProfile)
+                            .build();
+
+    private static final SlotSharingGroup slotSharingGroup1 = new 
SlotSharingGroup();
+    private static final SlotSharingGroup slotSharingGroup2 = new 
SlotSharingGroup();
+    private static final JobInformation.VertexInformation vertex1 =
+            new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
+    private static final JobInformation.VertexInformation vertex2 =
+            new TestVertexInformation(new JobVertexID(), 2, slotSharingGroup1);
+    private static final JobInformation.VertexInformation vertex3 =
+            new TestVertexInformation(new JobVertexID(), 3, slotSharingGroup2);
+
+    @Test
+    public void testCalculateRequiredSlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final ResourceCounter resourceCounter =
+                slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, 
vertex2, vertex3));
+
+        assertThat(resourceCounter.getResources(), 
contains(ResourceProfile.UNKNOWN));
+        assertThat(
+                resourceCounter.getResourceCount(ResourceProfile.UNKNOWN),
+                is(
+                        Math.max(vertex1.getParallelism(), 
vertex2.getParallelism())
+                                + vertex3.getParallelism()));
+    }
+
+    @Test
+    public void testDetermineParallelismWithMinimumSlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelism slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(2)).get();
+
+        final Map<JobVertexID, Integer> maxParallelismForVertices =
+                slotSharingAssignments.getMaxParallelismForVertices();
+
+        assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), 
is(1));
+        assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), 
is(1));
+        assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), 
is(1));
+    }
+
+    @Test
+    public void testDetermineParallelismWithManySlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelism slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+
+        final Map<JobVertexID, Integer> maxParallelismForVertices =
+                slotSharingAssignments.getMaxParallelismForVertices();
+
+        assertThat(
+                maxParallelismForVertices.get(vertex1.getJobVertexID()),
+                is(vertex1.getParallelism()));
+        assertThat(
+                maxParallelismForVertices.get(vertex2.getJobVertexID()),
+                is(vertex2.getParallelism()));
+        assertThat(
+                maxParallelismForVertices.get(vertex3.getJobVertexID()),
+                is(vertex3.getParallelism()));
+    }
+
+    @Test
+    public void 
testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final Optional<SlotSharingAssignments> slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(1));
+
+        assertThat(slotSharingAssignments.isPresent(), is(false));
+    }
+
+    @Test
+    public void testReserveResources() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final SlotSharingAssignments slotAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+
+        final Map<ExecutionVertexID, LogicalSlot> assignedResources =
+                slotAllocator.reserveResources(slotAssignments);
+
+        final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new 
HashMap<>();
+        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot 
assignment :
+                slotAssignments.getAssignments()) {
+            for (ExecutionVertexID containedExecutionVertex :
+                    
assignment.getExecutionSlotSharingGroup().getContainedExecutionVertices()) {
+                expectedAssignments.put(containedExecutionVertex, 
assignment.getSlotInfo());
+            }
+        }
+
+        for (Map.Entry<ExecutionVertexID, SlotInfo> expectedAssignment :
+                expectedAssignments.entrySet()) {
+            final LogicalSlot assignedSlot = 
assignedResources.get(expectedAssignment.getKey());
+
+            final SlotInfo backingSlot = expectedAssignment.getValue();
+
+            assertThat(assignedSlot.getAllocationId(), 
is(backingSlot.getAllocationId()));
+        }
+    }
+
+    private static Collection<SlotInfo> getSlots(int count) {
+        final Collection<SlotInfo> slotInfo = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            slotInfo.add(new TestSlotInfo());
+        }
+        return slotInfo;
+    }
+
+    private static class TestJobInformation implements JobInformation {
+
+        private final Map<JobVertexID, VertexInformation> vertexInformation;
+        private final Collection<SlotSharingGroup> slotSharingGroups;
+
+        private TestJobInformation(Collection<VertexInformation> 
vertexInformation) {
+            this.vertexInformation =
+                    vertexInformation.stream()
+                            .collect(
+                                    Collectors.toMap(
+                                            VertexInformation::getJobVertexID,
+                                            Function.identity()));
+            this.slotSharingGroups =
+                    vertexInformation.stream()
+                            .map(VertexInformation::getSlotSharingGroup)
+                            .collect(
+                                    Collectors.toMap(
+                                            
SlotSharingGroup::getSlotSharingGroupId,
+                                            Function.identity(),
+                                            (slotSharingGroup1, 
slotSharingGroup2) ->
+                                                    slotSharingGroup1))

Review comment:
       Maybe collect to a `Set<SlotSharingGroup>` instead of a map and then a 
`Collection`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SharedSlot}. */
+public class SharedSlotTest extends TestLogger {
+
+    @Test
+    public void testConstructorAssignsPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        assertThat(physicalSlot.getPayload(), not(nullValue()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testConstructorFailsIfSlotAlreadyHasAssignedPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        physicalSlot.tryAssignPayload(new TestPhysicalSlotPayload());
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+    }
+
+    @Test
+    public void testAllocateLogicalSlot() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot.getAllocationId(), 
equalTo(physicalSlot.getAllocationId()));
+        assertThat(logicalSlot.getSlotSharingGroupId(), nullValue());
+        assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
+        assertThat(logicalSlot.getPayload(), nullValue());
+        assertThat(logicalSlot.getPhysicalSlotNumber(), 
is(physicalSlot.getPhysicalSlotNumber()));
+        assertThat(
+                logicalSlot.getTaskManagerLocation(),
+                equalTo(physicalSlot.getTaskManagerLocation()));
+        assertThat(
+                logicalSlot.getTaskManagerGateway(), 
equalTo(physicalSlot.getTaskManagerGateway()));
+    }
+
+    @Test
+    public void testAllocateLogicalSlotIssuesUniqueSlotRequestIds() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+
+        final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+        final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot1.getSlotRequestId(), 
not(equalTo(logicalSlot2.getSlotRequestId())));

Review comment:
       Also as a follow up. We should check whether we really need the 
`SlotRequestId` or whether we can get rid of it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SlotAllocator} implementation that supports slot sharing. */
+public class SlotSharingSlotAllocator implements 
SlotAllocator<SlotSharingAssignments> {
+
+    private final ReserveSlotFunction reserveSlot;
+    private final FreeSlotFunction freeSlot;
+
+    public SlotSharingSlotAllocator(ReserveSlotFunction reserveSlot, 
FreeSlotFunction freeSlot) {
+        this.reserveSlot = reserveSlot;
+        this.freeSlot = freeSlot;
+    }
+
+    @Override
+    public ResourceCounter calculateRequiredSlots(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        int numTotalRequiredSlots = 0;
+        for (Integer requiredSlots : 
getMaxParallelismForSlotSharingGroups(vertices).values()) {
+            numTotalRequiredSlots += requiredSlots;
+        }
+        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numTotalRequiredSlots);
+    }
+
+    private static Map<SlotSharingGroupId, Integer> 
getMaxParallelismForSlotSharingGroups(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        final Map<SlotSharingGroupId, Integer> 
maxParallelismForSlotSharingGroups = new HashMap<>();
+        for (JobInformation.VertexInformation vertex : vertices) {
+            maxParallelismForSlotSharingGroups.compute(
+                    vertex.getSlotSharingGroup().getSlotSharingGroupId(),
+                    (slotSharingGroupId, currentMaxParallelism) ->
+                            currentMaxParallelism == null
+                                    ? vertex.getParallelism()
+                                    : Math.max(currentMaxParallelism, 
vertex.getParallelism()));
+        }
+        return maxParallelismForSlotSharingGroups;
+    }
+
+    @Override
+    public Optional<SlotSharingAssignments> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        // TODO: This can waste slots if the max parallelism for slot sharing 
groups is not equal
+        final int slotsPerSlotSharingGroup =
+                freeSlots.size() / 
jobInformation.getSlotSharingGroups().size();
+
+        if (slotsPerSlotSharingGroup == 0) {
+            // => less slots than slot-sharing groups
+            return Optional.empty();
+        }
+
+        final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
+        final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new 
ArrayList<>();
+
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            final List<JobInformation.VertexInformation> containedJobVertices =
+                    slotSharingGroup.getJobVertexIds().stream()
+                            .map(jobInformation::getVertexInformation)
+                            .collect(Collectors.toList());
+
+            final Iterable<ExecutionSlotSharingGroup> 
sharedSlotToVertexAssignment =
+                    determineParallelismAndAssignToFutureSlotIndex(
+                            containedJobVertices, slotsPerSlotSharingGroup);
+
+            for (ExecutionSlotSharingGroup executionSlotSharingGroup :
+                    sharedSlotToVertexAssignment) {
+                final SlotInfo slotInfo = slotIterator.next();
+
+                assignments.add(
+                        new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
+            }
+        }
+
+        return Optional.of(new SlotSharingAssignments(assignments));
+    }
+
+    private static Iterable<ExecutionSlotSharingGroup>
+            determineParallelismAndAssignToFutureSlotIndex(
+                    Collection<JobInformation.VertexInformation> 
containedJobVertices,
+                    int availableSlots) {
+        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
+        for (JobInformation.VertexInformation jobVertex : 
containedJobVertices) {
+            final int parallelism = Math.min(jobVertex.getParallelism(), 
availableSlots);
+
+            for (int i = 0; i < parallelism; i++) {
+                sharedSlotToVertexAssignment
+                        .computeIfAbsent(i, ignored -> new HashSet<>())
+                        .add(new ExecutionVertexID(jobVertex.getJobVertexID(), 
i));
+            }
+        }
+
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Map<ExecutionVertexID, LogicalSlot> reserveResources(
+            SlotSharingAssignments parallelism) {
+        final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new 
HashMap<>();
+
+        for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
+                parallelism.getAssignments()) {
+            final SharedSlot sharedSlot =
+                    reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
+
+            for (ExecutionVertexID executionVertexId :
+                    executionSlotSharingGroup
+                            .getExecutionSlotSharingGroup()
+                            .getContainedExecutionVertices()) {
+                final LogicalSlot logicalSlot = 
sharedSlot.allocateLogicalSlot();
+                assignedSlots.put(executionVertexId, logicalSlot);
+            }
+        }
+
+        return assignedSlots;
+    }
+
+    private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
+        final PhysicalSlot physicalSlot =
+                reserveSlot.reserveSlot(
+                        slotInfo.getAllocationId(),
+                        ResourceProfile.fromResourceSpec(ResourceSpec.DEFAULT, 
MemorySize.ZERO));
+
+        return new SharedSlot(
+                new SlotRequestId(),
+                physicalSlot,
+                slotInfo.willBeOccupiedIndefinitely(),
+                () ->
+                        freeSlot.freeSlot(

Review comment:
       Maybe give `freeSlot` a different slot because `freeSlot.freeSlot` looks 
a bit strange.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocatorTest.java
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SlotSharingSlotAllocator}. */
+public class SlotSharingSlotAllocatorTest extends TestLogger {
+
+    private static final FreeSlotFunction TEST_FREE_SLOT_FUNCTION = (a, c, t) 
-> {};
+    private static final ReserveSlotFunction TEST_RESERVE_SLOT_FUNCTION =
+            (allocationId, resourceProfile) ->
+                    TestingPhysicalSlot.builder()
+                            .withAllocationID(allocationId)
+                            .withResourceProfile(resourceProfile)
+                            .build();
+
+    private static final SlotSharingGroup slotSharingGroup1 = new 
SlotSharingGroup();
+    private static final SlotSharingGroup slotSharingGroup2 = new 
SlotSharingGroup();
+    private static final JobInformation.VertexInformation vertex1 =
+            new TestVertexInformation(new JobVertexID(), 4, slotSharingGroup1);
+    private static final JobInformation.VertexInformation vertex2 =
+            new TestVertexInformation(new JobVertexID(), 2, slotSharingGroup1);
+    private static final JobInformation.VertexInformation vertex3 =
+            new TestVertexInformation(new JobVertexID(), 3, slotSharingGroup2);
+
+    @Test
+    public void testCalculateRequiredSlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final ResourceCounter resourceCounter =
+                slotAllocator.calculateRequiredSlots(Arrays.asList(vertex1, 
vertex2, vertex3));
+
+        assertThat(resourceCounter.getResources(), 
contains(ResourceProfile.UNKNOWN));
+        assertThat(
+                resourceCounter.getResourceCount(ResourceProfile.UNKNOWN),
+                is(
+                        Math.max(vertex1.getParallelism(), 
vertex2.getParallelism())
+                                + vertex3.getParallelism()));
+    }
+
+    @Test
+    public void testDetermineParallelismWithMinimumSlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelism slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(2)).get();
+
+        final Map<JobVertexID, Integer> maxParallelismForVertices =
+                slotSharingAssignments.getMaxParallelismForVertices();
+
+        assertThat(maxParallelismForVertices.get(vertex1.getJobVertexID()), 
is(1));
+        assertThat(maxParallelismForVertices.get(vertex2.getJobVertexID()), 
is(1));
+        assertThat(maxParallelismForVertices.get(vertex3.getJobVertexID()), 
is(1));
+    }
+
+    @Test
+    public void testDetermineParallelismWithManySlots() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final VertexParallelism slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+
+        final Map<JobVertexID, Integer> maxParallelismForVertices =
+                slotSharingAssignments.getMaxParallelismForVertices();
+
+        assertThat(
+                maxParallelismForVertices.get(vertex1.getJobVertexID()),
+                is(vertex1.getParallelism()));
+        assertThat(
+                maxParallelismForVertices.get(vertex2.getJobVertexID()),
+                is(vertex2.getParallelism()));
+        assertThat(
+                maxParallelismForVertices.get(vertex3.getJobVertexID()),
+                is(vertex3.getParallelism()));
+    }
+
+    @Test
+    public void 
testDetermineParallelismUnsuccessfulWithLessSlotsThanSlotSharingGroups() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final Optional<SlotSharingAssignments> slotSharingAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(1));
+
+        assertThat(slotSharingAssignments.isPresent(), is(false));
+    }
+
+    @Test
+    public void testReserveResources() {
+        final SlotSharingSlotAllocator slotAllocator =
+                new SlotSharingSlotAllocator(TEST_RESERVE_SLOT_FUNCTION, 
TEST_FREE_SLOT_FUNCTION);
+
+        final JobInformation jobInformation =
+                new TestJobInformation(Arrays.asList(vertex1, vertex2, 
vertex3));
+
+        final SlotSharingAssignments slotAssignments =
+                slotAllocator.determineParallelism(jobInformation, 
getSlots(50)).get();
+
+        final Map<ExecutionVertexID, LogicalSlot> assignedResources =
+                slotAllocator.reserveResources(slotAssignments);
+
+        final Map<ExecutionVertexID, SlotInfo> expectedAssignments = new 
HashMap<>();
+        for (SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot 
assignment :
+                slotAssignments.getAssignments()) {
+            for (ExecutionVertexID containedExecutionVertex :
+                    
assignment.getExecutionSlotSharingGroup().getContainedExecutionVertices()) {
+                expectedAssignments.put(containedExecutionVertex, 
assignment.getSlotInfo());
+            }
+        }
+
+        for (Map.Entry<ExecutionVertexID, SlotInfo> expectedAssignment :
+                expectedAssignments.entrySet()) {
+            final LogicalSlot assignedSlot = 
assignedResources.get(expectedAssignment.getKey());
+
+            final SlotInfo backingSlot = expectedAssignment.getValue();
+
+            assertThat(assignedSlot.getAllocationId(), 
is(backingSlot.getAllocationId()));
+        }
+    }
+
+    private static Collection<SlotInfo> getSlots(int count) {
+        final Collection<SlotInfo> slotInfo = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            slotInfo.add(new TestSlotInfo());
+        }
+        return slotInfo;
+    }
+
+    private static class TestJobInformation implements JobInformation {
+
+        private final Map<JobVertexID, VertexInformation> vertexInformation;
+        private final Collection<SlotSharingGroup> slotSharingGroups;
+
+        private TestJobInformation(Collection<VertexInformation> 
vertexInformation) {
+            this.vertexInformation =

Review comment:
       Nit: It is a bit confusing that this field has the same name as the 
parameter but different types. Maybe name it `vertexInformationPerJobVertexId`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SlotAllocator} implementation that supports slot sharing. */
+public class SlotSharingSlotAllocator implements 
SlotAllocator<SlotSharingAssignments> {
+
+    private final ReserveSlotFunction reserveSlot;
+    private final FreeSlotFunction freeSlot;
+
+    public SlotSharingSlotAllocator(ReserveSlotFunction reserveSlot, 
FreeSlotFunction freeSlot) {
+        this.reserveSlot = reserveSlot;
+        this.freeSlot = freeSlot;
+    }
+
+    @Override
+    public ResourceCounter calculateRequiredSlots(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        int numTotalRequiredSlots = 0;
+        for (Integer requiredSlots : 
getMaxParallelismForSlotSharingGroups(vertices).values()) {
+            numTotalRequiredSlots += requiredSlots;
+        }
+        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numTotalRequiredSlots);
+    }
+
+    private static Map<SlotSharingGroupId, Integer> 
getMaxParallelismForSlotSharingGroups(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        final Map<SlotSharingGroupId, Integer> 
maxParallelismForSlotSharingGroups = new HashMap<>();
+        for (JobInformation.VertexInformation vertex : vertices) {
+            maxParallelismForSlotSharingGroups.compute(
+                    vertex.getSlotSharingGroup().getSlotSharingGroupId(),
+                    (slotSharingGroupId, currentMaxParallelism) ->
+                            currentMaxParallelism == null
+                                    ? vertex.getParallelism()
+                                    : Math.max(currentMaxParallelism, 
vertex.getParallelism()));
+        }
+        return maxParallelismForSlotSharingGroups;
+    }
+
+    @Override
+    public Optional<SlotSharingAssignments> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        // TODO: This can waste slots if the max parallelism for slot sharing 
groups is not equal
+        final int slotsPerSlotSharingGroup =
+                freeSlots.size() / 
jobInformation.getSlotSharingGroups().size();
+
+        if (slotsPerSlotSharingGroup == 0) {
+            // => less slots than slot-sharing groups
+            return Optional.empty();
+        }
+
+        final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
+        final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new 
ArrayList<>();
+
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            final List<JobInformation.VertexInformation> containedJobVertices =
+                    slotSharingGroup.getJobVertexIds().stream()
+                            .map(jobInformation::getVertexInformation)
+                            .collect(Collectors.toList());
+
+            final Iterable<ExecutionSlotSharingGroup> 
sharedSlotToVertexAssignment =
+                    determineParallelismAndAssignToFutureSlotIndex(
+                            containedJobVertices, slotsPerSlotSharingGroup);
+
+            for (ExecutionSlotSharingGroup executionSlotSharingGroup :
+                    sharedSlotToVertexAssignment) {
+                final SlotInfo slotInfo = slotIterator.next();
+
+                assignments.add(
+                        new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
+            }
+        }
+
+        return Optional.of(new SlotSharingAssignments(assignments));
+    }
+
+    private static Iterable<ExecutionSlotSharingGroup>
+            determineParallelismAndAssignToFutureSlotIndex(
+                    Collection<JobInformation.VertexInformation> 
containedJobVertices,
+                    int availableSlots) {
+        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
+        for (JobInformation.VertexInformation jobVertex : 
containedJobVertices) {
+            final int parallelism = Math.min(jobVertex.getParallelism(), 
availableSlots);
+
+            for (int i = 0; i < parallelism; i++) {
+                sharedSlotToVertexAssignment
+                        .computeIfAbsent(i, ignored -> new HashSet<>())
+                        .add(new ExecutionVertexID(jobVertex.getJobVertexID(), 
i));
+            }
+        }
+
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Map<ExecutionVertexID, LogicalSlot> reserveResources(
+            SlotSharingAssignments parallelism) {

Review comment:
       Nit: parallelism might be a bit confusing since it is not a numeric 
value as I would expect from this name.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link SharedSlot}. */
+public class SharedSlotTest extends TestLogger {
+
+    @Test
+    public void testConstructorAssignsPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+
+        assertThat(physicalSlot.getPayload(), not(nullValue()));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testConstructorFailsIfSlotAlreadyHasAssignedPayload() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        physicalSlot.tryAssignPayload(new TestPhysicalSlotPayload());
+
+        new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> {});
+    }
+
+    @Test
+    public void testAllocateLogicalSlot() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+
+        final LogicalSlot logicalSlot = sharedSlot.allocateLogicalSlot();
+
+        assertThat(logicalSlot.getAllocationId(), 
equalTo(physicalSlot.getAllocationId()));
+        assertThat(logicalSlot.getSlotSharingGroupId(), nullValue());
+        assertThat(logicalSlot.getLocality(), is(Locality.UNKNOWN));
+        assertThat(logicalSlot.getPayload(), nullValue());
+        assertThat(logicalSlot.getPhysicalSlotNumber(), 
is(physicalSlot.getPhysicalSlotNumber()));

Review comment:
       We don't have to fix it right now but we should rethink whether the 
LogicalSlot really needs to know about the `AllocationID` and the physical slot 
number. At least we should update the JavaDocs stating that multiple 
`LogicalSlot` might have the same physical slot number and `AllocationID`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingSlotAllocator.java
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** {@link SlotAllocator} implementation that supports slot sharing. */
+public class SlotSharingSlotAllocator implements 
SlotAllocator<SlotSharingAssignments> {
+
+    private final ReserveSlotFunction reserveSlot;
+    private final FreeSlotFunction freeSlot;
+
+    public SlotSharingSlotAllocator(ReserveSlotFunction reserveSlot, 
FreeSlotFunction freeSlot) {
+        this.reserveSlot = reserveSlot;
+        this.freeSlot = freeSlot;
+    }
+
+    @Override
+    public ResourceCounter calculateRequiredSlots(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        int numTotalRequiredSlots = 0;
+        for (Integer requiredSlots : 
getMaxParallelismForSlotSharingGroups(vertices).values()) {
+            numTotalRequiredSlots += requiredSlots;
+        }
+        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numTotalRequiredSlots);
+    }
+
+    private static Map<SlotSharingGroupId, Integer> 
getMaxParallelismForSlotSharingGroups(
+            Iterable<JobInformation.VertexInformation> vertices) {
+        final Map<SlotSharingGroupId, Integer> 
maxParallelismForSlotSharingGroups = new HashMap<>();
+        for (JobInformation.VertexInformation vertex : vertices) {
+            maxParallelismForSlotSharingGroups.compute(
+                    vertex.getSlotSharingGroup().getSlotSharingGroupId(),
+                    (slotSharingGroupId, currentMaxParallelism) ->
+                            currentMaxParallelism == null
+                                    ? vertex.getParallelism()
+                                    : Math.max(currentMaxParallelism, 
vertex.getParallelism()));
+        }
+        return maxParallelismForSlotSharingGroups;
+    }
+
+    @Override
+    public Optional<SlotSharingAssignments> determineParallelism(
+            JobInformation jobInformation, Collection<? extends SlotInfo> 
freeSlots) {
+        // TODO: This can waste slots if the max parallelism for slot sharing 
groups is not equal
+        final int slotsPerSlotSharingGroup =
+                freeSlots.size() / 
jobInformation.getSlotSharingGroups().size();
+
+        if (slotsPerSlotSharingGroup == 0) {
+            // => less slots than slot-sharing groups
+            return Optional.empty();
+        }
+
+        final Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
+        final Collection<ExecutionSlotSharingGroupAndSlot> assignments = new 
ArrayList<>();
+
+        for (SlotSharingGroup slotSharingGroup : 
jobInformation.getSlotSharingGroups()) {
+            final List<JobInformation.VertexInformation> containedJobVertices =
+                    slotSharingGroup.getJobVertexIds().stream()
+                            .map(jobInformation::getVertexInformation)
+                            .collect(Collectors.toList());
+
+            final Iterable<ExecutionSlotSharingGroup> 
sharedSlotToVertexAssignment =
+                    determineParallelismAndAssignToFutureSlotIndex(
+                            containedJobVertices, slotsPerSlotSharingGroup);
+
+            for (ExecutionSlotSharingGroup executionSlotSharingGroup :
+                    sharedSlotToVertexAssignment) {
+                final SlotInfo slotInfo = slotIterator.next();
+
+                assignments.add(
+                        new 
ExecutionSlotSharingGroupAndSlot(executionSlotSharingGroup, slotInfo));
+            }
+        }
+
+        return Optional.of(new SlotSharingAssignments(assignments));
+    }
+
+    private static Iterable<ExecutionSlotSharingGroup>
+            determineParallelismAndAssignToFutureSlotIndex(
+                    Collection<JobInformation.VertexInformation> 
containedJobVertices,
+                    int availableSlots) {
+        final Map<Integer, Set<ExecutionVertexID>> 
sharedSlotToVertexAssignment = new HashMap<>();
+        for (JobInformation.VertexInformation jobVertex : 
containedJobVertices) {
+            final int parallelism = Math.min(jobVertex.getParallelism(), 
availableSlots);
+
+            for (int i = 0; i < parallelism; i++) {
+                sharedSlotToVertexAssignment
+                        .computeIfAbsent(i, ignored -> new HashSet<>())
+                        .add(new ExecutionVertexID(jobVertex.getJobVertexID(), 
i));
+            }
+        }
+
+        return sharedSlotToVertexAssignment.values().stream()
+                .map(ExecutionSlotSharingGroup::new)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Map<ExecutionVertexID, LogicalSlot> reserveResources(
+            SlotSharingAssignments parallelism) {
+        final Map<ExecutionVertexID, LogicalSlot> assignedSlots = new 
HashMap<>();
+
+        for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroup :
+                parallelism.getAssignments()) {
+            final SharedSlot sharedSlot =
+                    reserveSharedSlot(executionSlotSharingGroup.getSlotInfo());
+
+            for (ExecutionVertexID executionVertexId :
+                    executionSlotSharingGroup
+                            .getExecutionSlotSharingGroup()
+                            .getContainedExecutionVertices()) {
+                final LogicalSlot logicalSlot = 
sharedSlot.allocateLogicalSlot();
+                assignedSlots.put(executionVertexId, logicalSlot);
+            }
+        }
+
+        return assignedSlots;
+    }
+
+    private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
+        final PhysicalSlot physicalSlot =
+                reserveSlot.reserveSlot(
+                        slotInfo.getAllocationId(),
+                        ResourceProfile.fromResourceSpec(ResourceSpec.DEFAULT, 
MemorySize.ZERO));

Review comment:
       Should we say that we pass `ResourceProfile.UNKNOWN`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SlotSharingAssignments.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative.allocator;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** {@link VertexParallelism} implementation for the {@link 
SlotSharingSlotAllocator}. */
+public class SlotSharingAssignments implements VertexParallelism {
+
+    private final 
Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> 
assignments;
+
+    SlotSharingAssignments(
+            
Collection<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> 
assignments) {
+        this.assignments = Preconditions.checkNotNull(assignments);
+    }
+
+    Iterable<SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot> 
getAssignments() {
+        return assignments;
+    }
+
+    @Override
+    public Map<JobVertexID, Integer> getMaxParallelismForVertices() {
+        return assignments.stream()
+                .map(
+                        
SlotSharingSlotAllocator.ExecutionSlotSharingGroupAndSlot
+                                ::getExecutionSlotSharingGroup)
+                .flatMap(x -> x.getContainedExecutionVertices().stream())
+                .collect(
+                        Collectors.toMap(
+                                ExecutionVertexID::getJobVertexId,
+                                v -> v.getSubtaskIndex() + 1,
+                                Math::max));

Review comment:
       Not that we have to change it right now but the implementation looks a 
bit inefficient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to