[
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15197268#comment-15197268
]
ASF GitHub Bot commented on FLINK-3544:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/1741#discussion_r56328991
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
---
@@ -0,0 +1,801 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
+import
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
+import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
+import
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
+import
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
+import
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
+import
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
+import
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
+import
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
+import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
+import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
+import
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
+import
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+
+import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.util.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ *
+ * <h1>Worker allocation steps</h1>
+ *
+ * <ol>
+ * <li>The resource manager decides to request more workers. This can
happen in order
+ * to fill the initial pool, or as a result of the JobManager
requesting more workers.</li>
+ *
+ * <li>The resource master calls {@link #requestNewWorkers(int)},
which triggers requests
+ * for more containers. After that, the {@link
#getNumWorkerRequestsPending()}
+ * should reflect the pending requests.</li>
+ *
+ * <li>The concrete framework may acquire containers and then trigger
to start TaskManagers
+ * in those containers. That should be reflected in {@link
#getNumWorkersPendingRegistration()}.</li>
+ *
+ * <li>At some point, the TaskManager processes will have started and
send a registration
+ * message to the JobManager. The JobManager will perform
+ * a lookup with the ResourceManager to check if it really started
this TaskManager.
+ * The method {@link #workerRegistered(ResourceID)} will be called
+ * to inform about a registered worker.</li>
+ * </ol>
+ *
+ */
+public abstract class FlinkResourceManager<WorkerType extends ResourceID>
extends FlinkUntypedActor {
+
+ /** The exit code with which the process is stopped in case of a fatal
error */
+ protected static final int EXIT_CODE_FATAL_ERROR = -13;
+
+ /** The default name of the resource manager actor */
+ public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
+
+ //
------------------------------------------------------------------------
+
+ /** The logger, named for the actual implementing class */
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ /** The Flink configuration object */
+ protected final Configuration config;
+
+ /** The timeout for actor messages sent to the JobManager /
TaskManagers */
+ private final FiniteDuration messageTimeout;
+
+ /** The service to find the right leader JobManager (to support high
availability) */
+ private final LeaderRetrievalService leaderRetriever;
+
+ /** The currently registered resources */
+ private final Map<ResourceID, WorkerType> registeredWorkers;
+
+ /** List of listeners for info messages */
+ private final Set<ActorRef> infoMessageListeners;
+
+ /** The JobManager that the framework master manages resources for */
+ private ActorRef jobManager;
+
+ /** Our JobManager's leader session */
+ private UUID leaderSessionID;
+
+ /** The size of the worker pool that the resource master strives to
maintain */
+ private int designatedPoolSize;
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Creates a AbstractFrameworkMaster actor.
+ *
+ * @param flinkConfig The Flink configuration object.
+ */
+ protected FlinkResourceManager(
+ int numInitialTaskManagers,
+ Configuration flinkConfig,
+ LeaderRetrievalService leaderRetriever) {
+ this.config = requireNonNull(flinkConfig);
+ this.leaderRetriever = requireNonNull(leaderRetriever);
+ this.registeredWorkers = new HashMap<>();
+
+ FiniteDuration lt;
+ try {
+ lt = AkkaUtils.getLookupTimeout(config);
+ }
+ catch (Exception e) {
+ lt = new FiniteDuration(
+
Duration.apply(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT).toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+ this.messageTimeout = lt;
+ this.designatedPoolSize = numInitialTaskManagers;
+ this.infoMessageListeners = new HashSet<>();
+ }
+
+ //
------------------------------------------------------------------------
+ // Actor Behavior
+ //
------------------------------------------------------------------------
+
+ @Override
+ public void preStart() {
+ // we start our leader retrieval service to make sure we get
informed
+ // about JobManager leader changes
+ try {
+ leaderRetriever.start(new LeaderRetrievalListener() {
+
+ @Override
+ public void notifyLeaderAddress(String
leaderAddress, UUID leaderSessionID) {
+ self().tell(
+ new
NewLeaderAvailable(leaderAddress, leaderSessionID),
+ ActorRef.noSender());
+ }
+
+ @Override
+ public void handleError(Exception e) {
+ self().tell(
+ new FatalErrorOccurred("Leader
retrieval service failed", e),
+ ActorRef.noSender());
+ }
+ });
+ }
+ catch (Throwable t) {
+ self().tell(
+ new FatalErrorOccurred("Could not start leader
retrieval service", t),
+ ActorRef.noSender());
+ }
+ // framework specific initialization
+ try {
+ initialize();
--- End diff --
Good catch. I consolidated this into one try/catch block.
> ResourceManager runtime components
> ----------------------------------
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
> Issue Type: Sub-task
> Components: ResourceManager
> Affects Versions: 1.1.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)