fangmin.yang created FLINK-36574:
------------------------------------
Summary: The file STDOUT is not available on the TaskExecutor
Key: FLINK-36574
URL: https://issues.apache.org/jira/browse/FLINK-36574
Project: Flink
Issue Type: Bug
Reporter: fangmin.yang
I started flink with the following docker-entrypoint.sh from the official flink
repository
{code:java}
#!/usr/bin/env
bash###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the
container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE_DIR="${FLINK_HOME}/conf"drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}copy_plugins_if_required() {
if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
return 0
fi echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar} mkdir -p
"${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1
else
ln -fs "${FLINK_HOME}/opt/${target_plugin}"
"${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}set_config_options() {
local config_parser_script="$FLINK_HOME/bin/config-parser-utils.sh"
local config_dir="$FLINK_HOME/conf"
local bin_dir="$FLINK_HOME/bin"
local lib_dir="$FLINK_HOME/lib" local config_params=() while [ $# -gt
0 ]; do
local key="$1"
local value="$2" config_params+=("-D${key}=${value}")
shift 2
done if [ "${#config_params[@]}" -gt 0 ]; then
"${config_parser_script}" "${config_dir}" "${bin_dir}" "${lib_dir}"
"${config_params[@]}"
fi
}prepare_configuration() {
local config_options=() config_options+=("jobmanager.rpc.address"
"${JOB_MANAGER_RPC_ADDRESS}")
config_options+=("blob.server.port" "6124")
config_options+=("query.server.port" "6125") if [ -n
"${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
config_options+=("taskmanager.numberOfTaskSlots"
"${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}")
fi if [ ${#config_options[@]} -ne 0 ]; then
set_config_options "${config_options[@]}"
fi if [ -n "${FLINK_PROPERTIES}" ]; then
process_flink_properties "${FLINK_PROPERTIES}"
fi
}process_flink_properties() {
local flink_properties_content=$1
local config_options=() local OLD_IFS="$IFS"
IFS=$'\n'
for prop in $flink_properties_content; do
prop=$(echo $prop | tr -d '[:space:]') if [ -z "$prop" ]; then
continue
fi IFS=':' read -r key value <<< "$prop" value=$(echo
$value | envsubst) config_options+=("$key" "$value")
done
IFS="$OLD_IFS" if [ ${#config_options[@]} -ne 0 ]; then
set_config_options "${config_options[@]}"
fi
}maybe_enable_jemalloc() {
if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
if [ -f "$JEMALLOC_PATH" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
elif [ -f "$JEMALLOC_FALLBACK" ]; then
export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
else
if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
MSG_PATH=$JEMALLOC_PATH
else
MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
fi
echo "WARNING: attempted to load jemalloc from $MSG_PATH but the
library couldn't be found. glibc will be used instead."
fi
fi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; then
printf "Usage: $(basename "$0")
(jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
printf " Or $(basename "$0") help\n\n"
printf "By default, Flink image adopts jemalloc as default memory
allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC'
environment variable to 'true'.\n"
exit 0
elif [ "$1" = "jobmanager" ]; then
args=("${args[@]:1}") echo "Starting Job Manager" exec
$(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
args=("${args[@]:1}") echo "Starting Job Manager" exec
$(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground
"${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
args=("${args[@]:1}") echo "Starting History Server" exec
$(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground
"${args[@]}"
elif [ "$1" = "taskmanager" ]; then
args=("${args[@]:1}") echo "Starting Task Manager" exec
$(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}" {code}
Here are the startup commands
{code:java}
docker run --rm --name=jobmanager --network flink-network
--publish 8081:8081 --env FLINK_PROPERTIES="jobmanager.rpc.address:
jobmanager" flink:v1 jobmanager {code}
{code:java}
docker run --rm --name=taskmanager --network flink-network
--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" flink:v1
taskmanager
{code}
When I started successfully, But when I run a word count program. the task
manager STDOUT the following exceptions:
The file STDOUT does not exist on the TaskExecutor.
If you are using kubernetes mode, please use "kubectl logs <pod-name>" to get
stdout content.
!https://i.postimg.cc/Qtvr6fm1/20241020205050.png!
https://i.postimg.cc/Qtvr6fm1/20241020205050.png
--
This message was sent by Atlassian Jira
(v8.20.10#820010)