zentol commented on a change in pull request #8446: [FLINK-12414] [runtime]
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284697592
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
##########
@@ -62,121 +49,73 @@
/**
* Unit tests for {@link DefaultResultPartition}.
*/
-public class DefaultResultPartitionTest {
+public class DefaultResultPartitionTest extends TestLogger {
- private final SimpleAckingTaskManagerGateway taskManagerGateway = new
SimpleAckingTaskManagerGateway();
+ private final DefaultExecutionVertexTest.ExecutionStateProviderTest
stateProvider = new DefaultExecutionVertexTest.ExecutionStateProviderTest();
- private final TestRestartStrategy triggeredRestartStrategy =
TestRestartStrategy.manuallyTriggered();
+ private List<SchedulingExecutionVertex> schedulingExecutionVertices;
- private ExecutionGraph executionGraph;
-
- private ExecutionGraphToSchedulingTopologyAdapter adapter;
-
- private List<IntermediateResultPartition> intermediateResultPartitions;
-
- private List<SchedulingResultPartition> schedulingResultPartitions;
+ private DefaultResultPartition resultPartition;
@Before
- public void setUp() throws Exception {
- final int parallelism = 3;
- JobVertex[] jobVertices = new JobVertex[2];
- jobVertices[0] = createNoOpVertex(parallelism);
- jobVertices[1] = createNoOpVertex(parallelism);
- jobVertices[1].connectNewDataSetAsInput(jobVertices[0],
ALL_TO_ALL, BLOCKING);
- jobVertices[0].setInputDependencyConstraint(ALL);
- jobVertices[1].setInputDependencyConstraint(ANY);
- executionGraph = createSimpleTestGraph(
- new JobID(),
- taskManagerGateway,
- triggeredRestartStrategy,
- jobVertices);
- adapter = new
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
- intermediateResultPartitions = new ArrayList<>();
- schedulingResultPartitions = new ArrayList<>();
-
- for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
- for (Map.Entry<IntermediateResultPartitionID,
IntermediateResultPartition> entry
- : vertex.getProducedPartitions().entrySet()) {
-
intermediateResultPartitions.add(entry.getValue());
-
schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey())
- .orElseThrow(() -> new
IllegalArgumentException("can not find partition" + entry.getKey())));
- }
- }
- assertEquals(parallelism, intermediateResultPartitions.size());
- }
-
- @Test
- public void testBasicInfo() {
- for (int idx = 0; idx < intermediateResultPartitions.size();
idx++) {
- final IntermediateResultPartition partition =
intermediateResultPartitions.get(idx);
- final SchedulingResultPartition
schedulingResultPartition = schedulingResultPartitions.get(idx);
- assertEquals(partition.getPartitionId(),
schedulingResultPartition.getId());
- assertEquals(partition.getIntermediateResult().getId(),
schedulingResultPartition.getResultId());
- assertEquals(partition.getResultType(),
schedulingResultPartition.getPartitionType());
- }
+ public void setUp() {
+ schedulingExecutionVertices = new ArrayList<>(2);
Review comment:
we never iterate over the array, so we could just have 2 fields (producer
and consumer), is a bit easier to read
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services