This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a1c9a30e322b0b3adfeadc3d01a2cbfced9fa74e Author: Zhu Zhu <[email protected]> AuthorDate: Thu Jun 11 10:37:44 2020 +0800 [hotfix][runtime] Move shared static test methods of slot allocator into ExecutionSlotAllocatorTestUtils --- .../DefaultExecutionSlotAllocatorTest.java | 23 +------- .../scheduler/ExecutionSlotAllocatorTestUtils.java | 61 ++++++++++++++++++++++ 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java index a6a559e..ad74357 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java @@ -54,6 +54,8 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.createSchedulingRequirements; +import static org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorTestUtils.findSlotAssignmentByExecutionVertexId; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; @@ -280,27 +282,6 @@ public class DefaultExecutionSlotAllocatorTest extends TestLogger { new DefaultPreferredLocationsRetriever(stateLocationRetriever, inputsLocationsRetriever)); } - private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(ExecutionVertexID... executionVertexIds) { - List<ExecutionVertexSchedulingRequirements> schedulingRequirements = new ArrayList<>(executionVertexIds.length); - - for (ExecutionVertexID executionVertexId : executionVertexIds) { - schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder() - .withExecutionVertexId(executionVertexId).build()); - } - return schedulingRequirements; - } - - private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId( - ExecutionVertexID executionVertexId, - Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) { - return slotExecutionVertexAssignments.stream() - .filter(slotExecutionVertexAssignment -> slotExecutionVertexAssignment.getExecutionVertexId().equals(executionVertexId)) - .findFirst() - .orElseThrow(() -> new IllegalArgumentException(String.format( - "SlotExecutionVertexAssignment with execution vertex id %s not found", - executionVertexId))); - } - private static class AllocationToggableSlotProvider implements SlotProvider { private final List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> slotAllocationRequests = new ArrayList<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java new file mode 100644 index 0000000..426d5f7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionSlotAllocatorTestUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Test utils for {@link ExecutionSlotAllocator}. + */ +class ExecutionSlotAllocatorTestUtils { + + static List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements( + final ExecutionVertexID... executionVertexIds) { + + final List<ExecutionVertexSchedulingRequirements> schedulingRequirements = + new ArrayList<>(executionVertexIds.length); + + for (ExecutionVertexID executionVertexId : executionVertexIds) { + schedulingRequirements.add(new ExecutionVertexSchedulingRequirements.Builder() + .withExecutionVertexId(executionVertexId).build()); + } + return schedulingRequirements; + } + + static SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId( + final ExecutionVertexID executionVertexId, + final Collection<SlotExecutionVertexAssignment> slotExecutionVertexAssignments) { + + return slotExecutionVertexAssignments.stream() + .filter(assignment -> assignment.getExecutionVertexId().equals(executionVertexId)) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException(String.format( + "SlotExecutionVertexAssignment with execution vertex id %s not found", + executionVertexId))); + } + + private ExecutionSlotAllocatorTestUtils() { + } +}
