[
https://issues.apache.org/jira/browse/FLINK-24031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chesnay Schepler updated FLINK-24031:
-------------------------------------
Language: (was: English english)
> I am trying to deploy Flink in kubernetes but when I launch the taskManager
> in other container I get a Exception
> ----------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-24031
> URL: https://issues.apache.org/jira/browse/FLINK-24031
> Project: Flink
> Issue Type: Bug
> Components: Deployment / Kubernetes
> Affects Versions: 1.13.0, 1.13.2
> Reporter: Julio Pérez
> Priority: Major
> Fix For: 1.13.1
>
>
> I explain here -> https://github.com/apache/flink/pull/17020
> I have a problem when I try to run Flink in k8s with the follow manifests
> h3. JobManager
> apiVersion: v1
> kind: Service
> metadata:
> name: jobmanager-cs
> spec:
> type: NodePort
> ports:
> - name: ui
> port: 8081
> selector:
> app: flink
> component: jobmanager
> —
> apiVersion: v1
> kind: Service
> metadata:
> name: jobmanager-hs
> spec:
> type: ClusterIP
> ports:
> - port: 6123
> name: rpc
> - port: 6124
> name: blob-server
> - port: 6125
> name: query
> selector:
> app: flink
> component: jobmanager
> —
> apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-jobmanager
> spec:
> selector:
> matchLabels:
> app: flink
> template:
> metadata:
> labels:
> app: flink
> component: jobmanager
> spec:
> restartPolicy: Always
> containers:
> - name: jobmanager
> image: flink:1.13.1-scala_2.12
> command: [bash,"-ec",bin/jobmanager.sh start-foreground cluster]
> resources:
> limits:
> memory: "2024Mi"
> cpu: "500m"
> env:
> - name: JOB_MANAGER_ID
> valueFrom:
> fieldRef:
> apiVersion: v1
> fieldPath: status.podIP
> - name: POD_IP
> valueFrom:
> fieldRef:
> apiVersion: v1
> fieldPath: status.podIP
> # The following args overwrite the value of jobmanager.rpc.address
> configured in the configuration config map to POD_IP.
> args: ["standalone-job", "--host", "$POD_IP", "--job-classname",
> "org.apache.flink.application.Main"] #, <optional arguments>, <job
> arguments>] optional arguments: ["--job-id", "<job id>", "--fromSavepoint",
> "/path/to/savepoint", "--allowNonRestoredState"]
> ports:
> - containerPort: 6123
> name: rpc
> - containerPort: 6124
> name: blob-server
> - containerPort: 6125
> name: query
> - containerPort: 8081
> name: webui
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf
> - name: job-artifacts-volume
> mountPath: /opt/flink/usrlib
> securityContext:
> runAsUser: 9999
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
> - name: job-artifacts-volume
> hostPath:
> path: /config/flink}}
> h3. Task Manager
> {{apiVersion: apps/v1
> kind: Deployment
> metadata:
> name: flink-taskmanager
> spec:
> replicas: 2
> selector:
> matchLabels:
> app: flink
> component: taskmanager
> template:
> metadata:
> labels:
> app: flink
> component: taskmanager
> spec:
> containers:
> - name: taskmanager
> image: flink:1.13.1-scala_2.12
> env:
> - name: K8S_POD_IP
> valueFrom:
> fieldRef:
> fieldPath: status.podIP
> command: ["/bin/sh", "-ec", "sleep 1000"]
> resources:
> limits:
> memory: "800Mi"
> cpu: "2000m"
> args: ["taskmanager","start-foreground","-Dtaskmanager.host=$K8S_POD_IP"]
> ports:
> - containerPort: 6122
> name: rpc
> - containerPort: 6125
> name: query-state
> volumeMounts:
> - name: flink-config-volume
> mountPath: /opt/flink/conf/
> - name: job-artifacts-volume
> mountPath: /opt/flink/usrlib
> securityContext:
> runAsUser: 9999
> volumes:
> - name: flink-config-volume
> configMap:
> name: flink-config
> items:
> - key: flink-conf.yaml
> path: flink-conf.yaml
> - key: log4j-console.properties
> path: log4j-console.properties
> - name: job-artifacts-volume
> hostPath:
> path: /config/flink}}
> h3. ConfigMap
> {{apiVersion: v1
> kind: ConfigMap
> metadata:
> name: flink-config
> labels:
> app: flink
> data:
> flink-conf.yaml: |+
> jobmanager.rpc.address: jobmanager-hs
> taskmanager.numberOfTaskSlots: 1
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> taskmanager.heap.size: 1024m
> jobmanager.heap.size: 1024m
> state.backend: filesystem
> s3.access-key: k8sdemo
> s3.secret-key: k8sdemo123
> state.checkpoints.dir: /opt/flink/usrlib/checkpoints
> state.savepoints.dir: /opt/flink/usrlib/savepoints
> metrics.reporters: prom
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.promport: 9249
> queryable-state.proxy.ports: 6125
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size: 1728m
> parallelism.default: 1
> rest.address: 0.0.0.0
> rest.bind-address: 0.0.0.0
> jobmanager.execution.failover-strategy: region
> log4j-console.properties: |+
> # This affects logging for both user code and Flink
> rootLogger.level = DEBUG
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
> # Uncomment this if you want to _only_ change Flink's logging
> #logger.flink.name = org.apache.flink
> #logger.flink.level = DEBUG
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> logger.akka.name = akka
> logger.akka.level = DEBUG
> logger.kafka.name= org.apache.kafka
> logger.kafka.level = DEBUG
> logger.hadoop.name = org.apache.hadoop
> logger.hadoop.level = DEBUG
> logger.zookeeper.name = org.apache.zookeeper
> logger.zookeeper.level = DEBUG
> # Log all infos to the console
> appender.console.name = ConsoleAppender
> appender.console.type = CONSOLE
> appender.console.layout.type = PatternLayout
> appender.console.layout.pattern = %d\{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
> - %m%n
> # Log all infos in the given rolling file
> appender.rolling.name = RollingFileAppender
> appender.rolling.type = RollingFile
> appender.rolling.append = false
> appender.rolling.fileName = ${sys:log.file}
> appender.rolling.filePattern = ${sys:log.file}.%i
> appender.rolling.layout.type = PatternLayout
> appender.rolling.layout.pattern = %d\{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
> - %m%n
> appender.rolling.policies.type = Policies
> appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
> appender.rolling.policies.size.size=100MB
> appender.rolling.strategy.type = DefaultRolloverStrategy
> appender.rolling.strategy.max = 10
> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
> logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
> logger.netty.level = OFF }}
> and when i try to run the Task Manager with the follow command
> {quote}bin/taskmanager start-foreground -Dtaskmanager.host=$K8S_POD_IP
> {quote}
> I have the following exception
> JobManager :
> {quote}2021-08-27 09:16:57,917 ERROR akka.remote.EndpointWriter [] - dropping
> message [class akka.actor.ActorSelectionMessage] for non-local recipient
> [Actor[akka.tcp://flink@jobmanager-hs:6123/]] arriving at
> [akka.tcp://flink@jobmanager-hs:6123] inbound addresses are
> [akka.tcp://flink@cluster:6123]
> 2021-08-27 09:17:01,255 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Trigger heartbeat request.
> 2021-08-27 09:17:01,284 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Trigger heartbeat request.
> 2021-08-27 09:17:10,008 DEBUG akka.remote.transport.netty.NettyTransport []
> - Remote connection to [/172.17.0.1:34827] was disconnected because of [id:
> 0x13ae1d03, /172.17.0.1:34827 :> /172.17.0.23:6123] DISCONNECTED
> 2021-08-27 09:17:10,008 DEBUG akka.remote.transport.ProtocolStateActor [] -
> Association between local [tcp://flink@cluster:6123] and remote
> [tcp://[email protected]:34827] was disassociated because the
> ProtocolStateActor failed: Unknown
> 2021-08-27 09:17:10,009 WARN akka.remote.ReliableDeliverySupervisor [] -
> Association with remote system [akka.tcp://[email protected]:6122] has
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> {quote}
> TaskManager:
> {quote}INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not
> resolve ResourceManager address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager__, retrying
> in 10000 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager__.
> INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not
> resolve ResourceManager address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager__, retrying
> in 10000 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager__.
> {quote}
> Best regards,
> Julio
--
This message was sent by Atlassian Jira
(v8.3.4#803005)