This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit f670c3f2ebc21ad8155b70a4c5940bec0011bc15 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Mon Dec 21 15:16:10 2020 +0800 [FLINK-20689] Upgrade Flink version to 1.11.3 --- docs/_config.yml | 2 +- pom.xml | 2 +- tools/docker/Dockerfile | 2 +- .../flink-distribution-template/bin/config.sh | 574 --------------------- .../flink-distribution-template/bin/taskmanager.sh | 103 ---- 5 files changed, 3 insertions(+), 680 deletions(-) diff --git a/docs/_config.yml b/docs/_config.yml index b1f7a37..0c1b6ef 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -32,7 +32,7 @@ version: "2.3-SNAPSHOT" # release this should be the same as the regular version version_title: "2.3-SNAPSHOT" # The Flink version supported by this version of Stateful Functions -flink_version: "1.11.1" +flink_version: "1.11.3" # Branch on Github for this version github_branch: "master" diff --git a/pom.xml b/pom.xml index 75fb8d4..3d826f8 100644 --- a/pom.xml +++ b/pom.xml @@ -76,7 +76,7 @@ under the License. <protobuf.version>3.7.1</protobuf.version> <unixsocket.version>2.3.2</unixsocket.version> <protoc-jar-maven-plugin.version>3.11.1</protoc-jar-maven-plugin.version> - <flink.version>1.11.1</flink.version> + <flink.version>1.11.3</flink.version> <scala.binary.version>2.12</scala.binary.version> <root.dir>${rootDir}</root.dir> </properties> diff --git a/tools/docker/Dockerfile b/tools/docker/Dockerfile index c5f8296..9b844c5 100644 --- a/tools/docker/Dockerfile +++ b/tools/docker/Dockerfile @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM flink:1.11.1-scala_2.12-java8 +FROM flink:1.11.3-scala_2.12-java8 ENV ROLE worker ENV MASTER_HOST localhost diff --git a/tools/docker/flink-distribution-template/bin/config.sh b/tools/docker/flink-distribution-template/bin/config.sh deleted file mode 100755 index 268f3fc..0000000 --- a/tools/docker/flink-distribution-template/bin/config.sh +++ /dev/null @@ -1,574 +0,0 @@ -#!/usr/bin/env bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Backported due to FLINK-18639 -# This file can be removed when upgrading -# to Flink 1.11.2 or 1.12 - -constructFlinkClassPath() { - local FLINK_DIST - local FLINK_CLASSPATH - - while read -d '' -r jarfile ; do - if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then - FLINK_DIST="$FLINK_DIST":"$jarfile" - elif [[ "$FLINK_CLASSPATH" == "" ]]; then - FLINK_CLASSPATH="$jarfile"; - else - FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" - fi - done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z) - - if [[ "$FLINK_DIST" == "" ]]; then - # write error message to stderr since stdout is stored as the classpath - (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") - - # exit function with empty classpath to force process failure - exit 1 - fi - - echo "$FLINK_CLASSPATH""$FLINK_DIST" -} - -findFlinkDistJar() { - local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`" - - if [[ "$FLINK_DIST" == "" ]]; then - # write error message to stderr since stdout is stored as the classpath - (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") - - # exit function with empty classpath to force process failure - exit 1 - fi - - echo "$FLINK_DIST" -} - -# These are used to mangle paths that are passed to java when using -# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere -# but the windows java version expects them in Windows Format, i.e. C:\bla\blub. -# "cygpath" can do the conversion. -manglePath() { - UNAME=$(uname -s) - if [ "${UNAME:0:6}" == "CYGWIN" ]; then - echo `cygpath -w "$1"` - else - echo $1 - fi -} - -manglePathList() { - UNAME=$(uname -s) - # a path list, for example a java classpath - if [ "${UNAME:0:6}" == "CYGWIN" ]; then - echo `cygpath -wp "$1"` - else - echo $1 - fi -} - -# Looks up a config value by key from a simple YAML-style key-value map. -# $1: key to look up -# $2: default value to return if key does not exist -# $3: config file to read from -readFromConfig() { - local key=$1 - local defaultValue=$2 - local configFile=$3 - - # first extract the value with the given key (1st sed), then trim the result (2nd sed) - # if a key exists multiple times, take the "last" one (tail) - local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1` - - [ -z "$value" ] && echo "$defaultValue" || echo "$value" -} - -######################################################################################################################## -# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml -# -or- the respective environment variables are not set. -######################################################################################################################## - - -# WARNING !!! , these values are only used if there is nothing else is specified in -# conf/flink-conf.yaml - -DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to -DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep -DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args -DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager) -DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager) -DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer) -DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client) -DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode -DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary -DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary -DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary - -######################################################################################################################## -# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml -######################################################################################################################## - -KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa" - -KEY_ENV_PID_DIR="env.pid.dir" -KEY_ENV_LOG_DIR="env.log.dir" -KEY_ENV_LOG_MAX="env.log.max" -KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir" -KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir" -KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir" -KEY_ENV_JAVA_HOME="env.java.home" -KEY_ENV_JAVA_OPTS="env.java.opts" -KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager" -KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager" -KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver" -KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client" -KEY_ENV_SSH_OPTS="env.ssh.opts" -KEY_HIGH_AVAILABILITY="high-availability" -KEY_ZK_HEAP_MB="zookeeper.heap.mb" - -######################################################################################################################## -# PATHS AND CONFIG -######################################################################################################################## - -target="$0" -# For the case, the executable has been directly symlinked, figure out -# the correct bin path by following its symlink up to an upper bound. -# Note: we can't use the readlink utility here if we want to be POSIX -# compatible. -iteration=0 -while [ -L "$target" ]; do - if [ "$iteration" -gt 100 ]; then - echo "Cannot resolve path: You have a cyclic symlink in $target." - break - fi - ls=`ls -ld -- "$target"` - target=`expr "$ls" : '.* -> \(.*\)$'` - iteration=$((iteration + 1)) -done - -# Convert relative path to absolute path and resolve directory symlinks -bin=`dirname "$target"` -SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P` - -# Define the main directory of the flink installation -# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here. -if [ -z "$_FLINK_HOME_DETERMINED" ]; then - FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"` -fi -FLINK_LIB_DIR=$FLINK_HOME/lib -FLINK_PLUGINS_DIR=$FLINK_HOME/plugins -FLINK_OPT_DIR=$FLINK_HOME/opt - - -# These need to be mangled because they are directly passed to java. -# The above lib path is used by the shell script to retrieve jars in a -# directory, so it needs to be unmangled. -FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"` -if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi -FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin -DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log -FLINK_CONF_FILE="flink-conf.yaml" -YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE} - -### Exported environment variables ### -export FLINK_CONF_DIR -export FLINK_BIN_DIR -export FLINK_PLUGINS_DIR -# export /lib dir to access it during deployment of the Yarn staging files -export FLINK_LIB_DIR -# export /opt dir to access it for the SQL client -export FLINK_OPT_DIR - -######################################################################################################################## -# ENVIRONMENT VARIABLES -######################################################################################################################## - -# read JAVA_HOME from config with no default value -MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}") -# check if config specified JAVA_HOME -if [ -z "${MY_JAVA_HOME}" ]; then - # config did not specify JAVA_HOME. Use system JAVA_HOME - MY_JAVA_HOME=${JAVA_HOME} -fi -# check if we have a valid JAVA_HOME and if java is not available -if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then - echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME." - exit 1 -else - JAVA_HOME=${MY_JAVA_HOME} -fi - -UNAME=$(uname -s) -if [ "${UNAME:0:6}" == "CYGWIN" ]; then - JAVA_RUN=java -else - if [[ -d $JAVA_HOME ]]; then - JAVA_RUN=$JAVA_HOME/bin/java - else - JAVA_RUN=java - fi -fi - -# Define HOSTNAME if it is not already set -if [ -z "${HOSTNAME}" ]; then - HOSTNAME=`hostname` -fi - -IS_NUMBER="^[0-9]+$" - -# Verify that NUMA tooling is available -command -v numactl >/dev/null 2>&1 -if [[ $? -ne 0 ]]; then - FLINK_TM_COMPUTE_NUMA="false" -else - # Define FLINK_TM_COMPUTE_NUMA if it is not already set - if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then - FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}") - fi -fi - -if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then - MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}") -fi - -if [ -z "${FLINK_LOG_DIR}" ]; then - FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}") -fi - -if [ -z "${YARN_CONF_DIR}" ]; then - YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}") -fi - -if [ -z "${HADOOP_CONF_DIR}" ]; then - HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}") -fi - -if [ -z "${HBASE_CONF_DIR}" ]; then - HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}") -fi - -if [ -z "${FLINK_PID_DIR}" ]; then - FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}") -fi - -if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then - FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}") - - # Remove leading and ending double quotes (if present) of value - FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )" -fi - -if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then - FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}") - # Remove leading and ending double quotes (if present) of value - FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )" -fi - -if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then - FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}") - # Remove leading and ending double quotes (if present) of value - FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )" -fi - -if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then - FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}") - # Remove leading and ending double quotes (if present) of value - FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )" -fi - -if [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then - FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}") - # Remove leading and ending double quotes (if present) of value - FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//' -e 's/"$//' )" -fi - -if [ -z "${FLINK_SSH_OPTS}" ]; then - FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}") -fi - -# Define ZK_HEAP if it is not already set -if [ -z "${ZK_HEAP}" ]; then - ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}") -fi - -# High availability -if [ -z "${HIGH_AVAILABILITY}" ]; then - HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}") - if [ -z "${HIGH_AVAILABILITY}" ]; then - # Try deprecated value - DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}") - if [ -z "${DEPRECATED_HA}" ]; then - HIGH_AVAILABILITY="none" - elif [ ${DEPRECATED_HA} == "standalone" ]; then - # Standalone is now 'none' - HIGH_AVAILABILITY="none" - else - HIGH_AVAILABILITY=${DEPRECATED_HA} - fi - fi -fi - -# Arguments for the JVM. Used for job and task manager JVMs. -# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys -# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that! -if [ -z "${JVM_ARGS}" ]; then - JVM_ARGS="" -fi - -# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty. -if [ -z "$HADOOP_CONF_DIR" ]; then - if [ -n "$HADOOP_HOME" ]; then - # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path - if [ -d "$HADOOP_HOME/conf" ]; then - # It's Hadoop 1.x - HADOOP_CONF_DIR="$HADOOP_HOME/conf" - fi - if [ -d "$HADOOP_HOME/etc/hadoop" ]; then - # It's Hadoop 2.2+ - HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop" - fi - fi -fi - -# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available) -if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then - if [ -d "/etc/hadoop/conf" ]; then - echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set." - HADOOP_CONF_DIR="/etc/hadoop/conf" - fi -fi - -# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty. -if [ -z "$HBASE_CONF_DIR" ]; then - if [ -n "$HBASE_HOME" ]; then - # HBASE_HOME is set. - if [ -d "$HBASE_HOME/conf" ]; then - HBASE_CONF_DIR="$HBASE_HOME/conf" - fi - fi -fi - -# try and set HBASE_CONF_DIR to some common default if it's not set -if [ -z "$HBASE_CONF_DIR" ]; then - if [ -d "/etc/hbase/conf" ]; then - echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set." - HBASE_CONF_DIR="/etc/hbase/conf" - fi -fi - -INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}" - -if [ -n "${HBASE_CONF_DIR}" ]; then - INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}" -fi - -# Auxilliary function which extracts the name of host from a line which -# also potentially includes topology information and the taskManager type -extractHostName() { - # handle comments: extract first part of string (before first # character) - WORKER=`echo $1 | cut -d'#' -f 1` - - # Extract the hostname from the network hierarchy - if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then - WORKER=${BASH_REMATCH[1]} - fi - - echo $WORKER -} - -# Auxilliary functions for log file rotation -rotateLogFilesWithPrefix() { - dir=$1 - prefix=$2 - while read -r log ; do - rotateLogFile "$log" - # find distinct set of log file names, ignoring the rotation number (trailing dot and digit) - done < <(find "$dir" ! -type d -path "${prefix}*" | sed s/\.[0-9][0-9]*$// | sort | uniq) -} - -rotateLogFile() { - log=$1; - num=$MAX_LOG_FILE_NUMBER - if [ -f "$log" -a "$num" -gt 0 ]; then - while [ $num -gt 1 ]; do - prev=`expr $num - 1` - [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num" - num=$prev - done - mv "$log" "$log.$num"; - fi -} - -readMasters() { - MASTERS_FILE="${FLINK_CONF_DIR}/masters" - - if [[ ! -f "${MASTERS_FILE}" ]]; then - echo "No masters file. Please specify masters in 'conf/masters'." - exit 1 - fi - - MASTERS=() - WEBUIPORTS=() - - MASTERS_ALL_LOCALHOST=true - GOON=true - while $GOON; do - read line || GOON=false - HOSTWEBUIPORT=$( extractHostName $line) - - if [ -n "$HOSTWEBUIPORT" ]; then - HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:) - WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:) - MASTERS+=(${HOST}) - - if [ -z "$WEBUIPORT" ]; then - WEBUIPORTS+=(0) - else - WEBUIPORTS+=(${WEBUIPORT}) - fi - - if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then - MASTERS_ALL_LOCALHOST=false - fi - fi - done < "$MASTERS_FILE" -} - -readWorkers() { - WORKERS_FILE="${FLINK_CONF_DIR}/workers" - - if [[ ! -f "$WORKERS_FILE" ]]; then - echo "No workers file. Please specify workers in 'conf/workers'." - exit 1 - fi - - WORKERS=() - - WORKERS_ALL_LOCALHOST=true - GOON=true - while $GOON; do - read line || GOON=false - HOST=$( extractHostName $line) - if [ -n "$HOST" ] ; then - WORKERS+=(${HOST}) - if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then - WORKERS_ALL_LOCALHOST=false - fi - fi - done < "$WORKERS_FILE" -} - -# starts or stops TMs on all workers -# TMWorkers start|stop -TMWorkers() { - CMD=$1 - - readWorkers - - if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then - # all-local setup - for worker in ${WORKERS[@]}; do - "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" - done - else - # non-local setup - # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available - command -v pdsh >/dev/null 2>&1 - if [[ $? -ne 0 ]]; then - for worker in ${WORKERS[@]}; do - ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &" - done - else - PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \ - "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\"" - fi - fi -} - -runBashJavaUtilsCmd() { - local cmd=$1 - local conf_dir=$2 - local class_path=$3 - local dynamic_args=${@:4} - class_path=`manglePathList "${class_path}"` - - local output=`${JAVA_RUN} -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000` - if [[ $? -ne 0 ]]; then - echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2 - # Print the output in case the user redirect the log to console. - echo "$output" 1>&2 - exit 1 - fi - - echo "$output" -} - -extractExecutionResults() { - local output="$1" - local expected_lines="$2" - local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" - local execution_results - local num_lines - - execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX}) - num_lines=$(echo "${execution_results}" | wc -l) - # explicit check for empty result, becuase if execution_results is empty, then wc returns 1 - if [[ -z ${execution_results} ]]; then - echo "[ERROR] The execution result is empty." 1>&2 - exit 1 - fi - if [[ ${num_lines} -ne ${expected_lines} ]]; then - echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2 - echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2 - echo "$output" 1>&2 - exit 1 - fi - - echo "${execution_results//${EXECUTION_PREFIX}/}" -} - -extractLoggingOutputs() { - local output="$1" - local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:" - - echo "${output}" | grep -v ${EXECUTION_PREFIX} -} - -parseJmJvmArgsAndExportLogs() { - java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@") - logging_output=$(extractLoggingOutputs "${java_utils_output}") - jvm_params=$(extractExecutionResults "${java_utils_output}" 1) - - if [[ $? -ne 0 ]]; then - echo "[ERROR] Could not get JVM parameters and dynamic configurations properly." - echo "[ERROR] Raw output from BashJavaUtils:" - echo "$java_utils_output" - exit 1 - fi - - export JVM_ARGS="${JVM_ARGS} ${jvm_params}" - - export FLINK_INHERITED_LOGS=" -$FLINK_INHERITED_LOGS - -JM_RESOURCE_PARAMS extraction logs: -jvm_params: $jvm_params -logs: $logging_output -" -} diff --git a/tools/docker/flink-distribution-template/bin/taskmanager.sh b/tools/docker/flink-distribution-template/bin/taskmanager.sh deleted file mode 100755 index dcfca5c..0000000 --- a/tools/docker/flink-distribution-template/bin/taskmanager.sh +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Backported due to FLINK-18639 -# This file can be removed when upgrading -# to Flink 1.11.2 or 1.12 - -# Start/stop a Flink TaskManager. -USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)" - -STARTSTOP=$1 - -ARGS=("${@:2}") - -if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then - echo $USAGE - exit 1 -fi - -bin=`dirname "$0"` -bin=`cd "$bin"; pwd` - -. "$bin"/config.sh - -ENTRYPOINT=taskexecutor - -if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then - - # if no other JVM options are set, set the GC to G1 - if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then - export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC" - fi - - # Add TaskManager-specific JVM options - export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}" - - # Startup parameters - - java_utils_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}") - - logging_output=$(extractLoggingOutputs "${java_utils_output}") - params_output=$(extractExecutionResults "${java_utils_output}" 2) - - if [[ $? -ne 0 ]]; then - echo "[ERROR] Could not get JVM parameters and dynamic configurations properly." - echo "[ERROR] Raw output from BashJavaUtils:" - echo "$java_utils_output" - exit 1 - fi - - jvm_params=$(echo "${params_output}" | head -n 1) - export JVM_ARGS="${JVM_ARGS} ${jvm_params}" - - IFS=$" " dynamic_configs=$(echo "${params_output}" | tail -n 1) - ARGS+=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]}) - - export FLINK_INHERITED_LOGS=" -$FLINK_INHERITED_LOGS - -TM_RESOURCE_PARAMS extraction logs: -jvm_params: $jvm_params -dynamic_configs: $dynamic_configs -logs: $logging_output -" -fi - -if [[ $STARTSTOP == "start-foreground" ]]; then - exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}" -else - if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then - # Start a single TaskManager - "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" - else - # Example output from `numactl --show` on an AWS c4.8xlarge: - # policy: default - # preferred node: current - # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 - # cpubind: 0 1 - # nodebind: 0 1 - # membind: 0 1 - read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") - for NODE_ID in "${NODE_LIST[@]:1}"; do - # Start a TaskManager for each NUMA node - numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}" - done - fi -fi
