Repository: flink Updated Branches: refs/heads/master c6715b7f7 -> 2648bc1a5
http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java new file mode 100644 index 0000000..a3ff6c4 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.testkit.JavaTestKit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered; +import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.NMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import scala.Option; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class YarnFlinkResourceManagerTest extends TestLogger { + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = AkkaUtils.createLocalActorSystem(new Configuration()); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(system); + } + + @Test + public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception { + new JavaTestKit(system) {{ + + final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow(); + + Configuration flinkConfig = new Configuration(); + YarnConfiguration yarnConfig = new YarnConfiguration(); + TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(); + String applicationMasterHostName = "localhost"; + String webInterfaceURL = "foobar"; + ContaineredTaskManagerParameters taskManagerParameters = new ContaineredTaskManagerParameters( + 1l, 1l, 1l, 1, new HashMap<String, String>()); + ContainerLaunchContext taskManagerLaunchContext = mock(ContainerLaunchContext.class); + int yarnHeartbeatIntervalMillis = 1000; + int maxFailedContainers = 10; + int numInitialTaskManagers = 5; + final YarnResourceManagerCallbackHandler callbackHandler = new YarnResourceManagerCallbackHandler(); + AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient = mock(AMRMClientAsync.class); + NMClient nodeManagerClient = mock(NMClient.class); + UUID leaderSessionID = UUID.randomUUID(); + + final List<Container> containerList = new ArrayList<>(); + + for (int i = 0; i < numInitialTaskManagers; i++) { + containerList.add(new TestingContainer("container_" + i, "localhost")); + } + + doAnswer(new Answer() { + int counter = 0; + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (counter < containerList.size()) { + callbackHandler.onContainersAllocated( + Collections.singletonList( + containerList.get(counter++) + )); + } + return null; + } + }).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class)); + + ActorRef resourceManager = null; + ActorRef leader1; + + try { + leader1 = system.actorOf( + Props.create( + TestingUtils.ForwardingActor.class, + getRef(), + Option.apply(leaderSessionID) + )); + + resourceManager = system.actorOf( + Props.create( + TestingYarnFlinkResourceManager.class, + flinkConfig, + yarnConfig, + leaderRetrievalService, + applicationMasterHostName, + webInterfaceURL, + taskManagerParameters, + taskManagerLaunchContext, + yarnHeartbeatIntervalMillis, + maxFailedContainers, + numInitialTaskManagers, + callbackHandler, + resourceManagerClient, + nodeManagerClient + )); + + leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); + + final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID); + final AkkaActorGateway resourceManagerGateway = new AkkaActorGateway(resourceManager, leaderSessionID); + + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + Container container = (Container) invocation.getArguments()[0]; + resourceManagerGateway.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), + leader1Gateway); + return null; + } + }).when(nodeManagerClient).startContainer(Matchers.any(Container.class), Matchers.any(ContainerLaunchContext.class)); + + expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); + + resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST)); + + for (int i = 0; i < containerList.size(); i++) { + expectMsgClass(deadline.timeLeft(), Acknowledge.class); + } + + Future<Object> taskManagerRegisteredFuture = resourceManagerGateway.ask(new NotifyWhenResourcesRegistered(numInitialTaskManagers), deadline.timeLeft()); + + Await.ready(taskManagerRegisteredFuture, deadline.timeLeft()); + + leaderRetrievalService.notifyListener(null, null); + + leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); + + expectMsgClass(deadline.timeLeft(), RegisterResourceManager.class); + + resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.EMPTY_LIST)); + + for (Container container: containerList) { + resourceManagerGateway.tell( + new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container)), + leader1Gateway); + } + + for (int i = 0; i < containerList.size(); i++) { + expectMsgClass(deadline.timeLeft(), Acknowledge.class); + } + + Future<Object> numberOfRegisteredResourcesFuture = resourceManagerGateway.ask(RequestNumberOfRegisteredResources.Instance, deadline.timeLeft()); + + int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); + + assertEquals(numInitialTaskManagers, numberOfRegisteredResources); + } finally { + if (resourceManager != null) { + resourceManager.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + }}; + } + + static class TestingContainer extends Container { + + private final String id; + private final String host; + + TestingContainer(String id, String host) { + this.id = id; + this.host = host; + } + + @Override + public ContainerId getId() { + ContainerId containerId = mock(ContainerId.class); + when(containerId.toString()).thenReturn(id); + + return containerId; + } + + @Override + public void setId(ContainerId containerId) { + + } + + @Override + public NodeId getNodeId() { + NodeId nodeId = mock(NodeId.class); + when(nodeId.getHost()).thenReturn(host); + + return nodeId; + } + + @Override + public void setNodeId(NodeId nodeId) { + + } + + @Override + public String getNodeHttpAddress() { + return null; + } + + @Override + public void setNodeHttpAddress(String s) { + + } + + @Override + public Resource getResource() { + return null; + } + + @Override + public void setResource(Resource resource) { + + } + + @Override + public Priority getPriority() { + return null; + } + + @Override + public void setPriority(Priority priority) { + + } + + @Override + public Token getContainerToken() { + return null; + } + + @Override + public void setContainerToken(Token token) { + + } + + @Override + public int compareTo(Container o) { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/test/java/org/apache/flink/yarn/messages/NotifyWhenResourcesRegistered.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/messages/NotifyWhenResourcesRegistered.java b/flink-yarn/src/test/java/org/apache/flink/yarn/messages/NotifyWhenResourcesRegistered.java new file mode 100644 index 0000000..ad5e683 --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/messages/NotifyWhenResourcesRegistered.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.messages; + +public class NotifyWhenResourcesRegistered { + + private final int numberResources; + + public NotifyWhenResourcesRegistered(int numberResources) { + this.numberResources = numberResources; + } + + public int getNumberResources() { + return numberResources; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/test/java/org/apache/flink/yarn/messages/RequestNumberOfRegisteredResources.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/messages/RequestNumberOfRegisteredResources.java b/flink-yarn/src/test/java/org/apache/flink/yarn/messages/RequestNumberOfRegisteredResources.java new file mode 100644 index 0000000..ccccbab --- /dev/null +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/messages/RequestNumberOfRegisteredResources.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn.messages; + +public class RequestNumberOfRegisteredResources { + public static RequestNumberOfRegisteredResources Instance = new RequestNumberOfRegisteredResources(); + + private RequestNumberOfRegisteredResources() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/resources/log4j-test.properties b/flink-yarn/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2226f68 --- /dev/null +++ b/flink-yarn/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/2648bc1a/flink-yarn/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/resources/logback-test.xml b/flink-yarn/src/test/resources/logback-test.xml new file mode 100644 index 0000000..1c4ea08 --- /dev/null +++ b/flink-yarn/src/test/resources/logback-test.xml @@ -0,0 +1,34 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + + <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/> + <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/> + <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/> + <logger name="org.apache.flink.configuration.Configuration" level="OFF"/> +</configuration> \ No newline at end of file
