This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 359ab037349145b8eaadbd476d2f1a98bdab45ed
Author: Xintong Song <[email protected]>
AuthorDate: Thu May 28 16:11:31 2020 +0800

    [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via 
TaskSubmissionTestEnvironment not in RPC main thread.
---
 .../TaskSubmissionTestEnvironment.java             |   9 +-
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   5 +
 .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +++++++++++++++++++++
 3 files changed, 217 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 824e0f0..14adabf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
        private final TestingHighAvailabilityServices haServices;
        private final TemporaryFolder temporaryFolder;
-       private final TaskSlotTable<Task> taskSlotTable;
+       private final ThreadSafeTaskSlotTable<Task> threadSafeTaskSlotTable;
        private final JobMasterId jobMasterId;
 
        private TestingTaskExecutor taskExecutor;
@@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
 
                this.jobMasterId = jobMasterId;
 
-               this.taskSlotTable = slotSize > 0 ?
+               final TaskSlotTable<Task> taskSlotTable = slotSize > 0 ?
                        TaskSlotUtils.createTaskSlotTable(slotSize) :
                        TestingTaskSlotTable
                                .<Task>newBuilder()
@@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
 
                taskExecutor.start();
                taskExecutor.waitUntilStarted();
+
+               this.threadSafeTaskSlotTable = new 
ThreadSafeTaskSlotTable<>(taskSlotTable, 
taskExecutor.getMainThreadExecutableForTesting());
        }
 
        static void registerJobMasterConnection(
@@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
        }
 
        public TaskSlotTable<Task> getTaskSlotTable() {
-               return taskSlotTable;
+               return threadSafeTaskSlotTable;
        }
 
        public JobMasterId getJobMasterId() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 7a4a01f..95b2f2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
