This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 39ebaae65a9a2f55f50f8714597fbc1c2eafc7c0
Author: Gary Yao <g...@apache.org>
AuthorDate: Thu Apr 9 10:38:46 2020 +0200

    [FLINK-17050][runtime] Rename methods getVertexOrThrow() and 
getResultPartitionOrThrow()
    
    Rename methods
    
    - SchedulingTopology#getVertexOrThrow(ExecutionVertexID)
    - 
SchedulingTopology#getResultPartitionOrThrow(IntermediateResultPartitionID)
    
    to
    
    - SchedulingTopology#getVertex(ExecutionVertexID)
    - SchedulingTopology#getResultPartition(IntermediateResultPartitionID)
    
    respectively.
    
    This closes #11684.
---
 .../org/apache/flink/runtime/executiongraph/ExecutionGraph.java   | 2 +-
 .../flip1/partitionrelease/RegionPartitionReleaseStrategy.java    | 4 ++--
 .../flink/runtime/scheduler/adapter/DefaultExecutionTopology.java | 4 ++--
 .../scheduler/strategy/LazyFromSourcesSchedulingStrategy.java     | 6 +++---
 .../flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java | 2 +-
 .../flink/runtime/scheduler/strategy/SchedulingTopology.java      | 4 ++--
 .../org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java  | 2 +-
 .../runtime/scheduler/adapter/DefaultExecutionTopologyTest.java   | 8 ++++----
 .../runtime/scheduler/strategy/TestingSchedulingTopology.java     | 4 ++--
 9 files changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d3aa0e2..4ef2bbf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1547,7 +1547,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
 
        ResultPartitionID createResultPartitionId(final 
IntermediateResultPartitionID resultPartitionId) {
                final SchedulingResultPartition<?, ?> schedulingResultPartition 
=
-                       
getSchedulingTopology().getResultPartitionOrThrow(resultPartitionId);
+                       
getSchedulingTopology().getResultPartition(resultPartitionId);
                final SchedulingExecutionVertex<?, ?> producer = 
schedulingResultPartition.getProducer();
                final ExecutionVertexID producerId = producer.getId();
                final JobVertexID jobVertexId = producerId.getJobVertexId();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
index a05a24a..d5529b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
@@ -88,7 +88,7 @@ public class RegionPartitionReleaseStrategy implements 
PartitionReleaseStrategy
                final Set<SchedulingResultPartition<?, ?>> 
allConsumedPartitionsInRegion = pipelinedRegion
                        .getExecutionVertexIds()
                        .stream()
-                       .map(schedulingTopology::getVertexOrThrow)
+                       .map(schedulingTopology::getVertex)
                        .flatMap(vertex -> 
IterableUtils.toStream(vertex.getConsumedResults()))
                        .collect(Collectors.toSet());
 
@@ -157,7 +157,7 @@ public class RegionPartitionReleaseStrategy implements 
PartitionReleaseStrategy
        }
 
        private boolean areConsumerRegionsFinished(final 
IntermediateResultPartitionID resultPartitionId) {
-               final SchedulingResultPartition<?, ?> resultPartition = 
schedulingTopology.getResultPartitionOrThrow(resultPartitionId);
+               final SchedulingResultPartition<?, ?> resultPartition = 
schedulingTopology.getResultPartition(resultPartitionId);
                return IterableUtils.toStream(resultPartition.getConsumers())
                        .map(SchedulingExecutionVertex::getId)
                        .allMatch(this::isRegionOfVertexFinished);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
index 162a888..a535fdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
@@ -87,7 +87,7 @@ public class DefaultExecutionTopology implements 
SchedulingTopology<DefaultExecu
        }
 
        @Override
