tillrohrmann commented on a change in pull request #14921:
URL: https://github.com/apache/flink/pull/14921#discussion_r577669074



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
##########
@@ -34,14 +35,19 @@
     @Nonnull
     SlotPoolService createSlotPoolService(@Nonnull JobID jobId);
 
-    static SlotPoolServiceFactory fromConfiguration(Configuration 
configuration) {
+    static SlotPoolServiceFactory fromConfiguration(Configuration 
configuration, JobType jobType) {
         final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
         final Time slotIdleTimeout =
                 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
         final Time batchSlotTimeout =
                 
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
 
         if 
(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
+            if (ClusterOptions.isDeclarativeSchedulerEnabled(configuration)
+                    && jobType == JobType.STREAMING) {
+                return new DeclarativeSlotPoolServiceFactory(
+                        SystemClock.getInstance(), slotIdleTimeout, 
rpcTimeout);
+            }
 
             return new DeclarativeSlotPoolBridgeServiceFactory(
                     SystemClock.getInstance(), rpcTimeout, slotIdleTimeout, 
batchSlotTimeout);

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to