[
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15467390#comment-15467390
]
ASF GitHub Bot commented on FLINK-4538:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2463#discussion_r77630918
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+ private static TestingRpcService testRpcService;
+
+ @BeforeClass
+ public static void beforeClass() {
+ testRpcService = new TestingRpcService();
+
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ testRpcService.stopService();
+ testRpcService = null;
+ }
+
+ @Before
+ public void beforeTest(){
+ testRpcService.clearGateways();
+ }
+
+ /**
+ * Tests whether
+ * 1) SlotRequest is routed to the SlotManager
+ * 2) SlotRequest leads to a container allocation
+ * 3) SlotRequest is confirmed
+ * 4) Slot becomes available and TaskExecutor gets a SlotRequest
+ */
+ @Test
+ public void testSlotsUnavailableRequest() throws Exception {
+ final String rmAddress = "/rm1";
+ final String jmAddress = "/jm1";
+ final JobID jobID = new JobID();
+
+ testRpcService.registerGateway(jmAddress,
mock(JobMasterGateway.class));
+
+
+ TestingSlotManager slotManager = Mockito.spy(new
TestingSlotManager());
+ ResourceManager resourceManager =
+ new ResourceManager(testRpcService, new
NonHaServices(rmAddress), slotManager);
+ resourceManager.start();
+
+ Future<RegistrationResponse> registrationFuture =
+ resourceManager.registerJobMaster(new
JobMasterRegistration(jmAddress, jobID));
+ try {
+ Await.ready(registrationFuture, Duration.create(5,
TimeUnit.SECONDS));
+ } catch (Exception e) {
+ Assert.fail("JobManager registration Future didn't
become ready.");
+ }
+
+ final AllocationID allocationID = new AllocationID();
+ final ResourceProfile resourceProfile = new
ResourceProfile(1.0, 100);
+
+ SlotRequest slotRequest = new SlotRequest(jobID, allocationID,
resourceProfile);
+ Future<SlotRequestRegistered> slotRequestFuture =
+ resourceManager.getSelf().requestSlot(slotRequest);
+
+ // 1) SlotRequest is routed to the SlotManager
+ verify(slotManager, timeout(5000)).requestSlot(slotRequest);
+
+ // 2) SlotRequest leads to a container allocation
+ verify(slotManager,
timeout(5000)).allocateContainer(resourceProfile);
+
+ // 3) SlotRequest is confirmed
+ Assert.assertEquals(
+ Await.result(slotRequestFuture, Duration.create(5,
TimeUnit.SECONDS)).getAllocationID(),
+ allocationID);
+
+ Assert.assertFalse(slotManager.isAllocated(allocationID));
+
+ // slot becomes available
+ final String tmAddress = "/tm1";
+ TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
+ testRpcService.registerGateway(tmAddress, taskExecutorGateway);
+
+ final ResourceID resourceID = ResourceID.generate();
+ final SlotID slotID = new SlotID(resourceID, 0);
+
+ final SlotStatus slotStatus =
+ new SlotStatus(slotID, resourceProfile,
taskExecutorGateway);
+ final SlotReport slotReport =
+ new SlotReport(Collections.singletonList(slotStatus),
resourceID);
+ // register slot at SlotManager
+ slotManager.updateSlotStatus(slotReport);
+
+ // 4) Slot becomes available and TaskExecutor gets a SlotRequest
+ verify(taskExecutorGateway,
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
+ }
+
+ /**
+ * Tests whether
+ * 1) a SlotRequest is routed to the SlotManager
+ * 2) a SlotRequest leads to an allocation of a registered slot
+ * 3) a SlotRequest is confirmed
+ * 4) a SlotRequest is routed to the TaskExecutor
+ */
+ @Test
+ public void testSlotAvailableRequest() throws Exception {
+ final String rmAddress = "/rm1";
+ final String jmAddress = "/jm1";
+ final String tmAddress = "/tm1";
+ final JobID jobID = new JobID();
+
+ testRpcService.registerGateway(jmAddress,
mock(JobMasterGateway.class));
+
+ TaskExecutorGateway taskExecutorGateway =
mock(TaskExecutorGateway.class);
+ testRpcService.registerGateway(tmAddress, taskExecutorGateway);
+
+ TestingSlotManager slotManager = Mockito.spy(new
TestingSlotManager());
+ ResourceManager resourceManager =
+ new ResourceManager(testRpcService, new
NonHaServices(rmAddress), slotManager);
+ resourceManager.start();
+
+ Future<RegistrationResponse> registrationFuture =
+ resourceManager.registerJobMaster(new
JobMasterRegistration(jmAddress, jobID));
+ try {
+ Await.ready(registrationFuture, Duration.create(5,
TimeUnit.SECONDS));
+ } catch (Exception e) {
+ Assert.fail("JobManager registration Future didn't
become ready.");
+ }
+
+ final ResourceID resourceID = ResourceID.generate();
+ final AllocationID allocationID = new AllocationID();
+ final ResourceProfile resourceProfile = new
ResourceProfile(1.0, 100);
+ final SlotID slotID = new SlotID(resourceID, 0);
+
+ final SlotStatus slotStatus =
+ new SlotStatus(slotID, resourceProfile,
taskExecutorGateway);
+ final SlotReport slotReport =
+ new SlotReport(Collections.singletonList(slotStatus),
resourceID);
+ // register slot at SlotManager
+ slotManager.updateSlotStatus(slotReport);
+
+ SlotRequest slotRequest = new SlotRequest(jobID, allocationID,
resourceProfile);
+ Future<SlotRequestRegistered> slotRequestFuture =
+ resourceManager.getSelf().requestSlot(slotRequest);
+
+ // 1) a SlotRequest is routed to the SlotManager
+ verify(slotManager, timeout(5000)).requestSlot(slotRequest);
+
+ // 2) a SlotRequest leads to an allocation of a registered slot
+ Assert.assertTrue(slotManager.isAllocated(slotID));
+ Assert.assertTrue(slotManager.isAllocated(allocationID));
+
+ // 3) a SlotRequest is confirmed
+ Assert.assertEquals(
+ Await.result(slotRequestFuture, Duration.create(5,
TimeUnit.SECONDS)).getAllocationID(),
+ allocationID);
+
+ // 4) a SlotRequest is routed to the TaskExecutor
+ verify(taskExecutorGateway,
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class));
--- End diff --
We're not really waiting here but executing asynchronous. The whole test
completes in less than 250 milliseconds.
> Implement slot allocation protocol with JobMaster
> -------------------------------------------------
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)