Repository: flink
Updated Branches:
  refs/heads/master 8e1775afc -> d6aed38b3


[FLINK-5836] [flip6] Fix race condition between offer slot and submit task

Streamline test case

This closes #3371.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6aed38b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6aed38b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6aed38b

Branch: refs/heads/master
Commit: d6aed38b3a15946d383d762030b5f5c1418388de
Parents: 8e1775a
Author: wenlong.lwl <[email protected]>
Authored: Fri Jan 6 16:32:08 2017 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Wed Feb 22 14:25:38 2017 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  35 ++--
 .../runtime/taskexecutor/TaskExecutorTest.java  | 168 +++++++++++++++++++
 2 files changed, 186 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d6aed38b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 58bbfac..2980376 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -659,7 +659,22 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                final Collection<SlotOffer> reservedSlots = new 
HashSet<>(2);
 
                                while (reservedSlotsIterator.hasNext()) {
-                                       
reservedSlots.add(reservedSlotsIterator.next().generateSlotOffer());
+                                       SlotOffer offer = 
reservedSlotsIterator.next().generateSlotOffer();
+                                       try {
+                                               if 
(!taskSlotTable.markSlotActive(offer.getAllocationId())) {
+                                                       // the slot is either 
free or releasing at the moment
+                                                       final String message = 
"Could not mark slot " + jobId + " active.";
+                                                       log.debug(message);
+                                                       
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(),
+                                                               leaderId, new 
Exception(message));
+                                               }
+                                       } catch (SlotNotFoundException e) {
+                                               final String message = "Could 
not mark slot " + jobId + " active.";
+                                               
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(),
+                                                       leaderId, new 
Exception(message));
+                                               continue;
+                                       }
+                                       reservedSlots.add(offer);
                                }
 
                                Future<Iterable<SlotOffer>> acceptedSlotsFuture 
= jobMasterGateway.offerSlots(
@@ -674,22 +689,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                                // check if the response is 
still valid
                                                if 
(isJobManagerConnectionValid(jobId, leaderId)) {
                                                        // mark accepted slots 
active
-                                                       for (SlotOffer 
acceptedSlot: acceptedSlots) {
-                                                               try {
-                                                                       if 
(!taskSlotTable.markSlotActive(acceptedSlot.getAllocationId())) {
-                                                                               
// the slot is either free or releasing at the moment
-                                                                               
final String message = "Could not mark slot " + jobId + " active.";
-                                                                               
log.debug(message);
-                                                                               
jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(),
-                                                                               
                leaderId, new Exception(message));
-                                                                       }
-
-                                                                       // 
remove the assigned slots so that we can free the left overs
-                                                                       
reservedSlots.remove(acceptedSlot);
-                                                               } catch 
(SlotNotFoundException e) {
-                                                                       
log.debug("Could not mark slot {} active.", acceptedSlot,  e);
-                                                                       
jobMasterGateway.failSlot(getResourceID(), acceptedSlot.getAllocationId(), 
leaderId, e);
-                                                               }
+                                                       for (SlotOffer 
acceptedSlot : acceptedSlots) {
+                                                               
reservedSlots.remove(acceptedSlot);
                                                        }
 
                                                        final Exception e = new 
Exception("The slot was rejected by the JobManager.");

http://git-wip-us.apache.org/repos/asf/flink/blob/d6aed38b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index aacd329..31bf9b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -696,4 +696,172 @@ public class TaskExecutorTest extends TestLogger {
                }
 
        }
