zhuzhurk commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r483462252
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
##########
@@ -95,12 +97,15 @@
private final Set<ExecutionVertexID> verticesWaitingForRestart;
+ private final Consumer<ComponentMainThreadExecutor> slartUpAction;
Review comment:
`slartUpAction` -> `startUpAction `
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
##########
@@ -62,11 +51,13 @@ default void start(ComponentMainThreadExecutor
mainThreadExecutor) {
* @param allocationTimeout after which the allocation fails with a
timeout exception
* @return The future of the allocation
*/
- CompletableFuture<LogicalSlot> allocateSlot(
+ default CompletableFuture<LogicalSlot> allocateSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
- Time allocationTimeout);
+ Time allocationTimeout) {
Review comment:
Looks to me default body is not needed. I tried removing the default
body and flink-runtime still compiles.
Correct me if I miss anything.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
##########
@@ -125,24 +153,50 @@ static SchedulingStrategyFactory
createSchedulingStrategyFactory(final ScheduleM
}
}
- private static ExecutionSlotAllocatorFactory
createExecutionSlotAllocatorFactory(
+ private static DefaultSchedulerComponents
createPipelinedRegionSchedulerComponents(
final ScheduleMode scheduleMode,
- final SlotProvider slotProvider,
- final Time slotRequestTimeout,
- final SchedulingStrategyFactory
schedulingStrategyFactory) {
-
- if (schedulingStrategyFactory instanceof
PipelinedRegionSchedulingStrategy.Factory) {
- return new OneSlotPerExecutionSlotAllocatorFactory(
- slotProvider,
- scheduleMode !=
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
- slotRequestTimeout);
- } else {
- final SlotProviderStrategy slotProviderStrategy =
SlotProviderStrategy.from(
- scheduleMode,
- slotProvider,
- slotRequestTimeout);
-
- return new
DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+ final Configuration jobMasterConfiguration,
+ final SlotPool slotPool,
+ final Time slotRequestTimeout) {
+ final SlotSelectionStrategy slotSelectionStrategy =
selectSlotSelectionStrategy(jobMasterConfiguration);
+ final BulkSlotProvider bulkSlotProvider = new
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+ final ExecutionSlotAllocatorFactory allocatorFactory = new
OneSlotPerExecutionSlotAllocatorFactory(
+ bulkSlotProvider,
+ scheduleMode !=
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+ slotRequestTimeout);
+ return new DefaultSchedulerComponents(
+ new PipelinedRegionSchedulingStrategy.Factory(),
+ bulkSlotProvider::start,
+ allocatorFactory);
+ }
+
+ @Nonnull
+ private static SlotSelectionStrategy
selectSlotSelectionStrategy(@Nonnull Configuration configuration) {
+ final boolean evenlySpreadOutSlots =
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+
+ final SlotSelectionStrategy
locationPreferenceSlotSelectionStrategy;
+
+ locationPreferenceSlotSelectionStrategy = evenlySpreadOutSlots ?
+
LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut() :
+ LocationPreferenceSlotSelectionStrategy.createDefault();
+
+ return
configuration.getBoolean(CheckpointingOptions.LOCAL_RECOVERY) ?
+
PreviousAllocationSlotSelectionStrategy.create(locationPreferenceSlotSelectionStrategy)
:
+ locationPreferenceSlotSelectionStrategy;
+ }
+
+ private static class DefaultSchedulerComponents {
+ private final SchedulingStrategyFactory
schedulingStrategyFactory;
+ private final Consumer<ComponentMainThreadExecutor>
slartUpAction;
Review comment:
`startUpAction` -> `startUpAction`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
##########
@@ -125,24 +153,50 @@ static SchedulingStrategyFactory
createSchedulingStrategyFactory(final ScheduleM
}
}
- private static ExecutionSlotAllocatorFactory
createExecutionSlotAllocatorFactory(
+ private static DefaultSchedulerComponents
createPipelinedRegionSchedulerComponents(
final ScheduleMode scheduleMode,
- final SlotProvider slotProvider,
- final Time slotRequestTimeout,
- final SchedulingStrategyFactory
schedulingStrategyFactory) {
-
- if (schedulingStrategyFactory instanceof
PipelinedRegionSchedulingStrategy.Factory) {
- return new OneSlotPerExecutionSlotAllocatorFactory(
- slotProvider,
- scheduleMode !=
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
- slotRequestTimeout);
- } else {
- final SlotProviderStrategy slotProviderStrategy =
SlotProviderStrategy.from(
- scheduleMode,
- slotProvider,
- slotRequestTimeout);
-
- return new
DefaultExecutionSlotAllocatorFactory(slotProviderStrategy);
+ final Configuration jobMasterConfiguration,
+ final SlotPool slotPool,
+ final Time slotRequestTimeout) {
+ final SlotSelectionStrategy slotSelectionStrategy =
selectSlotSelectionStrategy(jobMasterConfiguration);
+ final BulkSlotProvider bulkSlotProvider = new
BulkSlotProviderImpl(slotSelectionStrategy, slotPool);
+ final ExecutionSlotAllocatorFactory allocatorFactory = new
OneSlotPerExecutionSlotAllocatorFactory(
+ bulkSlotProvider,
+ scheduleMode !=
ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST,
+ slotRequestTimeout);
+ return new DefaultSchedulerComponents(
+ new PipelinedRegionSchedulingStrategy.Factory(),
+ bulkSlotProvider::start,
+ allocatorFactory);
+ }
+
+ @Nonnull
Review comment:
We can remove the `@Nonnull` annotation I think.
This is because that we will by default assume it to be non-null,
and only `@Nullable` annotations are required when needed.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java
##########
@@ -102,31 +93,19 @@ default void start(ComponentMainThreadExecutor
mainThreadExecutor) {
allocationTimeout);
}
- /**
- * Allocates a bulk of physical slots. The allocation will be completed
- * normally only when all the requests are fulfilled.
- *
- * @param physicalSlotRequests requests for physical slots
- * @param timeout indicating how long it is accepted that the slot
requests can be unfulfillable
- * @return future of the results of slot requests
- */
- default CompletableFuture<Collection<PhysicalSlotRequest.Result>>
allocatePhysicalSlots(
- Collection<PhysicalSlotRequest> physicalSlotRequests,
- Time timeout) {
- throw new UnsupportedOperationException("Not properly
implemented.");
- }
-
/**
* Cancels the slot request with the given {@link SlotRequestId} and
{@link SlotSharingGroupId}.
*
* @param slotRequestId identifying the slot request to cancel
* @param slotSharingGroupId identifying the slot request to cancel
* @param cause of the cancellation
*/
- void cancelSlotRequest(
- SlotRequestId slotRequestId,
- @Nullable SlotSharingGroupId slotSharingGroupId,
- Throwable cause);
+ default void cancelSlotRequest(
Review comment:
Looks to me default body is not needed. I tried removing the default
body and flink-runtime still compiles.
Correct me if I miss anything.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]