azagrebin commented on a change in pull request #13641:
URL: https://github.com/apache/flink/pull/13641#discussion_r530185201



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -917,15 +917,15 @@ private void resetAndStartScheduler() throws Exception {
 
                if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
                        schedulerAssignedFuture = 
CompletableFuture.completedFuture(null);
-                       
schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
+                       schedulerNG.start(getMainThreadExecutor());

Review comment:
       If it does initialization, maybe it is better to call it `initialize` or 
`setup`? we already have `startScheduling`

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphNotEnoughResourceTest.java
##########
@@ -113,29 +119,28 @@ public void 
testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
                        final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, 
"Test Job", source, sink);
                        jobGraph.setScheduleMode(ScheduleMode.EAGER);
 
-                       TestRestartStrategy restartStrategy = new 
TestRestartStrategy(numRestarts, false);
+                       RestartBackoffTimeStrategy restartStrategy = new 
FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(numRestarts,
 0).create();
 
-                       final ExecutionGraph eg = TestingExecutionGraphBuilder
-                               .newBuilder()
-                               .setJobGraph(jobGraph)
-                               .setSlotProvider(scheduler)
-                               .setRestartStrategy(restartStrategy)
-                               .setAllocationTimeout(Time.milliseconds(1L))
+                       final SchedulerBase schedulerNG = SchedulerTestingUtils
+                               
.newSchedulerBuilderWithDefaultSlotAllocator(jobGraph, scheduler, 
Time.milliseconds(1))
+                               .setRestartBackoffTimeStrategy(restartStrategy)
+                               .setSchedulingStrategyFactory(new 
EagerSchedulingStrategy.Factory())
+                               .setFailoverStrategyFactory(new 
RestartAllFailoverStrategy.Factory())
                                .build();
+                       final ExecutionGraph eg = 
schedulerNG.getExecutionGraph();

Review comment:
       I am wondering whether we should keep the test depending on EG at all if 
