[ 
https://issues.apache.org/jira/browse/FLINK-1952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14560860#comment-14560860
 ] 

ASF GitHub Bot commented on FLINK-1952:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/731#discussion_r31127537
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
 ---
    @@ -0,0 +1,665 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.instance;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
    +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
    +import org.apache.flink.runtime.jobmanager.scheduler.Locality;
    +import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
    +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
    +import org.apache.flink.util.AbstractID;
    +
    +import org.junit.Test;
    +
    +import java.util.Collections;
    +
    +import static org.junit.Assert.*;
    +
    +/**
    + * Tests for the allocation, properties, and release of shared slots.
    + */
    +public class SharedSlotsTest {
    +   
    +   @Test
    +   public void allocateAndReleaseEmptySlot() {
    +           try {
    +                   JobID jobId = new JobID();
    +                   JobVertexID vertexId = new JobVertexID();
    +                   
    +                   SlotSharingGroup sharingGroup = new 
SlotSharingGroup(vertexId);
    +                   SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
    +                   
    +                   assertEquals(0, assignment.getNumberOfSlots());
    +                   assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vertexId));
    +                   
    +                   Instance instance = 
SchedulerTestUtils.getRandomInstance(2);
    +                   
    +                   assertEquals(2, instance.getTotalNumberOfSlots());
    +                   assertEquals(0, instance.getNumberOfAllocatedSlots());
    +                   assertEquals(2, instance.getNumberOfAvailableSlots());
    +                   
    +                   // allocate a shared slot
    +                   SharedSlot slot = instance.allocateSharedSlot(jobId, 
assignment);
    +                   assertEquals(2, instance.getTotalNumberOfSlots());
    +                   assertEquals(1, instance.getNumberOfAllocatedSlots());
    +                   assertEquals(1, instance.getNumberOfAvailableSlots());
    +                   
    +                   // check that the new slot is fresh
    +                   assertTrue(slot.isAlive());
    +                   assertFalse(slot.isCanceled());
    +                   assertFalse(slot.isReleased());
    +                   assertEquals(0, slot.getNumberLeaves());
    +                   assertFalse(slot.hasChildren());
    +                   assertTrue(slot.isRootAndEmpty());
    +                   assertNotNull(slot.toString());
    +                   assertTrue(slot.getSubSlots().isEmpty());
    +                   assertEquals(0, slot.getSlotNumber());
    +                   assertEquals(0, slot.getRootSlotNumber());
    +                   
    +                   // release the slot immediately.
    +                   slot.releaseSlot();
    +
    +                   assertTrue(slot.isCanceled());
    +                   assertTrue(slot.isReleased());
    +                   
    +                   // the slot sharing group and instance should not
    +                   assertEquals(2, instance.getTotalNumberOfSlots());
    +                   assertEquals(0, instance.getNumberOfAllocatedSlots());
    +                   assertEquals(2, instance.getNumberOfAvailableSlots());
    +
    +                   assertEquals(0, assignment.getNumberOfSlots());
    +                   assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(vertexId));
    +                   
    +                   // we should not be able to allocate any children from 
this released slot
    +                   assertNull(slot.allocateSharedSlot(new AbstractID()));
    +                   assertNull(slot.allocateSubSlot(new AbstractID()));
    +                   
    +                   // we cannot add this slot to the assignment group
    +                   assignment.addSharedSlotAndAllocateSubSlot(slot, 
Locality.NON_LOCAL, vertexId);
    --- End diff --
    
    Is an assertion which checks that the slot was not added missing here?


> Cannot run ConnectedComponents example: Could not allocate a slot on instance
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-1952
>                 URL: https://issues.apache.org/jira/browse/FLINK-1952
>             Project: Flink
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Priority: Blocker
>
> Steps to reproduce
> {code}
> ./bin/yarn-session.sh -n 350 
> {code}
> ... wait until they are connected ...
> {code}
> Number of connected TaskManagers changed to 266. Slots available: 266
> Number of connected TaskManagers changed to 323. Slots available: 323
> Number of connected TaskManagers changed to 334. Slots available: 334
> Number of connected TaskManagers changed to 343. Slots available: 343
> Number of connected TaskManagers changed to 350. Slots available: 350
> {code}
> Start CC
> {code}
> ./bin/flink run -p 350 
> ./examples/flink-java-examples-0.9-SNAPSHOT-ConnectedComponents.jar
> {code}
> ---> it runs
> Run KMeans, let it fail with 
> {code}
> Failed to deploy the task Map (Map at main(KMeans.java:100)) (1/350) - 
> execution #0 to slot SimpleSlot (2)(2)(0) - 182b7661ca9547a84591de940c47a200 
> - ALLOCATED/ALIVE: java.io.IOException: Insufficient number of network 
> buffers: required 350, but only 254 available. The total number of network 
> buffers is currently set to 2048. You can increase this number by setting the 
> configuration key 'taskmanager.network.numberOfBuffers'.
> {code}
> ... as expected.
> (I've waited for 10 minutes between the two submissions)
> Starting CC now will fail:
> {code}
> ./bin/flink run -p 350 
> ./examples/flink-java-examples-0.9-SNAPSHOT-ConnectedComponents.jar 
> {code}
> Error message(s):
> {code}
> Caused by: java.lang.IllegalStateException: Could not schedule consumer 
> vertex IterationHead(WorksetIteration (Unnamed Delta Iteration)) (19/350)
>       at 
> org.apache.flink.runtime.executiongraph.Execution$3.call(Execution.java:479)
>       at 
> org.apache.flink.runtime.executiongraph.Execution$3.call(Execution.java:469)
>       at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:94)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at 
> scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
>       ... 4 more
> Caused by: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate a slot on instance 4a6d761cb084c32310ece1f849556faf @ 
> cloud-19 - 1 slots - URL: 
> akka.tcp://[email protected]:51400/user/taskmanager, as required by the 
> co-location constraint.
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:247)
>       at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:110)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:262)
>       at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:436)
>       at 
> org.apache.flink.runtime.executiongraph.Execution$3.call(Execution.java:475)
>       ... 9 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to