XComp commented on code in PR #19351:
URL: https://github.com/apache/flink/pull/19351#discussion_r843921237
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java:
##########
@@ -132,19 +139,28 @@ public void teardown() throws Exception {
@Test
public void testCleanupWhenRestoreFails() throws Exception {
-
createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).build().run();
+ createTaskBuilder()
+ .setInvokable(InvokableWithExceptionInRestore.class)
+ .build(Executors.directExecutor())
Review Comment:
I'm wondering whether we should also add a `buildWithDirectExecutor()`
method that uses `Executors.directExecutor` as a default.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java:
##########
@@ -315,7 +334,7 @@ private void testExecutionFailsInNetworkRegistration(
.setPartitionProducerStateChecker(partitionProducerStateChecker)
.setResultPartitions(resultPartitions)
.setInputGates(inputGates)
- .build();
+ .build(EXECUTOR_RESOURCE.getExecutor());
Review Comment:
Could we add a comment here why we can't use the `directExecutor` here? It's
still not clear to me why we need to handle this test case differently... 🤔
##########
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java:
##########
@@ -1342,7 +1347,7 @@ public void close() throws Exception {
Time.milliseconds(50),
deadline,
(jobStatus) ->
jobStatus.equals(JobStatus.CANCELED),
- TestingUtils.defaultScheduledExecutor());
+ new
ScheduledExecutorServiceAdapter(EXECUTOR_RESOURCE.getExecutor()));
Review Comment:
We have so many locations where we put a `ScheduledExecutorServiceAdapter`.
Couldn't we move this somehow into `TestingUtils`?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java:
##########
@@ -78,8 +74,6 @@ public static TestingDefaultExecutionGraphBuilder
newBuilder() {
(execution, previousState, newState) -> {};
private VertexParallelismStore vertexParallelismStore;
- private TestingDefaultExecutionGraphBuilder() {}
Review Comment:
Why do we remove that one?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerSharedServicesBuilder.java:
##########
@@ -76,7 +59,10 @@ public void setBlobWriter(BlobWriter blobWriter) {
}
public JobManagerSharedServices build() {
+ final ScheduledExecutorService executorService =
+ Executors.newSingleThreadScheduledExecutor();
Review Comment:
As far as I can see, the executor is closed in the `JobMaster.shutdown()`
method which is not necessarily called in each test (e.g.
`JobMasterExecutionDeploymentReconcilliationTest.testExecutionDeploymentReconciliationForPendingExecution`).
Or am I missing something? 🤔 Hence, we would have an open thread pool here...
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/JobMasterBuilder.java:
##########
@@ -188,7 +187,9 @@ public JobMaster createJobMaster() throws Exception {
? slotPoolServiceSchedulerFactory
:
DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
configuration, jobGraph.getJobType()),
- jobManagerSharedServices,
+ jobManagerSharedServices != null
+ ? jobManagerSharedServices
+ : new TestingJobManagerSharedServicesBuilder().build(),
Review Comment:
This is added lazily to only start the internally used threadpool if
actually needed, right? (not sure whether we can actually instantiate the
thread pool in the build method of
`TestingJobManagerSharedServicesBuilder.build` as mentioned in my previous
comment...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]