juliito19 opened a new pull request #17020:
URL: https://github.com/apache/flink/pull/17020
I have a problem when I try to run Flink in k8s with the follow manifests
### JobManager
```
apiVersion: v1
kind: Service
metadata:
name: jobmanager-cs
spec:
type: NodePort
ports:
- name: ui
port: 8081
selector:
app: flink
component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
name: jobmanager-hs
spec:
type: ClusterIP
ports:
- port: 6123
name: rpc
- port: 6124
name: blob-server
- port: 6125
name: query
selector:
app: flink
component: jobmanager
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
selector:
matchLabels:
app: flink
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
restartPolicy: Always
containers:
- name: jobmanager
image: flink:1.13.1-scala_2.12
command: [bash,"-ec",bin/jobmanager.sh start-foreground cluster]
resources:
limits:
memory: "2024Mi"
cpu: "500m"
env:
- name: JOB_MANAGER_ID
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
# The following args overwrite the value of jobmanager.rpc.address
configured in the configuration config map to POD_IP.
args: ["standalone-job", "--host", "$POD_IP", "--job-classname",
"org.apache.flink.application.Main"] #, <optional arguments>, <job arguments>]
optional arguments: ["--job-id", "<job id>", "--fromSavepoint",
"/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 6125
name: query
- containerPort: 8081
name: webui
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /config/flink
```
### Task Manager
```
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.13.1-scala_2.12
env:
- name: K8S_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
command: ["/bin/sh", "-ec", "sleep 1000"]
resources:
limits:
memory: "800Mi"
cpu: "2000m"
args:
["taskmanager","start-foreground","-Dtaskmanager.host=$K8S_POD_IP"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: job-artifacts-volume
mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: job-artifacts-volume
hostPath:
path: /config/flink
```
### ConfigMap
```
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: jobmanager-hs
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
taskmanager.heap.size: 1024m
jobmanager.heap.size: 1024m
state.backend: filesystem
s3.access-key: k8sdemo
s3.secret-key: k8sdemo123
state.checkpoints.dir: /opt/flink/usrlib/checkpoints
state.savepoints.dir: /opt/flink/usrlib/savepoints
metrics.reporters: prom
metrics.reporter.prom.class:
org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.promport: 9249
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 1728m
parallelism.default: 1
rest.address: 0.0.0.0
rest.bind-address: 0.0.0.0
jobmanager.execution.failover-strategy: region
log4j-console.properties: |+
# This affects logging for both user code and Flink
rootLogger.level = DEBUG
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = DEBUG
# The following lines keep the log level of common libraries/connectors
on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = DEBUG
logger.kafka.name= org.apache.kafka
logger.kafka.level = DEBUG
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = DEBUG
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = DEBUG
# Log all infos to the console
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c
%x - %m%n
# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c
%x - %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
```
and when i try to run the Task Manager with the follow command
> bin/taskmanager start-foreground -Dtaskmanager.host=$K8S_POD_IP
I have the following exception
JobManager :
> 2021-08-27 09:16:57,917 ERROR akka.remote.EndpointWriter
[] - dropping message [class akka.actor.ActorSelectionMessage]
for non-local recipient [Actor[akka.tcp://flink@jobmanager-hs:6123/]] arriving
at [akka.tcp://flink@jobmanager-hs:6123] inbound addresses are
[akka.tcp://flink@cluster:6123]
2021-08-27 09:17:01,255 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Trigger
heartbeat request.
2021-08-27 09:17:01,284 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Trigger
heartbeat request.
2021-08-27 09:17:10,008 DEBUG akka.remote.transport.netty.NettyTransport
[] - Remote connection to [/172.17.0.1:34827] was disconnected
because of [id: 0x13ae1d03, /172.17.0.1:34827 :> /172.17.0.23:6123] DISCONNECTED
2021-08-27 09:17:10,008 DEBUG akka.remote.transport.ProtocolStateActor
[] - Association between local [tcp://flink@cluster:6123] and
remote [tcp://[email protected]:34827] was disassociated because the
ProtocolStateActor failed: Unknown
2021-08-27 09:17:10,009 WARN akka.remote.ReliableDeliverySupervisor
[] - Association with remote system
[akka.tcp://[email protected]:6122] has failed, address is now gated for [50]
ms. Reason: [Disassociated]
TaskManager:
> INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] -
Could not resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in
10000 ms: Could not connect to rpc endpoint under address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] -
Could not resolve ResourceManager address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in
10000 ms: Could not connect to rpc endpoint under address
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]