[FLINK-2288] [runtime] Cleanups and comments for ZooKeeper based initialization
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/535475c2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/535475c2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/535475c2 Branch: refs/heads/master Commit: 535475c2131b9bc352bc7268f022a1bdce206f2e Parents: 9c0dd97 Author: Stephan Ewen <[email protected]> Authored: Wed Jul 8 15:02:33 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Jul 8 20:28:40 2015 +0200 ---------------------------------------------------------------------- docs/setup/jobmanager_high_availability.md | 2 +- flink-dist/src/main/resources/flink-conf.yaml | 37 +++++++++++++ .../flink/runtime/security/SecurityUtils.java | 10 +++- .../zookeeper/FlinkZooKeeperQuorumPeer.java | 26 +++++---- .../flink/runtime/jobmanager/JobManager.scala | 58 +++++++++++--------- 5 files changed, 92 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/docs/setup/jobmanager_high_availability.md ---------------------------------------------------------------------- diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md index dec0cdc..8958e17 100644 --- a/docs/setup/jobmanager_high_availability.md +++ b/docs/setup/jobmanager_high_availability.md @@ -77,7 +77,7 @@ server.X=addressX:peerPort:leaderPort server.Y=addressY:peerPort:leaderPort </pre> -The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some rqeuired configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation. +The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each of the configured hosts. The started processes start ZooKeeper servers via a Flink wrapper, which reads the configuration from `conf/zoo.cfg` and makes sure to set some required configuration values for convenience. In production setups, it is recommended to manage your own ZooKeeper installation. ## Example: Start and stop a local HA-cluster with 2 JobManagers http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-dist/src/main/resources/flink-conf.yaml ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml index a258815..2a287dc 100644 --- a/flink-dist/src/main/resources/flink-conf.yaml +++ b/flink-dist/src/main/resources/flink-conf.yaml @@ -21,18 +21,38 @@ # Common #============================================================================== +# The host on which the JobManager runs. Only used in non-high-availability mode. +# The JobManager process will use this hostname to bind the listening servers to. +# The TaskManagers will try to connect to the JobManager on that host. + jobmanager.rpc.address: localhost + +# The port where the JobManager's main actor system listens for messages. + jobmanager.rpc.port: 6123 + +# The heap size for the JobManager JVM + jobmanager.heap.mb: 256 + +# The heap size for the TaskManager JVM + taskmanager.heap.mb: 512 + +# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. + taskmanager.numberOfTaskSlots: 1 + +# The parallelism used for programs that did not specify and other parallelism. + parallelism.default: 1 + #============================================================================== # Web Frontend #============================================================================== @@ -42,11 +62,13 @@ parallelism.default: 1 jobmanager.web.port: 8081 + # The port uder which the standalone web client # (for job upload and submit) listens. webclient.port: 8080 + #============================================================================== # Streaming state checkpointing #============================================================================== @@ -58,12 +80,14 @@ webclient.port: 8080 state.backend: jobmanager + # Directory for storing checkpoints in a flink supported filesystem # Note: State backend must be accessible from the JobManager, use file:// # only for local setups. # # state.backend.fs.checkpointdir: hdfs://checkpoints + #============================================================================== # Advanced #============================================================================== @@ -72,6 +96,7 @@ state.backend: jobmanager # # taskmanager.network.numberOfBuffers: 2048 + # Directories for temporary files. # # Add a delimited list for multiple directories, using the system directory @@ -88,6 +113,7 @@ state.backend: jobmanager # # taskmanager.tmp.dirs: /tmp + # Path to the Hadoop configuration directory. # # This configuration is used when writing into HDFS. Unless specified otherwise, @@ -98,3 +124,14 @@ state.backend: jobmanager # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'. # # fs.hdfs.hadoopconf: /path/to/hadoop/conf/ + + +#============================================================================== +# High Availability +#============================================================================== + +# The list of ZooKepper quorum peers that coordinate the high-availability +# setup. This must be a list of the form +# "host_1[:peerPort[:leaderPort]],host_2[:peerPort[:leaderPort]],..." + +#ha.zookeeper.quorum: localhost \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java index 5fde51e..4c0d49a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java @@ -24,6 +24,12 @@ import org.slf4j.LoggerFactory; import java.security.PrivilegedExceptionAction; +/** + * A utility class that lets program code run in a security context provided by the + * Hadoop security user groups. + * + * The secure context will for example pick up authentication information from Kerberos. + */ public class SecurityUtils { private static final Logger LOG = LoggerFactory.getLogger(SecurityUtils.class); @@ -44,17 +50,15 @@ public class SecurityUtils { LOG.error("Security is enabled but no Kerberos credentials have been found. " + "You may authenticate using the kinit command."); } - T ret = ugi.doAs(new PrivilegedExceptionAction<T>() { + return ugi.doAs(new PrivilegedExceptionAction<T>() { @Override public T run() throws Exception { return runner.run(); } }); - return ret; } public static interface FlinkSecuredRunner<T> { public T run() throws Exception; } - } http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java index d0e706e..c9d3ec4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java @@ -21,20 +21,22 @@ package org.apache.flink.runtime.zookeeper; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerMain; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; -import java.io.PrintWriter; import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -49,6 +51,10 @@ public class FlinkZooKeeperQuorumPeer { public static void main(String[] args) { try { + // startup checks and logging + EnvironmentInformation.logEnvironmentInfo(LOG, "ZooKeeper Quorum Peer", args); + EnvironmentInformation.checkJavaVersion(); + final ParameterTool params = ParameterTool.fromArgs(args); final String zkConfigFile = params.getRequired("zkConfigFile"); final int peerId = params.getInt("peerId"); @@ -57,7 +63,7 @@ public class FlinkZooKeeperQuorumPeer { runFlinkZkQuorumPeer(zkConfigFile, peerId); } catch (Throwable t) { - t.printStackTrace(); + LOG.error("Error running ZooKeeper quorum peer: " + t.getMessage(), t); System.exit(-1); } } @@ -210,18 +216,16 @@ public class FlinkZooKeeperQuorumPeer { dataDir.deleteOnExit(); - // Write myid to file - PrintWriter writer = null; + LOG.info("Writing {} to myid file in 'dataDir'.", id); + + // Write myid to file. We use a File Writer, because that properly propagates errors, + // while the PrintWriter swallows errors + FileWriter writer = new FileWriter(new File(dataDir, "myid")); try { - LOG.info("Writing {} to myid file in 'dataDir'.", id); - - writer = new PrintWriter(new File(dataDir, "myid")); - writer.println(id); + writer.write(String.valueOf(id)); } finally { - if (writer != null) { - writer.close(); - } + writer.close(); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/535475c2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0d96edb..dc1599a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1000,35 +1000,41 @@ object JobManager { configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..") } - // HA mode - val (hostname, port) = if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) { - // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is - // chosen. For the FlinkMiniCluster you have to choose it on your own. - LOG.info("HA mode.") - - if (config.getHost == null) { - throw new Exception("Missing parameter '--host'.") + // high availability mode + val (hostname: String, port: Int ) = + if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) { + // TODO @removeme @tillrohrmann This is the place where the host and random port for JM is + // chosen. For the FlinkMiniCluster you have to choose it on your own. + LOG.info("Starting JobManager in High-Availability Mode") + + if (config.getHost() == null) { + throw new Exception("Missing parameter '--host'. Parameter is required when " + + "running in high-availability mode") + } + + // Let web server listen on random port + configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) + + (config.getHost(), 0) } - - // Let web server listen on random port - configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); - - (config.getHost, NetUtils.getAvailablePort) - } - else { - if (config.getHost != null) { - throw new IllegalStateException("Specified explicit address for JobManager communication " + - "via CLI, but no ZooKeeper quorum has been configured. The task managers will not be " + - "able to find the correct JobManager to connect to. Please configure ZooKeeper or " + - "don't set the address explicitly (this will fallback to the address configured in " + - "in 'conf/flink-conf.yaml'.") + else { + LOG.info("Staring JobManager without high-availability") + + if (config.getHost() != null) { + throw new Exception("Found an explicit address for JobManager communication " + + "via the CLI option '--host'.\n" + + "This parameter must only be set if the JobManager is started in high-availability " + + "mode and connects to a ZooKeeper quorum.\n" + + "Please configure ZooKeeper or don't set the '--host' option, so that the JobManager " + + "uses the address configured under 'conf/flink-conf.yaml'.") + } + + val host = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT) + (host, port) } - (configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null), - configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)) - } - (configuration, config.getJobManagerMode(), config.getStreamingMode(), hostname, port) }
