Caizhi Weng created FLINK-16543:
-----------------------------------
Summary: Support setting schedule mode by config for Blink planner
in batch mode
Key: FLINK-16543
URL: https://issues.apache.org/jira/browse/FLINK-16543
Project: Flink
Issue Type: Improvement
Components: Runtime / Configuration, Table SQL / Runtime
Reporter: Caizhi Weng
Currently Blink planner is bound to use the
{{LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST}} schedule mode in batch mode. It
is hard coded in the {{ExecutorUtils.setBatchProperties}} method.
{code:java}
public static void setBatchProperties(StreamGraph streamGraph, TableConfig
tableConfig) {
streamGraph.getStreamNodes().forEach(
sn -> sn.setResources(ResourceSpec.UNKNOWN,
ResourceSpec.UNKNOWN));
streamGraph.setChaining(true);
streamGraph.setAllVerticesInSameSlotSharingGroupByDefault(false);
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
streamGraph.setStateBackend(null);
if (streamGraph.getCheckpointConfig().isCheckpointingEnabled()) {
throw new IllegalArgumentException("Checkpoint is not supported
for batch jobs.");
}
if (ExecutorUtils.isShuffleModeAllBatch(tableConfig)) {
streamGraph.setBlockingConnectionsBetweenChains(true);
}
}
{code}
By under certain use cases where execution time is short, especially under OLAP
use cases, {{LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST}} might not be the best
choice, as it will cause data to be spilled onto disks when shuffling. Under
such use cases, {{EAGER}} schedule mode with {{PIPELINED}} shuffle mode is
preferred.
Currently we can set shuffle mode by the {{table.exec.shuffle-mode}} table
config, and we would like to add another config to change the schedule mode for
Blink planner in batch mode.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)