kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506181718
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -239,13 +261,28 @@ private void configureStreamGraph(final StreamGraph
graph) {
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
setDefaultBufferTimeout(-1);
+ setBatchStateBackendAndTimerService(graph);
} else {
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
graph.setScheduleMode(ScheduleMode.EAGER);
}
}
+ private void setBatchStateBackendAndTimerService(StreamGraph graph) {
+ boolean useStateBackend =
configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
+ boolean sortInputs =
configuration.get(ExecutionOptions.SORT_INPUTS);
+ checkState(
+ !useStateBackend || sortInputs,
+ "Batch state backend requires the sorted inputs to be
enabled!");
+
+ if (useStateBackend) {
+ LOG.debug("Using BATCH execution state backend.");
Review comment:
Nit: Can't we move
https://github.com/apache/flink/pull/13647/files#diff-54c8fe1971ffb5aa55b3f829f43aa02c7765b62c397f0c943b4049a4fd1e3a62R253
to the `else{}` block in lines 266...? I find it more clear than write and
then overwrite.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -239,13 +261,28 @@ private void configureStreamGraph(final StreamGraph
graph) {
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
setDefaultBufferTimeout(-1);
+ setBatchStateBackendAndTimerService(graph);
} else {
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
graph.setScheduleMode(ScheduleMode.EAGER);
}
}
+ private void setBatchStateBackendAndTimerService(StreamGraph graph) {
+ boolean useStateBackend =
configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
+ boolean sortInputs =
configuration.get(ExecutionOptions.SORT_INPUTS);
+ checkState(
+ !useStateBackend || sortInputs,
+ "Batch state backend requires the sorted inputs to be
enabled!");
+
+ if (useStateBackend) {
+ LOG.debug("Using BATCH execution state backend.");
Review comment:
"Using BATCH execution state backend _and timer service_."
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]