[FLINK-2288] [FLINK-2302] Setup ZooKeeper for distributed coordination

- FLINK-2288: Setup ZooKeeper for distributed coordination
  * Add FlinkZooKeeperQuorumPeer to wrap ZooKeeper's quorum peers with
    utilities to write required config values (default datadir, myid)
  * Add default conf/zoo.cfg config for ZooKeeper
  * Add startup scripts for ZooKeeper quorum
  * Add conf/masters file for HA masters

- FLINK-2302: Allow multiple instances to run on single host
  * Multiple TaskManager and JobManager instances can run on a single
    host.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c72b50d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c72b50d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c72b50d

Branch: refs/heads/master
Commit: 8c72b50d87c5a4946b67d722fc1ec3adb7c9bf16
Parents: 60cfa0b
Author: Ufuk Celebi <[email protected]>
Authored: Fri Jul 3 11:04:45 2015 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  58 ++++-
 flink-dist/src/main/flink-bin/bin/config.sh     |  55 ++++-
 .../src/main/flink-bin/bin/flink-daemon.sh      | 146 ++++++++++++
 flink-dist/src/main/flink-bin/bin/jobmanager.sh | 112 +++------
 .../flink-bin/bin/start-cluster-streaming.sh    |  28 +--
 .../src/main/flink-bin/bin/start-cluster.sh     |  46 ++--
 .../main/flink-bin/bin/start-local-streaming.sh |   7 +-
 .../src/main/flink-bin/bin/start-local.sh       |   7 +-
 .../flink-bin/bin/start-zookeeper-quorum.sh     |  46 ++++
 .../src/main/flink-bin/bin/stop-cluster.sh      |  37 ++-
 .../main/flink-bin/bin/stop-zookeeper-quorum.sh |  46 ++++
 .../src/main/flink-bin/bin/taskmanager.sh       |  88 ++-----
 flink-dist/src/main/flink-bin/bin/zookeeper.sh  |  56 +++++
 flink-dist/src/main/flink-bin/conf/masters      |   1 +
 flink-dist/src/main/flink-bin/conf/zoo.cfg      |  18 ++
 flink-runtime/pom.xml                           |  51 +++--
 .../jobmanager/JobManagerCliOptions.java        |  12 +
 .../flink/runtime/util/ZooKeeperUtil.java       | 110 +++++++++
 .../zookeeper/FlinkZooKeeperQuorumPeer.java     | 227 +++++++++++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  80 +++++--
 .../flink/runtime/taskmanager/TaskManager.scala |   7 +-
 .../flink/runtime/util/ZooKeeperUtilTest.java   |  76 +++++++
 pom.xml                                         |  16 ++
 23 files changed, 1055 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 9d8da21..e3b8b53 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -671,11 +671,61 @@ public final class ConfigConstants {
         */
        public static final String LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER = 
"localinstancemanager.numtaskmanager";
 
-
        public static final String LOCAL_INSTANCE_MANAGER_START_WEBSERVER = 
"localinstancemanager.start-webserver";
-       
-       // 
------------------------------------------------------------------------
-       
+
+       // --------------------------- ZooKeeper 
----------------------------------
+
+       /** ZooKeeper servers. */
+       public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum";
+
+       /** ZooKeeper root path. */
+       public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir";
+
+       public static final String ZOOKEEPER_LATCH_PATH = 
"ha.zookeeper.dir.latch";
+
+       public static final String ZOOKEEPER_LEADER_PATH = 
"ha.zookeeper.dir.leader";
+
+       public static final String ZOOKEEPER_SESSION_TIMEOUT = 
"ha.zookeeper.client.session-timeout";
+
+       public static final String ZOOKEEPER_CONNECTION_TIMEOUT = 
"ha.zookeeper.client.connection-timeout";
+
+       public static final String ZOOKEEPER_RETRY_WAIT = 
"ha.zookeeper.client.retry-wait";
+
+       public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = 
"ha.zookeeper.client.max-retry-attempts";
+
+       // - Defaults 
-------------------------------------------------------------
+
+       public static final String DEFAULT_ZOOKEEPER_ZNODE_ROOT = "/flink";
+
+       public static final String DEFAULT_ZOOKEEPER_LATCH_PATH = 
"/leaderlatch";
+
+       public static final String DEFAULT_ZOOKEEPER_LEADER_PATH = "/leader";
+
+       public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = 60000;
+
+       public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT = 15000;
+
+       public static final int DEFAULT_ZOOKEEPER_RETRY_WAIT = 5000;
+
+       public static final int DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS = 3;
+
+       // - Defaults for required ZooKeeper configuration keys 
-------------------
+
+       /** ZooKeeper default client port. */
+       public static final int DEFAULT_ZOOKEEPER_CLIENT_PORT = 2181;
+
+       /** ZooKeeper default init limit. */
+       public static final int DEFAULT_ZOOKEEPER_INIT_LIMIT = 10;
+
+       /** ZooKeeper default sync limit. */
+       public static final int DEFAULT_ZOOKEEPER_SYNC_LIMIT = 5;
+
+       /** ZooKeeper default peer port. */
+       public static final int DEFAULT_ZOOKEEPER_PEER_PORT = 2888;
+
+       /** ZooKeeper default leader port. */
+       public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
+
        /**
         * Not instantiable.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 341144c..99cfa86 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -94,6 +94,8 @@ KEY_ENV_LOG_MAX="env.log.max"
 KEY_ENV_JAVA_HOME="env.java.home"
 KEY_ENV_JAVA_OPTS="env.java.opts"
 KEY_ENV_SSH_OPTS="env.ssh.opts"
+KEY_ZK_QUORUM="ha.zookeeper.quorum"
+KEY_ZK_HEAP_MB="zookeeper.heap.mb"
 
 
########################################################################################################################
 # PATHS AND CONFIG
@@ -196,10 +198,21 @@ if [ -z "${FLINK_SSH_OPTS}" ]; then
     FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} 
"${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
 fi
 
+# Define ZK_HEAP if it is not already set
+if [ -z "${ZK_HEAP}" ]; then
+    ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
+fi
+
+if [ -z "${ZK_QUORUM}" ]; then
+    ZK_QUORUM=$(readFromConfig ${KEY_ZK_QUORUM} "" "${YAML_CONF}")
+fi
+
 # Arguments for the JVM. Used for job and task manager JVMs.
 # DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
 # KEY_JOBM_HEAP_MB and KEY_TASKM_HEAP_MB for that!
-JVM_ARGS=""
+if [ -z "${JVM_ARGS}" ]; then
+    JVM_ARGS=""
+fi
 
 # Check if deprecated HADOOP_HOME is set.
 if [ -n "$HADOOP_HOME" ]; then
@@ -243,3 +256,43 @@ rotateLogFile() {
         mv "$log" "$log.$num";
     fi
 }
+
+readMasters() {
+    MASTERS_FILE="${FLINK_CONF_DIR}/masters"
+
+    if [[ ! -f "${MASTERS_FILE}" ]]; then
+        echo "No masters file. Please specify masters in 'conf/masters'."
+        exit 1
+    fi
+
+    MASTERS=()
+
+    GOON=true
+    while $GOON; do
+        read line || GOON=false
+        HOST=$( extractHostName $line)
+        if [ -n "$HOST" ]; then
+            MASTERS+=(${HOST})
+        fi
+    done < "$MASTERS_FILE"
+}
+
+readSlaves() {
+    SLAVES_FILE="${FLINK_CONF_DIR}/slaves"
+
+    if [[ ! -f "$SLAVES_FILE" ]]; then
+        echo "No slaves file. Please specify slaves in 'conf/slaves'."
+        exit 1
+    fi
+
+    SLAVES=()
+
+    GOON=true
+    while $GOON; do
+        read line || GOON=false
+        HOST=$( extractHostName $line)
+        if [ -n "$HOST" ]; then
+            SLAVES+=(${HOST})
+        fi
+    done < "$SLAVES_FILE"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh 
b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
new file mode 100644
index 0000000..1fc110a
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -0,0 +1,146 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a Flink daemon.
+USAGE="Usage: flink-daemon.sh (start|stop|stop-all) 
(jobmanager|taskmanager|zookeeper) [args]"
+
+STARTSTOP=$1
+DAEMON=$2
+ARGS=$3
+
+case $DAEMON in
+    (jobmanager)
+        CLASS_TO_RUN=org.apache.flink.runtime.jobmanager.JobManager
+    ;;
+
+    (taskmanager)
+        CLASS_TO_RUN=org.apache.flink.runtime.taskmanager.TaskManager
+    ;;
+
+    (zookeeper)
+        
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
+    ;;
+
+    (*)
+        echo "Unknown daemon '${DAEMON}'. $USAGE."
+        exit 1
+    ;;
+esac
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+    FLINK_IDENT_STRING="$USER"
+fi
+
+FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+
+pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$DAEMON.pid
+
+mkdir -p "$FLINK_PID_DIR"
+
+# Ascending ID depending on number of lines in pid file.
+# This allows us to start multiple daemon of each type.
+id=$([ -f "$pid" ] && echo $(wc -l < $pid) || echo "0")
+
+log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-$DAEMON-$id-$HOSTNAME.log
+out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-$DAEMON-$id-$HOSTNAME.out
+
+log_setting=(-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
+
+JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version 
"\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
+
+# Only set JVM 8 arguments if we have correctly extracted the version
+if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
+    if [ "$JAVA_VERSION" -lt 18 ]; then
+        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+    fi
+fi
+
+case $STARTSTOP in
+
+    (start)
+        # Rotate log files
+        rotateLogFile $log
+        rotateLogFile $out
+
+        echo "Starting $DAEMON daemon on host $HOSTNAME."
+        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} ${ARGS} > "$out" 2>&1 < /dev/null &
+        mypid=$!
+
+        # Add to pid file if successful start
+        if [[ ${mypid} =~ ${IS_NUMBER} ]] && kill -0 $mypid > /dev/null 2>&1 ; 
then
+            echo $mypid >> $pid
+        else
+            echo "Error starting $DAEMON daemon."
+            exit 1
+        fi
+    ;;
+
+    (stop)
+        if [ -f $pid ]; then
+            # Remove last in pid file
+            to_stop=$(tail -n 1 $pid)
+
+            if [ -z $to_stop ]; then
+                rm $pid # If all stopped, clean up pid file
+                echo "No $DAEMON daemon to stop on host $HOSTNAME."
+            else
+                sed \$d $pid > $pid.tmp # all but last line
+
+                # If all stopped, clean up pid file
+                [ $(wc -l < $pid.tmp) -eq 0 ] && rm $pid $pid.tmp || mv 
$pid.tmp $pid
+
+                if kill -0 $to_stop > /dev/null 2>&1; then
+                    echo "Stopping $DAEMON daemon (pid: $to_stop) on host 
$HOSTNAME."
+                    kill $to_stop
+                else
+                    echo "No $DAEMON daemon (pid: $to_stop) is running anymore 
on $HOSTNAME."
+                fi
+            fi
+        else
+            echo "No $DAEMON daemon to stop on host $HOSTNAME."
+        fi
+    ;;
+
+    (stop-all)
+        if [ -f $pid ]; then
+            mv $pid ${pid}.tmp
+
+            while read to_stop; do
+                if kill -0 $to_stop > /dev/null 2>&1; then
+                    echo "Stopping $DAEMON daemon (pid: $to_stop) on host 
$HOSTNAME."
+                    kill $to_stop
+                else
+                    echo "Skipping $DAEMON daemon (pid: $to_stop), because it 
is not running anymore on $HOSTNAME."
+                fi
+            done < ${pid}.tmp
+            rm ${pid}.tmp
+        fi
+    ;;
+
+    (*)
+        echo "Unexpected argument '$STARTSTOP'. $USAGE."
+        exit 1
+    ;;
+
+esac

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/jobmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/jobmanager.sh 
b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
index afe35d8..450d36b 100755
--- a/flink-dist/src/main/flink-bin/bin/jobmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/jobmanager.sh
@@ -17,104 +17,54 @@
 # limitations under the License.
 
################################################################################
 
+# Start/stop a Flink JobManager.
+USAGE="Usage: jobmanager.sh (start (local|cluster) [batch|streaming] 
[host])|stop|stop-all)"
 
 STARTSTOP=$1
 EXECUTIONMODE=$2
 STREAMINGMODE=$3
+HOST=$4 # optional when starting multiple instances
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version 
"\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
-
-# Only set JVM 8 arguments if we have correctly extracted the version
-if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
-    if [ "$JAVA_VERSION" -lt 18 ]; then
-        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+if [[ $STARTSTOP == "start" ]]; then
+    if [ -z $EXECUTIONMODE ]; then
+        echo "Missing execution mode (local|cluster) argument. $USAGE."
+        exit 1
     fi
-fi
-
-if [ "$FLINK_IDENT_STRING" = "" ]; then
-    FLINK_IDENT_STRING="$USER"
-fi
-
-FLINK_JM_CLASSPATH=`constructFlinkClassPath`
-
-log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-jobmanager-$HOSTNAME.log
-out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-jobmanager-$HOSTNAME.out
-pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-jobmanager.pid
-log_setting=(-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
-
-case $STARTSTOP in
-
-    (start)
 
-        if [ -z $EXECUTIONMODE ]; then
-            echo "Please specify 'start (cluster|local) [batch|streaming]' or 
'stop'"
-            exit 1
-        fi
-
-        # Use batch mode as default
-        if [ -z $STREAMINGMODE ]; then
-            echo "Did not specify [batch|streaming] mode. Falling back to 
batch mode as default."
-            STREAMINGMODE="batch"
-        fi
+    # Use batch mode as default
+    if [ -z $STREAMINGMODE ]; then
+        echo "Missing streaming mode (batch|streaming) argument. Using 
'batch'."
+        STREAMINGMODE="batch"
+    fi
 
-        if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]]; then
-            echo "ERROR: Configured job manager heap size is not a number. 
Cancelling job manager startup."
+    if [[ ! ${FLINK_JM_HEAP} =~ $IS_NUMBER ]]; then
+        echo "[ERROR] Configured JobManager JVM heap size is not a number. 
Please set '$KEY_JOBM_HEAP_MB' in $FLINK_CONF_FILE."
+        exit 1
+    fi
 
+    if [ "$EXECUTIONMODE" = "local" ]; then
+        if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]]; then
+            echo "[ERROR] Configured JobManager JVM heap size is not a number. 
Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
             exit 1
         fi
 
-        if [ "$EXECUTIONMODE" = "local" ]; then
-            if [[ ! ${FLINK_TM_HEAP} =~ $IS_NUMBER ]]; then
-                echo "ERROR: Configured task manager heap size is not a 
number. Cancelling (local) job manager startup."
-
-                exit 1
-            fi
-
-            FLINK_JM_HEAP=`expr $FLINK_JM_HEAP + $FLINK_TM_HEAP`
-        fi
-
-        if [ "$FLINK_JM_HEAP" -gt 0 ]; then
-            JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m"
-        fi
-
-        mkdir -p "$FLINK_PID_DIR"
-        if [ -f $pid ]; then
-            if kill -0 `cat $pid` > /dev/null 2>&1; then
-                echo Job manager running as process `cat $pid`.  Stop it first.
-                exit 1
-            fi
-        fi
-
-        # Rotate log files
-        rotateLogFile $log
-        rotateLogFile $out
-
-        echo "Starting Job Manager"
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_JM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
org.apache.flink.runtime.jobmanager.JobManager --configDir "$FLINK_CONF_DIR" 
--executionMode $EXECUTIONMODE --streamingMode "$STREAMINGMODE" > "$out" 2>&1 < 
/dev/null &
-        echo $! > $pid
-
-    ;;
+        FLINK_JM_HEAP=`expr $FLINK_JM_HEAP + $FLINK_TM_HEAP`
+    fi
 
-    (stop)
-        if [ -f $pid ]; then
-            if kill -0 `cat $pid` > /dev/null 2>&1; then
-                echo "Stopping job manager"
-                kill `cat $pid`
-            else
-                echo "No job manager to stop"
-            fi
-        else
-            echo "No job manager to stop"
-        fi
-    ;;
+    if [ "$FLINK_JM_HEAP" -gt 0 ]; then
+        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP"m -Xmx"$FLINK_JM_HEAP"m"
+    fi
 
-    (*)
-        echo "Please specify 'start (cluster|local) [batch|streaming]' or 
'stop'"
-    ;;
+    # Startup parameters
+    args="--configDir ${FLINK_CONF_DIR} --executionMode ${EXECUTIONMODE} 
--streamingMode ${STREAMINGMODE}"
+    if [ ! -z $HOST ]; then
+        args="${args} --host $HOST"
+    fi
+fi
 
-esac
+${bin}/flink-daemon.sh $STARTSTOP jobmanager "${args}"

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh 
b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
index da13029..d48a6e3fb 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster-streaming.sh
@@ -17,32 +17,8 @@
 # limitations under the License.
 
################################################################################
 
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
-. "$bin"/config.sh
-
-HOSTLIST=$FLINK_SLAVES
-
-if [ "$HOSTLIST" = "" ]; then
-    HOSTLIST="${FLINK_CONF_DIR}/slaves"
-fi
-
-if [ ! -f "$HOSTLIST" ]; then
-    echo $HOSTLIST is not a valid slave list
-    exit 1
-fi
-
-# cluster mode, bring up job manager locally and a task manager on every slave 
host
-"$FLINK_BIN_DIR"/jobmanager.sh start cluster streaming
-
-GOON=true
-while $GOON
-do
-    read line || GOON=false
-    HOST=$( extractHostName $line)
-    if [ -n "$HOST" ]; then
-        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l 
$FLINK_BIN_DIR/taskmanager.sh start streaming &"
-    fi
-done < "$HOSTLIST"
+# Start a Flink cluster in streaming mode
+${bin}/start-cluster.sh streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-cluster.sh 
b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
index 36f5bc3..2183841 100755
--- a/flink-dist/src/main/flink-bin/bin/start-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-cluster.sh
@@ -17,32 +17,40 @@
 # limitations under the License.
 
################################################################################
 
+# Start a Flink cluster in batch or streaming mode
+USAGE="Usage: start-cluster.sh [batch|streaming]"
+
+STREAMING_MODE=$1
+
+if [[ -z $STREAMING_MODE ]]; then
+    STREAMING_MODE="batch"
+fi
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-HOSTLIST=$FLINK_SLAVES
+# Start the JobManager instance(s)
+if [[ -z $ZK_QUORUM ]]; then
+    echo "Starting cluster (${STREAMING_MODE} mode)."
 
-if [ "$HOSTLIST" = "" ]; then
-    HOSTLIST="${FLINK_CONF_DIR}/slaves"
-fi
+    # Start single JobManager on this machine
+    "$bin"/jobmanager.sh start cluster ${STREAMING_MODE}
+else
+    # HA Mode
+    readMasters
+
+    echo "Starting HA cluster (${STREAMING_MODE} mode) with ${#MASTERS[@]} 
masters and ${#ZK_QUORUM[@]} peers in ZooKeeper quorum."
 
-if [ ! -f "$HOSTLIST" ]; then
-    echo $HOSTLIST is not a valid slave list
-    exit 1
+    for master in ${MASTERS[@]}; do
+        ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l 
$FLINK_BIN_DIR/jobmanager.sh start cluster ${STREAMING_MODE} ${master} &"
+    done
 fi
 
-# cluster mode, bring up job manager locally and a task manager on every slave 
host
-"$FLINK_BIN_DIR"/jobmanager.sh start cluster batch
-
-GOON=true
-while $GOON
-do
-    read line || GOON=false
-    HOST=$( extractHostName $line)
-    if [ -n "$HOST" ]; then
-        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash -l 
$FLINK_BIN_DIR/taskmanager.sh start batch &"
-    fi
-done < "$HOSTLIST"
+# Start TaskManager instance(s)
+readSlaves
+
+for slave in ${SLAVES[@]}; do
+    ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l $bin/taskmanager.sh 
start ${STREAMING_MODE} &"
+done

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh 
b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
index 2cb4d4a..688bffa 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local-streaming.sh
@@ -17,11 +17,8 @@
 # limitations under the License.
 
################################################################################
 
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
-. "$bin"/config.sh
-
-# local mode, only bring up job manager. The job manager will start an 
internal task manager
-"$FLINK_BIN_DIR"/jobmanager.sh start local streaming
+# Start a local Flink cluster in streaming mode
+${bin}/start-local.sh streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-local.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-local.sh 
b/flink-dist/src/main/flink-bin/bin/start-local.sh
index 7ea3ff4..626fa69 100755
--- a/flink-dist/src/main/flink-bin/bin/start-local.sh
+++ b/flink-dist/src/main/flink-bin/bin/start-local.sh
@@ -17,6 +17,11 @@
 # limitations under the License.
 
################################################################################
 
+STREAMING_MODE=$1
+
+if [[ -z $STREAMING_MODE ]]; then
+       STREAMING_MODE="batch"
+fi
 
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
@@ -24,4 +29,4 @@ bin=`cd "$bin"; pwd`
 . "$bin"/config.sh
 
 # local mode, only bring up job manager. The job manager will start an 
internal task manager
-"$FLINK_BIN_DIR"/jobmanager.sh start local batch
+"$FLINK_BIN_DIR"/jobmanager.sh start local ${STREAMING_MODE}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh 
b/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh
new file mode 100755
index 0000000..d2ff914
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/start-zookeeper-quorum.sh
@@ -0,0 +1,46 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Starts a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg
+
+ZK_CONF=$FLINK_CONF_DIR/zoo.cfg
+if [ ! -f $ZK_CONF ]; then
+    echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+    exit 1
+fi
+
+# Extract server.X from ZooKeeper config and start instances
+while read server ; do
+    server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 
's/[[:space:]]*$//') # trim
+
+    # match server.id=address[:port[:port]]
+    if [[ $server =~ ^server\.([0-9])+[[:space:]]*\=([^: \#]+) ]]; then
+        id=${BASH_REMATCH[1]}
+        address=${BASH_REMATCH[2]}
+
+        ssh -n $FLINK_SSH_OPTS $address -- "nohup /bin/bash -l 
$bin/zookeeper.sh start $id &"
+    else
+        echo "[WARN] Parse error. Skipping config entry '$server'."
+    fi
+done < <(grep "^server\." $ZK_CONF)

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh 
b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
index cf2bf9b..17a5daf 100755
--- a/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
+++ b/flink-dist/src/main/flink-bin/bin/stop-cluster.sh
@@ -17,33 +17,26 @@
 # limitations under the License.
 
################################################################################
 
-
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-HOSTLIST=$FLINK_SLAVES
-
-if [ "$HOSTLIST" = "" ]; then
-    HOSTLIST="${FLINK_CONF_DIR}/slaves"
-fi
-
-if [ ! -f "$HOSTLIST" ]; then
-    echo $HOSTLIST is not a valid slave list
-    exit 1
-fi
+# Stop TaskManager instance(s)
+readSlaves
 
+for slave in ${SLAVES[@]}; do
+    ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l $bin/taskmanager.sh 
stop &"
+done
 
-GOON=true
-while $GOON
-do
-    read line || GOON=false
-    HOST=$( extractHostName $line)
-    if [ -n "$HOST" ]; then
-        ssh -n $FLINK_SSH_OPTS $HOST -- "nohup /bin/bash 
$FLINK_BIN_DIR/taskmanager.sh stop &"
-    fi
-done < $HOSTLIST
+# Stop JobManager instance(s)
+if [[ -z $ZK_QUORUM ]]; then
+    "$bin"/jobmanager.sh stop
+else
+       # HA Mode
+    readMasters
 
-# cluster mode, stop the job manager locally and stop the task manager on 
every slave host
-"$FLINK_BIN_DIR"/jobmanager.sh stop
+    for master in ${MASTERS[@]}; do
+        ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l 
$bin/jobmanager.sh stop &"
+    done
+fi

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh 
b/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh
new file mode 100755
index 0000000..4d19cbd
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/stop-zookeeper-quorum.sh
@@ -0,0 +1,46 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Stops a ZooKeeper quorum as configured in $FLINK_CONF/zoo.cfg
+
+ZK_CONF=$FLINK_CONF_DIR/zoo.cfg
+if [ ! -f $ZK_CONF ]; then
+    echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+    exit 1
+fi
+
+# Extract server.X from ZooKeeper config and stop instances
+while read server ; do
+    server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 
's/[[:space:]]*$//') # trim
+
+    # match server.id=address[:port[:port]]
+    if [[ $server =~ ^server\.([0-9])+[[:space:]]*\=([^: \#]+) ]]; then
+        id=${BASH_REMATCH[1]}
+        server=${BASH_REMATCH[2]}
+
+        ssh -n $FLINK_SSH_OPTS $server -- "nohup /bin/bash -l 
$bin/zookeeper.sh stop &"
+    else
+        echo "[WARN] Parse error. Skipping config entry '$server'."
+    fi
+done < <(grep "^server\." $ZK_CONF)

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh 
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index a35ad77..1920bd6 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -17,6 +17,8 @@
 # limitations under the License.
 
################################################################################
 
+# Start/stop a Flink JobManager.
+USAGE="Usage: taskmanager.sh (start [batch|streaming])|stop|stop-all)"
 
 STARTSTOP=$1
 STREAMINGMODE=$2
@@ -26,78 +28,24 @@ bin=`cd "$bin"; pwd`
 
 . "$bin"/config.sh
 
-if [ "$FLINK_IDENT_STRING" = "" ]; then
-    FLINK_IDENT_STRING="$USER"
-fi
-
-FLINK_TM_CLASSPATH=`constructFlinkClassPath`
-
-log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-taskmanager-$HOSTNAME.log
-out=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-taskmanager-$HOSTNAME.out
-pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-taskmanager.pid
-log_setting=(-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
-
-JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version 
"\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
-
-# Only set JVM 8 arguments if we have correctly extracted the version
-if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
-    if [ "$JAVA_VERSION" -lt 18 ]; then
-        JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+if [[ $STARTSTOP == "start" ]]; then
+    # Use batch mode as default
+    if [ -z $STREAMINGMODE ]; then
+        echo "Missing streaming mode (batch|streaming). Using 'batch'."
+        STREAMINGMODE="batch"
     fi
-fi
-
-case $STARTSTOP in
-
-    (start)
-
-        # Use batch mode as default
-        if [ -z $STREAMINGMODE ]; then
-            echo "Did not specify [batch|streaming] mode. Falling back to 
batch mode as default."
-            STREAMINGMODE="batch"
-        fi
 
-        if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
-            echo "ERROR: Configured task manager heap size is not a number. 
Cancelling task manager startup."
-            exit 1
-        fi
-
-        if [ "$FLINK_TM_HEAP" -gt 0 ]; then
-            JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m"
-        fi
-
-        mkdir -p "$FLINK_PID_DIR"
-        if [ -f $pid ]; then
-            if kill -0 `cat $pid` > /dev/null 2>&1; then
-                echo Task manager running as process `cat $pid` on host 
$HOSTNAME.  Stop it first.
-                exit 1
-            fi
-        fi
-
-        # Rotate log files
-        rotateLogFile $log
-        rotateLogFile $out
-
-        echo Starting task manager on host $HOSTNAME
-        $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
org.apache.flink.runtime.taskmanager.TaskManager --configDir "$FLINK_CONF_DIR" 
--streamingMode "$STREAMINGMODE" > "$out" 2>&1 < /dev/null &
-        echo $! > $pid
-
-    ;;
+    if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]]; then
+        echo "[ERROR] Configured TaskManager JVM heap size is not a number. 
Please set '$KEY_TASKM_HEAP_MB' in $FLINK_CONF_FILE."
+        exit 1
+    fi
 
-    (stop)
-        if [ -f $pid ]; then
-            if kill -0 `cat $pid` > /dev/null 2>&1; then
-                echo Stopping task manager on host $HOSTNAME
-                kill `cat $pid`
-            else
-                echo No task manager to stop on host $HOSTNAME
-            fi
-        else
-            echo No task manager to stop on host $HOSTNAME
-        fi
-    ;;
+    if [ "$FLINK_TM_HEAP" -gt 0 ]; then
+        export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_TM_HEAP"m -Xmx"$FLINK_TM_HEAP"m"
+    fi
 
-    (*)
-        echo "Please specify 'start [batch|streaming]' or 'stop'"
-    ;;
+    # Startup parameters
+    args="--configDir ${FLINK_CONF_DIR} --streamingMode ${STREAMINGMODE}"
+fi
 
-esac
+${bin}/flink-daemon.sh $STARTSTOP taskmanager "${args}"

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/bin/zookeeper.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/zookeeper.sh 
b/flink-dist/src/main/flink-bin/bin/zookeeper.sh
new file mode 100755
index 0000000..0bbf6d5
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/zookeeper.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Start/stop a ZooKeeper quorum peer.
+USAGE="Usage: zookeeper.sh (start peer-id|stop|stop-all)"
+
+STARTSTOP=$1
+PEER_ID=$2
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+ZK_CONF=$FLINK_CONF_DIR/zoo.cfg
+if [ ! -f $ZK_CONF ]; then
+    echo "[ERROR] No ZooKeeper configuration file found in '$ZK_CONF'."
+    exit 1
+fi
+
+if [[ $STARTSTOP == "start" ]]; then
+    if [ -z $PEER_ID ]; then
+        echo "[ERROR] Missing peer id argument. $USAGE."
+        exit 1
+    fi
+
+    if [[ ! ${ZK_HEAP} =~ ${IS_NUMBER} ]]; then
+        echo "[ERROR] Configured ZooKeeper JVM heap size is not a number. 
Please set '$KEY_ZK_HEAP_MB' in $FLINK_CONF_FILE."
+        exit 1
+    fi
+
+    if [ "$ZK_HEAP" -gt 0 ]; then
+        export JVM_ARGS="$JVM_ARGS -Xms"$ZK_HEAP"m -Xmx"$ZK_HEAP"m"
+    fi
+
+    # Startup parameters
+    args="--zkConfigFile $ZK_CONF --peerId $PEER_ID"
+fi
+
+${bin}/flink-daemon.sh $STARTSTOP zookeeper "${args}"

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/conf/masters
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/masters 
b/flink-dist/src/main/flink-bin/conf/masters
new file mode 100644
index 0000000..2fbb50c
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/masters
@@ -0,0 +1 @@
+localhost

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-dist/src/main/flink-bin/conf/zoo.cfg
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/zoo.cfg 
b/flink-dist/src/main/flink-bin/conf/zoo.cfg
new file mode 100644
index 0000000..a14ec66
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/conf/zoo.cfg
@@ -0,0 +1,18 @@
+# The number of milliseconds of each tick
+tickTime=2000
+
+# The number of ticks that the initial  synchronization phase can take
+initLimit=10
+
+# The number of ticks that can pass between  sending a request and getting an 
acknowledgement
+syncLimit=5
+
+# The directory where the snapshot is stored.
+# dataDir=/tmp/zookeeper
+
+# The port at which the clients will connect
+clientPort=2181
+
+# ZooKeeper quorum peers
+server.1=localhost:2888:3888
+# server.2=host:peer-port:leader-port

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f92fb6e..62e85c1 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -34,9 +34,9 @@ under the License.
 
        <packaging>jar</packaging>
 
-    <properties>
-        <metrics.version>3.1.0</metrics.version>
-    </properties>
+       <properties>
+               <metrics.version>3.1.0</metrics.version>
+       </properties>
 
        <dependencies>
                <dependency>
@@ -168,23 +168,32 @@ under the License.
                        </exclusions>
                </dependency>
 
-        <dependency>
-            <groupId>io.dropwizard.metrics</groupId>
-            <artifactId>metrics-core</artifactId>
-            <version>${metrics.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>io.dropwizard.metrics</groupId>
-            <artifactId>metrics-jvm</artifactId>
-            <version>${metrics.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>io.dropwizard.metrics</groupId>
-            <artifactId>metrics-json</artifactId>
-            <version>${metrics.version}</version>
-        </dependency>
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-core</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-jvm</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>io.dropwizard.metrics</groupId>
+                       <artifactId>metrics-json</artifactId>
+                       <version>${metrics.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.zookeeper</groupId>
+                       <artifactId>zookeeper</artifactId>
+               </dependency>
 
+               <dependency>
+                       <groupId>org.apache.curator</groupId>
+                       <artifactId>curator-recipes</artifactId>
+               </dependency>
        </dependencies>
 
        <build>
@@ -196,7 +205,7 @@ under the License.
                                <version>3.1.4</version>
                                <executions>
                                        <!-- Run scala compiler in the 
process-resources phase, so that dependencies on
-                                           scala classes can be resolved later 
in the (Java) compile phase -->
+                                               scala classes can be resolved 
later in the (Java) compile phase -->
                                        <execution>
                                                <id>scala-compile-first</id>
                                                <phase>process-resources</phase>
@@ -206,7 +215,7 @@ under the License.
                                        </execution>
 
                                        <!-- Run scala compiler in the 
process-test-resources phase, so that dependencies on
-                                            scala classes can be resolved 
later in the (Java) test-compile phase -->
+                                                scala classes can be resolved 
later in the (Java) test-compile phase -->
                                        <execution>
                                                <id>scala-test-compile</id>
                                                
<phase>process-test-resources</phase>

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
index 988e3a7..edfa87b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerCliOptions.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.runtime.StreamingMode;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * The command line parameters passed to the TaskManager.
  */
@@ -31,6 +33,8 @@ public class JobManagerCliOptions {
        
        private StreamingMode streamingMode = StreamingMode.BATCH_ONLY;
 
+       private String host;
+
        // 
------------------------------------------------------------------------
 
        public String getConfigDir() {
@@ -74,4 +78,12 @@ public class JobManagerCliOptions {
                                        "Unknown streaming mode. Streaming mode 
must be one of 'BATCH' or 'STREAMING'.");
                }
        }
+
+       public String getHost() {
+               return host;
+       }
+
+       public void setHost(String host) {
+               this.host = checkNotNull(host);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
new file mode 100644
index 0000000..c3d9df4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtil.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ZooKeeper utilities.
+ */
+public class ZooKeeperUtil {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperUtil.class);
+
+       public static CuratorFramework createCuratorFramework(Configuration 
configuration) throws Exception {
+               String zkQuorum = 
ZooKeeperUtil.getZooKeeperEnsemble(configuration);
+
+               if (zkQuorum == null || zkQuorum.equals("")) {
+                       throw new RuntimeException("No valid ZooKeeper quorum 
has been specified.");
+               }
+
+               int sessionTimeout = configuration.getInteger(
+                               ConfigConstants.ZOOKEEPER_SESSION_TIMEOUT,
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
+
+               int connectionTimeout = configuration.getInteger(
+                               ConfigConstants.ZOOKEEPER_CONNECTION_TIMEOUT,
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT);
+
+               int retryWait = configuration.getInteger(
+                               ConfigConstants.ZOOKEEPER_RETRY_WAIT,
+                               ConfigConstants.DEFAULT_ZOOKEEPER_RETRY_WAIT);
+
+               int maxRetryAttempts = configuration.getInteger(
+                               ConfigConstants.ZOOKEEPER_MAX_RETRY_ATTEMPTS,
+                               
ConfigConstants.DEFAULT_ZOOKEEPER_MAX_RETRY_ATTEMPTS);
+
+               String root = 
configuration.getString(ConfigConstants.ZOOKEEPER_DIR_KEY,
+                               ConfigConstants.DEFAULT_ZOOKEEPER_ZNODE_ROOT);
+
+               LOG.info("Using '{}' as root namespace.", root);
+
+               CuratorFramework cf = CuratorFrameworkFactory.builder()
+                               .connectString(zkQuorum)
+                               .sessionTimeoutMs(sessionTimeout)
+                               .connectionTimeoutMs(connectionTimeout)
+                               .retryPolicy(new 
ExponentialBackoffRetry(retryWait, maxRetryAttempts))
+                               // Curator prepends a '/' manually and throws 
an Exception if the
+                               // namespace starts with a '/'.
+                               .namespace(root.startsWith("/") ? 
root.substring(1) : root)
+                               .build();
+
+               try {
+                       cf.start();
+               }
+               catch (Exception e) {
+                       throw new Exception("Could not start 
CuratorFramework.", e);
+               }
+
+               return cf;
+       }
+
+       /**
+        * Returns whether high availability is enabled (<=> ZooKeeper quorum 
configured).
+        */
+       public static boolean isJobManagerHighAvailabilityEnabled(Configuration 
flinkConf) {
+               return 
flinkConf.containsKey(ConfigConstants.ZOOKEEPER_QUORUM_KEY);
+       }
+
+       /**
+        * Returns the configured ZooKeeper quorum (and removes whitespace, 
because ZooKeeper does not
+        * tolerate it).
+        */
+       public static String getZooKeeperEnsemble(Configuration flinkConf)
+                       throws IllegalConfigurationException {
+
+               String zkQuorum = 
flinkConf.getString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, null);
+
+               if (zkQuorum == null || zkQuorum.equals("")) {
+                       throw new IllegalConfigurationException("No ZooKeeper 
quorum specified in config.");
+               }
+
+               // Remove all whitespace
+               zkQuorum = zkQuorum.replaceAll("\\s+", "");
+
+               return zkQuorum;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
new file mode 100644
index 0000000..d0e706e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/FlinkZooKeeperQuorumPeer.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Simple wrapper for ZooKeeper's {@link QuorumPeer}, which reads a ZooKeeper 
config file and writes
+ * the required 'myid' file before starting the peer.
+ */
+public class FlinkZooKeeperQuorumPeer {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FlinkZooKeeperQuorumPeer.class);
+
+       public static void main(String[] args) {
+               try {
+                       final ParameterTool params = 
ParameterTool.fromArgs(args);
+                       final String zkConfigFile = 
params.getRequired("zkConfigFile");
+                       final int peerId = params.getInt("peerId");
+
+                       // Run quorum peer
+                       runFlinkZkQuorumPeer(zkConfigFile, peerId);
+               }
+               catch (Throwable t) {
+                       t.printStackTrace();
+                       System.exit(-1);
+               }
+       }
+
+       /**
+        * Runs a ZooKeeper {@link QuorumPeer} if further peers are configured 
or a single
+        * {@link ZooKeeperServer} if no further peers are configured.
+        *
+        * @param zkConfigFile ZooKeeper config file 'zoo.cfg'
+        * @param peerId       ID for the 'myid' file
+        */
+       public static void runFlinkZkQuorumPeer(String zkConfigFile, int 
peerId) throws Exception {
+
+               Properties zkProps = new Properties();
+
+               InputStream inStream = new FileInputStream(new 
File(zkConfigFile));
+               zkProps.load(inStream);
+
+               LOG.info("Configuration: " + zkProps);
+
+               // Set defaults for required properties
+               setRequiredProperties(zkProps);
+
+               // Write peer id to myid file
+               writeMyIdToDataDir(zkProps, peerId);
+
+               // The myid file needs to be written before creating the 
instance. Otherwise, this
+               // will fail.
+               QuorumPeerConfig conf = new QuorumPeerConfig();
+               conf.parseProperties(zkProps);
+
+               if (conf.isDistributed()) {
+                       // Run quorum peer
+                       LOG.info("Running distributed ZooKeeper quorum peer 
(total peers: {}).",
+                                       conf.getServers().size());
+
+                       QuorumPeerMain qp = new QuorumPeerMain();
+                       qp.runFromConfig(conf);
+               }
+               else {
+                       // Run standalone
+                       LOG.info("Running standalone ZooKeeper quorum peer.");
+
+                       ZooKeeperServerMain zk = new ZooKeeperServerMain();
+                       ServerConfig sc = new ServerConfig();
+                       sc.readFrom(conf);
+                       zk.runFromConfig(sc);
+               }
+       }
+
+       /**
+        * Sets required properties to reasonable defaults and logs it.
+        */
+       private static void setRequiredProperties(Properties zkProps) {
+               // Set default client port
+               if (zkProps.getProperty("clientPort") == null) {
+                       int clientPort = 
ConfigConstants.DEFAULT_ZOOKEEPER_CLIENT_PORT;
+                       zkProps.setProperty("clientPort", 
String.valueOf(clientPort));
+
+                       LOG.warn("No 'clientPort' configured. Set to '{}'.", 
clientPort);
+               }
+
+               // Set default init limit
+               if (zkProps.getProperty("initLimit") == null) {
+                       int initLimit = 
ConfigConstants.DEFAULT_ZOOKEEPER_INIT_LIMIT;
+                       zkProps.setProperty("initLimit", 
String.valueOf(initLimit));
+
+                       LOG.warn("No 'initLimit' configured. Set to '{}'.", 
initLimit);
+               }
+
+               // Set default sync limit
+               if (zkProps.getProperty("syncLimit") == null) {
+                       int syncLimit = 
ConfigConstants.DEFAULT_ZOOKEEPER_SYNC_LIMIT;
+                       zkProps.setProperty("syncLimit", 
String.valueOf(syncLimit));
+
+                       LOG.warn("No 'syncLimit' configured. Set to '{}'.", 
syncLimit);
+               }
+
+               // Set default data dir
+               if (zkProps.getProperty("dataDir") == null) {
+                       String dataDir = String.format("%s/%s/zookeeper",
+                                       System.getProperty("java.io.tmpdir"), 
UUID.randomUUID().toString());
+
+                       zkProps.setProperty("dataDir", dataDir);
+
+                       LOG.warn("No 'dataDir' configured. Set to '{}'.", 
dataDir);
+               }
+
+               int peerPort = ConfigConstants.DEFAULT_ZOOKEEPER_PEER_PORT;
+               int leaderPort = ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PORT;
+
+               // Set peer and leader ports if none given, because ZooKeeper 
complains if multiple
+               // servers are configured, but no ports are given.
+               for (Map.Entry<Object, Object> entry : zkProps.entrySet()) {
+                       String key = (String) entry.getKey();
+
+                       if (entry.getKey().toString().startsWith("server.")) {
+                               String value = (String) entry.getValue();
+                               String[] parts = value.split(":");
+
+                               if (parts.length == 1) {
+                                       String address = 
String.format("%s:%d:%d", parts[0], peerPort, leaderPort);
+                                       zkProps.setProperty(key, address);
+                                       LOG.info("Set peer and leader port of 
'{}': '{}' => '{}'.",
+                                                       key, value, address);
+                               }
+                               else if (parts.length == 2) {
+                                       String address = 
String.format("%s:%d:%d",
+                                                       parts[0], 
Integer.valueOf(parts[1]), leaderPort);
+                                       zkProps.setProperty(key, address);
+                                       LOG.info("Set peer port of '{}': '{}' 
=> '{}'.", key, value, address);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Write 'myid' file to the 'dataDir' in the given ZooKeeper 
configuration.
+        *
+        * <blockquote>
+        * Every machine that is part of the ZooKeeper ensemble should know 
about every other machine in
+        * the ensemble. You accomplish this with the series of lines of the 
form
+        * server.id=host:port:port. The parameters host and port are 
straightforward. You attribute the
+        * server id to each machine by creating a file named myid, one for 
each server, which resides
+        * in that server's data directory, as specified by the configuration 
file parameter dataDir.
+        * </blockquote>
+        *
+        * @param zkProps ZooKeeper configuration.
+        * @param id      The ID of this {@link QuorumPeer}.
+        * @throws IllegalConfigurationException Thrown, if 'dataDir' property 
not set in given
+        *                                       ZooKeeper properties.
+        * @throws IOException                   Thrown, if 'dataDir' does not 
exist and cannot be
+        *                                       created.
+        * @see <a 
href="http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html";>
+        * ZooKeeper Administrator's Guide</a>
+        */
+
+       private static void writeMyIdToDataDir(Properties zkProps, int id) 
throws IOException {
+
+               // Check dataDir and create if necessary
+               if (zkProps.getProperty("dataDir") == null) {
+                       throw new IllegalConfigurationException("No dataDir 
configured.");
+               }
+
+               File dataDir = new File(zkProps.getProperty("dataDir"));
+
+               if (!dataDir.isDirectory() && !dataDir.mkdirs()) {
+                       throw new IOException("Cannot create dataDir '" + 
dataDir + "'.");
+               }
+
+               dataDir.deleteOnExit();
+
+               // Write myid to file
+               PrintWriter writer = null;
+               try {
+                       LOG.info("Writing {} to myid file in 'dataDir'.", id);
+
+                       writer = new PrintWriter(new File(dataDir, "myid"));
+                       writer.println(id);
+               }
+               finally {
+                       if (writer != null) {
+                               writer.close();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c917d4a..0d96edb 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -18,49 +18,49 @@
 
 package org.apache.flink.runtime.jobmanager
 
-import java.io.{IOException, File}
+import java.io.{File, IOException}
 import java.net.InetSocketAddress
 import java.util.Collections
 
-import akka.actor.Status.{Success, Failure}
+import akka.actor.Status.{Failure, Success}
+import akka.actor._
 import grizzled.slf4j.Logger
-import org.apache.flink.api.common.{JobID, ExecutionConfig}
-import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, 
Configuration}
+import org.apache.flink.api.common.{ExecutionConfig, JobID}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
+import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, 
ExecutionGraph}
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionJobVertex}
+import org.apache.flink.runtime.instance.InstanceManager
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
+import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
+import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
+import org.apache.flink.runtime.messages.JobManagerMessages._
+import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
+import org.apache.flink.runtime.messages.RegistrationMessages._
+import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, 
SendStackTrace}
 import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, 
UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators._
-import org.apache.flink.runtime.messages.checkpoint.{AcknowledgeCheckpoint, 
AbstractCheckpointMessage}
+import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint}
+import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
-import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, 
ActorLogMessages}
-import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
-import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
-import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
-import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
-import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, 
Heartbeat}
+import org.apache.flink.runtime.util.{EnvironmentInformation, SerializedValue, 
ZooKeeperUtil}
+import org.apache.flink.runtime.{ActorLogMessages, ActorSynchronousLogging, 
StreamingMode}
 import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
 
-import akka.actor._
-
+import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.language.postfixOps
-import scala.collection.JavaConverters._
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the 
tasks, gathering the
@@ -969,6 +969,13 @@ object JobManager {
       } text {
         "The streaming mode of the JobManager (STREAMING / BATCH)"
       }
+
+      opt[String]("host").optional().action { (arg, conf) =>
+        conf.setHost(arg)
+        conf
+      } text {
+        "Network address for communication with the job manager"
+      }
     }
 
     val config = parser.parse(args, new JobManagerCliOptions()).getOrElse {
@@ -993,9 +1000,34 @@ object JobManager {
       configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, 
configDir + "/..")
     }
 
-    val hostname = 
configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null)
-    val port = 
configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-      ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
+    // HA mode
+    val (hostname, port) = if 
(ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+      // TODO @removeme @tillrohrmann This is the place where the host and 
random port for JM is
+      // chosen.  For the FlinkMiniCluster you have to choose it on your own.
+      LOG.info("HA mode.")
+
+      if (config.getHost == null) {
+        throw new Exception("Missing parameter '--host'.")
+      }
+
+      // Let web server listen on random port
+      configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
+
+      (config.getHost, NetUtils.getAvailablePort)
+    }
+    else {
+      if (config.getHost != null) {
+        throw new IllegalStateException("Specified explicit address for 
JobManager communication " +
+          "via CLI, but no ZooKeeper quorum has been configured. The task 
managers will not be " +
+          "able to find the correct JobManager to connect to. Please configure 
ZooKeeper or " +
+          "don't set the address explicitly (this will fallback to the address 
configured in " +
+          "in 'conf/flink-conf.yaml'.")
+      }
+
+      (configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, 
null),
+        configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+          ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT))
+    }
 
     (configuration, config.getJobManagerMode(), config.getStreamingMode(), 
hostname, port)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1c60e89..4f95c1c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.net.NetUtils
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
-import org.apache.flink.runtime.util.{MathUtils, EnvironmentInformation}
+import org.apache.flink.runtime.util.{ZooKeeperUtil, MathUtils, 
EnvironmentInformation}
 
 import scala.concurrent._
 import scala.concurrent.duration._
@@ -1435,6 +1435,11 @@ object TaskManager {
     // start the I/O manager last, it will create some temp directories.
     val ioManager: IOManager = new 
IOManagerAsync(taskManagerConfig.tmpDirPaths)
 
+    if (ZooKeeperUtil.isJobManagerHighAvailabilityEnabled(configuration)) {
+      // TODO @removeme @tillrohrmann Setup leader retrieval service
+      LOG.info("HA mode.")
+    }
+
     // create the actor properties (which define the actor constructor 
parameters)
     val tmProps = Props(taskManagerClass,
       taskManagerConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
new file mode 100644
index 0000000..da40e15
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/ZooKeeperUtilTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ZooKeeperUtilTest {
+
+       @Test
+       public void testZooKeeperEnsembleConnectStringConfiguration() throws 
Exception {
+               // ZooKeeper does not like whitespace in the quorum connect 
String.
+               String actual, expected;
+               Configuration conf = new Configuration();
+
+               {
+                       expected = "localhost:2891";
+
+                       setQuorum(conf, expected);
+                       actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+                       assertEquals(expected, actual);
+
+                       setQuorum(conf, " localhost:2891 "); // with leading 
and trailing whitespace
+                       actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+                       assertEquals(expected, actual);
+
+                       setQuorum(conf, "localhost :2891"); // whitespace after 
port
+                       actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+                       assertEquals(expected, actual);
+               }
+
+               {
+                       expected = "localhost:2891,localhost:2891";
+
+                       setQuorum(conf, "localhost:2891,localhost:2891");
+                       actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+                       assertEquals(expected, actual);
+
+                       setQuorum(conf, "localhost:2891, localhost:2891");
+                       actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+                       assertEquals(expected, actual);
+
+                       setQuorum(conf, "localhost :2891, localhost:2891");
+                       actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+                       assertEquals(expected, actual);
+
+                       setQuorum(conf, " localhost:2891, localhost:2891 ");
+                       actual = ZooKeeperUtil.getZooKeeperEnsemble(conf);
+                       assertEquals(expected, actual);
+               }
+       }
+
+       private Configuration setQuorum(Configuration conf, String quorum) {
+               conf.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, quorum);
+               return conf;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8c72b50d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6336485..1f63ff3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,8 @@ under the License.
                <chill.version>0.5.2</chill.version>
                <asm.version>5.0.4</asm.version>
                <tez.version>0.6.1</tez.version>
+               <zookeeper.version>3.4.6</zookeeper.version>
+               <curatorrecipes.version>2.8.0</curatorrecipes.version>
        </properties>
 
        <dependencies>
@@ -322,6 +324,18 @@ under the License.
                                        </exclusion>
                                </exclusions>
                        </dependency>
+
+                       <dependency>
+                               <groupId>org.apache.zookeeper</groupId>
+                               <artifactId>zookeeper</artifactId>
+                               <version>${zookeeper.version}</version>
+                       </dependency>
+
+                       <dependency>
+                               <groupId>org.apache.curator</groupId>
+                               <artifactId>curator-recipes</artifactId>
+                               <version>${curatorrecipes.version}</version>
+                       </dependency>
                </dependencies>
        </dependencyManagement>
 
@@ -730,6 +744,8 @@ under the License.
                                                
<exclude>flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/**</exclude>
                                                <!-- Configuration Files. -->
                                                
<exclude>**/flink-bin/conf/slaves</exclude>
+                                               
<exclude>**/flink-bin/conf/masters</exclude>
+                                               
<exclude>**/flink-bin/conf/zoo.cfg</exclude>
                         
<exclude>flink-contrib/docker-flink/flink/conf/slaves</exclude>
                                                <!-- Administrative files in 
the main trunk. -->
                                                <exclude>**/README.md</exclude>

Reply via email to