[FLINK-4505] [cluster mngt] Separate TaskManager service configuration from TaskManagerConfiguration; Implement TaskManagerRunner
Refactors the startup logic so that is easier to reuse. This closes #2461. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d25ea85 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d25ea85 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d25ea85 Branch: refs/heads/flip-6 Commit: 1d25ea85e8f10f6ab67b4ebe7274236778ee4b0f Parents: a64e818 Author: Till Rohrmann <[email protected]> Authored: Wed Sep 21 12:33:15 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 20 19:46:25 2016 +0200 ---------------------------------------------------------------------- .../HighAvailabilityServicesUtils.java | 41 + .../flink/runtime/rpc/RpcServiceUtils.java | 73 ++ .../flink/runtime/rpc/akka/AkkaRpcService.java | 2 + .../runtime/taskexecutor/TaskExecutor.java | 51 +- .../taskexecutor/TaskExecutorConfiguration.java | 142 ---- .../taskexecutor/TaskManagerConfiguration.java | 205 +++++ .../runtime/taskexecutor/TaskManagerRunner.java | 172 +++++ .../taskexecutor/TaskManagerServices.java | 320 ++++++++ .../TaskManagerServicesConfiguration.java | 325 ++++++++ .../runtime/taskmanager/TaskManagerRunner.java | 749 ------------------- .../runtime/util/LeaderRetrievalUtils.java | 7 + .../apache/flink/runtime/akka/AkkaUtils.scala | 4 + .../NetworkEnvironmentConfiguration.scala | 2 +- .../flink/runtime/taskmanager/TaskManager.scala | 6 +- .../io/network/NetworkEnvironmentTest.java | 4 +- .../runtime/rpc/TestingSerialRpcService.java | 1 - .../runtime/taskexecutor/TaskExecutorTest.java | 29 +- ...askManagerComponentsStartupShutdownTest.java | 3 +- .../TaskManagerConfigurationTest.java | 1 - 19 files changed, 1195 insertions(+), 942 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java new file mode 100644 index 0000000..f3da847 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServicesUtils.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.highavailability; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; + +public class HighAvailabilityServicesUtils { + + public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration) throws Exception { + HighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration); + + switch(highAvailabilityMode) { + case NONE: + final String resourceManagerAddress = null; + return new NonHaServices(resourceManagerAddress); + case ZOOKEEPER: + throw new UnsupportedOperationException("ZooKeeper high availability services " + + "have not been implemented yet."); + default: + throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java new file mode 100644 index 0000000..d40e336 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcServiceUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.actor.ActorSystem; +import com.typesafe.config.Config; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.util.NetUtils; +import org.jboss.netty.channel.ChannelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class RpcServiceUtils { + private static final Logger LOG = LoggerFactory.getLogger(RpcServiceUtils.class); + + /** + * Utility method to create RPC service from configuration and hostname, port. + * + * @param hostname The hostname/address that describes the TaskManager's data location. + * @param port If true, the TaskManager will not initiate the TCP network stack. + * @param configuration The configuration for the TaskManager. + * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint . + * @throws IOException Thrown, if the actor system can not bind to the address + * @throws Exception Thrown is some other error occurs while creating akka actor system + */ + public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception { + LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port)); + + final ActorSystem actorSystem; + + try { + Config akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port); + + LOG.debug("Using akka configuration \n {}.", akkaConfig); + + actorSystem = AkkaUtils.createActorSystem(akkaConfig); + } catch (Throwable t) { + if (t instanceof ChannelException) { + Throwable cause = t.getCause(); + if (cause != null && t.getCause() instanceof java.net.BindException) { + String address = NetUtils.hostAndPortToUrlString(hostname, port); + throw new IOException("Unable to bind AkkaRpcService actor system to address " + + address + " - " + cause.getMessage(), t); + } + } + throw new Exception("Could not create TaskManager actor system", t); + } + + final Time timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); + return new AkkaRpcService(actorSystem, timeout); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 6825557..fb7896a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -93,6 +93,8 @@ public class AkkaRpcService implements RpcService { Address actorSystemAddress = AkkaUtils.getAddress(actorSystem); + + if (actorSystemAddress.host().isDefined()) { address = actorSystemAddress.host().get(); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 8ce2780..7df0a91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -18,16 +18,14 @@ package org.apache.flink.runtime.taskexecutor; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; +import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.jboss.netty.channel.ChannelException; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; @@ -39,7 +37,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.BindException; + import java.util.UUID; import static org.apache.flink.util.Preconditions.checkArgument; @@ -60,7 +58,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private final HighAvailabilityServices haServices; /** The task manager configuration */ - private final TaskExecutorConfiguration taskExecutorConfig; + private final TaskManagerConfiguration taskManagerConfiguration; /** The I/O manager component in the task manager */ private final IOManager ioManager; @@ -71,9 +69,15 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { /** The network component in the task manager */ private final NetworkEnvironment networkEnvironment; + /** The metric registry in the task manager */ + private final MetricRegistry metricRegistry; + /** The number of slots in the task manager, should be 1 for YARN */ private final int numberOfSlots; + /** The fatal error handler to use in case of a fatal error */ + private final FatalErrorHandler fatalErrorHandler; + // --------- resource manager -------- private TaskExecutorToResourceManagerConnection resourceManagerConnection; @@ -81,26 +85,30 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // ------------------------------------------------------------------------ public TaskExecutor( - TaskExecutorConfiguration taskExecutorConfig, + TaskManagerConfiguration taskManagerConfiguration, TaskManagerLocation taskManagerLocation, RpcService rpcService, MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, - HighAvailabilityServices haServices) { + HighAvailabilityServices haServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler) { super(rpcService); - checkArgument(taskExecutorConfig.getNumberOfSlots() > 0, "The number of slots has to be larger than 0."); + checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0."); - this.taskExecutorConfig = checkNotNull(taskExecutorConfig); + this.taskManagerConfiguration = checkNotNull(taskManagerConfiguration); this.taskManagerLocation = checkNotNull(taskManagerLocation); this.memoryManager = checkNotNull(memoryManager); this.ioManager = checkNotNull(ioManager); this.networkEnvironment = checkNotNull(networkEnvironment); this.haServices = checkNotNull(haServices); + this.metricRegistry = checkNotNull(metricRegistry); + this.fatalErrorHandler = checkNotNull(fatalErrorHandler); - this.numberOfSlots = taskExecutorConfig.getNumberOfSlots(); + this.numberOfSlots = taskManagerConfiguration.getNumberSlots(); } // ------------------------------------------------------------------------ @@ -158,6 +166,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } + /** * Requests a slot from the TaskManager * * @param allocationID id for the request @@ -169,22 +178,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { return new SlotRequestRegistered(allocationID); } - /** - public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { - return null; - } - - @Override - return null; - } - - @Override - public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws Exception { - return null; - } - - @Override - public SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception { // ------------------------------------------------------------------------ // Properties // ------------------------------------------------------------------------ @@ -222,7 +215,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { void onFatalError(Throwable t) { // to be determined, probably delegate to a fatal error handler that // would either log (mini cluster) ot kill the process (yarn, mesos, ...) - log.error("FATAL ERROR", t); + fatalErrorHandler.onFatalError(t); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java deleted file mode 100644 index c97c893..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorConfiguration.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; - -import scala.concurrent.duration.FiniteDuration; - -import java.io.Serializable; -import java.util.concurrent.TimeUnit; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * {@link TaskExecutor} Configuration - */ -public class TaskExecutorConfiguration implements Serializable { - - private static final long serialVersionUID = 1L; - - private final String[] tmpDirPaths; - - private final long cleanupInterval; - - private final int numberOfSlots; - - private final Configuration configuration; - - private final FiniteDuration timeout; - private final FiniteDuration maxRegistrationDuration; - private final FiniteDuration initialRegistrationPause; - private final FiniteDuration maxRegistrationPause; - private final FiniteDuration refusedRegistrationPause; - - private final NetworkEnvironmentConfiguration networkConfig; - - public TaskExecutorConfiguration( - String[] tmpDirPaths, - long cleanupInterval, - NetworkEnvironmentConfiguration networkConfig, - FiniteDuration timeout, - FiniteDuration maxRegistrationDuration, - int numberOfSlots, - Configuration configuration) { - - this (tmpDirPaths, - cleanupInterval, - networkConfig, - timeout, - maxRegistrationDuration, - numberOfSlots, - configuration, - new FiniteDuration(500, TimeUnit.MILLISECONDS), - new FiniteDuration(30, TimeUnit.SECONDS), - new FiniteDuration(10, TimeUnit.SECONDS)); - } - - public TaskExecutorConfiguration( - String[] tmpDirPaths, - long cleanupInterval, - NetworkEnvironmentConfiguration networkConfig, - FiniteDuration timeout, - FiniteDuration maxRegistrationDuration, - int numberOfSlots, - Configuration configuration, - FiniteDuration initialRegistrationPause, - FiniteDuration maxRegistrationPause, - FiniteDuration refusedRegistrationPause) { - - this.tmpDirPaths = checkNotNull(tmpDirPaths); - this.cleanupInterval = checkNotNull(cleanupInterval); - this.networkConfig = checkNotNull(networkConfig); - this.timeout = checkNotNull(timeout); - this.maxRegistrationDuration = maxRegistrationDuration; - this.numberOfSlots = checkNotNull(numberOfSlots); - this.configuration = checkNotNull(configuration); - this.initialRegistrationPause = checkNotNull(initialRegistrationPause); - this.maxRegistrationPause = checkNotNull(maxRegistrationPause); - this.refusedRegistrationPause = checkNotNull(refusedRegistrationPause); - } - - // -------------------------------------------------------------------------------------------- - // Properties - // -------------------------------------------------------------------------------------------- - - public String[] getTmpDirPaths() { - return tmpDirPaths; - } - - public long getCleanupInterval() { - return cleanupInterval; - } - - public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } - - public FiniteDuration getTimeout() { - return timeout; - } - - public FiniteDuration getMaxRegistrationDuration() { - return maxRegistrationDuration; - } - - public int getNumberOfSlots() { - return numberOfSlots; - } - - public Configuration getConfiguration() { - return configuration; - } - - public FiniteDuration getInitialRegistrationPause() { - return initialRegistrationPause; - } - - public FiniteDuration getMaxRegistrationPause() { - return maxRegistrationPause; - } - - public FiniteDuration getRefusedRegistrationPause() { - return refusedRegistrationPause; - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java new file mode 100644 index 0000000..32eb8c1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.Duration; + +import java.io.File; + +/** + * Configuration object for {@link TaskExecutor}. + */ +public class TaskManagerConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerConfiguration.class); + + private final int numberSlots; + + private final String[] tmpDirPaths; + + private final Time timeout; + private final Time maxRegistrationDuration; + private final Time initialRegistrationPause; + private final Time maxRegistrationPause; + private final Time refusedRegistrationPause; + + private final long cleanupInterval; + + public TaskManagerConfiguration( + int numberSlots, + String[] tmpDirPaths, + Time timeout, + Time maxRegistrationDuration, + Time initialRegistrationPause, + Time maxRegistrationPause, + Time refusedRegistrationPause, + long cleanupInterval) { + + this.numberSlots = numberSlots; + this.tmpDirPaths = Preconditions.checkNotNull(tmpDirPaths); + this.timeout = Preconditions.checkNotNull(timeout); + this.maxRegistrationDuration = Preconditions.checkNotNull(maxRegistrationDuration); + this.initialRegistrationPause = Preconditions.checkNotNull(initialRegistrationPause); + this.maxRegistrationPause = Preconditions.checkNotNull(maxRegistrationPause); + this.refusedRegistrationPause = Preconditions.checkNotNull(refusedRegistrationPause); + this.cleanupInterval = Preconditions.checkNotNull(cleanupInterval); + } + + public int getNumberSlots() { + return numberSlots; + } + + public String[] getTmpDirPaths() { + return tmpDirPaths; + } + + public Time getTimeout() { + return timeout; + } + + public Time getMaxRegistrationDuration() { + return maxRegistrationDuration; + } + + public Time getInitialRegistrationPause() { + return initialRegistrationPause; + } + + public Time getMaxRegistrationPause() { + return maxRegistrationPause; + } + + public Time getRefusedRegistrationPause() { + return refusedRegistrationPause; + } + + public long getCleanupInterval() { + return cleanupInterval; + } + + // -------------------------------------------------------------------------------------------- + // Static factory methods + // -------------------------------------------------------------------------------------------- + + public static TaskManagerConfiguration fromConfiguration(Configuration configuration) { + int numberSlots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + + if (numberSlots == -1) { + numberSlots = 1; + } + + final String[] tmpDirPaths = configuration.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); + + final Time timeout; + + try { + timeout = Time.milliseconds(AkkaUtils.getTimeout(configuration).toMillis()); + } catch (Exception e) { + throw new IllegalArgumentException( + "Invalid format for '" + ConfigConstants.AKKA_ASK_TIMEOUT + + "'.Use formats like '50 s' or '1 min' to specify the timeout."); + } + + LOG.info("Messages have a max timeout of " + timeout); + + final long cleanupInterval = configuration.getLong( + ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL, + ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000; + + final Time finiteRegistrationDuration; + + try { + Duration maxRegistrationDuration = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_DURATION)); + if (maxRegistrationDuration.isFinite()) { + finiteRegistrationDuration = Time.seconds(maxRegistrationDuration.toSeconds()); + } else { + finiteRegistrationDuration = null; + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, e); + } + + final Time initialRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_INITIAL_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + initialRegistrationPause = Time.seconds(pause.toSeconds()); + } else { + throw new IllegalArgumentException("The initial registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + final Time maxRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_MAX_REGISTARTION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_MAX_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + maxRegistrationPause = Time.seconds(pause.toSeconds()); + } else { + throw new IllegalArgumentException("The maximum registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + final Time refusedRegistrationPause; + try { + Duration pause = Duration.create(configuration.getString( + ConfigConstants.TASK_MANAGER_REFUSED_REGISTRATION_PAUSE, + ConfigConstants.DEFAULT_TASK_MANAGER_REFUSED_REGISTRATION_PAUSE)); + if (pause.isFinite()) { + refusedRegistrationPause = Time.seconds(pause.toSeconds()); + } else { + throw new IllegalArgumentException("The refused registration pause must be finite: " + pause); + } + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid format for parameter " + + ConfigConstants.TASK_MANAGER_INITIAL_REGISTRATION_PAUSE, e); + } + + return new TaskManagerConfiguration( + numberSlots, + tmpDirPaths, + timeout, + finiteRegistrationDuration, + initialRegistrationPause, + maxRegistrationPause, + refusedRegistrationPause, + cleanupInterval); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java new file mode 100644 index 0000000..8ac0ddd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcServiceUtils; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; + +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is the executable entry point for the task manager in yarn or standalone mode. + * It constructs the related components (network, I/O manager, memory manager, RPC service, HA service) + * and starts them. + */ +public class TaskManagerRunner implements FatalErrorHandler { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class); + + private final Object lock = new Object(); + + private final Configuration configuration; + + private final ResourceID resourceID; + + private final RpcService rpcService; + + private final HighAvailabilityServices highAvailabilityServices; + + /** Executor used to run future callbacks */ + private final Executor executor; + + private final TaskExecutor taskManager; + + public TaskManagerRunner( + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + Executor executor) throws Exception { + + this.configuration = Preconditions.checkNotNull(configuration); + this.resourceID = Preconditions.checkNotNull(resourceID); + this.rpcService = Preconditions.checkNotNull(rpcService); + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); + this.executor = rpcService.getExecutor(); + + InetAddress remoteAddress = InetAddress.getByName(rpcService.getAddress()); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = TaskManagerServicesConfiguration.fromConfiguration( + configuration, + remoteAddress, + false); + + TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, resourceID); + + TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); + + this.taskManager = new TaskExecutor( + taskManagerConfiguration, + taskManagerServices.getTaskManagerLocation(), + rpcService, + taskManagerServices.getMemoryManager(), + taskManagerServices.getIOManager(), + taskManagerServices.getNetworkEnvironment(), + highAvailabilityServices, + taskManagerServices.getMetricRegistry(), + this); + } + + // -------------------------------------------------------------------------------------------- + // Lifecycle management + // -------------------------------------------------------------------------------------------- + + public void start() { + taskManager.start(); + } + + public void shutDown(Throwable cause) { + shutDownInternally(); + } + + protected void shutDownInternally() { + synchronized(lock) { + taskManager.shutDown(); + } + } + + // -------------------------------------------------------------------------------------------- + // FatalErrorHandler methods + // -------------------------------------------------------------------------------------------- + + @Override + public void onFatalError(Throwable exception) { + LOG.error("Fatal error occurred while executing the TaskManager. Shutting it down...", exception); + shutDown(exception); + } + + // -------------------------------------------------------------------------------------------- + // Static utilities + // -------------------------------------------------------------------------------------------- + + /** + * Create a RPC service for the task manager. + * + * @param configuration The configuration for the TaskManager. + * @param haServices to use for the task manager hostname retrieval + */ + public static RpcService createRpcService( + final Configuration configuration, + final HighAvailabilityServices haServices) throws Exception { + + checkNotNull(configuration); + checkNotNull(haServices); + + String taskManagerHostname = configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null); + + if (taskManagerHostname != null) { + LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname); + } else { + Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis()); + + InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress( + haServices.getResourceManagerLeaderRetriever(), + lookupTimeout); + + taskManagerHostname = taskManagerAddress.getHostName(); + + LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.", + taskManagerHostname, taskManagerAddress.getHostAddress()); + } + + final int rpcPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0); + + Preconditions.checkState(rpcPort < 0 || rpcPort >65535, "Invalid value for " + + "'%s' (port for the TaskManager actor system) : %d - Leave config parameter empty or " + + "use 0 to let the system choose port automatically.", + ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, rpcPort); + + return RpcServiceUtils.createRpcService(taskManagerHostname, rpcPort, configuration); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java new file mode 100644 index 0000000..ff7f7d5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.LocalConnectionManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; +import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Container for {@link TaskExecutor} services such as the {@link MemoryManager}, {@link IOManager}, + * {@link NetworkEnvironment} and the {@link MetricRegistry}. + */ +public class TaskManagerServices { + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class); + + /** TaskManager services */ + private final TaskManagerLocation taskManagerLocation; + private final MemoryManager memoryManager; + private final IOManager ioManager; + private final NetworkEnvironment networkEnvironment; + private final MetricRegistry metricRegistry; + + private TaskManagerServices( + TaskManagerLocation taskManagerLocation, + MemoryManager memoryManager, + IOManager ioManager, + NetworkEnvironment networkEnvironment, + MetricRegistry metricRegistry) { + + this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation); + this.memoryManager = Preconditions.checkNotNull(memoryManager); + this.ioManager = Preconditions.checkNotNull(ioManager); + this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); + this.metricRegistry = Preconditions.checkNotNull(metricRegistry); + } + + // -------------------------------------------------------------------------------------------- + // Getter/Setter + // -------------------------------------------------------------------------------------------- + + public MemoryManager getMemoryManager() { + return memoryManager; + } + + public IOManager getIOManager() { + return ioManager; + } + + public NetworkEnvironment getNetworkEnvironment() { + return networkEnvironment; + } + + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + + public MetricRegistry getMetricRegistry() { + return metricRegistry; + } + + // -------------------------------------------------------------------------------------------- + // Static factory methods for task manager services + // -------------------------------------------------------------------------------------------- + + /** + * Creates and returns the task manager services. + * + * @param resourceID resource ID of the task manager + * @param taskManagerServicesConfiguration task manager configuration + * @return task manager components + * @throws Exception + */ + public static TaskManagerServices fromConfiguration( + TaskManagerServicesConfiguration taskManagerServicesConfiguration, + ResourceID resourceID) throws Exception { + + final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration); + + network.start(); + + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation( + resourceID, + taskManagerServicesConfiguration.getTaskManagerAddress(), + network.getConnectionManager().getDataPort()); + + // this call has to happen strictly after the network stack has been initialized + final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration); + + // start the I/O manager, it will create some temp directories. + final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); + + MetricRegistry metricsRegistry = new MetricRegistry(taskManagerServicesConfiguration.getMetricRegistryConfiguration()); + + return new TaskManagerServices(taskManagerLocation, memoryManager, ioManager, network, metricsRegistry); + } + + /** + * Creates a {@link MemoryManager} from the given {@link TaskManagerServicesConfiguration}. + * + * @param taskManagerServicesConfiguration to create the memory manager from + * @return Memory manager + * @throws Exception + */ + private static MemoryManager createMemoryManager(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws Exception { + // computing the amount of memory to use depends on how much memory is available + // it strictly needs to happen AFTER the network stack has been initialized + + MemoryType memType = taskManagerServicesConfiguration.getNetworkConfig().memoryType(); + + // check if a value has been configured + long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory(); + + final long memorySize; + + boolean preAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory(); + + if (configuredMemory > 0) { + if (preAllocateMemory) { + LOG.info("Using {} MB for managed memory." , configuredMemory); + } else { + LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily." , configuredMemory); + } + memorySize = configuredMemory << 20; // megabytes to bytes + } else { + float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction(); + + if (memType == MemoryType.HEAP) { + long relativeMemSize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * memoryFraction); + if (preAllocateMemory) { + LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB)." , + memoryFraction , relativeMemSize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), " + + "memory will be allocated lazily." , memoryFraction , relativeMemSize >> 20); + } + memorySize = relativeMemSize; + } else if (memType == MemoryType.OFF_HEAP) { + // The maximum heap memory has been adjusted according to the fraction + long maxMemory = EnvironmentInformation.getMaxJvmHeapMemory(); + long directMemorySize = (long) (maxMemory / (1.0 - memoryFraction) * memoryFraction); + if (preAllocateMemory) { + LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB)." , + memoryFraction, directMemorySize >> 20); + } else { + LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB)," + + " memory will be allocated lazily.", memoryFraction, directMemorySize >> 20); + } + memorySize = directMemorySize; + } else { + throw new RuntimeException("No supported memory type detected."); + } + } + + // now start the memory manager + final MemoryManager memoryManager; + try { + memoryManager = new MemoryManager( + memorySize, + taskManagerServicesConfiguration.getNumberOfSlots(), + taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(), + memType, + preAllocateMemory); + } catch (OutOfMemoryError e) { + if (memType == MemoryType.HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager heap memory (" + memorySize + " bytes).", e); + } else if (memType == MemoryType.OFF_HEAP) { + throw new Exception("OutOfMemory error (" + e.getMessage() + + ") while allocating the TaskManager off-heap memory (" + memorySize + + " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e); + } else { + throw e; + } + } + return memoryManager; + } + + /** + * Creates the {@link NetworkEnvironment} from the given {@link TaskManagerServicesConfiguration}. + * + * @param taskManagerServicesConfiguration to construct the network environment from + * @return Network environment + * @throws IOException + */ + private static NetworkEnvironment createNetworkEnvironment(TaskManagerServicesConfiguration taskManagerServicesConfiguration) throws IOException { + // pre-start checks + checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths()); + + NetworkEnvironmentConfiguration networkEnvironmentConfiguration = taskManagerServicesConfiguration.getNetworkConfig(); + + NetworkBufferPool networkBufferPool = new NetworkBufferPool( + networkEnvironmentConfiguration.numNetworkBuffers(), + networkEnvironmentConfiguration.networkBufferSize(), + networkEnvironmentConfiguration.memoryType()); + + ConnectionManager connectionManager; + + if (networkEnvironmentConfiguration.nettyConfig() != null) { + connectionManager = new NettyConnectionManager(networkEnvironmentConfiguration.nettyConfig()); + } else { + connectionManager = new LocalConnectionManager(); + } + + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); + + KvStateRegistry kvStateRegistry = new KvStateRegistry(); + + KvStateServer kvStateServer; + + if (networkEnvironmentConfiguration.nettyConfig() != null) { + NettyConfig nettyConfig = networkEnvironmentConfiguration.nettyConfig(); + + int numNetworkThreads = networkEnvironmentConfiguration.queryServerNetworkThreads() == 0 ? + nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerNetworkThreads(); + + int numQueryThreads = networkEnvironmentConfiguration.queryServerQueryThreads() == 0 ? + nettyConfig.getNumberOfSlots() : networkEnvironmentConfiguration.queryServerQueryThreads(); + + kvStateServer = new KvStateServer( + taskManagerServicesConfiguration.getTaskManagerAddress(), + networkEnvironmentConfiguration.queryServerPort(), + numNetworkThreads, + numQueryThreads, + kvStateRegistry, + new DisabledKvStateRequestStats()); + } else { + kvStateServer = null; + } + + // we start the network first, to make sure it can allocate its buffers first + final NetworkEnvironment network = new NetworkEnvironment( + networkBufferPool, + connectionManager, + resultPartitionManager, + taskEventDispatcher, + kvStateRegistry, + kvStateServer, + networkEnvironmentConfiguration.ioMode(), + networkEnvironmentConfiguration.partitionRequestInitialBackoff(), + networkEnvironmentConfiguration.partitinRequestMaxBackoff()); + + return network; + } + + /** + * Validates that all the directories denoted by the strings do actually exist, are proper + * directories (not files), and are writable. + * + * @param tmpDirs The array of directory paths to check. + * @throws Exception Thrown if any of the directories does not exist or is not writable + * or is a file, rather than a directory. + */ + private static void checkTempDirs(String[] tmpDirs) throws IOException { + for (String dir : tmpDirs) { + if (dir != null && !dir.equals("")) { + File file = new File(dir); + if (!file.exists()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist."); + } + if (!file.isDirectory()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory."); + } + if (!file.canWrite()) { + throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable."); + } + + if (LOG.isInfoEnabled()) { + long totalSpaceGb = file.getTotalSpace() >> 30; + long usableSpaceGb = file.getUsableSpace() >> 30; + double usablePercentage = (double)usableSpaceGb / totalSpaceGb * 100; + String path = file.getAbsolutePath(); + LOG.info(String.format("Temporary file directory '%s': total %d GB, " + "usable %d GB (%.2f%% usable)", + path, totalSpaceGb, usableSpaceGb, usablePercentage)); + } + } else { + throw new IllegalArgumentException("Temporary file directory #$id is null."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/1d25ea85/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java new file mode 100644 index 0000000..66d969a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.memory.HeapMemorySegment; +import org.apache.flink.core.memory.HybridMemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.util.MathUtils; + +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Configuration for the task manager services such as the network environment, the memory manager, + * the io manager and the metric registry + */ +public class TaskManagerServicesConfiguration { + + private final InetAddress taskManagerAddress; + + private final String[] tmpDirPaths; + + private final int numberOfSlots; + + private final NetworkEnvironmentConfiguration networkConfig; + + private final long configuredMemory; + + private final boolean preAllocateMemory; + + private final float memoryFraction; + + private final MetricRegistryConfiguration metricRegistryConfiguration; + + public TaskManagerServicesConfiguration( + InetAddress taskManagerAddress, + String[] tmpDirPaths, + NetworkEnvironmentConfiguration networkConfig, + int numberOfSlots, + long configuredMemory, + boolean preAllocateMemory, + float memoryFraction, + MetricRegistryConfiguration metricRegistryConfiguration) { + + this.taskManagerAddress = checkNotNull(taskManagerAddress); + this.tmpDirPaths = checkNotNull(tmpDirPaths); + this.networkConfig = checkNotNull(networkConfig); + this.numberOfSlots = checkNotNull(numberOfSlots); + + this.configuredMemory = configuredMemory; + this.preAllocateMemory = preAllocateMemory; + this.memoryFraction = memoryFraction; + + this.metricRegistryConfiguration = checkNotNull(metricRegistryConfiguration); + } + + // -------------------------------------------------------------------------------------------- + // Getter/Setter + // -------------------------------------------------------------------------------------------- + + + public InetAddress getTaskManagerAddress() { + return taskManagerAddress; + } + + public String[] getTmpDirPaths() { + return tmpDirPaths; + } + + public NetworkEnvironmentConfiguration getNetworkConfig() { return networkConfig; } + + public int getNumberOfSlots() { + return numberOfSlots; + } + + public float getMemoryFraction() { + return memoryFraction; + } + + public long getConfiguredMemory() { + return configuredMemory; + } + + public boolean isPreAllocateMemory() { + return preAllocateMemory; + } + + public MetricRegistryConfiguration getMetricRegistryConfiguration() { + return metricRegistryConfiguration; + } + + // -------------------------------------------------------------------------------------------- + // Parsing of Flink configuration + // -------------------------------------------------------------------------------------------- + + /** + * Utility method to extract TaskManager config parameters from the configuration and to + * sanity check them. + * + * @param configuration The configuration. + * @param remoteAddress identifying the IP address under which the TaskManager will be accessible + * @param localCommunication True, to skip initializing the network stack. + * Use only in cases where only one task manager runs. + * @return TaskExecutorConfiguration that wrappers InstanceConnectionInfo, NetworkEnvironmentConfiguration, etc. + */ + public static TaskManagerServicesConfiguration fromConfiguration( + Configuration configuration, + InetAddress remoteAddress, + boolean localCommunication) throws Exception { + + // we need this because many configs have been written with a "-1" entry + int slots = configuration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + if (slots == -1) { + slots = 1; + } + + final String[] tmpDirs = configuration.getString( + ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator); + + final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration( + configuration, + localCommunication, + remoteAddress, + slots); + + // extract memory settings + long configuredMemory = configuration.getLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L); + checkConfigParameter(configuredMemory == -1 || configuredMemory > 0, configuredMemory, + ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, + "MemoryManager needs at least one MB of memory. " + + "If you leave this config parameter empty, the system automatically " + + "pick a fraction of the available memory."); + + boolean preAllocateMemory = configuration.getBoolean( + ConfigConstants.TASK_MANAGER_MEMORY_PRE_ALLOCATE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_PRE_ALLOCATE); + + float memoryFraction = configuration.getFloat( + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); + checkConfigParameter(memoryFraction > 0.0f && memoryFraction < 1.0f, memoryFraction, + ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, + "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); + + final MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration); + + return new TaskManagerServicesConfiguration( + remoteAddress, + tmpDirs, + networkConfig, + slots, + configuredMemory, + preAllocateMemory, + memoryFraction, + metricRegistryConfiguration); + } + + // -------------------------------------------------------------------------- + // Parsing and checking the TaskManager Configuration + // -------------------------------------------------------------------------- + + /** + * Creates the {@link NetworkEnvironmentConfiguration} from the given {@link Configuration}. + * + * @param configuration to create the network environment configuration from + * @param localTaskManagerCommunication true if task manager communication is local + * @param taskManagerAddress address of the task manager + * @param slots to start the task manager with + * @return Network environment configuration + */ + private static NetworkEnvironmentConfiguration parseNetworkEnvironmentConfiguration( + Configuration configuration, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress, + int slots) throws Exception { + + // ----> hosts / ports for communication and data exchange + + int dataport = configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT); + + checkConfigParameter(dataport > 0, dataport, ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + checkConfigParameter(slots >= 1, slots, ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, + "Number of task slots must be at least one."); + + final int numNetworkBuffers = configuration.getInteger( + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS); + + checkConfigParameter(numNetworkBuffers > 0, numNetworkBuffers, + ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY, ""); + + final int pageSize = configuration.getInteger( + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_SEGMENT_SIZE); + + // check page size of for minimum size + checkConfigParameter(pageSize >= MemoryManager.MIN_PAGE_SIZE, pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Minimum memory segment size is " + MemoryManager.MIN_PAGE_SIZE); + + // check page size for power of two + checkConfigParameter(MathUtils.isPowerOf2(pageSize), pageSize, + ConfigConstants.TASK_MANAGER_MEMORY_SEGMENT_SIZE_KEY, + "Memory segment size must be a power of 2."); + + // check whether we use heap or off-heap memory + final MemoryType memType; + if (configuration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_OFF_HEAP_KEY, false)) { + memType = MemoryType.OFF_HEAP; + } else { + memType = MemoryType.HEAP; + } + + // initialize the memory segment factory accordingly + if (memType == MemoryType.HEAP) { + if (!MemorySegmentFactory.initializeIfNotInitialized(HeapMemorySegment.FACTORY)) { + throw new Exception("Memory type is set to heap memory, but memory segment " + + "factory has been initialized for off-heap memory segments"); + } + } else { + if (!MemorySegmentFactory.initializeIfNotInitialized(HybridMemorySegment.FACTORY)) { + throw new Exception("Memory type is set to off-heap memory, but memory segment " + + "factory has been initialized for heap memory segments"); + } + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), + taskManagerInetSocketAddress.getPort(), pageSize, slots, configuration); + } else { + nettyConfig = null; + } + + // Default spill I/O mode for intermediate results + final String syncOrAsync = configuration.getString( + ConfigConstants.TASK_MANAGER_NETWORK_DEFAULT_IO_MODE, + ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_DEFAULT_IO_MODE); + + final IOManager.IOMode ioMode; + if (syncOrAsync.equals("async")) { + ioMode = IOManager.IOMode.ASYNC; + } else { + ioMode = IOManager.IOMode.SYNC; + } + + final int queryServerPort = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_PORT, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_PORT); + + final int queryServerNetworkThreads = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_NETWORK_THREADS, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_NETWORK_THREADS); + + final int queryServerQueryThreads = configuration.getInteger( + ConfigConstants.QUERYABLE_STATE_SERVER_QUERY_THREADS, + ConfigConstants.DEFAULT_QUERYABLE_STATE_SERVER_QUERY_THREADS); + + return new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + memType, + ioMode, + queryServerPort, + queryServerNetworkThreads, + queryServerQueryThreads, + nettyConfig, + 500, + 3000); + } + + /** + * Validates a condition for a config parameter and displays a standard exception, if the + * the condition does not hold. + * + * @param condition The condition that must hold. If the condition is false, an exception is thrown. + * @param parameter The parameter value. Will be shown in the exception message. + * @param name The name of the config parameter. Will be shown in the exception message. + * @param errorMessage The optional custom error message to append to the exception message. + */ + private static void checkConfigParameter( + boolean condition, + Object parameter, + String name, + String errorMessage) { + if (!condition) { + throw new IllegalConfigurationException("Invalid configuration value for " + name + " : " + parameter + " - " + errorMessage); + } + } +} +
