zentol commented on a change in pull request #8446: [FLINK-12414] [runtime]
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284697765
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
##########
@@ -62,121 +49,73 @@
/**
* Unit tests for {@link DefaultResultPartition}.
*/
-public class DefaultResultPartitionTest {
+public class DefaultResultPartitionTest extends TestLogger {
- private final SimpleAckingTaskManagerGateway taskManagerGateway = new
SimpleAckingTaskManagerGateway();
+ private final DefaultExecutionVertexTest.ExecutionStateProviderTest
stateProvider = new DefaultExecutionVertexTest.ExecutionStateProviderTest();
- private final TestRestartStrategy triggeredRestartStrategy =
TestRestartStrategy.manuallyTriggered();
+ private List<SchedulingExecutionVertex> schedulingExecutionVertices;
- private ExecutionGraph executionGraph;
-
- private ExecutionGraphToSchedulingTopologyAdapter adapter;
-
- private List<IntermediateResultPartition> intermediateResultPartitions;
-
- private List<SchedulingResultPartition> schedulingResultPartitions;
+ private DefaultResultPartition resultPartition;
@Before
- public void setUp() throws Exception {
- final int parallelism = 3;
- JobVertex[] jobVertices = new JobVertex[2];
- jobVertices[0] = createNoOpVertex(parallelism);
- jobVertices[1] = createNoOpVertex(parallelism);
- jobVertices[1].connectNewDataSetAsInput(jobVertices[0],
ALL_TO_ALL, BLOCKING);
- jobVertices[0].setInputDependencyConstraint(ALL);
- jobVertices[1].setInputDependencyConstraint(ANY);
- executionGraph = createSimpleTestGraph(
- new JobID(),
- taskManagerGateway,
- triggeredRestartStrategy,
- jobVertices);
- adapter = new
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
- intermediateResultPartitions = new ArrayList<>();
- schedulingResultPartitions = new ArrayList<>();
-
- for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
- for (Map.Entry<IntermediateResultPartitionID,
IntermediateResultPartition> entry
- : vertex.getProducedPartitions().entrySet()) {
-
intermediateResultPartitions.add(entry.getValue());
-
schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey())
- .orElseThrow(() -> new
IllegalArgumentException("can not find partition" + entry.getKey())));
- }
- }
- assertEquals(parallelism, intermediateResultPartitions.size());
- }
-
- @Test
- public void testBasicInfo() {
- for (int idx = 0; idx < intermediateResultPartitions.size();
idx++) {
- final IntermediateResultPartition partition =
intermediateResultPartitions.get(idx);
- final SchedulingResultPartition
schedulingResultPartition = schedulingResultPartitions.get(idx);
- assertEquals(partition.getPartitionId(),
schedulingResultPartition.getId());
- assertEquals(partition.getIntermediateResult().getId(),
schedulingResultPartition.getResultId());
- assertEquals(partition.getResultType(),
schedulingResultPartition.getPartitionType());
- }
+ public void setUp() {
+ schedulingExecutionVertices = new ArrayList<>(2);
+ resultPartition = new DefaultResultPartition(
+ new IntermediateResultPartitionID(),
+ new IntermediateDataSetID(),
+ BLOCKING);
+
+ DefaultExecutionVertex vertex1 = new DefaultExecutionVertex(
+ new ExecutionVertexID(new JobVertexID(), 0),
+ Collections.singletonList(resultPartition),
+ ALL,
+ stateProvider);
+ resultPartition.setProducer(vertex1);
+ DefaultExecutionVertex vertex2 = new DefaultExecutionVertex(
+ new ExecutionVertexID(new JobVertexID(), 0),
+ java.util.Collections.emptyList(),
+ ALL,
+ stateProvider);
+ resultPartition.addConsumer(vertex2);
+ schedulingExecutionVertices.add(vertex1);
+ schedulingExecutionVertices.add(vertex2);
}
@Test
public void testGetConsumers() {
- for (int idx = 0; idx < intermediateResultPartitions.size();
idx++) {
- Collection<ExecutionVertexID> schedulingConsumers =
schedulingResultPartitions.get(idx).getConsumers()
-
.stream().map(SchedulingExecutionVertex::getId).collect(Collectors.toList());
+ Collection<ExecutionVertexID> schedulingConsumers =
resultPartition.getConsumers()
+
.stream().map(SchedulingExecutionVertex::getId).collect(Collectors.toList());
- Set<ExecutionVertexID> executionConsumers = new
HashSet<>();
- for (List<ExecutionEdge> list
:intermediateResultPartitions.get(idx).getConsumers()) {
- for (ExecutionEdge edge : list) {
- final ExecutionVertex vertex =
edge.getTarget();
- executionConsumers.add(new
ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()));
- }
- }
- assertThat(schedulingConsumers,
containsInAnyOrder(executionConsumers.toArray()));
- }
+ List<ExecutionVertexID> executionConsumers =
Collections.singletonList(schedulingExecutionVertices.get(1).getId());
+ assertThat(schedulingConsumers,
containsInAnyOrder(executionConsumers.toArray()));
}
@Test
public void testGetProducer() {
- for (int idx = 0; idx < intermediateResultPartitions.size();
idx++) {
- final ExecutionVertex vertex =
intermediateResultPartitions.get(idx).getProducer();
-
assertEquals(schedulingResultPartitions.get(idx).getProducer().getId(),
- new ExecutionVertexID(vertex.getJobvertexId(),
vertex.getParallelSubtaskIndex()));
- }
+ assertEquals(resultPartition.getProducer().getId(),
schedulingExecutionVertices.get(0).getId());
}
@Test
public void testGetPartitionState() {
- List<SchedulingExecutionVertex> schedulingExecutionVertices =
new ArrayList<>();
- executionGraph.getAllExecutionVertices().forEach(
- vertex -> schedulingExecutionVertices.add(new
DefaultExecutionVertex(vertex)));
-
final ExecutionState[] states = ExecutionState.values();
Review comment:
in-line, as above
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services