in future EG is planned to be more like a topology. Maybe `eg::getState` and 
`eg::getFailureCause` rather belong to scheduler in a long term? `eg::getState` 
can be already replaced `schedulerNG::requestJobStatus`. Then test can be also 
renamed to e.g. `EagerSchedulingNotEnoughResourceTest`. or the idea the test is 
going to be removed anyways after pipelined region scheduling is stable?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
##########
@@ -361,11 +168,7 @@ public void testTaskRestoreStateIsNulledAfterDeployment() 
throws Exception {
                assertThat(execution.getTaskRestore(), is(notNullValue()));
 
                // schedule the execution vertex and wait for its deployment
-               executionVertex.scheduleForExecution(
-                       executionGraph.getSlotProviderStrategy(),
-                       LocationPreferenceConstraint.ANY,
-                       Collections.emptySet())
-                       .get();
+               scheduler.startScheduling();

Review comment:
       `scheduler.startScheduling` does not look to wait for anything, would it 
be easier to call `Execution::deploy`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -188,7 +188,7 @@ public void start(ComponentMainThreadExecutor 
mainThreadExecutor) {
        }
 
        @Override
-       protected long getNumberOfRestarts() {
+       public long getNumberOfRestarts() {

Review comment:
       maybe we could query this somehow from `JobManagerMetricGroup` but I am 
also wondering whether the `JobManagerMetricGroup` registration/query code 
belongs to scheduler and not to a separate e.g. `SchedulingMetrics` or so 
component.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
##########
@@ -20,86 +20,31 @@
 
 import org.apache.flink.api.common.JobStatus;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.execution.SuppressRestartsException;
-import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-
 public class ExecutionGraphVariousFailuesTest extends TestLogger {
 
-       /**
-        * Test that failing in state restarting will retrigger the restarting 
logic. This means that
-        * it only goes into the state FAILED after the restart strategy says 
the job is no longer
-        * restartable.
-        */
-       @Test
-       public void testFailureWhileRestarting() throws Exception {
-               final ExecutionGraph eg = 
ExecutionGraphTestUtils.createSimpleTestGraph(new 
InfiniteDelayRestartStrategy(2));
-               
eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-               eg.scheduleForExecution();
-
-               assertEquals(JobStatus.RUNNING, eg.getState());
-               ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
-
-               eg.failGlobal(new Exception("Test 1"));
-               assertEquals(JobStatus.FAILING, eg.getState());
-               ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
-
-               // we should restart since we have two restart attempts left
-               assertEquals(JobStatus.RESTARTING, eg.getState());
-
-               eg.failGlobal(new Exception("Test 2"));
-
-               // we should restart since we have one restart attempts left
-               assertEquals(JobStatus.RESTARTING, eg.getState());
-
-               eg.failGlobal(new Exception("Test 3"));
-
-               // after depleting all our restart attempts we should go into 
Failed
-               assertEquals(JobStatus.FAILED, eg.getState());
-       }
-
-       /**
-        * Tests that a {@link SuppressRestartsException} in state RESTARTING 
stops the restarting
-        * immediately and sets the execution graph's state to FAILED.
-        */
-       @Test
-       public void testSuppressRestartFailureWhileRestarting() throws 
Exception {
-               final ExecutionGraph eg = 
ExecutionGraphTestUtils.createSimpleTestGraph(new 
InfiniteDelayRestartStrategy(10));
-               
eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-               eg.scheduleForExecution();
-
-               assertEquals(JobStatus.RUNNING, eg.getState());
-               ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
-
-               eg.failGlobal(new Exception("test"));
-               assertEquals(JobStatus.FAILING, eg.getState());
-
-               ExecutionGraphTestUtils.completeCancellingForAllVertices(eg);
-               assertEquals(JobStatus.RESTARTING, eg.getState());
-
-               // suppress a possible restart
-               eg.failGlobal(new SuppressRestartsException(new 
Exception("Test")));
-
-               assertEquals(JobStatus.FAILED, eg.getState());
-       }
-
        /**
         * Tests that a failing scheduleOrUpdateConsumers call with a 
non-existing execution attempt
         * id, will not fail the execution graph.
         */
        @Test
        public void testFailingScheduleOrUpdateConsumers() throws Exception {
-               final ExecutionGraph eg = 
ExecutionGraphTestUtils.createSimpleTestGraph(new 
InfiniteDelayRestartStrategy(10));
-               
eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-               eg.scheduleForExecution();
+               final SchedulerBase scheduler = 
SchedulerTestingUtils.newSchedulerBuilder(new JobGraph()).build();
+               
scheduler.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+               scheduler.startScheduling();
+
+               final ExecutionGraph eg = scheduler.getExecutionGraph();

Review comment:
       do we need to test `eg.scheduleOrUpdateConsumers` here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/RestartTimeGauge.java
##########
@@ -39,19 +41,21 @@
 
        // 
------------------------------------------------------------------------
 
-       private final ExecutionGraph eg;
+       private final Supplier<JobStatus> statusSupplier;
+       private final Function<JobStatus, Long> statusTimestampRetriever;
 
-       public RestartTimeGauge(ExecutionGraph executionGraph) {
-               this.eg = checkNotNull(executionGraph);
+       public RestartTimeGauge(Supplier<JobStatus> statusSupplier, 
Function<JobStatus, Long> statusTimestampRetriever) {

Review comment:
       it looks there are other gauges with the same pattern 
JobStatus/timestamp. Maybe we can segregate an interface from EG:
   ```
   interface JobStatusProvider {
       JobStatus getJobStatus();
       long getJobStatusTimestamp(JobStatus);
   }
   ```
   and then `TestingJobStatusProvider`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
##########
@@ -256,7 +256,7 @@ public boolean equals(Object obj) {
                if (obj == this) {
                        return true;
                }
-               else if (obj != null && obj.getClass() == 
TaskManagerLocation.class) {
+               else if (obj != null && obj.getClass() == getClass()) {

Review comment:
       maybe to revert this nit change for easier git history

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
##########
@@ -159,7 +164,7 @@ private ExecutionGraph 
createExecutionGraphAndEnableCheckpointing(
                        false,
                        0);
 
-               executionGraph.enableCheckpointing(
+               scheduler.getExecutionGraph().enableCheckpointing(

Review comment:
       do we want to call `enableCheckpointing` on EG?
   Should we use `JobCheckpointingSettings` in `JobGraph` instead like in 
`ArchivedExecutionGraphTest`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to