tillrohrmann commented on a change in pull request #14921:
URL: https://github.com/apache/flink/pull/14921#discussion_r576321108
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -132,8 +132,28 @@
"Defines whether the cluster uses fine-grained
resource management.");
public static boolean isDeclarativeResourceManagementEnabled(Configuration
configuration) {
- return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT)
- &&
!System.getProperties().containsKey("flink.tests.disable-declarative");
+ if (configuration.contains(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT)) {
+ return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT);
+ } else {
+ return
!System.getProperties().containsKey("flink.tests.disable-declarative");
+ }
+ }
+
+ public static JobManagerOptions.SchedulerType
getSchedulerType(Configuration configuration) {
+ if (isDeclarativeSchedulerEnabled(configuration)) {
+ return JobManagerOptions.SchedulerType.Declarative;
+ } else {
+ return configuration.get(JobManagerOptions.SCHEDULER);
+ }
+ }
+
+ public static boolean isDeclarativeSchedulerEnabled(Configuration
configuration) {
+ if (configuration.contains(JobManagerOptions.SCHEDULER)) {
+ return configuration.get(JobManagerOptions.SCHEDULER)
+ == JobManagerOptions.SchedulerType.Declarative;
+ } else {
+ return
System.getProperties().containsKey("flink.tests.enable-declarative-scheduler");
+ }
Review comment:
Let's try to quickly get rid of this logic. I find this a bit confusing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.java
##########
@@ -197,20 +196,21 @@ private void signalPayloadRelease(Throwable cause) {
}
private void returnSlotToOwner(CompletableFuture<?> terminalStateFuture) {
- terminalStateFuture.whenComplete(
- (Object ignored, Throwable throwable) -> {
- if (state == State.RELEASING) {
- slotOwner.returnLogicalSlot(this);
- }
-
- markReleased();
-
- if (throwable != null) {
- releaseFuture.completeExceptionally(throwable);
- } else {
- releaseFuture.complete(null);
- }
- });
+ FutureUtils.assertNoException(
+ terminalStateFuture.whenComplete(
Review comment:
Can `terminalStateFuture` be completed exceptionally? If yes, then we
need a `handle` call in order to consume the valid exception in order to let
`FutureUtils.assertNoException` not fail for valid exceptions.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerBuilder.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import
org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.FatalExitExceptionHandler;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+/** Builder for {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerBuilder {
+ private static final Time DEFAULT_TIMEOUT = Time.seconds(300);
+
+ private final JobGraph jobGraph;
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
+ private Executor ioExecutor = TestingUtils.defaultExecutor();
+ private Configuration jobMasterConfiguration = new Configuration();
+ private ScheduledExecutorService futureExecutor =
TestingUtils.defaultExecutor();
+ private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
+ private CheckpointRecoveryFactory checkpointRecoveryFactory =
+ new StandaloneCheckpointRecoveryFactory();
+ private DeclarativeSlotPool declarativeSlotPool;
+ private Time rpcTimeout = DEFAULT_TIMEOUT;
+ private BlobWriter blobWriter = VoidBlobWriter.getInstance();
+ private JobManagerJobMetricGroup jobManagerJobMetricGroup =
+
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
+ private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
+ private JobMasterPartitionTracker partitionTracker =
NoOpJobMasterPartitionTracker.INSTANCE;
+ private RestartBackoffTimeStrategy restartBackoffTimeStrategy =
+ NoRestartBackoffTimeStrategy.INSTANCE;
+ private JobStatusListener jobStatusListener = (ignoredA, ignoredB,
ignoredC, ignoredD) -> {};
+
+ public DeclarativeSchedulerBuilder(
+ final JobGraph jobGraph, ComponentMainThreadExecutor
mainThreadExecutor) {
+ this.jobGraph = jobGraph;
+ this.mainThreadExecutor = mainThreadExecutor;
+
+ this.declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ DEFAULT_TIMEOUT,
+ rpcTimeout);
+ }
+
+ public DeclarativeSchedulerBuilder setIoExecutor(final Executor
ioExecutor) {
+ this.ioExecutor = ioExecutor;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setJobMasterConfiguration(
+ final Configuration jobMasterConfiguration) {
+ this.jobMasterConfiguration = jobMasterConfiguration;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setFutureExecutor(
+ final ScheduledExecutorService futureExecutor) {
+ this.futureExecutor = futureExecutor;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setUserCodeLoader(final ClassLoader
userCodeLoader) {
+ this.userCodeLoader = userCodeLoader;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setCheckpointRecoveryFactory(
+ final CheckpointRecoveryFactory checkpointRecoveryFactory) {
+ this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setRpcTimeout(final Time rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setBlobWriter(final BlobWriter
blobWriter) {
+ this.blobWriter = blobWriter;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setJobManagerJobMetricGroup(
+ final JobManagerJobMetricGroup jobManagerJobMetricGroup) {
+ this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setShuffleMaster(final ShuffleMaster<?>
shuffleMaster) {
+ this.shuffleMaster = shuffleMaster;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setPartitionTracker(
+ final JobMasterPartitionTracker partitionTracker) {
+ this.partitionTracker = partitionTracker;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setDeclarativeSlotPool(
+ DeclarativeSlotPool declarativeSlotPool) {
+ this.declarativeSlotPool = declarativeSlotPool;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setRestartBackoffTimeStrategy(
+ final RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
+ this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+ return this;
+ }
+
+ public DeclarativeSchedulerBuilder setJobStatusListener(JobStatusListener
jobStatusListener) {
+ this.jobStatusListener = jobStatusListener;
+ return this;
+ }
+
+ public DeclarativeScheduler build() throws Exception {
+ return new DeclarativeScheduler(
+ jobGraph,
+ jobMasterConfiguration,
+ declarativeSlotPool,
+ futureExecutor,
+ ioExecutor,
+ userCodeLoader,
+ checkpointRecoveryFactory,
+ rpcTimeout,
+ blobWriter,
+ jobManagerJobMetricGroup,
+ shuffleMaster,
+ partitionTracker,
+ restartBackoffTimeStrategy,
+ new DefaultExecutionDeploymentTracker(),
+ System.currentTimeMillis(),
+ mainThreadExecutor,
+ error ->
+ FatalExitExceptionHandler.INSTANCE.uncaughtException(
+ Thread.currentThread(), error),
Review comment:
I guess we would want to make the fatal error handler also configurable
in order to test the `runIfState` method properly.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+ private static final JobVertex JOB_VERTEX;
+
+ static {
+ JOB_VERTEX = new JobVertex("v1");
+ JOB_VERTEX.setParallelism(PARALLELISM);
+ JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+ }
+
+ private final ComponentMainThreadExecutor mainThreadExecutor =
+ new
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+ @Test
+ public void testInitialState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(scheduler.getState(), instanceOf(Created.class));
+ }
+
+ @Test
+ public void testIsState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ final State state = scheduler.getState();
+
+ assertThat(scheduler.isState(state), is(true));
+ assertThat(scheduler.isState(new DummyState()), is(false));
+ }
+
+ @Test
+ public void testRunIfState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+ assertThat(ran.get(), is(true));
+ }
+
+ @Test
+ public void testRunIfStateWithStateMismatch() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(new DummyState(), () -> ran.set(true));
+ assertThat(ran.get(), is(false));
+ }
+
+ @Test
+ public void testHasEnoughResources() throws Exception {
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final ResourceCounter resourceRequirement =
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(false));
+
+ offerSlots(
+ declarativeSlotPool,
createSlotOffersForResourceRequirements(resourceRequirement));
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(true));
Review comment:
I would split this into two tests.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
##########
@@ -34,14 +35,19 @@
@Nonnull
SlotPoolService createSlotPoolService(@Nonnull JobID jobId);
- static SlotPoolServiceFactory fromConfiguration(Configuration
configuration) {
+ static SlotPoolServiceFactory fromConfiguration(Configuration
configuration, JobType jobType) {
final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
final Time slotIdleTimeout =
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
final Time batchSlotTimeout =
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
if
(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
+ if (ClusterOptions.isDeclarativeSchedulerEnabled(configuration)
+ && jobType == JobType.STREAMING) {
+ return new DeclarativeSlotPoolServiceFactory(
+ SystemClock.getInstance(), slotIdleTimeout,
rpcTimeout);
+ }
return new DeclarativeSlotPoolBridgeServiceFactory(
SystemClock.getInstance(), rpcTimeout, slotIdleTimeout,
batchSlotTimeout);
Review comment:
Maybe we should create a joint Scheduler-SlotPoolService-Factory. Now
setting up the right combination of scheduler and slot pool service is spread
over multiple classes which makes it hard to reason about it.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+ private static final JobVertex JOB_VERTEX;
+
+ static {
+ JOB_VERTEX = new JobVertex("v1");
+ JOB_VERTEX.setParallelism(PARALLELISM);
+ JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+ }
+
+ private final ComponentMainThreadExecutor mainThreadExecutor =
+ new
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+ @Test
+ public void testInitialState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(scheduler.getState(), instanceOf(Created.class));
+ }
+
+ @Test
+ public void testIsState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ final State state = scheduler.getState();
+
+ assertThat(scheduler.isState(state), is(true));
+ assertThat(scheduler.isState(new DummyState()), is(false));
+ }
+
+ @Test
+ public void testRunIfState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+ assertThat(ran.get(), is(true));
+ }
+
+ @Test
+ public void testRunIfStateWithStateMismatch() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(new DummyState(), () -> ran.set(true));
+ assertThat(ran.get(), is(false));
+ }
Review comment:
I think we are missing a case where we test that failures are forwarded
to the fatal error handler.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.declarative.allocator.JobInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/** {@link JobInformation} created from a {@link JobGraph}. */
+public class JobGraphJobInformation implements JobInformation {
+
+ private final JobGraph jobGraph;
+ private final JobID jobID;
+ private final String name;
+
+ public JobGraphJobInformation(JobGraph jobGraph) {
+ this.jobGraph = jobGraph;
+ this.jobID = jobGraph.getJobID();
+ this.name = jobGraph.getName();
+ }
+
+ @Override
+ public Collection<SlotSharingGroup> getSlotSharingGroups() {
+ return jobGraph.getSlotSharingGroups();
+ }
+
+ @Override
+ public JobInformation.VertexInformation getVertexInformation(JobVertexID
jobVertexId) {
+ return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+ }
+
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ // note: the SlotSharingGroup returned is mutable.
Review comment:
which `SlotSharingGroup`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##########
@@ -0,0 +1,909 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import
org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import
org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Declarative scheduler. */
+public class DeclarativeScheduler
+ implements SchedulerNG,
+ Created.Context,
+ WaitingForResources.Context,
+ Executing.Context,
+ Restarting.Context,
+ Failing.Context,
+ Finished.Context {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DeclarativeScheduler.class);
+
+ private final JobGraphJobInformation jobInformation;
+
+ private final DeclarativeSlotPool declarativeSlotPool;
+
+ private final long initializationTimestamp;
+
+ private final Configuration configuration;
+ private final ScheduledExecutorService futureExecutor;
+ private final Executor ioExecutor;
+ private final ClassLoader userCodeClassLoader;
+ private final Time rpcTimeout;
+ private final BlobWriter blobWriter;
+ private final ShuffleMaster<?> shuffleMaster;
+ private final JobMasterPartitionTracker partitionTracker;
+ private final ExecutionDeploymentTracker executionDeploymentTracker;
+ private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+
+ private final CompletedCheckpointStore completedCheckpointStore;
+ private final CheckpointIDCounter checkpointIdCounter;
+ private final CheckpointsCleaner checkpointsCleaner;
+
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+ private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
Review comment:
nit: Why are these two fields grouped together? Looks a bit odd.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/JobGraphJobInformation.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.declarative.allocator.JobInformation;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/** {@link JobInformation} created from a {@link JobGraph}. */
+public class JobGraphJobInformation implements JobInformation {
+
+ private final JobGraph jobGraph;
+ private final JobID jobID;
+ private final String name;
+
+ public JobGraphJobInformation(JobGraph jobGraph) {
+ this.jobGraph = jobGraph;
+ this.jobID = jobGraph.getJobID();
+ this.name = jobGraph.getName();
+ }
+
+ @Override
+ public Collection<SlotSharingGroup> getSlotSharingGroups() {
+ return jobGraph.getSlotSharingGroups();
+ }
+
+ @Override
+ public JobInformation.VertexInformation getVertexInformation(JobVertexID
jobVertexId) {
+ return new JobVertexInformation(jobGraph.findVertexByID(jobVertexId));
+ }
+
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ // note: the SlotSharingGroup returned is mutable.
+ public Iterable<JobInformation.VertexInformation> getVertices() {
+ return jobGraphVerticesToVertexInformation(jobGraph.getVertices());
+ }
+
+ public static Iterable<JobInformation.VertexInformation>
jobGraphVerticesToVertexInformation(
+ Iterable<JobVertex> verticesIterable) {
+ return () -> {
+ Iterator<JobVertex> iterator = verticesIterable.iterator();
+ return new Iterator<JobInformation.VertexInformation>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public JobInformation.VertexInformation next() {
+ return new JobVertexInformation(iterator.next());
+ }
+ };
+ };
Review comment:
`Iterables.transform` might be simpler.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##########
@@ -0,0 +1,909 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import
org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import
org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Declarative scheduler. */
Review comment:
Maybe add a bit more expressive JavaDocs to this class. We could state
the overall idea and how it works. Maybe even link to the FLIP and the state
machine diagram.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeScheduler.java
##########
@@ -0,0 +1,909 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+import org.apache.flink.runtime.blob.BlobWriter;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionDeploymentListener;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionStateUpdateListener;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobEdge;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
+import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker;
+import
org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
+import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
+import org.apache.flink.runtime.scheduler.SchedulerNG;
+import org.apache.flink.runtime.scheduler.SchedulerUtils;
+import
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
+import org.apache.flink.runtime.scheduler.declarative.allocator.SlotAllocator;
+import
org.apache.flink.runtime.scheduler.declarative.allocator.SlotSharingSlotAllocator;
+import
org.apache.flink.runtime.scheduler.declarative.allocator.VertexParallelism;
+import
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ReactiveScaleUpController;
+import
org.apache.flink.runtime.scheduler.declarative.scalingpolicy.ScaleUpController;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/** Declarative scheduler. */
+public class DeclarativeScheduler
+ implements SchedulerNG,
+ Created.Context,
+ WaitingForResources.Context,
+ Executing.Context,
+ Restarting.Context,
+ Failing.Context,
+ Finished.Context {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DeclarativeScheduler.class);
+
+ private final JobGraphJobInformation jobInformation;
+
+ private final DeclarativeSlotPool declarativeSlotPool;
+
+ private final long initializationTimestamp;
+
+ private final Configuration configuration;
+ private final ScheduledExecutorService futureExecutor;
+ private final Executor ioExecutor;
+ private final ClassLoader userCodeClassLoader;
+ private final Time rpcTimeout;
+ private final BlobWriter blobWriter;
+ private final ShuffleMaster<?> shuffleMaster;
+ private final JobMasterPartitionTracker partitionTracker;
+ private final ExecutionDeploymentTracker executionDeploymentTracker;
+ private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
+
+ private final CompletedCheckpointStore completedCheckpointStore;
+ private final CheckpointIDCounter checkpointIdCounter;
+ private final CheckpointsCleaner checkpointsCleaner;
+
+ private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
+ private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
+
+ private final ComponentMainThreadExecutor componentMainThreadExecutor;
+ private final FatalErrorHandler fatalErrorHandler;
+
+ private final JobStatusListener jobStatusListener;
+
+ private final SlotAllocator<?> slotAllocator;
+
+ private final ScaleUpController scaleUpController;
+
+ private State state = new Created(this, LOG);
+
+ public DeclarativeScheduler(
+ JobGraph jobGraph,
+ Configuration configuration,
+ DeclarativeSlotPool declarativeSlotPool,
+ ScheduledExecutorService futureExecutor,
+ Executor ioExecutor,
+ ClassLoader userCodeClassLoader,
+ CheckpointRecoveryFactory checkpointRecoveryFactory,
+ Time rpcTimeout,
+ BlobWriter blobWriter,
+ JobManagerJobMetricGroup jobManagerJobMetricGroup,
+ ShuffleMaster<?> shuffleMaster,
+ JobMasterPartitionTracker partitionTracker,
+ RestartBackoffTimeStrategy restartBackoffTimeStrategy,
+ ExecutionDeploymentTracker executionDeploymentTracker,
+ long initializationTimestamp,
+ ComponentMainThreadExecutor mainThreadExecutor,
+ FatalErrorHandler fatalErrorHandler,
+ JobStatusListener jobStatusListener)
+ throws JobExecutionException {
+
+ ensureFullyPipelinedStreamingJob(jobGraph);
+
+ this.jobInformation = new JobGraphJobInformation(jobGraph);
+ this.declarativeSlotPool = declarativeSlotPool;
+ this.initializationTimestamp = initializationTimestamp;
+ this.configuration = configuration;
+ this.futureExecutor = futureExecutor;
+ this.ioExecutor = ioExecutor;
+ this.userCodeClassLoader = userCodeClassLoader;
+ this.rpcTimeout = rpcTimeout;
+ this.blobWriter = blobWriter;
+ this.shuffleMaster = shuffleMaster;
+ this.partitionTracker = partitionTracker;
+ this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
+ this.executionDeploymentTracker = executionDeploymentTracker;
+ this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+ this.fatalErrorHandler = fatalErrorHandler;
+ this.completedCheckpointStore =
+
SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
+ jobGraph,
+ configuration,
+ userCodeClassLoader,
+ checkpointRecoveryFactory,
+ LOG);
+ this.checkpointIdCounter =
+
SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
+ jobGraph, checkpointRecoveryFactory);
+ this.checkpointsCleaner = new CheckpointsCleaner();
+
+ this.slotAllocator =
+ new SlotSharingSlotAllocator(
+ declarativeSlotPool::reserveFreeSlot,
+ declarativeSlotPool::freeReservedSlot);
+
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ if (vertex.getParallelism() ==
ExecutionConfig.PARALLELISM_DEFAULT) {
+ vertex.setParallelism(1);
+ }
+ }
+
+
declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
+
+ this.componentMainThreadExecutor = mainThreadExecutor;
+ this.jobStatusListener = jobStatusListener;
+
+ this.scaleUpController = new ReactiveScaleUpController(configuration);
+ }
+
+ private static void ensureFullyPipelinedStreamingJob(JobGraph jobGraph)
+ throws RuntimeException {
+ Preconditions.checkState(
+ jobGraph.getJobType() == JobType.STREAMING,
+ "The declarative scheduler only supports streaming jobs.");
+ Preconditions.checkState(
+ jobGraph.getScheduleMode()
+ !=
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+ "The declarative schedules does not support batch slot
requests.");
+
+ for (JobVertex vertex : jobGraph.getVertices()) {
+ for (JobEdge jobEdge : vertex.getInputs()) {
+ Preconditions.checkState(
+ jobEdge.getSource().getResultType().isPipelined(),
+ "The declarative scheduler supports pipelined data
exchanges (violated by %s -> %s).",
+ jobEdge.getSource().getProducer(),
+ jobEdge.getTarget().getID());
+ }
+ }
+ }
+
+ private void newResourcesAvailable(Collection<? extends PhysicalSlot>
physicalSlots) {
+ state.tryRun(
+ ResourceConsumer.class,
+ ResourceConsumer::notifyNewResourcesAvailable,
+ "newResourcesAvailable");
+ }
+
+ @Override
+ public void startScheduling() {
+ state.as(Created.class)
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "Can only start scheduling when being
in Created state."))
+ .startScheduling();
+ }
+
+ @Override
+ public void suspend(Throwable cause) {
+ state.suspend(cause);
+ }
+
+ @Override
+ public void cancel() {
+ state.cancel();
+ }
+
+ @Override
+ public CompletableFuture<Void> getTerminationFuture() {
+ return terminationFuture;
+ }
+
+ @Override
+ public void handleGlobalFailure(Throwable cause) {
+ state.handleGlobalFailure(cause);
+ }
+
+ @Override
+ public boolean updateTaskExecutionState(TaskExecutionStateTransition
taskExecutionState) {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+
stateWithExecutionGraph.updateTaskExecutionState(
+ taskExecutionState),
+ "updateTaskExecutionState")
+ .orElse(false);
+ }
+
+ @Override
+ public SerializedInputSplit requestNextInputSplit(
+ JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws
IOException {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.requestNextInputSplit(
+ vertexID, executionAttempt),
+ "requestNextInputSplit")
+ .orElseThrow(
+ () -> new IOException("Scheduler is currently not
executing the job."));
+ }
+
+ @Override
+ public ExecutionState requestPartitionState(
+ IntermediateDataSetID intermediateResultId, ResultPartitionID
resultPartitionId)
+ throws PartitionProducerDisposedException {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.requestPartitionState(
+ intermediateResultId,
resultPartitionId),
+ "requestPartitionState")
+ .orElseThrow(() -> new
PartitionProducerDisposedException(resultPartitionId));
+ }
+
+ @Override
+ public void notifyPartitionDataAvailable(ResultPartitionID partitionID) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+
stateWithExecutionGraph.notifyPartitionDataAvailable(partitionID),
+ "notifyPartitionDataAvailable");
+ }
+
+ @Override
+ public ArchivedExecutionGraph requestJob() {
+ return state.getJob();
+ }
+
+ @Override
+ public JobStatus requestJobStatus() {
+ return state.getJobStatus();
+ }
+
+ @Override
+ public JobDetails requestJobDetails() {
+ return JobDetails.createDetailsForJob(state.getJob());
+ }
+
+ @Override
+ public KvStateLocation requestKvStateLocation(JobID jobId, String
registrationName)
+ throws UnknownKvStateLocation, FlinkJobNotFoundException {
+ final Optional<StateWithExecutionGraph> asOptional =
+ state.as(StateWithExecutionGraph.class);
+
+ if (asOptional.isPresent()) {
+ return asOptional.get().requestKvStateLocation(jobId,
registrationName);
+ } else {
+ throw new UnknownKvStateLocation(registrationName);
+ }
+ }
+
+ @Override
+ public void notifyKvStateRegistered(
+ JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName,
+ KvStateID kvStateId,
+ InetSocketAddress kvStateServerAddress)
+ throws FlinkJobNotFoundException {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.notifyKvStateRegistered(
+ jobId,
+ jobVertexId,
+ keyGroupRange,
+ registrationName,
+ kvStateId,
+ kvStateServerAddress),
+ "notifyKvStateRegistered");
+ }
+
+ @Override
+ public void notifyKvStateUnregistered(
+ JobID jobId,
+ JobVertexID jobVertexId,
+ KeyGroupRange keyGroupRange,
+ String registrationName)
+ throws FlinkJobNotFoundException {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.notifyKvStateUnregistered(
+ jobId, jobVertexId, keyGroupRange,
registrationName),
+ "notifyKvStateUnregistered");
+ }
+
+ @Override
+ public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+
stateWithExecutionGraph.updateAccumulators(accumulatorSnapshot),
+ "updateAccumulators");
+ }
+
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ @Nullable String targetDirectory, boolean cancelJob) {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.triggerSavepoint(
+ targetDirectory, cancelJob),
+ "triggerSavepoint")
+ .orElse(
+ FutureUtils.completedExceptionally(
+ new CheckpointException(
+ "The Flink job is currently not
executing.",
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+ }
+
+ @Override
+ public void acknowledgeCheckpoint(
+ JobID jobID,
+ ExecutionAttemptID executionAttemptID,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
+ TaskStateSnapshot checkpointState) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.acknowledgeCheckpoint(
+ jobID,
+ executionAttemptID,
+ checkpointId,
+ checkpointMetrics,
+ checkpointState),
+ "acknowledgeCheckpoint");
+ }
+
+ @Override
+ public void reportCheckpointMetrics(
+ JobID jobID,
+ ExecutionAttemptID executionAttemptID,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.reportCheckpointMetrics(
+ executionAttemptID, checkpointId,
checkpointMetrics),
+ "reportCheckpointMetrics");
+ }
+
+ @Override
+ public void declineCheckpoint(DeclineCheckpoint decline) {
+ state.tryRun(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
stateWithExecutionGraph.declineCheckpoint(decline),
+ "declineCheckpoint");
+ }
+
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ String targetDirectory, boolean advanceToEndOfEventTime) {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+ stateWithExecutionGraph.stopWithSavepoint(
+ targetDirectory,
advanceToEndOfEventTime),
+ "stopWithSavepoint")
+ .orElse(
+ FutureUtils.completedExceptionally(
+ new CheckpointException(
+ "The Flink job is currently not
executing.",
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
+ }
+
+ @Override
+ public void deliverOperatorEventToCoordinator(
+ ExecutionAttemptID taskExecution, OperatorID operator,
OperatorEvent evt)
+ throws FlinkException {
+ final StateWithExecutionGraph stateWithExecutionGraph =
+ state.as(StateWithExecutionGraph.class)
+ .orElseThrow(
+ () ->
+ new TaskNotRunningException(
+ "Task is not known or in state
running on the JobManager."));
+
+
stateWithExecutionGraph.deliverOperatorEventToCoordinator(taskExecution,
operator, evt);
+ }
+
+ @Override
+ public CompletableFuture<CoordinationResponse>
deliverCoordinationRequestToCoordinator(
+ OperatorID operator, CoordinationRequest request) throws
FlinkException {
+ return state.tryCall(
+ StateWithExecutionGraph.class,
+ stateWithExecutionGraph ->
+
stateWithExecutionGraph.deliverCoordinationRequestToCoordinator(
+ operator, request),
+ "deliverCoordinationRequestToCoordinator")
+ .orElseGet(
+ () ->
+ FutureUtils.completedExceptionally(
+ new FlinkException(
+ "Coordinator of operator "
+ + operator
+ + " does not exist")));
+ }
+
+ // ----------------------------------------------------------------
+
+ @Override
+ public boolean hasEnoughResources(ResourceCounter desiredResources) {
+ final Collection<? extends SlotInfo> allSlots =
+ declarativeSlotPool.getFreeSlotsInformation();
+ ResourceCounter outstandingResources = desiredResources;
+
+ final Iterator<? extends SlotInfo> slotIterator = allSlots.iterator();
+
+ while (!outstandingResources.isEmpty() && slotIterator.hasNext()) {
+ final SlotInfo slotInfo = slotIterator.next();
+ final ResourceProfile resourceProfile =
slotInfo.getResourceProfile();
+
+ if (outstandingResources.containsResource(resourceProfile)) {
+ outstandingResources =
outstandingResources.subtract(resourceProfile, 1);
+ } else {
+ outstandingResources =
outstandingResources.subtract(ResourceProfile.UNKNOWN, 1);
+ }
+ }
+
+ return outstandingResources.isEmpty();
+ }
+
+ private <T extends VertexParallelism>
+ ParallelismAndResourceAssignments
determineParallelismAndAssignResources(
+ SlotAllocator<T> slotAllocator) throws
JobExecutionException {
+
+ final T vertexParallelism =
+ slotAllocator
+ .determineParallelism(
+ jobInformation,
declarativeSlotPool.getFreeSlotsInformation())
+ .orElseThrow(
+ () ->
+ new JobExecutionException(
+ jobInformation.getJobID(),
+ "Not enough resources
available for scheduling."));
+
+ final Map<ExecutionVertexID, LogicalSlot> slotAssignments =
+ slotAllocator.reserveResources(vertexParallelism);
+
+ return new ParallelismAndResourceAssignments(
+ slotAssignments,
vertexParallelism.getMaxParallelismForVertices());
+ }
+
+ @Override
+ public ExecutionGraph createExecutionGraphWithAvailableResources() throws
Exception {
+ final ParallelismAndResourceAssignments
parallelismAndResourceAssignments =
+ determineParallelismAndAssignResources(slotAllocator);
+
+ JobGraph adjustedJobGraph = jobInformation.copyJobGraph();
+ for (JobVertex vertex : adjustedJobGraph.getVertices()) {
+
vertex.setParallelism(parallelismAndResourceAssignments.getParallelism(vertex.getID()));
+ }
+
+ final ExecutionGraph executionGraph =
createExecutionGraphAndRestoreState(adjustedJobGraph);
+
+ executionGraph.start(componentMainThreadExecutor);
+ executionGraph.transitionToRunning();
+
+ executionGraph.setInternalTaskFailuresListener(
+ new UpdateSchedulerNgOnInternalFailuresListener(this,
jobInformation.getJobID()));
+
+ for (ExecutionVertex executionVertex :
executionGraph.getAllExecutionVertices()) {
+ final LogicalSlot assignedSlot =
+
parallelismAndResourceAssignments.getAssignedSlot(executionVertex.getID());
+ executionVertex
+ .getCurrentExecutionAttempt()
+
.registerProducedPartitions(assignedSlot.getTaskManagerLocation(), false);
+ executionVertex.tryAssignResource(assignedSlot);
+ }
+ return executionGraph;
+ }
+
+ private ExecutionGraph createExecutionGraphAndRestoreState(JobGraph
adjustedJobGraph)
+ throws Exception {
+ ExecutionDeploymentListener executionDeploymentListener =
+ new
ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
+ ExecutionStateUpdateListener executionStateUpdateListener =
+ (execution, newState) -> {
+ if (newState.isTerminal()) {
+
executionDeploymentTracker.stopTrackingDeploymentOf(execution);
+ }
+ };
+
+ final ExecutionGraph newExecutionGraph =
+ ExecutionGraphBuilder.buildGraph(
+ adjustedJobGraph,
+ configuration,
+ futureExecutor,
+ ioExecutor,
+ userCodeClassLoader,
+ completedCheckpointStore,
+ checkpointsCleaner,
+ checkpointIdCounter,
+ rpcTimeout,
+ jobManagerJobMetricGroup,
+ blobWriter,
+ LOG,
+ shuffleMaster,
+ partitionTracker,
+ executionDeploymentListener,
+ executionStateUpdateListener,
+ initializationTimestamp);
+
+ final CheckpointCoordinator checkpointCoordinator =
+ newExecutionGraph.getCheckpointCoordinator();
+
+ if (checkpointCoordinator != null) {
+ // check whether we find a valid checkpoint
+ if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
+ new
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+
+ // check whether we can restore from a savepoint
+ tryRestoreExecutionGraphFromSavepoint(
+ newExecutionGraph,
adjustedJobGraph.getSavepointRestoreSettings());
+ }
+ }
+
+ return newExecutionGraph;
+ }
+
+ /**
+ * Tries to restore the given {@link ExecutionGraph} from the provided
{@link
+ * SavepointRestoreSettings}.
Review comment:
Maybe add that we only try to restore the EG if checkpointing has been
enabled.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
##########
@@ -34,7 +34,20 @@
import java.util.List;
import java.util.Map;
-/** Shared slot implementation for the declarative scheduler. */
+/**
+ * Shared slot implementation for the declarative scheduler.
+ *
+ * <p>The release process of a shared slot follows one of 2 code paths:
+ *
+ * <p>1)During normal execution all allocated logical slots will be returned,
with the last return
Review comment:
```suggestion
* <p>1) During normal execution all allocated logical slots will be
returned, with the last return
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+ private static final JobVertex JOB_VERTEX;
+
+ static {
+ JOB_VERTEX = new JobVertex("v1");
+ JOB_VERTEX.setParallelism(PARALLELISM);
+ JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+ }
+
+ private final ComponentMainThreadExecutor mainThreadExecutor =
+ new
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+ @Test
+ public void testInitialState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(scheduler.getState(), instanceOf(Created.class));
+ }
+
+ @Test
+ public void testIsState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ final State state = scheduler.getState();
+
+ assertThat(scheduler.isState(state), is(true));
+ assertThat(scheduler.isState(new DummyState()), is(false));
+ }
+
+ @Test
+ public void testRunIfState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+ assertThat(ran.get(), is(true));
+ }
+
+ @Test
+ public void testRunIfStateWithStateMismatch() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(new DummyState(), () -> ran.set(true));
+ assertThat(ran.get(), is(false));
+ }
+
+ @Test
+ public void testHasEnoughResources() throws Exception {
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final ResourceCounter resourceRequirement =
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(false));
+
+ offerSlots(
+ declarativeSlotPool,
createSlotOffersForResourceRequirements(resourceRequirement));
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(true));
+ }
+
+ @Test
+ public void testExecutionGraphGenerationWithAvailableResources() throws
Exception {
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final int numAvailableSlots = 1;
+
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN,
numAvailableSlots)));
+
+ final ExecutionGraph executionGraph =
+ scheduler.createExecutionGraphWithAvailableResources();
+
+ assertThat(
+
executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+ is(numAvailableSlots));
+ }
Review comment:
Maybe also test that we try to restore from a given savepoint.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlot.java
##########
@@ -34,7 +34,20 @@
import java.util.List;
import java.util.Map;
-/** Shared slot implementation for the declarative scheduler. */
+/**
+ * Shared slot implementation for the declarative scheduler.
+ *
+ * <p>The release process of a shared slot follows one of 2 code paths:
+ *
+ * <p>1)During normal execution all allocated logical slots will be returned,
with the last return
+ * triggering the {@code externalReleaseCallback} which must eventually result
in a {@link
+ * #release(Throwable)} call. 2)
Review comment:
```suggestion
* #release(Throwable)} call.
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/allocator/SharedSlotTest.java
##########
@@ -166,6 +168,48 @@ public void
testReleaseForbidsSubsequentLogicalSlotAllocations() {
sharedSlot.allocateLogicalSlot();
}
+ @Test
+ public void testCanReturnLogicalSlotDuringRelease() {
+ final TestingPhysicalSlot physicalSlot =
TestingPhysicalSlot.builder().build();
+ final SharedSlot sharedSlot =
+ new SharedSlot(new SlotRequestId(), physicalSlot, false, () ->
{});
+ final LogicalSlot logicalSlot1 = sharedSlot.allocateLogicalSlot();
+ final LogicalSlot logicalSlot2 = sharedSlot.allocateLogicalSlot();
+
+ // both slots try to release the other one, simulating that the
failure of one execution due
+ // to the release also fails others
+ logicalSlot1.tryAssignPayload(
+ new TestLogicalSlotPayload(
+ cause -> {
+ if (logicalSlot2.isAlive()) {
+ logicalSlot2.releaseSlot(cause);
+ }
+ }));
+ logicalSlot2.tryAssignPayload(
+ new TestLogicalSlotPayload(
+ cause -> {
+ if (logicalSlot1.isAlive()) {
+ logicalSlot1.releaseSlot(cause);
+ }
+ }));
+
+ sharedSlot.release(new Exception("test"));
Review comment:
What are we asserting here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/declarative/StateWithExecutionGraph.java
##########
@@ -106,14 +107,17 @@
context.getMainThreadExecutor()));
}
+ @VisibleForTesting
Review comment:
I guess it should be `protected` and `@VisibleForTesting`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+ private static final JobVertex JOB_VERTEX;
+
+ static {
+ JOB_VERTEX = new JobVertex("v1");
+ JOB_VERTEX.setParallelism(PARALLELISM);
+ JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+ }
+
+ private final ComponentMainThreadExecutor mainThreadExecutor =
+ new
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+ @Test
+ public void testInitialState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(scheduler.getState(), instanceOf(Created.class));
+ }
+
+ @Test
+ public void testIsState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ final State state = scheduler.getState();
+
+ assertThat(scheduler.isState(state), is(true));
+ assertThat(scheduler.isState(new DummyState()), is(false));
+ }
+
+ @Test
+ public void testRunIfState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+ assertThat(ran.get(), is(true));
+ }
+
+ @Test
+ public void testRunIfStateWithStateMismatch() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(new DummyState(), () -> ran.set(true));
+ assertThat(ran.get(), is(false));
+ }
+
+ @Test
+ public void testHasEnoughResources() throws Exception {
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final ResourceCounter resourceRequirement =
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(false));
+
+ offerSlots(
+ declarativeSlotPool,
createSlotOffersForResourceRequirements(resourceRequirement));
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(true));
+ }
+
+ @Test
+ public void testExecutionGraphGenerationWithAvailableResources() throws
Exception {
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final int numAvailableSlots = 1;
+
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN,
numAvailableSlots)));
+
+ final ExecutionGraph executionGraph =
+ scheduler.createExecutionGraphWithAvailableResources();
+
+ assertThat(
+
executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+ is(numAvailableSlots));
+ }
+
+ @Test
+ public void testStartScheduling() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ scheduler.startScheduling();
+
+ assertThat(scheduler.getState(),
instanceOf(WaitingForResources.class));
+ }
+
+ @Test
+ public void testStartSchedulingSetsResourceRequirements() throws Exception
{
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ assertThat(
+ declarativeSlotPool.getResourceRequirements(),
+ contains(ResourceRequirement.create(ResourceProfile.UNKNOWN,
PARALLELISM)));
+ }
+
+ /** Tests that the listener for new slots is properly set up. */
+ @Test
+ public void testResourceAcquisitionTriggersExecution() throws Exception {
Review comment:
```suggestion
public void testResourceAcquisitionTriggersJobExecution() throws
Exception {
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+ private static final JobVertex JOB_VERTEX;
+
+ static {
+ JOB_VERTEX = new JobVertex("v1");
+ JOB_VERTEX.setParallelism(PARALLELISM);
+ JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+ }
+
+ private final ComponentMainThreadExecutor mainThreadExecutor =
+ new
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+ @Test
+ public void testInitialState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(scheduler.getState(), instanceOf(Created.class));
+ }
+
+ @Test
+ public void testIsState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ final State state = scheduler.getState();
+
+ assertThat(scheduler.isState(state), is(true));
+ assertThat(scheduler.isState(new DummyState()), is(false));
+ }
+
+ @Test
+ public void testRunIfState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+ assertThat(ran.get(), is(true));
+ }
+
+ @Test
+ public void testRunIfStateWithStateMismatch() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(new DummyState(), () -> ran.set(true));
+ assertThat(ran.get(), is(false));
+ }
+
+ @Test
+ public void testHasEnoughResources() throws Exception {
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final ResourceCounter resourceRequirement =
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(false));
+
+ offerSlots(
+ declarativeSlotPool,
createSlotOffersForResourceRequirements(resourceRequirement));
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(true));
+ }
+
+ @Test
+ public void testExecutionGraphGenerationWithAvailableResources() throws
Exception {
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final int numAvailableSlots = 1;
+
+ offerSlots(
+ declarativeSlotPool,
+ createSlotOffersForResourceRequirements(
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN,
numAvailableSlots)));
+
+ final ExecutionGraph executionGraph =
+ scheduler.createExecutionGraphWithAvailableResources();
+
+ assertThat(
+
executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism(),
+ is(numAvailableSlots));
+ }
+
+ @Test
+ public void testStartScheduling() throws Exception {
Review comment:
```suggestion
public void testStartSchedulingTransitionsToWaitingForResources() throws
Exception {
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/declarative/DeclarativeSchedulerTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.declarative;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
+import
org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobType;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
+import org.apache.flink.runtime.jobmaster.slotpool.ResourceCounter;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.slots.ResourceRequirement;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements;
+import static
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolTest.offerSlots;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+/** Tests for the {@link DeclarativeScheduler}. */
+public class DeclarativeSchedulerTest extends TestLogger {
+
+ private static final int PARALLELISM = 4;
+ private static final JobVertex JOB_VERTEX;
+
+ static {
+ JOB_VERTEX = new JobVertex("v1");
+ JOB_VERTEX.setParallelism(PARALLELISM);
+ JOB_VERTEX.setInvokableClass(AbstractInvokable.class);
+ }
+
+ private final ComponentMainThreadExecutor mainThreadExecutor =
+ new
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+ @Test
+ public void testInitialState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ assertThat(scheduler.getState(), instanceOf(Created.class));
+ }
+
+ @Test
+ public void testIsState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ final State state = scheduler.getState();
+
+ assertThat(scheduler.isState(state), is(true));
+ assertThat(scheduler.isState(new DummyState()), is(false));
+ }
+
+ @Test
+ public void testRunIfState() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(scheduler.getState(), () -> ran.set(true));
+ assertThat(ran.get(), is(true));
+ }
+
+ @Test
+ public void testRunIfStateWithStateMismatch() throws Exception {
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(getJobGraph(),
mainThreadExecutor).build();
+
+ AtomicBoolean ran = new AtomicBoolean(false);
+ scheduler.runIfState(new DummyState(), () -> ran.set(true));
+ assertThat(ran.get(), is(false));
+ }
+
+ @Test
+ public void testHasEnoughResources() throws Exception {
+ final JobGraph jobGraph = getJobGraph();
+
+ final DefaultDeclarativeSlotPool declarativeSlotPool =
+ new DefaultDeclarativeSlotPool(
+ jobGraph.getJobID(),
+ new DefaultAllocatedSlotPool(),
+ ignored -> {},
+ Time.minutes(10),
+ Time.minutes(10));
+
+ final DeclarativeScheduler scheduler =
+ new DeclarativeSchedulerBuilder(jobGraph, mainThreadExecutor)
+ .setDeclarativeSlotPool(declarativeSlotPool)
+ .build();
+
+ scheduler.startScheduling();
+
+ final ResourceCounter resourceRequirement =
+ ResourceCounter.withResource(ResourceProfile.UNKNOWN, 1);
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(false));
+
+ offerSlots(
+ declarativeSlotPool,
createSlotOffersForResourceRequirements(resourceRequirement));
+
+ assertThat(scheduler.hasEnoughResources(resourceRequirement),
is(true));
Review comment:
I think we should also test that non matched resources decrease
`UNKNOWN` resources.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java
##########
@@ -54,16 +57,29 @@ public void createSchedulerNGFactoryIfConfigured() {
@Test
public void throwsExceptionIfSchedulerNameIsInvalid() {
final Configuration configuration = new Configuration();
- configuration.setString(JobManagerOptions.SCHEDULER,
"invalid-scheduler-name");
+ configuration.setString(JobManagerOptions.SCHEDULER.key(),
"invalid-scheduler-name");
try {
createSchedulerNGFactory(configuration);
} catch (IllegalArgumentException e) {
- assertThat(e.getMessage(), containsString("Illegal value
[invalid-scheduler-name]"));
+ assertThat(
+ e.getMessage(),
+ containsString("Could not parse value
'invalid-scheduler-name'"));
}
}
+ @Test
+ public void fallBackIfBatchAndDeclarative() {
Review comment:
fall back to what? It is a good idea to state in the test name what the
intended behaviour is.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]