@@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor {
        void waitUntilStarted() {
                startFuture.join();
        }
+
+       MainThreadExecutable getMainThreadExecutableForTesting() {
+               return this.rpcServer;
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
new file mode 100644
index 0000000..0b9b668
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Testing implementation of {@link TaskSlotTable}.
+ * This class wraps a given {@link TaskSlotTable}, guarantees all the accesses 
are invoked on the given {@link MainThreadExecutable}.
+ */
+public class ThreadSafeTaskSlotTable<T extends TaskSlotPayload> implements 
TaskSlotTable<T> {
+
+       private final TaskSlotTable<T> taskSlotTable;
+       private final MainThreadExecutable mainThreadExecutable;
+
+       public ThreadSafeTaskSlotTable(
+                       final TaskSlotTable<T> taskSlotTable,
+                       final MainThreadExecutable mainThreadExecutable) {
+               this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
+               this.mainThreadExecutable = 
Preconditions.checkNotNull(mainThreadExecutable);
+       }
+
+       private void runAsync(Runnable runnable) {
+               mainThreadExecutable.runAsync(runnable);
+       }
+
+       private <V> V callAsync(Callable<V> callable) {
+               try {
+                       return mainThreadExecutable.callAsync(
+                               callable,
+                               Time.days(1) // practically infinite timeout
+                       ).get();
+               } catch (InterruptedException | ExecutionException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public void start(SlotActions initialSlotActions, 
ComponentMainThreadExecutor mainThreadExecutor) {
+               runAsync(() -> taskSlotTable.start(initialSlotActions, 
mainThreadExecutor));
+       }
+
+       @Override
+       public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+               return callAsync(() -> 
taskSlotTable.getAllocationIdsPerJob(jobId));
+       }
+
+       @Override
+       public SlotReport createSlotReport(ResourceID resourceId) {
+               return callAsync(() -> 
taskSlotTable.createSlotReport(resourceId));
+       }
+
+       @Override
+       public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId, Time slotTimeout) {
+               return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, 
allocationId, slotTimeout));
+       }
+
+       @Override
+       public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId, ResourceProfile resourceProfile, Time slotTimeout) {
+               return callAsync(() -> taskSlotTable.allocateSlot(index, jobId, 
allocationId, resourceProfile, slotTimeout));
+       }
+
+       @Override
+       public boolean markSlotActive(AllocationID allocationId) throws 
SlotNotFoundException {
+               return callAsync(() -> 
taskSlotTable.markSlotActive(allocationId));
+       }
+
+       @Override
+       public boolean markSlotInactive(AllocationID allocationId, Time 
slotTimeout) throws SlotNotFoundException {
+               return callAsync(() -> 
taskSlotTable.markSlotInactive(allocationId, slotTimeout));
+       }
+
+       @Override
+       public int freeSlot(AllocationID allocationId) throws 
SlotNotFoundException {
+               return callAsync(() -> taskSlotTable.freeSlot(allocationId));
+       }
+
+       @Override
+       public int freeSlot(AllocationID allocationId, Throwable cause) throws 
SlotNotFoundException {
+               return callAsync(() -> taskSlotTable.freeSlot(allocationId, 
cause));
+       }
+
+       @Override
+       public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+               return callAsync(() -> 
taskSlotTable.isValidTimeout(allocationId, ticket));
+       }
+
+       @Override
+       public boolean isAllocated(int index, JobID jobId, AllocationID 
allocationId) {
+               return callAsync(() -> taskSlotTable.isAllocated(index, jobId, 
allocationId));
+       }
+
+       @Override
+       public boolean tryMarkSlotActive(JobID jobId, AllocationID 
allocationId) {
+               return callAsync(() -> taskSlotTable.tryMarkSlotActive(jobId, 
allocationId));
+       }
+
+       @Override
+       public boolean isSlotFree(int index) {
+               return callAsync(() -> taskSlotTable.isSlotFree(index));
+       }
+
+       @Override
+       public boolean hasAllocatedSlots(JobID jobId) {
+               return callAsync(() -> taskSlotTable.hasAllocatedSlots(jobId));
+       }
+
+       @Override
+       public Iterator<TaskSlot<T>> getAllocatedSlots(JobID jobId) {
+               return callAsync(() -> taskSlotTable.getAllocatedSlots(jobId));
+       }
+
+       @Override
+       public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+               return callAsync(() -> taskSlotTable.getActiveSlots(jobId));
+       }
+
+       @Nullable
+       @Override
+       public JobID getOwningJob(AllocationID allocationId) {
+               return callAsync(() -> 
taskSlotTable.getOwningJob(allocationId));
+       }
+
+       @Override
+       public boolean addTask(T task) throws SlotNotFoundException, 
SlotNotActiveException {
+               return callAsync(() -> taskSlotTable.addTask(task));
+       }
+
+       @Override
+       public T removeTask(ExecutionAttemptID executionAttemptID) {
+               return callAsync(() -> 
taskSlotTable.removeTask(executionAttemptID));
+       }
+
+       @Override
+       public T getTask(ExecutionAttemptID executionAttemptID) {
+               return callAsync(() -> 
taskSlotTable.getTask(executionAttemptID));
+       }
+
+       @Override
+       public Iterator<T> getTasks(JobID jobId) {
+               return callAsync(() -> taskSlotTable.getTasks(jobId));
+       }
+
+       @Override
+       public AllocationID getCurrentAllocation(int index) {
+               return callAsync(() -> 
taskSlotTable.getCurrentAllocation(index));
+       }
+
+       @Override
+       public MemoryManager getTaskMemoryManager(AllocationID allocationID) 
throws SlotNotFoundException {
+               return callAsync(() -> 
taskSlotTable.getTaskMemoryManager(allocationID));
+       }
+
+       @Override
+       public void notifyTimeout(AllocationID key, UUID ticket) {
+               runAsync(() -> taskSlotTable.notifyTimeout(key, ticket));
+       }
+
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               return callAsync(taskSlotTable::closeAsync);
+       }
+
+       @Override
+       public void close() throws Exception {
+               callAsync(() -> {
+                       taskSlotTable.close();
+                       return null;
+               });
+       }
+}

Reply via email to