zentol commented on a change in pull request #14921:
URL: https://github.com/apache/flink/pull/14921#discussion_r577563314
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolServiceFactory.java
##########
@@ -34,14 +35,19 @@
@Nonnull
SlotPoolService createSlotPoolService(@Nonnull JobID jobId);
- static SlotPoolServiceFactory fromConfiguration(Configuration
configuration) {
+ static SlotPoolServiceFactory fromConfiguration(Configuration
configuration, JobType jobType) {
final Time rpcTimeout = AkkaUtils.getTimeoutAsTime(configuration);
final Time slotIdleTimeout =
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_IDLE_TIMEOUT));
final Time batchSlotTimeout =
Time.milliseconds(configuration.getLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
if
(ClusterOptions.isDeclarativeResourceManagementEnabled(configuration)) {
+ if (ClusterOptions.isDeclarativeSchedulerEnabled(configuration)
+ && jobType == JobType.STREAMING) {
+ return new DeclarativeSlotPoolServiceFactory(
+ SystemClock.getInstance(), slotIdleTimeout,
rpcTimeout);
+ }
return new DeclarativeSlotPoolBridgeServiceFactory(
SystemClock.getInstance(), rpcTimeout, slotIdleTimeout,
batchSlotTimeout);
Review comment:
Because the approach is not obvious to me and I don't want to delay the
skeleton further.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]