Neha Aggarwal created FLINK-32718:
-------------------------------------
Summary: 'UNION ALL' with a 'GROUP BY' condition in Flink is
causing the checkpoint to become unbounded
Key: FLINK-32718
URL: https://issues.apache.org/jira/browse/FLINK-32718
Project: Flink
Issue Type: Bug
Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.3.1
Environment: Apache Flink Kubernetes Operator 1.3.1
Kubernetes 1.24.9
Reporter: Neha Aggarwal
Assignee: Jayme Howard
This appears to be a pretty straightforward issue, but I don't know the
codebase well enough to propose a fix. When a FlinkDeployment is present in a
namespace, and the namespace is deleted, the FlinkDeployment never reconciles
and fails to complete its finalizer. This leads to the namespace being blocked
from deletion indefinitely, requiring manual manipulation to remove the
finalizer on the FlinkDeployment.
Namespace conditions:
{code:java}
conditions:
- lastTransitionTime: '2023-04-18T22:17:48Z'
message: All resources successfully discovered
reason: ResourcesDiscovered
status: 'False'
type: NamespaceDeletionDiscoveryFailure
- lastTransitionTime: '2023-03-23T18:27:37Z'
message: All legacy kube types successfully parsed
reason: ParsedGroupVersions
status: 'False'
type: NamespaceDeletionGroupVersionParsingFailure
- lastTransitionTime: '2023-03-23T18:27:37Z'
message: All content successfully deleted, may be waiting on finalization
reason: ContentDeleted
status: 'False'
type: NamespaceDeletionContentFailure
- lastTransitionTime: '2023-03-23T18:27:37Z'
message: 'Some resources are remaining: flinkdeployments.flink.apache.org has
2
resource instances'
reason: SomeResourcesRemain
status: 'True'
type: NamespaceContentRemaining
- lastTransitionTime: '2023-03-23T18:27:37Z'
message: 'Some content in the namespace has finalizers remaining:
flinkdeployments.flink.apache.org/finalizer
in 2 resource instances'
reason: SomeFinalizersRemain
status: 'True'
type: NamespaceFinalizersRemaining
phase: Terminating {code}
FlinkDeployment example (some fields redacted):
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
creationTimestamp: '2023-03-23T18:27:02Z'
deletionGracePeriodSeconds: 0
deletionTimestamp: '2023-03-23T18:27:35Z'
finalizers:
- flinkdeployments.flink.apache.org/finalizer
generation: 3
name: <name-redacted>
namespace: <namespace-redacted>
resourceVersion: '10565277081'
uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
spec:
flinkConfiguration:
taskmanager.numberOfTaskSlots: '2'
flinkVersion: v1_16
image: <image-redacted>
job:
args: []
entryClass: <class-redacted>
jarURI: <uri-redacted>
parallelism: 2
state: running
upgradeMode: stateless
jobManager:
replicas: 1
resource:
cpu: 1
memory: 2048m
logConfiguration:
log4j-console.properties: '# This affects logging for both user code and
Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref =
ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender
# Uncomment this if you want to _only_ change Flink''s logging
#logger.flink.name = org.apache.flink #logger.flink.level = INFO #
The following lines keep the log level of common libraries/connectors on #
log level INFO. The root logger does not override this. You have to manually
# change the log levels here. logger.akka.name = akka
logger.akka.level = INFO logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO # Log all infos to the console
appender.console.name = ConsoleAppender appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n # Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender appender.rolling.type =
RollingFile appender.rolling.append = false appender.rolling.fileName
= ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
- %m%n appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB appender.rolling.strategy.type =
DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress
the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF '
mode: standalone
podTemplate:
apiVersion: v1
kind: Pod
metadata:
labels:
app.kubernetes.io/managed-by: tilt
spec:
containers:
- env:
- name: APPLICATION_CONFIG_FILE
value: /app-conf/feature-pipeline-config.yaml
name: flink-main-container
volumeMounts:
- mountPath: /app-conf
name: feature-pipeline-config-volume
volumes:
- configMap:
items:
- key: feature-pipeline-config.yaml
path: feature-pipeline-config.yaml
name: <name-redacted>
name: feature-pipeline-config-volume
serviceAccount: default
taskManager:
replicas: 2
resource:
cpu: 1
memory: 2048m
status:
clusterInfo: {}
jobManagerDeploymentStatus: DEPLOYED_NOT_READY
jobStatus:
jobId: 07243f53058c9083c06d2000352b04ec
savepointInfo:
lastPeriodicSavepointTimestamp: 0
savepointHistory: []
state: RECONCILING
reconciliationStatus:
lastReconciledSpec:
'{"spec":{"job":{"jarURI":"<jar-redacted>","parallelism":2,"entryClass":"<class-redacted>","args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"upgradeMode":"stateless","allowNonRestoredState":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"<image-redacted>","imagePullPolicy":null,"serviceAccount":"default","flinkVersion":"v1_16","ingress":{"template":"<template-redacted>","className":null,,"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"labels":{"app.kubernetes.io/managed-by":"tilt","spec":{"containers":[{"env":[{"name":"APPLICATION_CONFIG_FILE","value":"/app-conf/feature-pipeline-config.yaml"}],"name":"flink-main-container","volumeMounts":[{"mountPath":"/app-conf","name":"feature-pipeline-config-volume"}]}],"volumes":[{"configMap":{"items":[{"key":"feature-pipeline-config.yaml","path":"feature-pipeline-config.yaml"}],"name":"<name-redacted>"},"name":"feature-pipeline-config-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":2,"podTemplate":null},"logConfiguration":{"log4j-console.properties":"#
This affects logging for both user code and Flink\nrootLogger.level =
INFO\nrootLogger.appenderRef.console.ref
= ConsoleAppender\nrootLogger.appenderRef.rolling.ref =
RollingFileAppender\n#
Uncomment this if you want to _only_ change Flink''s
logging\n#logger.flink.name
= org.apache.flink\n#logger.flink.level = INFO\n# The following lines
keep the
log level of common libraries/connectors on\n# log level INFO. The root
logger
does not override this. You have to manually\n# change the log levels
here.\nlogger.akka.name
= akka\nlogger.akka.level = INFO\nlogger.kafka.name=
org.apache.kafka\nlogger.kafka.level
= INFO\nlogger.hadoop.name = org.apache.hadoop\nlogger.hadoop.level =
INFO\nlogger.zookeeper.name
= org.apache.zookeeper\nlogger.zookeeper.level = INFO\n# Log all infos to
the
console\nappender.console.name = ConsoleAppender\nappender.console.type =
CONSOLE\nappender.console.layout.type
= PatternLayout\nappender.console.layout.pattern = %d{yyyy-MM-dd
HH:mm:ss,SSS}
%-5p %-60c %x - %m%n\n# Log all infos in the given rolling
file\nappender.rolling.name
= RollingFileAppender\nappender.rolling.type =
RollingFile\nappender.rolling.append
= false\nappender.rolling.fileName =
${sys:log.file}\nappender.rolling.filePattern
= ${sys:log.file}.%i\nappender.rolling.layout.type =
PatternLayout\nappender.rolling.layout.pattern
= %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n\nappender.rolling.policies.type
= Policies\nappender.rolling.policies.size.type =
SizeBasedTriggeringPolicy\nappender.rolling.policies.size.size=100MB\nappender.rolling.strategy.type
= DefaultRolloverStrategy\nappender.rolling.strategy.max = 10\n# Suppress
the
irrelevant (wrong) warnings from the Netty channel
handler\nlogger.netty.name
= org.jboss.netty.channel.DefaultChannelPipeline\nlogger.netty.level =
OFF\n"},"mode":"standalone"},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
reconciliationTimestamp: 1679596022457
state: DEPLOYED
taskManager:
labelSelector: component=taskmanager,app=<label-redacted>
replicas: 2
{code}
My guess here would be that the best thing to do would be for the reconciler to
check if the namespace is in a Terminating state, and skip the finalizer step
that's blocking things. Based on the following log messages, I'm inclined to
believe the issue is that the operator is attempting to create an Event in the
namespace as a part of deleting the FlinkDeployment, and because that fails the
whole deletion aborts. Log messages redacted.
{noformat}
2023-04-19 16:22:58,642 o.a.f.k.o.c.FlinkDeploymentController [INFO
][NAMESPACE/DEPLOYMENT1] Cleaning up FlinkDeployment
2023-04-19 16:22:58,645 o.a.f.k.o.c.FlinkDeploymentController [INFO
][NAMESPACE/DEPLOYMENT2] Cleaning up FlinkDeployment
2023-04-19 16:22:58,749 i.j.o.p.e.ReconciliationDispatcher
[ERROR][NAMESPACE/DEPLOYMENT1] Error during event processing ExecutionScope{
resource id: ResourceID{name='DEPLOYMENT1', namespace='NAMESPACE'}, version:
10877039326} failed.
io.javaoperatorsdk.operator.OperatorException:
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST
at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events. Message:
Forbidden!Configured service account doesn't have access. Service account may
have been revoked. events "Operator.1727008025" is forbidden: unable to create
new content in namespace NAMESPACE because it is being terminated.
at
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:195)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleCleanup(ReconciliationDispatcher.java:277)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:79)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
at
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
executing: POST at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events.
Message: Forbidden!Configured service account doesn't have access. Service
account may have been revoked. events "Operator.1727008025" is forbidden:
unable to create new content in namespace NAMESPACE because it is being
terminated.
at
io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:517)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:535)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleCreate(OperationSupport.java:328)
at
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:675)
at
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:88)
at
io.fabric8.kubernetes.client.dsl.internal.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:42)
at
io.fabric8.kubernetes.client.utils.internal.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:50)
at
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.createOrReplace(BaseOperation.java:296)
at
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.createOrReplace(BaseOperation.java:88)
at
io.fabric8.kubernetes.client.extension.ResourceAdapter.createOrReplace(ResourceAdapter.java:121)
at
org.apache.flink.kubernetes.operator.utils.EventUtils.createOrUpdateEvent(EventUtils.java:107)
at
org.apache.flink.kubernetes.operator.utils.EventRecorder.triggerEvent(EventRecorder.java:59)
at
org.apache.flink.kubernetes.operator.utils.EventRecorder.triggerEvent(EventRecorder.java:50)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:90)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:55)
at
io.javaoperatorsdk.operator.processing.Controller$2.execute(Controller.java:182)
at
io.javaoperatorsdk.operator.processing.Controller$2.execute(Controller.java:145)
at
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
at
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:144)
... 7 more
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
executing: POST at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events.
Message: Forbidden!Configured service account doesn't have access. Service
account may have been revoked. events "Operator.1727008025" is forbidden:
unable to create new content in namespace NAMESPACE because it is being
terminated.
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:709)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:689)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:638)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:576)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$retryWithExponentialBackoff$2(OperationSupport.java:618)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at
io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$4.onResponse(OkHttpClientImpl.java:277)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
... 3 more
2023-04-19 16:22:58,750 i.j.o.p.e.EventProcessor [ERROR][NAMESPACE/DEPLOYMENT1]
Exhausted retries for ExecutionScope{ resource id:
ResourceID{name='DEPLOYMENT1', namespace='NAMESPACE'}, version: 10877039326}
2023-04-19 16:22:58,754 i.j.o.p.e.ReconciliationDispatcher
[ERROR][NAMESPACE/DEPLOYMENT2] Error during event processing ExecutionScope{
resource id: ResourceID{name='DEPLOYMENT2', namespace='NAMESPACE'}, version:
10877039321} failed.
io.javaoperatorsdk.operator.OperatorException:
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST
at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events. Message:
Forbidden!Configured service account doesn't have access. Service account may
have been revoked. events "Operator.1333719523" is forbidden: unable to create
new content in namespace NAMESPACE because it is being terminated.
at
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:195)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleCleanup(ReconciliationDispatcher.java:277)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:79)
at
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
at
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
executing: POST at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events.
Message: Forbidden!Configured service account doesn't have access. Service
account may have been revoked. events "Operator.1333719523" is forbidden:
unable to create new content in namespace NAMESPACE because it is being
terminated.
at
io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:517)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:535)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleCreate(OperationSupport.java:328)
at
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:675)
at
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:88)
at
io.fabric8.kubernetes.client.dsl.internal.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:42)
at
io.fabric8.kubernetes.client.utils.internal.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:50)
at
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.createOrReplace(BaseOperation.java:296)
at
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.createOrReplace(BaseOperation.java:88)
at
io.fabric8.kubernetes.client.extension.ResourceAdapter.createOrReplace(ResourceAdapter.java:121)
at
org.apache.flink.kubernetes.operator.utils.EventUtils.createOrUpdateEvent(EventUtils.java:107)
at
org.apache.flink.kubernetes.operator.utils.EventRecorder.triggerEvent(EventRecorder.java:59)
at
org.apache.flink.kubernetes.operator.utils.EventRecorder.triggerEvent(EventRecorder.java:50)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:90)
at
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:55)
at
io.javaoperatorsdk.operator.processing.Controller$2.execute(Controller.java:182)
at
io.javaoperatorsdk.operator.processing.Controller$2.execute(Controller.java:145)
at
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
at
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:144)
... 7 more
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
executing: POST at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events.
Message: Forbidden!Configured service account doesn't have access. Service
account may have been revoked. events "Operator.1333719523" is forbidden:
unable to create new content in namespace NAMESPACE because it is being
terminated.
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:709)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:689)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:638)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:576)
at
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$retryWithExponentialBackoff$2(OperationSupport.java:618)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at
io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$4.onResponse(OkHttpClientImpl.java:277)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
... 3 more
2023-04-19 16:22:58,754 i.j.o.p.e.EventProcessor [ERROR][NAMESPACE/DEPLOYMENT2]
Exhausted retries for ExecutionScope{ resource id:
ResourceID{name='DEPLOYMENT2', namespace='NAMESPACE'}, version:
10877039321}{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)