+
+       /**
+        * This tests task executor receive SubmitTask before OfferSlot 
response.
+        */
+       @Test
+       public void testSubmitTaskBeforeAcceptSlot() throws Exception {
+               final JobID jobId = new JobID();
+
+               final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
+               final Configuration configuration = new Configuration();
+               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
+               final ResourceID resourceId = new ResourceID("foobar");
+               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+               final TimerService<AllocationID> timerService = 
mock(TimerService.class);
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), 
mock(ResourceProfile.class)), timerService);
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(taskManagerLocation);
+               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               final String resourceManagerAddress = "rm";
+               final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+               final String jobManagerAddress = "jm";
+               final UUID jobManagerLeaderId = UUID.randomUUID();
+
+               final LeaderRetrievalService 
resourceManagerLeaderRetrievalService = new 
TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
+               final LeaderRetrievalService jobManagerLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId);
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetrievalService);
+
+               final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
+               final InstanceID registrationId = new InstanceID();
+
+               when(resourceManagerGateway.registerTaskExecutor(
+                       eq(resourceManagerLeaderId),
+                       any(String.class),
+                       eq(resourceId),
+                       any(SlotReport.class),
+                       any(Time.class))).thenReturn(
+                               
FlinkCompletableFuture.<RegistrationResponse>completed(new 
TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+               final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
+               final int blobPort = 42;
+
+               final AllocationID allocationId1 = new AllocationID();
+               final AllocationID allocationId2 = new AllocationID();
+
+               final SlotOffer offer1 = new SlotOffer(allocationId1, 0, 
ResourceProfile.UNKNOWN);
+
+               final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
+
+               when(jobMasterGateway.registerTaskManager(
+                       any(String.class),
+                       eq(taskManagerLocation),
+                       eq(jobManagerLeaderId),
+                       any(Time.class)
+               
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
+               
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+
+               rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+               rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+               final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
+               
when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+
+               final JobManagerConnection jobManagerConnection = new 
JobManagerConnection(
+                       jobMasterGateway,
+                       jobManagerLeaderId,
+                       mock(TaskManagerActions.class),
+                       mock(CheckpointResponder.class),
+                       libraryCacheManager,
+                       mock(ResultPartitionConsumableNotifier.class),
+                       mock(PartitionProducerStateChecker.class));
+
+               jobManagerTable.put(jobId, jobManagerConnection);
+
+               try {
+                       final TaskExecutor taskManager = new TaskExecutor(
+                               taskManagerConfiguration,
+                               taskManagerLocation,
+                               rpc,
+                               mock(MemoryManager.class),
+                               mock(IOManager.class),
+                               mock(NetworkEnvironment.class),
+                               haServices,
+                               mock(MetricRegistry.class),
+                               mock(TaskManagerMetricGroup.class),
+                               mock(BroadcastVariableManager.class),
+                               mock(FileCache.class),
+                               taskSlotTable,
+                               jobManagerTable,
+                               jobLeaderService,
+                               testingFatalErrorHandler);
+                       taskManager.start();
+                       taskSlotTable.allocateSlot(0, jobId, allocationId1, 
Time.milliseconds(10000L));
+                       taskSlotTable.allocateSlot(1, jobId, allocationId2, 
Time.milliseconds(10000L));
+
+                       final JobVertexID jobVertexId = new JobVertexID();
+
+                       JobInformation jobInformation = new JobInformation(
+                               jobId,
+                               name.getMethodName(),
+                               new SerializedValue<>(new ExecutionConfig()),
+                               new Configuration(),
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList());
+
+                       TaskInformation taskInformation = new TaskInformation(
+                               jobVertexId,
+                               "test task",
+                               1,
+                               1,
+                               TestInvokable.class.getName(),
+                               new Configuration());
+
+                       SerializedValue<JobInformation> 
serializedJobInformation = new SerializedValue<>(jobInformation);
+                       SerializedValue<TaskInformation> 
serializedJobVertexInformation = new SerializedValue<>(taskInformation);
+
+                       final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
+                               serializedJobInformation,
+                               serializedJobVertexInformation,
+                               new ExecutionAttemptID(),
+                               allocationId1,
+                               0,
+                               0,
+                               0,
+                               null,
+                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                               
Collections.<InputGateDeploymentDescriptor>emptyList());
+
+                       CompletableFuture<Iterable<SlotOffer>> 
offerResultFuture = new FlinkCompletableFuture<>();
+
+                       // submit task first and then return acceptance response
+                       when(
+                               jobMasterGateway.offerSlots(
+                                       any(ResourceID.class),
+                                       any(Iterable.class),
+                                       eq(jobManagerLeaderId),
+                                       any(Time.class)))
+                               .thenReturn(offerResultFuture);
+
+                       // we have to add the job after the TaskExecutor, 
because otherwise the service has not
+                       // been properly started. This will also offer the 
slots to the job master
+                       jobLeaderService.addJob(jobId, jobManagerAddress);
+
+                       
verify(jobMasterGateway).offerSlots(any(ResourceID.class), any(Iterable.class), 
eq(jobManagerLeaderId), any(Time.class));
+
+                       // submit the task without having acknowledge the 
offered slots
+                       taskManager.submitTask(tdd, jobManagerLeaderId);
+
+                       // acknowledge the offered slots
+                       
offerResultFuture.complete(Collections.singleton(offer1));
+
+                       
verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), 
eq(registrationId), eq(new SlotID(resourceId, 1)));
+
+                       assertTrue(taskSlotTable.existsActiveSlot(jobId, 
allocationId1));
+                       assertFalse(taskSlotTable.existsActiveSlot(jobId, 
allocationId2));
+                       assertTrue(taskSlotTable.isSlotFree(1));
+
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowError();
+               } finally {
+                       rpc.stopService();
+               }
+
+       }
 }

Reply via email to