tillrohrmann commented on a change in pull request #14921:
URL: https://github.com/apache/flink/pull/14921#discussion_r576321108



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -132,8 +132,28 @@
                             "Defines whether the cluster uses fine-grained 
resource management.");
 
     public static boolean isDeclarativeResourceManagementEnabled(Configuration 
configuration) {
-        return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT)
-                && 
!System.getProperties().containsKey("flink.tests.disable-declarative");
+        if (configuration.contains(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT)) {
+            return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT);
+        } else {
+            return 
!System.getProperties().containsKey("flink.tests.disable-declarative");
+        }
+    }
+
+    public static JobManagerOptions.SchedulerType 
getSchedulerType(Configuration configuration) {
+        if (isDeclarativeSchedulerEnabled(configuration)) {
+            return JobManagerOptions.SchedulerType.Declarative;
+        } else {
+            return configuration.get(JobManagerOptions.SCHEDULER);
+        }
+    }
+
+    public static boolean isDeclarativeSchedulerEnabled(Configuration 
configuration) {
+        if (configuration.contains(JobManagerOptions.SCHEDULER)) {
+            return configuration.get(JobManagerOptions.SCHEDULER)
+                    == JobManagerOptions.SchedulerType.Declarative;
+        } else {
+            return 
System.getProperties().containsKey("flink.tests.enable-declarative-scheduler");
+        }

Review comment:
       Let's try to quickly get rid of this logic. I find this a bit confusing.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
##########
@@ -197,20 +196,21 @@ private void signalPayloadRelease(Throwable cause) {
     }
 
     private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
-        terminalStateFuture.whenComplete(
-                (Object ignored, Throwable throwable) -> {
-                    if (state == State.RELEASING) {
-                        slotOwner.returnLogicalSlot(this);
-                    }
-
-                    markReleased();
-
-                    if (throwable != null) {
-                        releaseFuture.completeExceptionally(throwable);
-                    } else {
-                        releaseFuture.complete(null);
-                    }
-                });
+        FutureUtils.assertNoException(
+                terminalStateFuture.whenComplete(

Review comment:
       Can `terminalStateFuture` be completed exceptionally? If yes, then we 
need a `handle` call in order to consume the valid exception in order to let 
`FutureUtils.assertNoException` not fail for valid exceptions.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import 
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Builder for {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerBuilder {
+    private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+
+    private final JobGraph jobGraph;
+
+    private final ComponentMainThreadExecutor mainThreadExecutor;
+
+    private Executor ioExecutor = TestingUtils.defaultExecutor();
+    private Configuration jobMasterConfiguration = new Configuration();
+    private ScheduledExecutorService futureExecutor = 
TestingUtils.defaultExecutor();
+    private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+    private CheckpointRecoveryFactory checkpointRecoveryFactory =
+            new StandaloneCheckpointRecoveryFactory();
+    private DeclarativeSlotPool declarativeSlotPool;
+    private Time rpcTimeout = DEFAULT_TIMEOUT;
+    private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+    private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+            
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+    private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+    private JobMasterPartitionTracker partitionTracker = 
NoOpJobMasterPartitionTracker.INSTANCE;
+    private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+            NoRestartBackoffTimeStrategy.INSTANCE;
+    private JobStatusListener jobStatusListener = (ignoredA, ignoredB, 
ignoredC, ignoredD) -> {};
+
+    public DeclarativeSchedulerBuilder(
+            final JobGraph jobGraph, ComponentMainThreadExecutor 
mainThreadExecutor) {
+        this.jobGraph = jobGraph;
+        this.mainThreadExecutor = mainThreadExecutor;
+
+        this.declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        DEFAULT_TIMEOUT,
+                        rpcTimeout);
+    }
+
+    public DeclarativeSchedulerBuilder setIoExecutor(final Executor 
ioExecutor) {
+        this.ioExecutor = ioExecutor;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobMasterConfiguration(
+            final Configuration jobMasterConfiguration) {
+        this.jobMasterConfiguration = jobMasterConfiguration;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setFutureExecutor(
+            final ScheduledExecutorService futureExecutor) {
+        this.futureExecutor = futureExecutor;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setUserCodeLoader(final ClassLoader 
userCodeLoader) {
+        this.userCodeLoader = userCodeLoader;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setCheckpointRecoveryFactory(
+            final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+        this.rpcTimeout = rpcTimeout;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setBlobWriter(final BlobWriter 
blobWriter) {
+        this.blobWriter = blobWriter;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobManagerJobMetricGroup(
+            final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setShuffleMaster(final ShuffleMaster<?> 
shuffleMaster) {
+        this.shuffleMaster = shuffleMaster;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setPartitionTracker(
+            final JobMasterPartitionTracker partitionTracker) {
+        this.partitionTracker = partitionTracker;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setDeclarativeSlotPool(
+            DeclarativeSlotPool declarativeSlotPool) {
+        this.declarativeSlotPool = declarativeSlotPool;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setRestartBackoffTimeStrategy(
+            final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+        return this;
+    }
+
+    public DeclarativeSchedulerBuilder setJobStatusListener(JobStatusListener 
jobStatusListener) {
+        this.jobStatusListener = jobStatusListener;
+        return this;
+    }
+
+    public DeclarativeScheduler build() throws Exception {
+        return new DeclarativeScheduler(
+                jobGraph,
+                jobMasterConfiguration,
+                declarativeSlotPool,
+                futureExecutor,
+                ioExecutor,
+                userCodeLoader,
+                checkpointRecoveryFactory,
+                rpcTimeout,
+                blobWriter,
+                jobManagerJobMetricGroup,
+                shuffleMaster,
+                partitionTracker,
+                restartBackoffTimeStrategy,
+                new DefaultExecutionDeploymentTracker(),
+                System.currentTimeMillis(),
+                mainThreadExecutor,
+                error ->
+                        FatalExitExceptionHandler.INSTANCE.uncaughtException(
+                                Thread.currentThread(), error),

Review comment:
       I guess we would want to make the fatal error handler also configurable 
in order to test the `runIfState` method properly.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+    private static final JobVertex JOB_VERTEX;
+
+    static {
+        JOB_VERTEX = new JobVertex("v1");
+        JOB_VERTEX.setParallelism(PARALLELISM);
+        JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+    }
+
+    private final ComponentMainThreadExecutor mainThreadExecutor =
+            new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    @Test
+    public void testInitialState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(scheduler.getState(), instanceOf(Created.class));
+    }
+
+    @Test
+    public void testIsState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        final State state = scheduler.getState();
+
+        assertThat(scheduler.isState(state), is(true));
+        assertThat(scheduler.isState(new DummyState()), is(false));
+    }
+
+    @Test
+    public void testRunIfState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+        assertThat(ran.get(), is(true));
+    }
+
+    @Test
+    public void testRunIfStateWithStateMismatch() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        assertThat(ran.get(), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResources() throws Exception {
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(false));
+
+        offerSlots(
+                declarativeSlotPool, 
createSlotOffersForResourceRequirements(resourceRequirement));
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(true));

Review comment:
       I would split this into two tests.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
##########
@@ -34,14 +35,19 @@
     @Nonnull
     SlotPoolService createSlotPoolService(@Nonnull JobID jobId);
 
-    static SlotPoolServiceFactory fromConfiguration(Configuration 
configuration) {
+    static SlotPoolServiceFactory fromConfiguration(Configuration 
configuration, JobType jobType) {
         final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
         final Time slotIdleTimeout =
                 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
         final Time batchSlotTimeout =
                 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
 
         if 
(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
+            if (ClusterOptions.isDeclarativeSchedulerEnabled(configuration)
+                    && jobType == JobType.STREAMING) {
+                return new DeclarativeSlotPoolServiceFactory(
+                        SystemClock.getInstance(), slotIdleTimeout, 
rpcTimeout);
+            }
 
             return new DeclarativeSlotPoolBridgeServiceFactory(
                     SystemClock.getInstance(), rpcTimeout, slotIdleTimeout, 
batchSlotTimeout);

Review comment:
       Maybe we should create a joint Scheduler-SlotPoolService-Factory. Now 
setting up the right combination of scheduler and slot pool service is spread 
over multiple classes which makes it hard to reason about it.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+    private static final JobVertex JOB_VERTEX;
+
+    static {
+        JOB_VERTEX = new JobVertex("v1");
+        JOB_VERTEX.setParallelism(PARALLELISM);
+        JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+    }
+
+    private final ComponentMainThreadExecutor mainThreadExecutor =
+            new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    @Test
+    public void testInitialState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(scheduler.getState(), instanceOf(Created.class));
+    }
+
+    @Test
+    public void testIsState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        final State state = scheduler.getState();
+
+        assertThat(scheduler.isState(state), is(true));
+        assertThat(scheduler.isState(new DummyState()), is(false));
+    }
+
+    @Test
+    public void testRunIfState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+        assertThat(ran.get(), is(true));
+    }
+
+    @Test
+    public void testRunIfStateWithStateMismatch() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        assertThat(ran.get(), is(false));
+    }

Review comment:
       I think we are missing a case where we test that failures are forwarded 
to the fatal error handler.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.declarative.allocator.JobInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/** {@link JobInformation} created from a {@link JobGraph}. */
+public class JobGraphJobInformation implements JobInformation {
+
+    private final JobGraph jobGraph;
+    private final JobID jobID;
+    private final String name;
+
+    public JobGraphJobInformation(JobGraph jobGraph) {
+        this.jobGraph = jobGraph;
+        this.jobID = jobGraph.getJobID();
+        this.name = jobGraph.getName();
+    }
+
+    @Override
+    public Collection<SlotSharingGroup> getSlotSharingGroups() {
+        return jobGraph.getSlotSharingGroups();
+    }
+
+    @Override
+    public JobInformation.VertexInformation getVertexInformation(JobVertexID 
jobVertexId) {
+        return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+    }
+
+    public JobID getJobID() {
+        return jobID;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    // note: the SlotSharingGroup returned is mutable.

Review comment:
       which `SlotSharingGroup`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##########
@@ -0,0 +1,909 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import 
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import 
org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import 
org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import 
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import 
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Declarative scheduler. */
+public class DeclarativeScheduler
+        implements SchedulerNG,
+                Created.Context,
+                WaitingForResources.Context,
+                Executing.Context,
+                Restarting.Context,
+                Failing.Context,
+                Finished.Context {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeScheduler.class);
+
+    private final JobGraphJobInformation jobInformation;
+
+    private final DeclarativeSlotPool declarativeSlotPool;
+
+    private final long initializationTimestamp;
+
+    private final Configuration configuration;
+    private final ScheduledExecutorService futureExecutor;
+    private final Executor ioExecutor;
+    private final ClassLoader userCodeClassLoader;
+    private final Time rpcTimeout;
+    private final BlobWriter blobWriter;
+    private final ShuffleMaster<?> shuffleMaster;
+    private final JobMasterPartitionTracker partitionTracker;
+    private final ExecutionDeploymentTracker executionDeploymentTracker;
+    private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+
+    private final CompletedCheckpointStore completedCheckpointStore;
+    private final CheckpointIDCounter checkpointIdCounter;
+    private final CheckpointsCleaner checkpointsCleaner;
+
+    private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;

Review comment:
       nit: Why are these two fields grouped together? Looks a bit odd.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.declarative.allocator.JobInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/** {@link JobInformation} created from a {@link JobGraph}. */
+public class JobGraphJobInformation implements JobInformation {
+
+    private final JobGraph jobGraph;
+    private final JobID jobID;
+    private final String name;
+
+    public JobGraphJobInformation(JobGraph jobGraph) {
+        this.jobGraph = jobGraph;
+        this.jobID = jobGraph.getJobID();
+        this.name = jobGraph.getName();
+    }
+
+    @Override
+    public Collection<SlotSharingGroup> getSlotSharingGroups() {
+        return jobGraph.getSlotSharingGroups();
+    }
+
+    @Override
+    public JobInformation.VertexInformation getVertexInformation(JobVertexID 
jobVertexId) {
+        return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+    }
+
+    public JobID getJobID() {
+        return jobID;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    // note: the SlotSharingGroup returned is mutable.
+    public Iterable<JobInformation.VertexInformation> getVertices() {
+        return jobGraphVerticesToVertexInformation(jobGraph.getVertices());
+    }
+
+    public static Iterable<JobInformation.VertexInformation> 
jobGraphVerticesToVertexInformation(
+            Iterable<JobVertex> verticesIterable) {
+        return () -> {
+            Iterator<JobVertex> iterator = verticesIterable.iterator();
+            return new Iterator<JobInformation.VertexInformation>() {
+                @Override
+                public boolean hasNext() {
+                    return iterator.hasNext();
+                }
+
+                @Override
+                public JobInformation.VertexInformation next() {
+                    return new JobVertexInformation(iterator.next());
+                }
+            };
+        };

Review comment:
       `Iterables.transform` might be simpler.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##########
@@ -0,0 +1,909 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import 
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import 
org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import 
org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import 
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import 
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Declarative scheduler. */

Review comment:
       Maybe add a bit more expressive JavaDocs to this class. We could state 
the overall idea and how it works. Maybe even link to the FLIP and the state 
machine diagram.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##########
@@ -0,0 +1,909 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import 
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import 
org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import 
org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import 
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import 
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Declarative scheduler. */
+public class DeclarativeScheduler
+        implements SchedulerNG,
+                Created.Context,
+                WaitingForResources.Context,
+                Executing.Context,
+                Restarting.Context,
+                Failing.Context,
+                Finished.Context {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DeclarativeScheduler.class);
+
+    private final JobGraphJobInformation jobInformation;
+
+    private final DeclarativeSlotPool declarativeSlotPool;
+
+    private final long initializationTimestamp;
+
+    private final Configuration configuration;
+    private final ScheduledExecutorService futureExecutor;
+    private final Executor ioExecutor;
+    private final ClassLoader userCodeClassLoader;
+    private final Time rpcTimeout;
+    private final BlobWriter blobWriter;
+    private final ShuffleMaster<?> shuffleMaster;
+    private final JobMasterPartitionTracker partitionTracker;
+    private final ExecutionDeploymentTracker executionDeploymentTracker;
+    private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+
+    private final CompletedCheckpointStore completedCheckpointStore;
+    private final CheckpointIDCounter checkpointIdCounter;
+    private final CheckpointsCleaner checkpointsCleaner;
+
+    private final CompletableFuture<Void> terminationFuture = new 
CompletableFuture<>();
+    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
+
+    private final ComponentMainThreadExecutor componentMainThreadExecutor;
+    private final FatalErrorHandler fatalErrorHandler;
+
+    private final JobStatusListener jobStatusListener;
+
+    private final SlotAllocator<?> slotAllocator;
+
+    private final ScaleUpController scaleUpController;
+
+    private State state = new Created(this, LOG);
+
+    public DeclarativeScheduler(
+            JobGraph jobGraph,
+            Configuration configuration,
+            DeclarativeSlotPool declarativeSlotPool,
+            ScheduledExecutorService futureExecutor,
+            Executor ioExecutor,
+            ClassLoader userCodeClassLoader,
+            CheckpointRecoveryFactory checkpointRecoveryFactory,
+            Time rpcTimeout,
+            BlobWriter blobWriter,
+            JobManagerJobMetricGroup jobManagerJobMetricGroup,
+            ShuffleMaster<?> shuffleMaster,
+            JobMasterPartitionTracker partitionTracker,
+            RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+            ExecutionDeploymentTracker executionDeploymentTracker,
+            long initializationTimestamp,
+            ComponentMainThreadExecutor mainThreadExecutor,
+            FatalErrorHandler fatalErrorHandler,
+            JobStatusListener jobStatusListener)
+            throws JobExecutionException {
+
+        ensureFullyPipelinedStreamingJob(jobGraph);
+
+        this.jobInformation = new JobGraphJobInformation(jobGraph);
+        this.declarativeSlotPool = declarativeSlotPool;
+        this.initializationTimestamp = initializationTimestamp;
+        this.configuration = configuration;
+        this.futureExecutor = futureExecutor;
+        this.ioExecutor = ioExecutor;
+        this.userCodeClassLoader = userCodeClassLoader;
+        this.rpcTimeout = rpcTimeout;
+        this.blobWriter = blobWriter;
+        this.shuffleMaster = shuffleMaster;
+        this.partitionTracker = partitionTracker;
+        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+        this.executionDeploymentTracker = executionDeploymentTracker;
+        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        this.fatalErrorHandler = fatalErrorHandler;
+        this.completedCheckpointStore =
+                
SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
+                        jobGraph,
+                        configuration,
+                        userCodeClassLoader,
+                        checkpointRecoveryFactory,
+                        LOG);
+        this.checkpointIdCounter =
+                
SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
+                        jobGraph, checkpointRecoveryFactory);
+        this.checkpointsCleaner = new CheckpointsCleaner();
+
+        this.slotAllocator =
+                new SlotSharingSlotAllocator(
+                        declarativeSlotPool::reserveFreeSlot,
+                        declarativeSlotPool::freeReservedSlot);
+
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            if (vertex.getParallelism() == 
ExecutionConfig.PARALLELISM_DEFAULT) {
+                vertex.setParallelism(1);
+            }
+        }
+
+        
declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
+
+        this.componentMainThreadExecutor = mainThreadExecutor;
+        this.jobStatusListener = jobStatusListener;
+
+        this.scaleUpController = new ReactiveScaleUpController(configuration);
+    }
+
+    private static void ensureFullyPipelinedStreamingJob(JobGraph jobGraph)
+            throws RuntimeException {
+        Preconditions.checkState(
+                jobGraph.getJobType() == JobType.STREAMING,
+                "The declarative scheduler only supports streaming jobs.");
+        Preconditions.checkState(
+                jobGraph.getScheduleMode()
+                        != 
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+                "The declarative schedules does not support batch slot 
requests.");
+
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            for (JobEdge jobEdge : vertex.getInputs()) {
+                Preconditions.checkState(
+                        jobEdge.getSource().getResultType().isPipelined(),
+                        "The declarative scheduler supports pipelined data 
exchanges (violated by %s -> %s).",
+                        jobEdge.getSource().getProducer(),
+                        jobEdge.getTarget().getID());
+            }
+        }
+    }
+
+    private void newResourcesAvailable(Collection<? extends PhysicalSlot> 
physicalSlots) {
+        state.tryRun(
+                ResourceConsumer.class,
+                ResourceConsumer::notifyNewResourcesAvailable,
+                "newResourcesAvailable");
+    }
+
+    @Override
+    public void startScheduling() {
+        state.as(Created.class)
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        "Can only start scheduling when being 
in Created state."))
+                .startScheduling();
+    }
+
+    @Override
+    public void suspend(Throwable cause) {
+        state.suspend(cause);
+    }
+
+    @Override
+    public void cancel() {
+        state.cancel();
+    }
+
+    @Override
+    public CompletableFuture<Void> getTerminationFuture() {
+        return terminationFuture;
+    }
+
+    @Override
+    public void handleGlobalFailure(Throwable cause) {
+        state.handleGlobalFailure(cause);
+    }
+
+    @Override
+    public boolean updateTaskExecutionState(TaskExecutionStateTransition 
taskExecutionState) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                
stateWithExecutionGraph.updateTaskExecutionState(
+                                        taskExecutionState),
+                        "updateTaskExecutionState")
+                .orElse(false);
+    }
+
+    @Override
+    public SerializedInputSplit requestNextInputSplit(
+            JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws 
IOException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.requestNextInputSplit(
+                                        vertexID, executionAttempt),
+                        "requestNextInputSplit")
+                .orElseThrow(
+                        () -> new IOException("Scheduler is currently not 
executing the job."));
+    }
+
+    @Override
+    public ExecutionState requestPartitionState(
+            IntermediateDataSetID intermediateResultId, ResultPartitionID 
resultPartitionId)
+            throws PartitionProducerDisposedException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.requestPartitionState(
+                                        intermediateResultId, 
resultPartitionId),
+                        "requestPartitionState")
+                .orElseThrow(() -> new 
PartitionProducerDisposedException(resultPartitionId));
+    }
+
+    @Override
+    public void notifyPartitionDataAvailable(ResultPartitionID partitionID) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        
stateWithExecutionGraph.notifyPartitionDataAvailable(partitionID),
+                "notifyPartitionDataAvailable");
+    }
+
+    @Override
+    public ArchivedExecutionGraph requestJob() {
+        return state.getJob();
+    }
+
+    @Override
+    public JobStatus requestJobStatus() {
+        return state.getJobStatus();
+    }
+
+    @Override
+    public JobDetails requestJobDetails() {
+        return JobDetails.createDetailsForJob(state.getJob());
+    }
+
+    @Override
+    public KvStateLocation requestKvStateLocation(JobID jobId, String 
registrationName)
+            throws UnknownKvStateLocation, FlinkJobNotFoundException {
+        final Optional<StateWithExecutionGraph> asOptional =
+                state.as(StateWithExecutionGraph.class);
+
+        if (asOptional.isPresent()) {
+            return asOptional.get().requestKvStateLocation(jobId, 
registrationName);
+        } else {
+            throw new UnknownKvStateLocation(registrationName);
+        }
+    }
+
+    @Override
+    public void notifyKvStateRegistered(
+            JobID jobId,
+            JobVertexID jobVertexId,
+            KeyGroupRange keyGroupRange,
+            String registrationName,
+            KvStateID kvStateId,
+            InetSocketAddress kvStateServerAddress)
+            throws FlinkJobNotFoundException {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.notifyKvStateRegistered(
+                                jobId,
+                                jobVertexId,
+                                keyGroupRange,
+                                registrationName,
+                                kvStateId,
+                                kvStateServerAddress),
+                "notifyKvStateRegistered");
+    }
+
+    @Override
+    public void notifyKvStateUnregistered(
+            JobID jobId,
+            JobVertexID jobVertexId,
+            KeyGroupRange keyGroupRange,
+            String registrationName)
+            throws FlinkJobNotFoundException {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.notifyKvStateUnregistered(
+                                jobId, jobVertexId, keyGroupRange, 
registrationName),
+                "notifyKvStateUnregistered");
+    }
+
+    @Override
+    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        
stateWithExecutionGraph.updateAccumulators(accumulatorSnapshot),
+                "updateAccumulators");
+    }
+
+    @Override
+    public CompletableFuture<String> triggerSavepoint(
+            @Nullable String targetDirectory, boolean cancelJob) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.triggerSavepoint(
+                                        targetDirectory, cancelJob),
+                        "triggerSavepoint")
+                .orElse(
+                        FutureUtils.completedExceptionally(
+                                new CheckpointException(
+                                        "The Flink job is currently not 
executing.",
+                                        
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+    }
+
+    @Override
+    public void acknowledgeCheckpoint(
+            JobID jobID,
+            ExecutionAttemptID executionAttemptID,
+            long checkpointId,
+            CheckpointMetrics checkpointMetrics,
+            TaskStateSnapshot checkpointState) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.acknowledgeCheckpoint(
+                                jobID,
+                                executionAttemptID,
+                                checkpointId,
+                                checkpointMetrics,
+                                checkpointState),
+                "acknowledgeCheckpoint");
+    }
+
+    @Override
+    public void reportCheckpointMetrics(
+            JobID jobID,
+            ExecutionAttemptID executionAttemptID,
+            long checkpointId,
+            CheckpointMetrics checkpointMetrics) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph ->
+                        stateWithExecutionGraph.reportCheckpointMetrics(
+                                executionAttemptID, checkpointId, 
checkpointMetrics),
+                "reportCheckpointMetrics");
+    }
+
+    @Override
+    public void declineCheckpoint(DeclineCheckpoint decline) {
+        state.tryRun(
+                StateWithExecutionGraph.class,
+                stateWithExecutionGraph -> 
stateWithExecutionGraph.declineCheckpoint(decline),
+                "declineCheckpoint");
+    }
+
+    @Override
+    public CompletableFuture<String> stopWithSavepoint(
+            String targetDirectory, boolean advanceToEndOfEventTime) {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                stateWithExecutionGraph.stopWithSavepoint(
+                                        targetDirectory, 
advanceToEndOfEventTime),
+                        "stopWithSavepoint")
+                .orElse(
+                        FutureUtils.completedExceptionally(
+                                new CheckpointException(
+                                        "The Flink job is currently not 
executing.",
+                                        
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+    }
+
+    @Override
+    public void deliverOperatorEventToCoordinator(
+            ExecutionAttemptID taskExecution, OperatorID operator, 
OperatorEvent evt)
+            throws FlinkException {
+        final StateWithExecutionGraph stateWithExecutionGraph =
+                state.as(StateWithExecutionGraph.class)
+                        .orElseThrow(
+                                () ->
+                                        new TaskNotRunningException(
+                                                "Task is not known or in state 
running on the JobManager."));
+
+        
stateWithExecutionGraph.deliverOperatorEventToCoordinator(taskExecution, 
operator, evt);
+    }
+
+    @Override
+    public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoordinator(
+            OperatorID operator, CoordinationRequest request) throws 
FlinkException {
+        return state.tryCall(
+                        StateWithExecutionGraph.class,
+                        stateWithExecutionGraph ->
+                                
stateWithExecutionGraph.deliverCoordinationRequestToCoordinator(
+                                        operator, request),
+                        "deliverCoordinationRequestToCoordinator")
+                .orElseGet(
+                        () ->
+                                FutureUtils.completedExceptionally(
+                                        new FlinkException(
+                                                "Coordinator of operator "
+                                                        + operator
+                                                        + " does not exist")));
+    }
+
+    // ----------------------------------------------------------------
+
+    @Override
+    public boolean hasEnoughResources(ResourceCounter desiredResources) {
+        final Collection<? extends SlotInfo> allSlots =
+                declarativeSlotPool.getFreeSlotsInformation();
+        ResourceCounter outstandingResources = desiredResources;
+
+        final Iterator<? extends SlotInfo> slotIterator = allSlots.iterator();
+
+        while (!outstandingResources.isEmpty() && slotIterator.hasNext()) {
+            final SlotInfo slotInfo = slotIterator.next();
+            final ResourceProfile resourceProfile = 
slotInfo.getResourceProfile();
+
+            if (outstandingResources.containsResource(resourceProfile)) {
+                outstandingResources = 
outstandingResources.subtract(resourceProfile, 1);
+            } else {
+                outstandingResources = 
outstandingResources.subtract(ResourceProfile.UNKNOWN, 1);
+            }
+        }
+
+        return outstandingResources.isEmpty();
+    }
+
+    private <T extends VertexParallelism>
+            ParallelismAndResourceAssignments 
determineParallelismAndAssignResources(
+                    SlotAllocator<T> slotAllocator) throws 
JobExecutionException {
+
+        final T vertexParallelism =
+                slotAllocator
+                        .determineParallelism(
+                                jobInformation, 
declarativeSlotPool.getFreeSlotsInformation())
+                        .orElseThrow(
+                                () ->
+                                        new JobExecutionException(
+                                                jobInformation.getJobID(),
+                                                "Not enough resources 
available for scheduling."));
+
+        final Map<ExecutionVertexID, LogicalSlot> slotAssignments =
+                slotAllocator.reserveResources(vertexParallelism);
+
+        return new ParallelismAndResourceAssignments(
+                slotAssignments, 
vertexParallelism.getMaxParallelismForVertices());
+    }
+
+    @Override
+    public ExecutionGraph createExecutionGraphWithAvailableResources() throws 
Exception {
+        final ParallelismAndResourceAssignments 
parallelismAndResourceAssignments =
+                determineParallelismAndAssignResources(slotAllocator);
+
+        JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
+        for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+            
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
+        }
+
+        final ExecutionGraph executionGraph = 
createExecutionGraphAndRestoreState(adjustedJobGraph);
+
+        executionGraph.start(componentMainThreadExecutor);
+        executionGraph.transitionToRunning();
+
+        executionGraph.setInternalTaskFailuresListener(
+                new UpdateSchedulerNgOnInternalFailuresListener(this, 
jobInformation.getJobID()));
+
+        for (ExecutionVertex executionVertex : 
executionGraph.getAllExecutionVertices()) {
+            final LogicalSlot assignedSlot =
+                    
parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID());
+            executionVertex
+                    .getCurrentExecutionAttempt()
+                    
.registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false);
+            executionVertex.tryAssignResource(assignedSlot);
+        }
+        return executionGraph;
+    }
+
+    private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph 
adjustedJobGraph)
+            throws Exception {
+        ExecutionDeploymentListener executionDeploymentListener =
+                new 
ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
+        ExecutionStateUpdateListener executionStateUpdateListener =
+                (execution, newState) -> {
+                    if (newState.isTerminal()) {
+                        
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
+                    }
+                };
+
+        final ExecutionGraph newExecutionGraph =
+                ExecutionGraphBuilder.buildGraph(
+                        adjustedJobGraph,
+                        configuration,
+                        futureExecutor,
+                        ioExecutor,
+                        userCodeClassLoader,
+                        completedCheckpointStore,
+                        checkpointsCleaner,
+                        checkpointIdCounter,
+                        rpcTimeout,
+                        jobManagerJobMetricGroup,
+                        blobWriter,
+                        LOG,
+                        shuffleMaster,
+                        partitionTracker,
+                        executionDeploymentListener,
+                        executionStateUpdateListener,
+                        initializationTimestamp);
+
+        final CheckpointCoordinator checkpointCoordinator =
+                newExecutionGraph.getCheckpointCoordinator();
+
+        if (checkpointCoordinator != null) {
+            // check whether we find a valid checkpoint
+            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
+                    new 
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+
+                // check whether we can restore from a savepoint
+                tryRestoreExecutionGraphFromSavepoint(
+                        newExecutionGraph, 
adjustedJobGraph.getSavepointRestoreSettings());
+            }
+        }
+
+        return newExecutionGraph;
+    }
+
+    /**
+     * Tries to restore the given {@link ExecutionGraph} from the provided 
{@link
+     * SavepointRestoreSettings}.

Review comment:
       Maybe add that we only try to restore the EG if checkpointing has been 
enabled.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
##########
@@ -34,7 +34,20 @@
 import java.util.List;
 import java.util.Map;
 
-/** Shared slot implementation for the declarative scheduler. */
+/**
+ * Shared slot implementation for the declarative scheduler.
+ *
+ * <p>The release process of a shared slot follows one of 2 code paths:
+ *
+ * <p>1)During normal execution all allocated logical slots will be returned, 
with the last return

Review comment:
       ```suggestion
    * <p>1) During normal execution all allocated logical slots will be 
returned, with the last return
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+    private static final JobVertex JOB_VERTEX;
+
+    static {
+        JOB_VERTEX = new JobVertex("v1");
+        JOB_VERTEX.setParallelism(PARALLELISM);
+        JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+    }
+
+    private final ComponentMainThreadExecutor mainThreadExecutor =
+            new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    @Test
+    public void testInitialState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(scheduler.getState(), instanceOf(Created.class));
+    }
+
+    @Test
+    public void testIsState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        final State state = scheduler.getState();
+
+        assertThat(scheduler.isState(state), is(true));
+        assertThat(scheduler.isState(new DummyState()), is(false));
+    }
+
+    @Test
+    public void testRunIfState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+        assertThat(ran.get(), is(true));
+    }
+
+    @Test
+    public void testRunIfStateWithStateMismatch() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        assertThat(ran.get(), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResources() throws Exception {
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(false));
+
+        offerSlots(
+                declarativeSlotPool, 
createSlotOffersForResourceRequirements(resourceRequirement));
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(true));
+    }
+
+    @Test
+    public void testExecutionGraphGenerationWithAvailableResources() throws 
Exception {
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final int numAvailableSlots = 1;
+
+        offerSlots(
+                declarativeSlotPool,
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numAvailableSlots)));
+
+        final ExecutionGraph executionGraph =
+                scheduler.createExecutionGraphWithAvailableResources();
+
+        assertThat(
+                
executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+                is(numAvailableSlots));
+    }

Review comment:
       Maybe also test that we try to restore from a given savepoint.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
##########
@@ -34,7 +34,20 @@
 import java.util.List;
 import java.util.Map;
 
-/** Shared slot implementation for the declarative scheduler. */
+/**
+ * Shared slot implementation for the declarative scheduler.
+ *
+ * <p>The release process of a shared slot follows one of 2 code paths:
+ *
+ * <p>1)During normal execution all allocated logical slots will be returned, 
with the last return
+ * triggering the {@code externalReleaseCallback} which must eventually result 
in a {@link
+ * #release(Throwable)} call. 2)

Review comment:
       ```suggestion
    * #release(Throwable)} call.
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
##########
@@ -166,6 +168,48 @@ public void 
testReleaseForbidsSubsequentLogicalSlotAllocations() {
         sharedSlot.allocateLogicalSlot();
     }
 
+    @Test
+    public void testCanReturnLogicalSlotDuringRelease() {
+        final TestingPhysicalSlot physicalSlot = 
TestingPhysicalSlot.builder().build();
+        final SharedSlot sharedSlot =
+                new SharedSlot(new SlotRequestId(), physicalSlot, false, () -> 
{});
+        final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+        final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+        // both slots try to release the other one, simulating that the 
failure of one execution due
+        // to the release also fails others
+        logicalSlot1.tryAssignPayload(
+                new TestLogicalSlotPayload(
+                        cause -> {
+                            if (logicalSlot2.isAlive()) {
+                                logicalSlot2.releaseSlot(cause);
+                            }
+                        }));
+        logicalSlot2.tryAssignPayload(
+                new TestLogicalSlotPayload(
+                        cause -> {
+                            if (logicalSlot1.isAlive()) {
+                                logicalSlot1.releaseSlot(cause);
+                            }
+                        }));
+
+        sharedSlot.release(new Exception("test"));

Review comment:
       What are we asserting here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
##########
@@ -106,14 +107,17 @@
                                 context.getMainThreadExecutor()));
     }
 
+    @VisibleForTesting

Review comment:
       I guess it should be `protected` and `@VisibleForTesting`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+    private static final JobVertex JOB_VERTEX;
+
+    static {
+        JOB_VERTEX = new JobVertex("v1");
+        JOB_VERTEX.setParallelism(PARALLELISM);
+        JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+    }
+
+    private final ComponentMainThreadExecutor mainThreadExecutor =
+            new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    @Test
+    public void testInitialState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(scheduler.getState(), instanceOf(Created.class));
+    }
+
+    @Test
+    public void testIsState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        final State state = scheduler.getState();
+
+        assertThat(scheduler.isState(state), is(true));
+        assertThat(scheduler.isState(new DummyState()), is(false));
+    }
+
+    @Test
+    public void testRunIfState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+        assertThat(ran.get(), is(true));
+    }
+
+    @Test
+    public void testRunIfStateWithStateMismatch() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        assertThat(ran.get(), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResources() throws Exception {
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(false));
+
+        offerSlots(
+                declarativeSlotPool, 
createSlotOffersForResourceRequirements(resourceRequirement));
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(true));
+    }
+
+    @Test
+    public void testExecutionGraphGenerationWithAvailableResources() throws 
Exception {
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final int numAvailableSlots = 1;
+
+        offerSlots(
+                declarativeSlotPool,
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numAvailableSlots)));
+
+        final ExecutionGraph executionGraph =
+                scheduler.createExecutionGraphWithAvailableResources();
+
+        assertThat(
+                
executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+                is(numAvailableSlots));
+    }
+
+    @Test
+    public void testStartScheduling() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        scheduler.startScheduling();
+
+        assertThat(scheduler.getState(), 
instanceOf(WaitingForResources.class));
+    }
+
+    @Test
+    public void testStartSchedulingSetsResourceRequirements() throws Exception 
{
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        assertThat(
+                declarativeSlotPool.getResourceRequirements(),
+                contains(ResourceRequirement.create(ResourceProfile.UNKNOWN, 
PARALLELISM)));
+    }
+
+    /** Tests that the listener for new slots is properly set up. */
+    @Test
+    public void testResourceAcquisitionTriggersExecution() throws Exception {

Review comment:
       ```suggestion
       public void testResourceAcquisitionTriggersJobExecution() throws 
Exception {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+    private static final JobVertex JOB_VERTEX;
+
+    static {
+        JOB_VERTEX = new JobVertex("v1");
+        JOB_VERTEX.setParallelism(PARALLELISM);
+        JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+    }
+
+    private final ComponentMainThreadExecutor mainThreadExecutor =
+            new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    @Test
+    public void testInitialState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(scheduler.getState(), instanceOf(Created.class));
+    }
+
+    @Test
+    public void testIsState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        final State state = scheduler.getState();
+
+        assertThat(scheduler.isState(state), is(true));
+        assertThat(scheduler.isState(new DummyState()), is(false));
+    }
+
+    @Test
+    public void testRunIfState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+        assertThat(ran.get(), is(true));
+    }
+
+    @Test
+    public void testRunIfStateWithStateMismatch() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        assertThat(ran.get(), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResources() throws Exception {
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(false));
+
+        offerSlots(
+                declarativeSlotPool, 
createSlotOffersForResourceRequirements(resourceRequirement));
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(true));
+    }
+
+    @Test
+    public void testExecutionGraphGenerationWithAvailableResources() throws 
Exception {
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final int numAvailableSlots = 1;
+
+        offerSlots(
+                declarativeSlotPool,
+                createSlotOffersForResourceRequirements(
+                        ResourceCounter.withResource(ResourceProfile.UNKNOWN, 
numAvailableSlots)));
+
+        final ExecutionGraph executionGraph =
+                scheduler.createExecutionGraphWithAvailableResources();
+
+        assertThat(
+                
executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+                is(numAvailableSlots));
+    }
+
+    @Test
+    public void testStartScheduling() throws Exception {

Review comment:
       ```suggestion
       public void testStartSchedulingTransitionsToWaitingForResources() throws 
Exception {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import 
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+    private static final int PARALLELISM = 4;
+    private static final JobVertex JOB_VERTEX;
+
+    static {
+        JOB_VERTEX = new JobVertex("v1");
+        JOB_VERTEX.setParallelism(PARALLELISM);
+        JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+    }
+
+    private final ComponentMainThreadExecutor mainThreadExecutor =
+            new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    @Test
+    public void testInitialState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        assertThat(scheduler.getState(), instanceOf(Created.class));
+    }
+
+    @Test
+    public void testIsState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        final State state = scheduler.getState();
+
+        assertThat(scheduler.isState(state), is(true));
+        assertThat(scheduler.isState(new DummyState()), is(false));
+    }
+
+    @Test
+    public void testRunIfState() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+        assertThat(ran.get(), is(true));
+    }
+
+    @Test
+    public void testRunIfStateWithStateMismatch() throws Exception {
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(getJobGraph(), 
mainThreadExecutor).build();
+
+        AtomicBoolean ran = new AtomicBoolean(false);
+        scheduler.runIfState(new DummyState(), () -> ran.set(true));
+        assertThat(ran.get(), is(false));
+    }
+
+    @Test
+    public void testHasEnoughResources() throws Exception {
+        final JobGraph jobGraph = getJobGraph();
+
+        final DefaultDeclarativeSlotPool declarativeSlotPool =
+                new DefaultDeclarativeSlotPool(
+                        jobGraph.getJobID(),
+                        new DefaultAllocatedSlotPool(),
+                        ignored -> {},
+                        Time.minutes(10),
+                        Time.minutes(10));
+
+        final DeclarativeScheduler scheduler =
+                new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .build();
+
+        scheduler.startScheduling();
+
+        final ResourceCounter resourceRequirement =
+                ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(false));
+
+        offerSlots(
+                declarativeSlotPool, 
createSlotOffersForResourceRequirements(resourceRequirement));
+
+        assertThat(scheduler.hasEnoughResources(resourceRequirement), 
is(true));

Review comment:
       I think we should also test that non matched resources decrease 
`UNKNOWN` resources.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
##########
@@ -54,16 +57,29 @@ public void createSchedulerNGFactoryIfConfigured() {
     @Test
     public void throwsExceptionIfSchedulerNameIsInvalid() {
         final Configuration configuration = new Configuration();
-        configuration.setString(JobManagerOptions.SCHEDULER, 
"invalid-scheduler-name");
+        configuration.setString(JobManagerOptions.SCHEDULER.key(), 
"invalid-scheduler-name");
 
         try {
             createSchedulerNGFactory(configuration);
         } catch (IllegalArgumentException e) {
-            assertThat(e.getMessage(), containsString("Illegal value 
[invalid-scheduler-name]"));
+            assertThat(
+                    e.getMessage(),
+                    containsString("Could not parse value 
'invalid-scheduler-name'"));
         }
     }
 
+    @Test
+    public void fallBackIfBatchAndDeclarative() {

Review comment:
       fall back to what? It is a good idea to state in the test name what the 
intended behaviour is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to