-       public DefaultExecutionVertex getVertexOrThrow(final ExecutionVertexID 
executionVertexId) {
+       public DefaultExecutionVertex getVertex(final ExecutionVertexID 
executionVertexId) {
                final DefaultExecutionVertex executionVertex = 
executionVerticesById.get(executionVertexId);
                if (executionVertex == null) {
                        throw new IllegalArgumentException("can not find 
vertex: " + executionVertexId);
@@ -96,7 +96,7 @@ public class DefaultExecutionTopology implements 
SchedulingTopology<DefaultExecu
        }
 
        @Override
-       public DefaultResultPartition getResultPartitionOrThrow(final 
IntermediateResultPartitionID intermediateResultPartitionId) {
+       public DefaultResultPartition getResultPartition(final 
IntermediateResultPartitionID intermediateResultPartitionId) {
                final DefaultResultPartition resultPartition = 
resultPartitionsById.get(intermediateResultPartitionId);
                if (resultPartition == null) {
                        throw new IllegalArgumentException("can not find 
partition: " + intermediateResultPartitionId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
index da4c148..6520100 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java
@@ -86,7 +86,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
                // increase counter of the dataset first
                verticesToRestart
                        .stream()
-                       .map(schedulingTopology::getVertexOrThrow)
+                       .map(schedulingTopology::getVertex)
                        .flatMap(vertex -> 
IterableUtils.toStream(vertex.getProducedResults()))
                        
.forEach(inputConstraintChecker::resetSchedulingResultPartition);
 
@@ -101,7 +101,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
                }
 
                final Set<SchedulingExecutionVertex<?, ?>> verticesToSchedule = 
IterableUtils
-                       
.toStream(schedulingTopology.getVertexOrThrow(executionVertexId).getProducedResults())
+                       
.toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults())
                        .filter(partition -> 
partition.getResultType().isBlocking())
                        .flatMap(partition -> 
inputConstraintChecker.markSchedulingResultPartitionFinished(partition).stream())
                        .flatMap(partition -> 
IterableUtils.toStream(partition.getConsumers()))
@@ -113,7 +113,7 @@ public class LazyFromSourcesSchedulingStrategy implements 
SchedulingStrategy {
        @Override
        public void onPartitionConsumable(IntermediateResultPartitionID 
resultPartitionId) {
                final SchedulingResultPartition<?, ?> resultPartition = 
schedulingTopology
-                       .getResultPartitionOrThrow(resultPartitionId);
+                       .getResultPartition(resultPartitionId);
 
                if (!resultPartition.getResultType().isPipelined()) {
                        return;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
index 807217e..b1e9dd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
@@ -43,7 +43,7 @@ class SchedulingStrategyUtils {
                        final Set<ExecutionVertexID> vertexIds) {
 
                return vertexIds.stream()
-                       .map(topology::getVertexOrThrow)
+                       .map(topology::getVertex)
                        .collect(Collectors.toSet());
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
index fef82cf..ae26be9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java
@@ -34,7 +34,7 @@ public interface SchedulingTopology<V extends 
SchedulingExecutionVertex<V, R>, R
         * @return The respective scheduling vertex
         * @throws IllegalArgumentException If the vertex does not exist
         */
-       V getVertexOrThrow(ExecutionVertexID executionVertexId);
+       V getVertex(ExecutionVertexID executionVertexId);
 
        /**
         * Looks up the {@link SchedulingResultPartition} for the given {@link 
IntermediateResultPartitionID}.
@@ -43,5 +43,5 @@ public interface SchedulingTopology<V extends 
SchedulingExecutionVertex<V, R>, R
         * @return The respective scheduling result partition
         * @throws IllegalArgumentException If the partition does not exist
         */
-       R getResultPartitionOrThrow(IntermediateResultPartitionID 
intermediateResultPartitionId);
+       R getResultPartition(IntermediateResultPartitionID 
intermediateResultPartitionId);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
index 2f20f40..0b854e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
@@ -654,7 +654,7 @@ public class DefaultSchedulerTest extends TestLogger {
 
                scheduler.updateTaskExecutionState(new 
TaskExecutionState(jobid, attemptId1, ExecutionState.FAILED, new 
RuntimeException("expected")));
                scheduler.cancel();
-               final ExecutionState vertex2StateAfterCancel = 
topology.getVertexOrThrow(executionVertex2).getState();
+               final ExecutionState vertex2StateAfterCancel = 
topology.getVertex(executionVertex2).getState();
                final JobStatus statusAfterCancelWhileRestarting = 
scheduler.requestJobStatus();
                scheduler.updateTaskExecutionState(new 
TaskExecutionState(jobid, attemptId2, ExecutionState.CANCELED, new 
RuntimeException("expected")));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
index 8dcdfff..f0edb10 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
@@ -98,7 +98,7 @@ public class DefaultExecutionTopologyTest extends TestLogger {
                for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
                        for (Map.Entry<IntermediateResultPartitionID, 
IntermediateResultPartition> entry : vertex.getProducedPartitions().entrySet()) 
{
                                IntermediateResultPartition partition = 
entry.getValue();
-                               DefaultResultPartition 
schedulingResultPartition = adapter.getResultPartitionOrThrow(entry.getKey());
+                               DefaultResultPartition 
schedulingResultPartition = adapter.getResultPartition(entry.getKey());
 
                                assertPartitionEquals(partition, 
schedulingResultPartition);
                        }
@@ -114,7 +114,7 @@ public class DefaultExecutionTopologyTest extends 
TestLogger {
                        .get();
 
                final DefaultResultPartition schedulingResultPartition = adapter
-                       
.getResultPartitionOrThrow(intermediateResultPartition.getPartitionId());
+                       
.getResultPartition(intermediateResultPartition.getPartitionId());
 
                assertEquals(ResultPartitionState.CREATED, 
schedulingResultPartition.getState());
 
@@ -125,7 +125,7 @@ public class DefaultExecutionTopologyTest extends 
TestLogger {
        @Test
        public void testGetVertexOrThrow() {
                try {
-                       adapter.getVertexOrThrow(new ExecutionVertexID(new 
JobVertexID(), 0));
+                       adapter.getVertex(new ExecutionVertexID(new 
JobVertexID(), 0));
                        fail("get not exist vertex");
                } catch (IllegalArgumentException exception) {
                        // expected
@@ -135,7 +135,7 @@ public class DefaultExecutionTopologyTest extends 
TestLogger {
        @Test
        public void testResultPartitionOrThrow() {
                try {
-                       adapter.getResultPartitionOrThrow(new 
IntermediateResultPartitionID());
+                       adapter.getResultPartition(new 
IntermediateResultPartitionID());
                        fail("get not exist result partition");
                } catch (IllegalArgumentException exception) {
                        // expected
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index dd2fa88..5e930b4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -62,7 +62,7 @@ public class TestingSchedulingTopology
        }
 
        @Override
-       public TestingSchedulingExecutionVertex getVertexOrThrow(final 
ExecutionVertexID executionVertexId) {
+       public TestingSchedulingExecutionVertex getVertex(final 
ExecutionVertexID executionVertexId) {
                final TestingSchedulingExecutionVertex executionVertex = 
schedulingExecutionVertices.get(executionVertexId);
                if (executionVertex == null) {
                        throw new IllegalArgumentException("can not find 
vertex: " + executionVertexId);
@@ -71,7 +71,7 @@ public class TestingSchedulingTopology
        }
 
        @Override
-       public TestingSchedulingResultPartition getResultPartitionOrThrow(final 
IntermediateResultPartitionID intermediateResultPartitionId) {
+       public TestingSchedulingResultPartition getResultPartition(final 
IntermediateResultPartitionID intermediateResultPartitionId) {
                final TestingSchedulingResultPartition resultPartition = 
schedulingResultPartitions.get(intermediateResultPartitionId);
                if (resultPartition == null) {
                        throw new IllegalArgumentException("can not find 
partition: " + intermediateResultPartitionId);

Reply via email to