1996fanrui commented on code in PR #25113:
URL: https://github.com/apache/flink/pull/25113#discussion_r1690742563


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##########
@@ -292,7 +299,7 @@ public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGatew
         resourceRequirementServiceConnectionManager.connect(
                 resourceRequirements ->
                         resourceManagerGateway.declareRequiredResources(
-                                jobMasterId, resourceRequirements, 
rpcTimeout));
+                                jobMasterId, resourceRequirements, 
Time.fromDuration(rpcTimeout)));

Review Comment:
   It's better to update the parameter type from `Time` to `Duration` for 
`declareRequiredResources`.
   
   If so, we could avoid unnecessary `type conversion`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridgeBuilder.java:
##########
@@ -80,22 +88,25 @@ public DeclarativeSlotPoolBridgeBuilder 
setRequestSlotMatchingStrategy(
         return this;
     }
 
-    public DeclarativeSlotPoolBridge build() {
+    public DeclarativeSlotPoolBridge build(
+            ComponentMainThreadExecutor componentMainThreadExecutor) {

Review Comment:
   In general, build method doesn't with any parameter, and all parameters are 
set by setter.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java:
##########
@@ -299,14 +308,18 @@ public BlocklistDeclarativeSlotPoolBuilder 
setBlockedTaskManagerChecker(
             return this;
         }
 
-        public BlocklistDeclarativeSlotPool build() {
+        public BlocklistDeclarativeSlotPool build(
+                Duration slotRequestMaxInterval,
+                ComponentMainThreadExecutor componentMainThreadExecutor) {

Review Comment:
   In general, the build method without any parameter, and all parameters are 
set by setter.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.clock.SystemClock;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
+
+/** Test base class for {@link DeclarativeSlotPoolBridge}. */
+abstract class AbstractDeclarativeSlotPoolBridgeTest {
+
+    protected static final Duration rpcTimeout = Duration.ofSeconds(20);
+    protected static final JobID jobId = new JobID();
+    protected static final JobMasterId jobMasterId = JobMasterId.generate();
+    protected static final ComponentMainThreadExecutor mainThreadExecutor = 
forMainThread();

Review Comment:
   The `mainThreadExecutor` shouldn't be static, right?
   
   It will use the current thread as the executor.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTest.java:
##########
@@ -58,45 +61,66 @@
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link DefaultDeclarativeSlotPool}. */
-class DefaultDeclarativeSlotPoolTest {
+@ExtendWith(ParameterizedTestExtension.class)
+class DefaultDeclarativeSlotPoolTest extends 
DefaultDeclarativeSlotPoolTestBase {

Review Comment:
   Do we have test to check all requests are send in batch when interval > 0?
   
   It means it's better to check the batch request take effect when interval > 
0.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPoolTestBase.java:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.assertj.core.util.Lists;
+
+import java.time.Duration;
+import java.util.List;
+
+/** Tests base class for the {@link DefaultDeclarativeSlotPool}. */
+abstract class DefaultDeclarativeSlotPoolTestBase {
+
+    // Enabled the slot request interval if the duration is greater than 
Duration.ZERO, disabled

Review Comment:
   ```suggestion
       // Enabled the slot batch request if the interval is greater than 
Duration.ZERO, disabled
   ```



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/BlocklistDeclarativeSlotPoolTest.java:
##########
@@ -154,7 +163,7 @@ private void testRegisterSlots(boolean isBlocked) {
                 BlocklistDeclarativeSlotPoolBuilder.builder()
                         .setBlockedTaskManagerChecker(
                                 isBlocked ? 
taskManager.getResourceID()::equals : ignore -> false)
-                        .build();
+                        .build(Duration.ZERO, componentMainThreadExecutor);

Review Comment:
   Why hard code interval to 0?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##########
@@ -118,6 +131,8 @@ public DefaultDeclarativeSlotPool(
         this.totalResourceRequirements = ResourceCounter.empty();
         this.fulfilledResourceRequirements = ResourceCounter.empty();
         this.slotToRequirementProfileMappings = new HashMap<>();
+        this.componentMainThreadExecutor = 
Preconditions.checkNotNull(componentMainThreadExecutor);
+        this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);

Review Comment:
   nit: move these 2 lines before ` this.totalResourceRequirements `.
   
   it's better to keep the order based on constructor parameters.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.clock.SystemClock;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
+
+/** Test base class for {@link DeclarativeSlotPoolBridge}. */
+abstract class AbstractDeclarativeSlotPoolBridgeTest {
+
+    protected static final Duration rpcTimeout = Duration.ofSeconds(20);
+    protected static final JobID jobId = new JobID();
+    protected static final JobMasterId jobMasterId = JobMasterId.generate();
+    protected static final ComponentMainThreadExecutor mainThreadExecutor = 
forMainThread();
+
+    @Parameter protected RequestSlotMatchingStrategy 
requestSlotMatchingStrategy;
+
+    @Parameter(1)
+    protected Duration slotRequestMaxInterval;
+
+    @Parameters(name = "requestSlotMatchingStrategy: {0}, 
slotRequestMaxInterval: {1}")
+    static Collection<Object[]> data() throws IOException {

Review Comment:
   ```suggestion
       private static Collection<Object[]> data() {
   ```
   
   private is enough, and don't need the `throws IOException`.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/AbstractDeclarativeSlotPoolBridgeTest.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.clock.SystemClock;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter.forMainThread;
+
+/** Test base class for {@link DeclarativeSlotPoolBridge}. */
+abstract class AbstractDeclarativeSlotPoolBridgeTest {
+
+    protected static final Duration rpcTimeout = Duration.ofSeconds(20);
+    protected static final JobID jobId = new JobID();
+    protected static final JobMasterId jobMasterId = JobMasterId.generate();
+    protected static final ComponentMainThreadExecutor mainThreadExecutor = 
forMainThread();
+
+    @Parameter protected RequestSlotMatchingStrategy 
requestSlotMatchingStrategy;
+
+    @Parameter(1)
+    protected Duration slotRequestMaxInterval;
+
+    @Parameters(name = "requestSlotMatchingStrategy: {0}, 
slotRequestMaxInterval: {1}")
+    static Collection<Object[]> data() throws IOException {
+        return Arrays.asList(
+                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO},
+                new Object[] {SimpleRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(50)},
+                new Object[] {
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ZERO
+                },
+                new Object[] {
+                    PreferredAllocationRequestSlotMatchingStrategy.INSTANCE, 
Duration.ofMillis(50)
+                });
+    }
+
+    @Nonnull
+    static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
+            Duration slotRequestMaxInterval) {
+        return new DeclarativeSlotPoolBridge(
+                jobId,
+                declarativeSlotPoolFactory,
+                SystemClock.getInstance(),
+                rpcTimeout,
+                Duration.ofSeconds(20),
+                Duration.ofSeconds(20),
+                requestSlotMatchingStrategy,
+                slotRequestMaxInterval,
+                mainThreadExecutor);
+    }
+
+    @Nonnull
+    static DeclarativeSlotPoolBridge createDeclarativeSlotPoolBridge(
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            RequestSlotMatchingStrategy requestSlotMatchingStrategy,
+            Duration slotRequestMaxInterval,
+            ComponentMainThreadExecutor mainThreadExecutor) {
+        return new DeclarativeSlotPoolBridge(
+                jobId,
+                declarativeSlotPoolFactory,
+                SystemClock.getInstance(),
+                rpcTimeout,
+                Duration.ofSeconds(20),
+                Duration.ofSeconds(20),
+                requestSlotMatchingStrategy,
+                slotRequestMaxInterval,
+                mainThreadExecutor);
+    }

Review Comment:
   The first `createDeclarativeSlotPoolBridge` could call the second 
`createDeclarativeSlotPoolBridge` to new DeclarativeSlotPoolBridge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to