This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1aa9074e8a14765825f56e84caca806d4c17e227
Author: Zhu Zhu <[email protected]>
AuthorDate: Tue Dec 22 17:42:17 2020 +0800

    [FLINK-20439][runtime] Rename `scheduleOrUpdateConsumers` to 
`notifyPartitionDataAvailable` to avoid confusion
---
 .../deployment/ResultPartitionDeploymentDescriptor.java    | 12 ++++++------
 .../org/apache/flink/runtime/executiongraph/Execution.java |  8 ++++----
 .../java/org/apache/flink/runtime/jobmaster/JobMaster.java |  4 ++--
 .../apache/flink/runtime/jobmaster/JobMasterGateway.java   |  4 ++--
 .../apache/flink/runtime/scheduler/DefaultScheduler.java   |  8 +++++---
 .../apache/flink/runtime/scheduler/DeploymentOption.java   | 12 ++++++------
 .../org/apache/flink/runtime/scheduler/SchedulerBase.java  |  6 +++---
 .../org/apache/flink/runtime/scheduler/SchedulerNG.java    |  2 +-
 .../rpc/RpcResultPartitionConsumableNotifier.java          |  7 ++++---
 .../ConsumableNotifyingResultPartitionWriterDecorator.java |  4 ++--
 .../ResultPartitionDeploymentDescriptorTest.java           |  2 +-
 .../executiongraph/ExecutionGraphPartitionReleaseTest.java |  2 +-
 .../executiongraph/ExecutionGraphVariousFailuesTest.java   |  8 ++++----
 .../executiongraph/ExecutionVertexDeploymentTest.java      |  2 +-
 .../runtime/io/network/partition/ResultPartitionTest.java  |  8 ++++----
 .../runtime/jobmaster/utils/TestingJobMasterGateway.java   | 10 +++++-----
 .../jobmaster/utils/TestingJobMasterGatewayBuilder.java    |  8 ++++----
 .../apache/flink/runtime/scheduler/TestingSchedulerNG.java |  2 +-
 .../runtime/taskexecutor/TaskExecutorSubmissionTest.java   | 14 +++++++-------
 19 files changed, 63 insertions(+), 60 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
index 0720191..2bd12f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptor.java
@@ -45,19 +45,19 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
 
        private final int maxParallelism;
 
-       /** Flag whether the result partition should send 
scheduleOrUpdateConsumer messages. */
-       private final boolean sendScheduleOrUpdateConsumersMessage;
+       /** Flag whether the result partition should notify master when its 
data is available. */
+       private final boolean notifyPartitionDataAvailable;
 
        public ResultPartitionDeploymentDescriptor(
                        PartitionDescriptor partitionDescriptor,
                        ShuffleDescriptor shuffleDescriptor,
                        int maxParallelism,
-                       boolean sendScheduleOrUpdateConsumersMessage) {
+                       boolean notifyPartitionDataAvailable) {
                this.partitionDescriptor = checkNotNull(partitionDescriptor);
                this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
                
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
                this.maxParallelism = maxParallelism;
-               this.sendScheduleOrUpdateConsumersMessage = 
sendScheduleOrUpdateConsumersMessage;
+               this.notifyPartitionDataAvailable = 
notifyPartitionDataAvailable;
        }
 
        public IntermediateDataSetID getResultId() {
@@ -88,8 +88,8 @@ public class ResultPartitionDeploymentDescriptor implements 
Serializable {
                return shuffleDescriptor;
        }
 
-       public boolean sendScheduleOrUpdateConsumersMessage() {
-               return sendScheduleOrUpdateConsumersMessage;
+       public boolean notifyPartitionDataAvailable() {
+               return notifyPartitionDataAvailable;
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 98d428c..df60708 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -397,12 +397,12 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        public CompletableFuture<Execution> registerProducedPartitions(
                        TaskManagerLocation location,
-                       boolean sendScheduleOrUpdateConsumersMessage) {
+                       boolean notifyPartitionDataAvailable) {
 
                assertRunningInJobMasterMainThread();
 
                return FutureUtils.thenApplyAsyncIfNotDone(
-                       registerProducedPartitions(vertex, location, attemptId, 
sendScheduleOrUpdateConsumersMessage),
+                       registerProducedPartitions(vertex, location, attemptId, 
notifyPartitionDataAvailable),
                        
vertex.getExecutionGraph().getJobMasterMainThreadExecutor(),
                        producedPartitionsCache -> {
                                producedPartitions = producedPartitionsCache;
@@ -436,7 +436,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        ExecutionVertex vertex,
                        TaskManagerLocation location,
                        ExecutionAttemptID attemptId,
-                       boolean sendScheduleOrUpdateConsumersMessage) {
+                       boolean notifyPartitionDataAvailable) {
 
                ProducerDescriptor producerDescriptor = 
ProducerDescriptor.create(location, attemptId);
 
@@ -460,7 +460,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                        partitionDescriptor,
                                        shuffleDescriptor,
                                        maxParallelism,
-                                       sendScheduleOrUpdateConsumersMessage));
+                                       notifyPartitionDataAvailable));
                        partitionRegistrations.add(partitionRegistration);
                }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a14d681..19906d5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -453,11 +453,11 @@ public class JobMaster extends 
PermanentlyFencedRpcEndpoint<JobMasterId> impleme
        }
 
        @Override
