This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d5cb1b9  [FLINK-13221][table-planner-blink] Blink planner should set 
ScheduleMode to LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
d5cb1b9 is described below

commit d5cb1b9e5aefdc73026985f67a991f099e8f4364
Author: liyafan82 <[email protected]>
AuthorDate: Mon Jul 22 10:59:11 2019 +0800

    [FLINK-13221][table-planner-blink] Blink planner should set ScheduleMode to 
LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST for batch jobs
    
    This closes #9101
---
 .../AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java     | 9 +++++++++
 .../main/java/org/apache/flink/table/executor/BatchExecutor.java | 2 +-
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
index c203062..3c1ca2b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java
@@ -456,6 +456,15 @@ public class 
AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLog
                }
 
                @Override
+               public CompletableFuture<LogicalSlot> allocateBatchSlot(
+                       final SlotRequestId slotRequestId,
+                       final ScheduledUnit scheduledUnit,
+                       final SlotProfile slotProfile,
+                       final boolean allowQueuedScheduling) {
+                       return allocateSlot(slotRequestId, scheduledUnit, 
slotProfile, allowQueuedScheduling, null);
+               }
+
+               @Override
                public CompletableFuture<LogicalSlot> allocateSlot(
                                final SlotRequestId slotRequestId,
                                final ScheduledUnit scheduledUnit,
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
index f4bb2ee..adafab1 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/executor/BatchExecutor.java
@@ -89,7 +89,7 @@ public class BatchExecutor extends ExecutorBase {
                        }
                });
                streamGraph.setChaining(true);
-               streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+               
streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
                streamGraph.setStateBackend(null);
                
streamGraph.getCheckpointConfig().setCheckpointInterval(Long.MAX_VALUE);
                if (isShuffleModeAllBatch()) {

Reply via email to