This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 359ab037349145b8eaadbd476d2f1a98bdab45ed Author: Xintong Song <[email protected]> AuthorDate: Thu May 28 16:11:31 2020 +0800 [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread. --- .../TaskSubmissionTestEnvironment.java | 9 +- .../runtime/taskexecutor/TestingTaskExecutor.java | 5 + .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +++++++++++++++++++++ 3 files changed, 217 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 824e0f0..14adabf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable; +import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; @@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { private final TestingHighAvailabilityServices haServices; private final TemporaryFolder temporaryFolder; - private final TaskSlotTable<Task> taskSlotTable; + private final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable; private final JobMasterId jobMasterId; private TestingTaskExecutor taskExecutor; @@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { this.jobMasterId = jobMasterId; - this.taskSlotTable = slotSize > 0 ? + final TaskSlotTable<Task> taskSlotTable = slotSize > 0 ? TaskSlotUtils.createTaskSlotTable(slotSize) : TestingTaskSlotTable .<Task>newBuilder() @@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { taskExecutor.start(); taskExecutor.waitUntilStarted(); + + this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting()); } static void registerJobMasterConnection( @@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { } public TaskSlotTable<Task> getTaskSlotTable() { - return taskSlotTable; + return threadSafeTaskSlotTable; } public JobMasterId getJobMasterId() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java index 7a4a01f..95b2f2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcService; import javax.annotation.Nullable; @@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor { void waitUntilStarted() { startFuture.join(); } + + MainThreadExecutable getMainThreadExecutableForTesting() { + return this.rpcServer; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java new file mode 100644 index 0000000..0b9b668 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.slot; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.rpc.MainThreadExecutable; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Testing implementation of {@link TaskSlotTable}. + * This class wraps a given {@link TaskSlotTable}, guarantees all the accesses are invoked on the given {@link MainThreadExecutable}. + */ +public class ThreadSafeTaskSlotTable<T extends TaskSlotPayload> implements TaskSlotTable<T> { + + private final TaskSlotTable<T> taskSlotTable; + private final MainThreadExecutable mainThreadExecutable; + + public ThreadSafeTaskSlotTable( + final TaskSlotTable<T> taskSlotTable, + final MainThreadExecutable mainThreadExecutable) { + this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable); + this.mainThreadExecutable = Preconditions.checkNotNull(mainThreadExecutable); + } + + private void runAsync(Runnable runnable) { + mainThreadExecutable.runAsync(runnable); + } + + private <V> V callAsync(Callable<V> callable) { + try { + return mainThreadExecutable.callAsync( + callable, + Time.days(1) // practically infinite timeout + ).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) { + runAsync(() -> taskSlotTable.start(initialSlotActions, mainThreadExecutor)); + } + + @Override + public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) { + return callAsync(() -> taskSlotTable.getAllocationIdsPerJob(jobId)); + } + + @Override + public SlotReport createSlotReport(ResourceID resourceId) { + return callAsync(() -> taskSlotTable.createSlotReport(resourceId)); + } + + @Override + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, Time slotTimeout) { + return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, allocationId, slotTimeout)); + } + + @Override + public boolean allocateSlot(int index, JobID jobId, AllocationID allocationId, ResourceProfile resourceProfile, Time slotTimeout) { + return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, allocationId, resourceProfile, slotTimeout)); + } + + @Override + public boolean markSlotActive(AllocationID allocationId) throws SlotNotFoundException { + return callAsync(() -> taskSlotTable.markSlotActive(allocationId)); + } + + @Override + public boolean markSlotInactive(AllocationID allocationId, Time slotTimeout) throws SlotNotFoundException { + return callAsync(() -> taskSlotTable.markSlotInactive(allocationId, slotTimeout)); + } + + @Override + public int freeSlot(AllocationID allocationId) throws SlotNotFoundException { + return callAsync(() -> taskSlotTable.freeSlot(allocationId)); + } + + @Override + public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException { + return callAsync(() -> taskSlotTable.freeSlot(allocationId, cause)); + } + + @Override + public boolean isValidTimeout(AllocationID allocationId, UUID ticket) { + return callAsync(() -> taskSlotTable.isValidTimeout(allocationId, ticket)); + } + + @Override + public boolean isAllocated(int index, JobID jobId, AllocationID allocationId) { + return callAsync(() -> taskSlotTable.isAllocated(index, jobId, allocationId)); + } + + @Override + public boolean tryMarkSlotActive(JobID jobId, AllocationID allocationId) { + return callAsync(() -> taskSlotTable.tryMarkSlotActive(jobId, allocationId)); + } + + @Override + public boolean isSlotFree(int index) { + return callAsync(() -> taskSlotTable.isSlotFree(index)); + } + + @Override + public boolean hasAllocatedSlots(JobID jobId) { + return callAsync(() -> taskSlotTable.hasAllocatedSlots(jobId)); + } + + @Override + public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) { + return callAsync(() -> taskSlotTable.getAllocatedSlots(jobId)); + } + + @Override + public Iterator<AllocationID> getActiveSlots(JobID jobId) { + return callAsync(() -> taskSlotTable.getActiveSlots(jobId)); + } + + @Nullable + @Override + public JobID getOwningJob(AllocationID allocationId) { + return callAsync(() -> taskSlotTable.getOwningJob(allocationId)); + } + + @Override + public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException { + return callAsync(() -> taskSlotTable.addTask(task)); + } + + @Override + public T removeTask(ExecutionAttemptID executionAttemptID) { + return callAsync(() -> taskSlotTable.removeTask(executionAttemptID)); + } + + @Override + public T getTask(ExecutionAttemptID executionAttemptID) { + return callAsync(() -> taskSlotTable.getTask(executionAttemptID)); + } + + @Override + public Iterator<T> getTasks(JobID jobId) { + return callAsync(() -> taskSlotTable.getTasks(jobId)); + } + + @Override + public AllocationID getCurrentAllocation(int index) { + return callAsync(() -> taskSlotTable.getCurrentAllocation(index)); + } + + @Override + public MemoryManager getTaskMemoryManager(AllocationID allocationID) throws SlotNotFoundException { + return callAsync(() -> taskSlotTable.getTaskMemoryManager(allocationID)); + } + + @Override + public void notifyTimeout(AllocationID key, UUID ticket) { + runAsync(() -> taskSlotTable.notifyTimeout(key, ticket)); + } + + @Override + public CompletableFuture<Void> closeAsync() { + return callAsync(taskSlotTable::closeAsync); + } + + @Override + public void close() throws Exception { + callAsync(() -> { + taskSlotTable.close(); + return null; + }); + } +}
