This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 39ebaae65a9a2f55f50f8714597fbc1c2eafc7c0 Author: Gary Yao <g...@apache.org> AuthorDate: Thu Apr 9 10:38:46 2020 +0200 [FLINK-17050][runtime] Rename methods getVertexOrThrow() and getResultPartitionOrThrow() Rename methods - SchedulingTopology#getVertexOrThrow(ExecutionVertexID) - SchedulingTopology#getResultPartitionOrThrow(IntermediateResultPartitionID) to - SchedulingTopology#getVertex(ExecutionVertexID) - SchedulingTopology#getResultPartition(IntermediateResultPartitionID) respectively. This closes #11684. --- .../org/apache/flink/runtime/executiongraph/ExecutionGraph.java | 2 +- .../flip1/partitionrelease/RegionPartitionReleaseStrategy.java | 4 ++-- .../flink/runtime/scheduler/adapter/DefaultExecutionTopology.java | 4 ++-- .../scheduler/strategy/LazyFromSourcesSchedulingStrategy.java | 6 +++--- .../flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java | 2 +- .../flink/runtime/scheduler/strategy/SchedulingTopology.java | 4 ++-- .../org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java | 2 +- .../runtime/scheduler/adapter/DefaultExecutionTopologyTest.java | 8 ++++---- .../runtime/scheduler/strategy/TestingSchedulingTopology.java | 4 ++-- 9 files changed, 18 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index d3aa0e2..4ef2bbf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1547,7 +1547,7 @@ public class ExecutionGraph implements AccessExecutionGraph { ResultPartitionID createResultPartitionId(final IntermediateResultPartitionID resultPartitionId) { final SchedulingResultPartition<?, ?> schedulingResultPartition = - getSchedulingTopology().getResultPartitionOrThrow(resultPartitionId); + getSchedulingTopology().getResultPartition(resultPartitionId); final SchedulingExecutionVertex<?, ?> producer = schedulingResultPartition.getProducer(); final ExecutionVertexID producerId = producer.getId(); final JobVertexID jobVertexId = producerId.getJobVertexId(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java index a05a24a..d5529b7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java @@ -88,7 +88,7 @@ public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy final Set<SchedulingResultPartition<?, ?>> allConsumedPartitionsInRegion = pipelinedRegion .getExecutionVertexIds() .stream() - .map(schedulingTopology::getVertexOrThrow) + .map(schedulingTopology::getVertex) .flatMap(vertex -> IterableUtils.toStream(vertex.getConsumedResults())) .collect(Collectors.toSet()); @@ -157,7 +157,7 @@ public class RegionPartitionReleaseStrategy implements PartitionReleaseStrategy } private boolean areConsumerRegionsFinished(final IntermediateResultPartitionID resultPartitionId) { - final SchedulingResultPartition<?, ?> resultPartition = schedulingTopology.getResultPartitionOrThrow(resultPartitionId); + final SchedulingResultPartition<?, ?> resultPartition = schedulingTopology.getResultPartition(resultPartitionId); return IterableUtils.toStream(resultPartition.getConsumers()) .map(SchedulingExecutionVertex::getId) .allMatch(this::isRegionOfVertexFinished); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java index 162a888..a535fdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java @@ -87,7 +87,7 @@ public class DefaultExecutionTopology implements SchedulingTopology<DefaultExecu } @Override - public DefaultExecutionVertex getVertexOrThrow(final ExecutionVertexID executionVertexId) { + public DefaultExecutionVertex getVertex(final ExecutionVertexID executionVertexId) { final DefaultExecutionVertex executionVertex = executionVerticesById.get(executionVertexId); if (executionVertex == null) { throw new IllegalArgumentException("can not find vertex: " + executionVertexId); @@ -96,7 +96,7 @@ public class DefaultExecutionTopology implements SchedulingTopology<DefaultExecu } @Override - public DefaultResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID intermediateResultPartitionId) { + public DefaultResultPartition getResultPartition(final IntermediateResultPartitionID intermediateResultPartitionId) { final DefaultResultPartition resultPartition = resultPartitionsById.get(intermediateResultPartitionId); if (resultPartition == null) { throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java index da4c148..6520100 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/LazyFromSourcesSchedulingStrategy.java @@ -86,7 +86,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { // increase counter of the dataset first verticesToRestart .stream() - .map(schedulingTopology::getVertexOrThrow) + .map(schedulingTopology::getVertex) .flatMap(vertex -> IterableUtils.toStream(vertex.getProducedResults())) .forEach(inputConstraintChecker::resetSchedulingResultPartition); @@ -101,7 +101,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { } final Set<SchedulingExecutionVertex<?, ?>> verticesToSchedule = IterableUtils - .toStream(schedulingTopology.getVertexOrThrow(executionVertexId).getProducedResults()) + .toStream(schedulingTopology.getVertex(executionVertexId).getProducedResults()) .filter(partition -> partition.getResultType().isBlocking()) .flatMap(partition -> inputConstraintChecker.markSchedulingResultPartitionFinished(partition).stream()) .flatMap(partition -> IterableUtils.toStream(partition.getConsumers())) @@ -113,7 +113,7 @@ public class LazyFromSourcesSchedulingStrategy implements SchedulingStrategy { @Override public void onPartitionConsumable(IntermediateResultPartitionID resultPartitionId) { final SchedulingResultPartition<?, ?> resultPartition = schedulingTopology - .getResultPartitionOrThrow(resultPartitionId); + .getResultPartition(resultPartitionId); if (!resultPartition.getResultType().isPipelined()) { return; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java index 807217e..b1e9dd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java @@ -43,7 +43,7 @@ class SchedulingStrategyUtils { final Set<ExecutionVertexID> vertexIds) { return vertexIds.stream() - .map(topology::getVertexOrThrow) + .map(topology::getVertex) .collect(Collectors.toSet()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java index fef82cf..ae26be9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java @@ -34,7 +34,7 @@ public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R * @return The respective scheduling vertex * @throws IllegalArgumentException If the vertex does not exist */ - V getVertexOrThrow(ExecutionVertexID executionVertexId); + V getVertex(ExecutionVertexID executionVertexId); /** * Looks up the {@link SchedulingResultPartition} for the given {@link IntermediateResultPartitionID}. @@ -43,5 +43,5 @@ public interface SchedulingTopology<V extends SchedulingExecutionVertex<V, R>, R * @return The respective scheduling result partition * @throws IllegalArgumentException If the partition does not exist */ - R getResultPartitionOrThrow(IntermediateResultPartitionID intermediateResultPartitionId); + R getResultPartition(IntermediateResultPartitionID intermediateResultPartitionId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 2f20f40..0b854e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -654,7 +654,7 @@ public class DefaultSchedulerTest extends TestLogger { scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId1, ExecutionState.FAILED, new RuntimeException("expected"))); scheduler.cancel(); - final ExecutionState vertex2StateAfterCancel = topology.getVertexOrThrow(executionVertex2).getState(); + final ExecutionState vertex2StateAfterCancel = topology.getVertex(executionVertex2).getState(); final JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus(); scheduler.updateTaskExecutionState(new TaskExecutionState(jobid, attemptId2, ExecutionState.CANCELED, new RuntimeException("expected"))); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java index 8dcdfff..f0edb10 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java @@ -98,7 +98,7 @@ public class DefaultExecutionTopologyTest extends TestLogger { for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { for (Map.Entry<IntermediateResultPartitionID, IntermediateResultPartition> entry : vertex.getProducedPartitions().entrySet()) { IntermediateResultPartition partition = entry.getValue(); - DefaultResultPartition schedulingResultPartition = adapter.getResultPartitionOrThrow(entry.getKey()); + DefaultResultPartition schedulingResultPartition = adapter.getResultPartition(entry.getKey()); assertPartitionEquals(partition, schedulingResultPartition); } @@ -114,7 +114,7 @@ public class DefaultExecutionTopologyTest extends TestLogger { .get(); final DefaultResultPartition schedulingResultPartition = adapter - .getResultPartitionOrThrow(intermediateResultPartition.getPartitionId()); + .getResultPartition(intermediateResultPartition.getPartitionId()); assertEquals(ResultPartitionState.CREATED, schedulingResultPartition.getState()); @@ -125,7 +125,7 @@ public class DefaultExecutionTopologyTest extends TestLogger { @Test public void testGetVertexOrThrow() { try { - adapter.getVertexOrThrow(new ExecutionVertexID(new JobVertexID(), 0)); + adapter.getVertex(new ExecutionVertexID(new JobVertexID(), 0)); fail("get not exist vertex"); } catch (IllegalArgumentException exception) { // expected @@ -135,7 +135,7 @@ public class DefaultExecutionTopologyTest extends TestLogger { @Test public void testResultPartitionOrThrow() { try { - adapter.getResultPartitionOrThrow(new IntermediateResultPartitionID()); + adapter.getResultPartition(new IntermediateResultPartitionID()); fail("get not exist result partition"); } catch (IllegalArgumentException exception) { // expected diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java index dd2fa88..5e930b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java @@ -62,7 +62,7 @@ public class TestingSchedulingTopology } @Override - public TestingSchedulingExecutionVertex getVertexOrThrow(final ExecutionVertexID executionVertexId) { + public TestingSchedulingExecutionVertex getVertex(final ExecutionVertexID executionVertexId) { final TestingSchedulingExecutionVertex executionVertex = schedulingExecutionVertices.get(executionVertexId); if (executionVertex == null) { throw new IllegalArgumentException("can not find vertex: " + executionVertexId); @@ -71,7 +71,7 @@ public class TestingSchedulingTopology } @Override - public TestingSchedulingResultPartition getResultPartitionOrThrow(final IntermediateResultPartitionID intermediateResultPartitionId) { + public TestingSchedulingResultPartition getResultPartition(final IntermediateResultPartitionID intermediateResultPartitionId) { final TestingSchedulingResultPartition resultPartition = schedulingResultPartitions.get(intermediateResultPartitionId); if (resultPartition == null) { throw new IllegalArgumentException("can not find partition: " + intermediateResultPartitionId);