-       public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
+       public CompletableFuture<Acknowledge> notifyPartitionDataAvailable(
                        final ResultPartitionID partitionID,
                        final Time timeout) {
 
-               schedulerNG.scheduleOrUpdateConsumers(partitionID);
+               schedulerNG.notifyPartitionDataAvailable(partitionID);
                return CompletableFuture.completedFuture(Acknowledge.get());
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 26c337c..0798d97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -118,9 +118,9 @@ public interface JobMasterGateway extends
         *
         * @param partitionID     The partition which has already produced data
         * @param timeout         before the rpc call fails
-        * @return Future acknowledge of the schedule or update operation
+        * @return Future acknowledge of the notification
         */
-       CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
+       CompletableFuture<Acknowledge> notifyPartitionDataAvailable(
                        final ResultPartitionID partitionID,
                        @RpcTimeout final Time timeout);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 2dc9987..7f4a4bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -313,7 +313,7 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
        }
 
        @Override
-       protected void scheduleOrUpdateConsumersInternal(final 
IntermediateResultPartitionID partitionId) {
+       protected void notifyPartitionDataAvailableInternal(final 
IntermediateResultPartitionID partitionId) {
                schedulingStrategy.onPartitionConsumable(partitionId);
        }
 
@@ -442,10 +442,12 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
                        if (throwable == null) {
                                final ExecutionVertex executionVertex = 
getExecutionVertex(executionVertexId);
-                               final boolean 
sendScheduleOrUpdateConsumerMessage = 
deploymentHandle.getDeploymentOption().sendScheduleOrUpdateConsumerMessage();
+                               final boolean notifyPartitionDataAvailable = 
deploymentHandle
+                                       .getDeploymentOption()
+                                       .notifyPartitionDataAvailable();
                                executionVertex
                                        .getCurrentExecutionAttempt()
-                                       
.registerProducedPartitions(logicalSlot.getTaskManagerLocation(), 
sendScheduleOrUpdateConsumerMessage);
+                                       
.registerProducedPartitions(logicalSlot.getTaskManagerLocation(), 
notifyPartitionDataAvailable);
                                executionVertex.tryAssignResource(logicalSlot);
                        } else {
                                handleTaskDeploymentFailure(executionVertexId, 
maybeWrapWithNoResourceAvailableException(throwable));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
index 9fb9ace..40892ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
@@ -19,17 +19,17 @@
 package org.apache.flink.runtime.scheduler;
 
 /**
- * Deployment option which indicates whether the task should send 
scheduleOrUpdateConsumer message to master.
+ * Deployment option which indicates whether the task should notify master 
when its data is available.
  */
 public class DeploymentOption {
 
-       private final boolean sendScheduleOrUpdateConsumerMessage;
+       private final boolean notifyPartitionDataAvailable;
 
-       public DeploymentOption(boolean sendScheduleOrUpdateConsumerMessage) {
-               this.sendScheduleOrUpdateConsumerMessage = 
sendScheduleOrUpdateConsumerMessage;
+       public DeploymentOption(boolean notifyPartitionDataAvailable) {
+               this.notifyPartitionDataAvailable = 
notifyPartitionDataAvailable;
        }
 
-       public boolean sendScheduleOrUpdateConsumerMessage() {
-               return sendScheduleOrUpdateConsumerMessage;
+       public boolean notifyPartitionDataAvailable() {
+               return notifyPartitionDataAvailable;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index a5738cb..ffcd763 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -685,15 +685,15 @@ public abstract class SchedulerBase implements 
SchedulerNG {
        }
 
        @Override
-       public final void scheduleOrUpdateConsumers(final ResultPartitionID 
partitionId) {
+       public final void notifyPartitionDataAvailable(final ResultPartitionID 
partitionId) {
                mainThreadExecutor.assertRunningInMainThread();
 
                executionGraph.notifyPartitionDataAvailable(partitionId);
 
-               scheduleOrUpdateConsumersInternal(partitionId.getPartitionId());
+               
notifyPartitionDataAvailableInternal(partitionId.getPartitionId());
        }
 
-       protected void 
scheduleOrUpdateConsumersInternal(IntermediateResultPartitionID 
resultPartitionId) {
+       protected void 
notifyPartitionDataAvailableInternal(IntermediateResultPartitionID 
resultPartitionId) {
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
index ff877db..6430522 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNG.java
@@ -95,7 +95,7 @@ public interface SchedulerNG {
 
        ExecutionState requestPartitionState(IntermediateDataSetID 
intermediateResultId, ResultPartitionID resultPartitionId) throws 
PartitionProducerDisposedException;
 
-       void scheduleOrUpdateConsumers(ResultPartitionID partitionID);
+       void notifyPartitionDataAvailable(ResultPartitionID partitionID);
 
        ArchivedExecutionGraph requestJob();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
index 82a6fbc..4be083f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -48,16 +48,17 @@ public class RpcResultPartitionConsumableNotifier 
implements ResultPartitionCons
                this.executor = Preconditions.checkNotNull(executor);
                this.timeout = Preconditions.checkNotNull(timeout);
        }
+
        @Override
        public void notifyPartitionConsumable(JobID jobId, ResultPartitionID 
partitionId, final TaskActions taskActions) {
-               CompletableFuture<Acknowledge> acknowledgeFuture = 
jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
+               CompletableFuture<Acknowledge> acknowledgeFuture = 
jobMasterGateway.notifyPartitionDataAvailable(partitionId, timeout);
 
                acknowledgeFuture.whenCompleteAsync(
                        (Acknowledge ack, Throwable throwable) -> {
                                if (throwable != null) {
-                                       LOG.error("Could not schedule or update 
consumers at the JobManager.", throwable);
+                                       LOG.error("Could not notify partition 
data available to JobManager.", throwable);
 
-                                       taskActions.failExternally(new 
RuntimeException("Could not notify JobManager to schedule or update 
consumers.", throwable));
+                                       taskActions.failExternally(new 
RuntimeException("Could not notify partition data available to JobManager.", 
throwable));
                                }
                        },
                        executor);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
index 2212455..660e4f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
@@ -49,7 +49,7 @@ public class 
ConsumableNotifyingResultPartitionWriterDecorator {
        /**
         * Optionally decorate the ResultPartitionWriter to call
         * {@link 
ResultPartitionConsumableNotifier#notifyPartitionConsumable(JobID, 
ResultPartitionID, TaskActions)}
-        * on the first record, iff {@link 
ResultPartitionDeploymentDescriptor#sendScheduleOrUpdateConsumersMessage()}
+        * on the first record, iff {@link 
ResultPartitionDeploymentDescriptor#notifyPartitionDataAvailable()}
         * is true.
         */
        public static ResultPartitionWriter[] decorate(
@@ -62,7 +62,7 @@ public class 
ConsumableNotifyingResultPartitionWriterDecorator {
                ResultPartitionWriter[] consumableNotifyingPartitionWriters = 
new ResultPartitionWriter[partitionWriters.length];
                int counter = 0;
                for (ResultPartitionDeploymentDescriptor desc : descs) {
-                       if (desc.sendScheduleOrUpdateConsumersMessage() && 
desc.getPartitionType().isPipelined()) {
+                       if (desc.notifyPartitionDataAvailable() && 
desc.getPartitionType().isPipelined()) {
                                consumableNotifyingPartitionWriters[counter] = 
new ConsumableNotifyingResultPartitionWriter(
                                        taskActions,
                                        jobId,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 040a055..9e4404e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -121,6 +121,6 @@ public class ResultPartitionDeploymentDescriptorTest 
extends TestLogger {
                assertThat(copy.getPartitionId(), is(partitionId));
                assertThat(copy.getPartitionType(), is(partitionType));
                assertThat(copy.getNumberOfSubpartitions(), 
is(numberOfSubpartitions));
-               assertThat(copy.sendScheduleOrUpdateConsumersMessage(), 
is(true));
+               assertThat(copy.notifyPartitionDataAvailable(), is(true));
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index b37af38..8691316 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -142,7 +142,7 @@ public class ExecutionGraphPartitionReleaseTest extends 
TestLogger {
                        final Execution operator1Execution = 
getCurrentExecution(operator1Vertex, executionGraph);
                        // finish o1 and schedule the consumers (o2,o3); this 
should not result in any release calls since not all operators of the pipelined 
region are finished
                        for (final IntermediateResultPartitionID partitionId : 
operator1Execution.getVertex().getProducedPartitions().keySet()) {
-                               scheduler.scheduleOrUpdateConsumers(new 
ResultPartitionID(partitionId, operator1Execution.getAttemptId()));
+                               scheduler.notifyPartitionDataAvailable(new 
ResultPartitionID(partitionId, operator1Execution.getAttemptId()));
                        }
                        scheduler.updateTaskExecutionState(new 
TaskExecutionState(executionGraph.getJobID(), 
operator1Execution.getAttemptId(), ExecutionState.FINISHED));
                        assertThat(releasedPartitions, empty());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
index 493415b..ebecbe1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphVariousFailuesTest.java
@@ -37,11 +37,11 @@ import static org.junit.Assert.fail;
 public class ExecutionGraphVariousFailuesTest extends TestLogger {
 
        /**
-        * Tests that a failing scheduleOrUpdateConsumers call with a 
non-existing execution attempt
+        * Tests that a failing notifyPartitionDataAvailable call with a 
non-existing execution attempt
         * id, will not fail the execution graph.
         */
        @Test
-       public void testFailingScheduleOrUpdateConsumers() throws Exception {
+       public void testFailingNotifyPartitionDataAvailable() throws Exception {
                final SchedulerBase scheduler = 
SchedulerTestingUtils.newSchedulerBuilder(new JobGraph()).build();
                
scheduler.initialize(ComponentMainThreadExecutorServiceAdapter.forMainThread());
                scheduler.startScheduling();
@@ -55,11 +55,11 @@ public class ExecutionGraphVariousFailuesTest extends 
TestLogger {
                ExecutionAttemptID producerId = new ExecutionAttemptID();
                ResultPartitionID resultPartitionId = new 
ResultPartitionID(intermediateResultPartitionId, producerId);
 
-               // The execution attempt id does not exist and thus the 
scheduleOrUpdateConsumers call
+               // The execution attempt id does not exist and thus the 
notifyPartitionDataAvailable call
                // should fail
 
                try {
-                       scheduler.scheduleOrUpdateConsumers(resultPartitionId);
+                       
scheduler.notifyPartitionDataAvailable(resultPartitionId);
                        fail("Error expected.");
                } catch (IllegalStateException e) {
                        // we've expected this exception to occur
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 269c935..44e6cce 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -294,7 +294,7 @@ public class ExecutionVertexDeploymentTest extends 
TestLogger {
 
                        assertEquals(1, producedPartitions.size());
                        ResultPartitionDeploymentDescriptor desc = 
producedPartitions.iterator().next();
-                       assertEquals(scheduleMode.allowLazyDeployment(), 
desc.sendScheduleOrUpdateConsumersMessage());
+                       assertEquals(scheduleMode.allowLazyDeployment(), 
desc.notifyPartitionDataAvailable());
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 6fd8278..6743b20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -99,10 +99,10 @@ public class ResultPartitionTest {
        }
 
        /**
-        * Tests the schedule or update consumers message sending behaviour 
depending on the relevant flags.
+        * Tests notifyPartitionDataAvailable behaviour depending on the 
relevant flags.
         */
        @Test
-       public void testSendScheduleOrUpdateConsumersMessage() throws Exception 
{
+       public void testNotifyPartitionDataAvailable() throws Exception {
                FutureConsumerWithException[] notificationCalls = new 
FutureConsumerWithException[] {
                        writer -> ((ResultPartitionWriter) writer).finish(),
                        writer -> ((ResultPartitionWriter) 
writer).emitRecord(ByteBuffer.allocate(bufferSize), 0),
@@ -111,11 +111,11 @@ public class ResultPartitionTest {
                };
 
                for (FutureConsumerWithException notificationCall: 
notificationCalls) {
-                       
testSendScheduleOrUpdateConsumersMessage(notificationCall);
+                       testNotifyPartitionDataAvailable(notificationCall);
                }
        }
 
-       private void testSendScheduleOrUpdateConsumersMessage(
+       private void testNotifyPartitionDataAvailable(
                        FutureConsumerWithException<ResultPartitionWriter, 
Exception> notificationCall) throws Exception {
                JobID jobId = new JobID();
                TaskActions taskActions = new NoOpTaskActions();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index fc119d6..f403425 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -95,7 +95,7 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        private final BiFunction<IntermediateDataSetID, ResultPartitionID, 
CompletableFuture<ExecutionState>> requestPartitionStateFunction;
 
        @Nonnull
-       private final Function<ResultPartitionID, 
CompletableFuture<Acknowledge>> scheduleOrUpdateConsumersFunction;
+       private final Function<ResultPartitionID, 
CompletableFuture<Acknowledge>> notifyPartitionDataAvailableFunction;
 
        @Nonnull
        private final Function<ResourceID, CompletableFuture<Acknowledge>> 
disconnectTaskManagerFunction;
@@ -172,7 +172,7 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
                        @Nonnull Function<TaskExecutionState, 
CompletableFuture<Acknowledge>> updateTaskExecutionStateFunction,
                        @Nonnull BiFunction<JobVertexID, ExecutionAttemptID, 
CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction,
                        @Nonnull BiFunction<IntermediateDataSetID, 
ResultPartitionID, CompletableFuture<ExecutionState>> 
requestPartitionStateFunction,
-                       @Nonnull Function<ResultPartitionID, 
CompletableFuture<Acknowledge>> scheduleOrUpdateConsumersFunction,
+                       @Nonnull Function<ResultPartitionID, 
CompletableFuture<Acknowledge>> notifyPartitionDataAvailableFunction,
                        @Nonnull Function<ResourceID, 
CompletableFuture<Acknowledge>> disconnectTaskManagerFunction,
                        @Nonnull Consumer<ResourceManagerId> 
disconnectResourceManagerConsumer,
                        @Nonnull BiFunction<ResourceID, Collection<SlotOffer>, 
CompletableFuture<Collection<SlotOffer>>> offerSlotsFunction,
@@ -202,7 +202,7 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
                this.updateTaskExecutionStateFunction = 
updateTaskExecutionStateFunction;
                this.requestNextInputSplitFunction = 
requestNextInputSplitFunction;
                this.requestPartitionStateFunction = 
requestPartitionStateFunction;
-               this.scheduleOrUpdateConsumersFunction = 
scheduleOrUpdateConsumersFunction;
+               this.notifyPartitionDataAvailableFunction = 
notifyPartitionDataAvailableFunction;
                this.disconnectTaskManagerFunction = 
disconnectTaskManagerFunction;
                this.disconnectResourceManagerConsumer = 
disconnectResourceManagerConsumer;
                this.offerSlotsFunction = offerSlotsFunction;
@@ -249,8 +249,8 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
        }
 
        @Override
-       public CompletableFuture<Acknowledge> 
scheduleOrUpdateConsumers(ResultPartitionID partitionID, Time timeout) {
-               return scheduleOrUpdateConsumersFunction.apply(partitionID);
+       public CompletableFuture<Acknowledge> 
notifyPartitionDataAvailable(ResultPartitionID partitionID, Time timeout) {
+               return notifyPartitionDataAvailableFunction.apply(partitionID);
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index a55b4ca..f10e3d7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -83,7 +83,7 @@ public class TestingJobMasterGatewayBuilder {
        private Function<TaskExecutionState, CompletableFuture<Acknowledge>> 
updateTaskExecutionStateFunction = ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
        private BiFunction<JobVertexID, ExecutionAttemptID, 
CompletableFuture<SerializedInputSplit>> requestNextInputSplitFunction = 
(ignoredA, ignoredB) -> CompletableFuture.completedFuture(new 
SerializedInputSplit(null));
        private BiFunction<IntermediateDataSetID, ResultPartitionID, 
CompletableFuture<ExecutionState>> requestPartitionStateFunction = (ignoredA, 
ignoredB) -> CompletableFuture.completedFuture(ExecutionState.RUNNING);
-       private Function<ResultPartitionID, CompletableFuture<Acknowledge>> 
scheduleOrUpdateConsumersFunction = ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
+       private Function<ResultPartitionID, CompletableFuture<Acknowledge>> 
notifyPartitionDataAvailableFunction = ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
        private Function<ResourceID, CompletableFuture<Acknowledge>> 
disconnectTaskManagerFunction = ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
        private Consumer<ResourceManagerId> disconnectResourceManagerConsumer = 
ignored -> {};
        private BiFunction<ResourceID, Collection<SlotOffer>, 
CompletableFuture<Collection<SlotOffer>>> offerSlotsFunction = (ignoredA, 
ignoredB) -> CompletableFuture.completedFuture(Collections.emptyList());
@@ -138,8 +138,8 @@ public class TestingJobMasterGatewayBuilder {
                return this;
        }
 
-       public TestingJobMasterGatewayBuilder 
setScheduleOrUpdateConsumersFunction(Function<ResultPartitionID, 
CompletableFuture<Acknowledge>> scheduleOrUpdateConsumersFunction) {
-               this.scheduleOrUpdateConsumersFunction = 
scheduleOrUpdateConsumersFunction;
+       public TestingJobMasterGatewayBuilder 
setNotifyPartitionDataAvailableFunction(Function<ResultPartitionID, 
CompletableFuture<Acknowledge>> notifyPartitionDataAvailableFunction) {
+               this.notifyPartitionDataAvailableFunction = 
notifyPartitionDataAvailableFunction;
                return this;
        }
 
@@ -266,7 +266,7 @@ public class TestingJobMasterGatewayBuilder {
                        updateTaskExecutionStateFunction,
                        requestNextInputSplitFunction,
                        requestPartitionStateFunction,
-                       scheduleOrUpdateConsumersFunction,
+                       notifyPartitionDataAvailableFunction,
                        disconnectTaskManagerFunction,
                        disconnectResourceManagerConsumer,
                        offerSlotsFunction,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
index 2675a84..676ddb9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TestingSchedulerNG.java
@@ -131,7 +131,7 @@ public class TestingSchedulerNG implements SchedulerNG {
        }
 
        @Override
-       public void scheduleOrUpdateConsumers(ResultPartitionID partitionID) {
+       public void notifyPartitionDataAvailable(ResultPartitionID partitionID) 
{
                failOperation();
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 36ebd46..2d76290 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -269,7 +269,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
                TestingJobMasterGateway testingJobMasterGateway =
                        new TestingJobMasterGatewayBuilder()
                        .setFencingTokenSupplier(() -> jobMasterId)
-                       .setScheduleOrUpdateConsumersFunction(
+                       .setNotifyPartitionDataAvailableFunction(
                                resultPartitionID -> 
CompletableFuture.completedFuture(Acknowledge.get()))
                        .build();
 
@@ -501,7 +501,7 @@ public class TaskExecutorSubmissionTest extends TestLogger {
        }
 
        /**
-        * Test that a failing schedule or update consumers call leads to the 
failing of the respective
+        * Test that a failing notifyPartitionDataAvailable call leads to the 
failing of the respective
         * task.
         *
         * <p>IMPORTANT: We have to make sure that the invokable's cancel 
method is called, because only
@@ -509,14 +509,14 @@ public class TaskExecutorSubmissionTest extends 
TestLogger {
         * the invokable to fill one memory segment. The completed memory 
segment will trigger the
         * scheduling of the downstream operator since it is in pipeline mode. 
After we've filled the
         * memory segment, we'll block the invokable and wait for the task 
failure due to the failed
-        * schedule or update consumers call.
+        * notifyPartitionDataAvailable call.
         */
        @Test(timeout = TEST_TIMEOUT)
-       public void testFailingScheduleOrUpdateConsumers() throws Exception {
+       public void testFailingNotifyPartitionDataAvailable() throws Exception {
                final Configuration configuration = new Configuration();
 
                // set the memory segment to the smallest size possible, 
because we have to fill one
-               // memory buffer to trigger the schedule or update consumers 
message to the downstream
+               // memory buffer to trigger notifyPartitionDataAvailable to the 
downstream
                // operators
                configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 
MemorySize.parse("4096"));
 
@@ -527,13 +527,13 @@ public class TaskExecutorSubmissionTest extends 
TestLogger {
 
                final CompletableFuture<Void> taskRunningFuture = new 
CompletableFuture<>();
 
-               final Exception exception = new Exception("Failed schedule or 
update consumers");
+               final Exception exception = new Exception("Failed 
notifyPartitionDataAvailable");
 
                final JobMasterId jobMasterId = JobMasterId.generate();
                TestingJobMasterGateway testingJobMasterGateway =
                        new TestingJobMasterGatewayBuilder()
                                .setFencingTokenSupplier(() -> jobMasterId)
-                               
.setUpdateTaskExecutionStateFunction(resultPartitionID -> 
FutureUtils.completedExceptionally(exception))
+                               
.setNotifyPartitionDataAvailableFunction(resultPartitionID -> 
FutureUtils.completedExceptionally(exception))
                                .build();
 
                try (TaskSubmissionTestEnvironment env =

Reply via email to