[
https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15467609#comment-15467609
]
ASF GitHub Bot commented on FLINK-4538:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2463#discussion_r77651201
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
---
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.LeaderIdRegistry;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest {
+
+ private static TestingRpcService testRpcService;
+
+ @BeforeClass
+ public static void beforeClass() {
+ testRpcService = new TestingRpcService();
+
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ testRpcService.stopService();
+ testRpcService = null;
+ }
+
+ @Before
+ public void beforeTest(){
+ testRpcService.clearGateways();
+ }
+
+ /**
+ * Tests whether
+ * 1) SlotRequest is routed to the SlotManager
+ * 2) SlotRequest leads to a container allocation
+ * 3) SlotRequest is confirmed
+ * 4) Slot becomes available and TaskExecutor gets a SlotRequest
+ */
+ @Test
+ public void testSlotsUnavailableRequest() throws Exception {
+ final String rmAddress = "/rm1";
+ final String jmAddress = "/jm1";
+ final JobID jobID = new JobID();
+
+ testRpcService.registerGateway(jmAddress,
mock(JobMasterGateway.class));
+
+
+ TestingSlotManager slotManager = Mockito.spy(new
TestingSlotManager());
+ ResourceManager resourceManager =
+ new ResourceManager(testRpcService, new
NonHaServices(rmAddress), slotManager);
+ resourceManager.start();
+
+ Future<RegistrationResponse> registrationFuture =
+ resourceManager.registerJobMaster(new
JobMasterRegistration(jmAddress, jobID));
+ try {
+ Await.ready(registrationFuture, Duration.create(5,
TimeUnit.SECONDS));
+ } catch (Exception e) {
+ Assert.fail("JobManager registration Future didn't
become ready.");
+ }
+
+ final AllocationID allocationID = new AllocationID();
+ final ResourceProfile resourceProfile = new
ResourceProfile(1.0, 100);
+
+ SlotRequest slotRequest = new SlotRequest(jobID, allocationID,
resourceProfile);
+ Future<SlotRequestRegistered> slotRequestFuture =
+ resourceManager.getSelf().requestSlot(slotRequest);
--- End diff --
But you're not relying on asynchronous execution in this test. I think
these kind of unit test should be as light-weight as possible. Thus, one should
not start actor systems if that's not really necessary. I think in this test,
it is not necessary.
> Implement slot allocation protocol with JobMaster
> -------------------------------------------------
>
> Key: FLINK-4538
> URL: https://issues.apache.org/jira/browse/FLINK-4538
> Project: Flink
> Issue Type: Sub-task
> Components: Cluster Management
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)