[ 
https://issues.apache.org/jira/browse/FLINK-32718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Aggarwal updated FLINK-32718:
----------------------------------
    Description: 
I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
condition in Flink is causing the checkpoint to become unbounded. I've attached 
two Flink SQL queries where the only difference is the addition of one more 
'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to the query, 
the checkpoint size keeps increasing, which is not expected. We anticipated it 
would follow the same pattern of increasing and decreasing based on the 
throughput, as it does without the 'UNION ALL'.
When I analyze the breakdown of the checkpoint size, I find that the 'Interval 
Join' operator has the largest size. I suspect there might be a bug causing 
this difference in checkpoint behavior due to the addition of the 'GROUP BY' in 
the query.
We are using Flink version 1.16.1 with RocksDB as the backend, and incremental 
checkpointing is enabled.
PS: Interestingly, when I take periodic savepoints for both jobs, their 
patterns are similar, and the savepoint size is smaller than the checkpoint in 
the job with the 'UNION ALL'. Attaching the queries in the comment below.

With Union ALL:select
  *
from
  (
    with p2d as (
      select
        delivered.job_id,
        delivered.store_id,
        substring(
          GetGeoHash(
            CAST(delivered.metadata_json.address.lat as double),
            CAST(delivered.metadata_json.address.lng as double)
          )
          from
            1 for 6
        ) AS cx_gh6,
        (
          (
            cast(
              delivered.metadata_json.lastMileDistance as double
            ) / cast(1000 as double)
          ) /(
            cast(
              delivered.updated_at - pickedup.updated_at as double
            ) / cast(60 * 60 * 1000 as double)
          )
        ) as lm_speed_kmph,
        delivered.proctime as proctime
      from
        awz_s3_OrderLogsEvent pickedup
        inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
        and delivered.status = 'DELIVERY_DELIVERED'
        and delivered.type = 'INSTA'
        and pickedup.proctime between delivered.proctime - interval '95' minute
        and delivered.proctime + interval '5' minute
      where
        pickedup.status = 'DELIVERY_PICKEDUP'
        and pickedup.type = 'INSTA'
    )
    select
      'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      store_id
    union all
    select
      'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      cx_gh6
  )



Without Union all which is showing expected behaviour:
select
  *
from
  (
    with p2d as (
      select
        delivered.job_id,
        delivered.store_id,
        substring(
          GetGeoHash(
            CAST(delivered.metadata_json.address.lat as double),
            CAST(delivered.metadata_json.address.lng as double)
          )
          from
            1 for 6
        ) AS cx_gh6,
        (
          (
            cast(
              delivered.metadata_json.lastMileDistance as double
            ) / cast(1000 as double)
          ) /(
            cast(
              delivered.updated_at - pickedup.updated_at as double
            ) / cast(60 * 60 * 1000 as double)
          )
        ) as lm_speed_kmph,
        delivered.proctime as proctime
      from
        awz_s3_OrderLogsEvent pickedup
        inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
pickedup.job_id
        and delivered.status = 'DELIVERY_DELIVERED'
        and delivered.type = 'INSTA'
        and pickedup.proctime between delivered.proctime - interval '95' minute
        and delivered.proctime + interval '5' minute
      where
        pickedup.status = 'DELIVERY_PICKEDUP'
        and pickedup.type = 'INSTA'
    )
    select
      'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
      round(Avg(lm_speed_kmph), 4) AS `value`
    from
      p2d
    group by
      HOP(
        proctime,
        interval '5' minute,
        interval '30' minute
      ),
      cx_gh6
  )

 

 

 

 

!Screenshot 2023-07-31 at 10.37.11 AM.png!

  was:
This appears to be a pretty straightforward issue, but I don't know the 
codebase well enough to propose a fix.  When a FlinkDeployment is present in a 
namespace, and the namespace is deleted, the FlinkDeployment never reconciles 
and fails to complete its finalizer.  This leads to the namespace being blocked 
from deletion indefinitely, requiring manual manipulation to remove the 
finalizer on the FlinkDeployment.

 

Namespace conditions:
{code:java}
conditions:
- lastTransitionTime: '2023-04-18T22:17:48Z'
  message: All resources successfully discovered
  reason: ResourcesDiscovered
  status: 'False'
  type: NamespaceDeletionDiscoveryFailure
- lastTransitionTime: '2023-03-23T18:27:37Z'
  message: All legacy kube types successfully parsed
  reason: ParsedGroupVersions
  status: 'False'
  type: NamespaceDeletionGroupVersionParsingFailure
- lastTransitionTime: '2023-03-23T18:27:37Z'
  message: All content successfully deleted, may be waiting on finalization
  reason: ContentDeleted
  status: 'False'
  type: NamespaceDeletionContentFailure
- lastTransitionTime: '2023-03-23T18:27:37Z'
  message: 'Some resources are remaining: flinkdeployments.flink.apache.org has 
2
    resource instances'
  reason: SomeResourcesRemain
  status: 'True'
  type: NamespaceContentRemaining
- lastTransitionTime: '2023-03-23T18:27:37Z'
  message: 'Some content in the namespace has finalizers remaining: 
flinkdeployments.flink.apache.org/finalizer
    in 2 resource instances'
  reason: SomeFinalizersRemain
  status: 'True'
  type: NamespaceFinalizersRemaining
phase: Terminating {code}
FlinkDeployment example (some fields redacted):
{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  creationTimestamp: '2023-03-23T18:27:02Z'
  deletionGracePeriodSeconds: 0
  deletionTimestamp: '2023-03-23T18:27:35Z'
  finalizers:
  - flinkdeployments.flink.apache.org/finalizer
  generation: 3
  name: <name-redacted>
  namespace: <namespace-redacted>
  resourceVersion: '10565277081'
  uid: e50d2683-6c0c-467e-b10c-fe0f4e404692
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: '2'
  flinkVersion: v1_16
  image: <image-redacted>
  job:
    args: []
    entryClass: <class-redacted>
    jarURI: <uri-redacted>
    parallelism: 2
    state: running
    upgradeMode: stateless
  jobManager:
    replicas: 1
    resource:
      cpu: 1
      memory: 2048m
  logConfiguration:
    log4j-console.properties: '# This affects logging for both user code and 
Flink      rootLogger.level = INFO      rootLogger.appenderRef.console.ref = 
ConsoleAppender      rootLogger.appenderRef.rolling.ref = RollingFileAppender   
   # Uncomment this if you want to _only_ change Flink''s logging      
#logger.flink.name = org.apache.flink      #logger.flink.level = INFO      # 
The following lines keep the log level of common libraries/connectors on      # 
log level INFO. The root logger does not override this. You have to manually    
  # change the log levels here.      logger.akka.name = akka      
logger.akka.level = INFO      logger.kafka.name= org.apache.kafka      
logger.kafka.level = INFO      logger.hadoop.name = org.apache.hadoop      
logger.hadoop.level = INFO      logger.zookeeper.name = org.apache.zookeeper    
  logger.zookeeper.level = INFO      # Log all infos to the console      
appender.console.name = ConsoleAppender      appender.console.type = CONSOLE    
  appender.console.layout.type = PatternLayout      
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
      - %m%n      # Log all infos in the given rolling file      
appender.rolling.name = RollingFileAppender      appender.rolling.type = 
RollingFile      appender.rolling.append = false      appender.rolling.fileName 
= ${sys:log.file}      appender.rolling.filePattern = ${sys:log.file}.%i      
appender.rolling.layout.type = PatternLayout      
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x
      - %m%n      appender.rolling.policies.type = Policies      
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy      
appender.rolling.policies.size.size=100MB      appender.rolling.strategy.type = 
DefaultRolloverStrategy      appender.rolling.strategy.max = 10      # Suppress 
the irrelevant (wrong) warnings from the Netty channel handler      
logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline      
logger.netty.level = OFF      '
  mode: standalone
  podTemplate:
    apiVersion: v1
    kind: Pod
    metadata:
      labels:
        app.kubernetes.io/managed-by: tilt
    spec:
      containers:
      - env:
        - name: APPLICATION_CONFIG_FILE
          value: /app-conf/feature-pipeline-config.yaml
        name: flink-main-container
        volumeMounts:
        - mountPath: /app-conf
          name: feature-pipeline-config-volume
      volumes:
      - configMap:
          items:
          - key: feature-pipeline-config.yaml
            path: feature-pipeline-config.yaml
          name: <name-redacted>
        name: feature-pipeline-config-volume
  serviceAccount: default
  taskManager:
    replicas: 2
    resource:
      cpu: 1
      memory: 2048m
status:
  clusterInfo: {}
  jobManagerDeploymentStatus: DEPLOYED_NOT_READY
  jobStatus:
    jobId: 07243f53058c9083c06d2000352b04ec
    savepointInfo:
      lastPeriodicSavepointTimestamp: 0
      savepointHistory: []
    state: RECONCILING
  reconciliationStatus:
    lastReconciledSpec: 
'{"spec":{"job":{"jarURI":"<jar-redacted>","parallelism":2,"entryClass":"<class-redacted>","args":[],"state":"running","savepointTriggerNonce":null,"initialSavepointPath":null,"upgradeMode":"stateless","allowNonRestoredState":null},"restartNonce":null,"flinkConfiguration":{"taskmanager.numberOfTaskSlots":"2"},"image":"<image-redacted>","imagePullPolicy":null,"serviceAccount":"default","flinkVersion":"v1_16","ingress":{"template":"<template-redacted>","className":null,,"podTemplate":{"apiVersion":"v1","kind":"Pod","metadata":{"labels":{"app.kubernetes.io/managed-by":"tilt","spec":{"containers":[{"env":[{"name":"APPLICATION_CONFIG_FILE","value":"/app-conf/feature-pipeline-config.yaml"}],"name":"flink-main-container","volumeMounts":[{"mountPath":"/app-conf","name":"feature-pipeline-config-volume"}]}],"volumes":[{"configMap":{"items":[{"key":"feature-pipeline-config.yaml","path":"feature-pipeline-config.yaml"}],"name":"<name-redacted>"},"name":"feature-pipeline-config-volume"}]}},"jobManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":1,"podTemplate":null},"taskManager":{"resource":{"cpu":1.0,"memory":"2048m"},"replicas":2,"podTemplate":null},"logConfiguration":{"log4j-console.properties":"#
      This affects logging for both user code and Flink\nrootLogger.level = 
INFO\nrootLogger.appenderRef.console.ref
      = ConsoleAppender\nrootLogger.appenderRef.rolling.ref = 
RollingFileAppender\n#
      Uncomment this if you want to _only_ change Flink''s 
logging\n#logger.flink.name
      = org.apache.flink\n#logger.flink.level = INFO\n# The following lines 
keep the
      log level of common libraries/connectors on\n# log level INFO. The root 
logger
      does not override this. You have to manually\n# change the log levels 
here.\nlogger.akka.name
      = akka\nlogger.akka.level = INFO\nlogger.kafka.name= 
org.apache.kafka\nlogger.kafka.level
      = INFO\nlogger.hadoop.name = org.apache.hadoop\nlogger.hadoop.level = 
INFO\nlogger.zookeeper.name
      = org.apache.zookeeper\nlogger.zookeeper.level = INFO\n# Log all infos to 
the
      console\nappender.console.name = ConsoleAppender\nappender.console.type = 
CONSOLE\nappender.console.layout.type
      = PatternLayout\nappender.console.layout.pattern = %d{yyyy-MM-dd 
HH:mm:ss,SSS}
      %-5p %-60c %x - %m%n\n# Log all infos in the given rolling 
file\nappender.rolling.name
      = RollingFileAppender\nappender.rolling.type = 
RollingFile\nappender.rolling.append
      = false\nappender.rolling.fileName = 
${sys:log.file}\nappender.rolling.filePattern
      = ${sys:log.file}.%i\nappender.rolling.layout.type = 
PatternLayout\nappender.rolling.layout.pattern
      = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - 
%m%n\nappender.rolling.policies.type
      = Policies\nappender.rolling.policies.size.type = 
SizeBasedTriggeringPolicy\nappender.rolling.policies.size.size=100MB\nappender.rolling.strategy.type
      = DefaultRolloverStrategy\nappender.rolling.strategy.max = 10\n# Suppress 
the
      irrelevant (wrong) warnings from the Netty channel 
handler\nlogger.netty.name
      = org.jboss.netty.channel.DefaultChannelPipeline\nlogger.netty.level = 
OFF\n"},"mode":"standalone"},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","metadata":{"generation":2},"firstDeployment":true}}'
    reconciliationTimestamp: 1679596022457
    state: DEPLOYED
  taskManager:
    labelSelector: component=taskmanager,app=<label-redacted>
    replicas: 2
 {code}
My guess here would be that the best thing to do would be for the reconciler to 
check if the namespace is in a Terminating state, and skip the finalizer step 
that's blocking things.  Based on the following log messages, I'm inclined to 
believe the issue is that the operator is attempting to create an Event in the 
namespace as a part of deleting the FlinkDeployment, and because that fails the 
whole deletion aborts.  Log messages redacted.

 
{noformat}
2023-04-19 16:22:58,642 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][NAMESPACE/DEPLOYMENT1] Cleaning up FlinkDeployment
2023-04-19 16:22:58,645 o.a.f.k.o.c.FlinkDeploymentController [INFO 
][NAMESPACE/DEPLOYMENT2] Cleaning up FlinkDeployment
2023-04-19 16:22:58,749 i.j.o.p.e.ReconciliationDispatcher 
[ERROR][NAMESPACE/DEPLOYMENT1] Error during event processing ExecutionScope{ 
resource id: ResourceID{name='DEPLOYMENT1', namespace='NAMESPACE'}, version: 
10877039326} failed.
io.javaoperatorsdk.operator.OperatorException: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST 
at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. events "Operator.1727008025" is forbidden: unable to create 
new content in namespace NAMESPACE because it is being terminated.
    at 
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:195)
    at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleCleanup(ReconciliationDispatcher.java:277)
    at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:79)
    at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
    at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events. 
Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. events "Operator.1727008025" is forbidden: 
unable to create new content in namespace NAMESPACE because it is being 
terminated.
    at 
io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:517)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:535)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleCreate(OperationSupport.java:328)
    at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:675)
    at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:88)
    at 
io.fabric8.kubernetes.client.dsl.internal.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:42)
    at 
io.fabric8.kubernetes.client.utils.internal.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:50)
    at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.createOrReplace(BaseOperation.java:296)
    at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.createOrReplace(BaseOperation.java:88)
    at 
io.fabric8.kubernetes.client.extension.ResourceAdapter.createOrReplace(ResourceAdapter.java:121)
    at 
org.apache.flink.kubernetes.operator.utils.EventUtils.createOrUpdateEvent(EventUtils.java:107)
    at 
org.apache.flink.kubernetes.operator.utils.EventRecorder.triggerEvent(EventRecorder.java:59)
    at 
org.apache.flink.kubernetes.operator.utils.EventRecorder.triggerEvent(EventRecorder.java:50)
    at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:90)
    at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:55)
    at 
io.javaoperatorsdk.operator.processing.Controller$2.execute(Controller.java:182)
    at 
io.javaoperatorsdk.operator.processing.Controller$2.execute(Controller.java:145)
    at 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
    at 
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:144)
    ... 7 more
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events. 
Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. events "Operator.1727008025" is forbidden: 
unable to create new content in namespace NAMESPACE because it is being 
terminated.
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:709)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:689)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:638)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:576)
    at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
Source)
    at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$retryWithExponentialBackoff$2(OperationSupport.java:618)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source)
    at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
 Source)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
Source)
    at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
    at 
io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$4.onResponse(OkHttpClientImpl.java:277)
    at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
    ... 3 more
2023-04-19 16:22:58,750 i.j.o.p.e.EventProcessor [ERROR][NAMESPACE/DEPLOYMENT1] 
Exhausted retries for ExecutionScope{ resource id: 
ResourceID{name='DEPLOYMENT1', namespace='NAMESPACE'}, version: 10877039326}
2023-04-19 16:22:58,754 i.j.o.p.e.ReconciliationDispatcher 
[ERROR][NAMESPACE/DEPLOYMENT2] Error during event processing ExecutionScope{ 
resource id: ResourceID{name='DEPLOYMENT2', namespace='NAMESPACE'}, version: 
10877039321} failed.
io.javaoperatorsdk.operator.OperatorException: 
io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST 
at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events. Message: 
Forbidden!Configured service account doesn't have access. Service account may 
have been revoked. events "Operator.1333719523" is forbidden: unable to create 
new content in namespace NAMESPACE because it is being terminated.
    at 
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:195)
    at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleCleanup(ReconciliationDispatcher.java:277)
    at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:79)
    at 
io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:54)
    at 
io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events. 
Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. events "Operator.1333719523" is forbidden: 
unable to create new content in namespace NAMESPACE because it is being 
terminated.
    at 
io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:517)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:551)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:535)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleCreate(OperationSupport.java:328)
    at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:675)
    at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.handleCreate(BaseOperation.java:88)
    at 
io.fabric8.kubernetes.client.dsl.internal.CreateOnlyResourceOperation.create(CreateOnlyResourceOperation.java:42)
    at 
io.fabric8.kubernetes.client.utils.internal.CreateOrReplaceHelper.createOrReplace(CreateOrReplaceHelper.java:50)
    at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.createOrReplace(BaseOperation.java:296)
    at 
io.fabric8.kubernetes.client.dsl.internal.BaseOperation.createOrReplace(BaseOperation.java:88)
    at 
io.fabric8.kubernetes.client.extension.ResourceAdapter.createOrReplace(ResourceAdapter.java:121)
    at 
org.apache.flink.kubernetes.operator.utils.EventUtils.createOrUpdateEvent(EventUtils.java:107)
    at 
org.apache.flink.kubernetes.operator.utils.EventRecorder.triggerEvent(EventRecorder.java:59)
    at 
org.apache.flink.kubernetes.operator.utils.EventRecorder.triggerEvent(EventRecorder.java:50)
    at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:90)
    at 
org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:55)
    at 
io.javaoperatorsdk.operator.processing.Controller$2.execute(Controller.java:182)
    at 
io.javaoperatorsdk.operator.processing.Controller$2.execute(Controller.java:145)
    at 
org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
    at 
io.javaoperatorsdk.operator.processing.Controller.cleanup(Controller.java:144)
    ... 7 more
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: https://192.168.0.1/api/v1/namespaces/NAMESPACE/events. 
Message: Forbidden!Configured service account doesn't have access. Service 
account may have been revoked. events "Operator.1333719523" is forbidden: 
unable to create new content in namespace NAMESPACE because it is being 
terminated.
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:709)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:689)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:638)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:576)
    at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown 
Source)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
Source)
    at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
    at 
io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$retryWithExponentialBackoff$2(OperationSupport.java:618)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source)
    at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
 Source)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
Source)
    at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
    at 
io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$4.onResponse(OkHttpClientImpl.java:277)
    at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
    ... 3 more
2023-04-19 16:22:58,754 i.j.o.p.e.EventProcessor [ERROR][NAMESPACE/DEPLOYMENT2] 
Exhausted retries for ExecutionScope{ resource id: 
ResourceID{name='DEPLOYMENT2', namespace='NAMESPACE'}, version: 
10877039321}{noformat}


>  'UNION ALL' with a 'GROUP BY' condition in Flink is causing the checkpoint 
> to become unbounded
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-32718
>                 URL: https://issues.apache.org/jira/browse/FLINK-32718
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.3.1
>         Environment: Apache Flink Kubernetes Operator 1.3.1
> Kubernetes 1.24.9
>            Reporter: Neha Aggarwal
>            Assignee: Jayme Howard
>            Priority: Blocker
>              Labels: pull-request-available
>         Attachments: Screenshot 2023-07-31 at 10.37.11 AM.png, Screenshot 
> 2023-07-31 at 10.40.30 AM.png, Screenshot 2023-07-31 at 10.40.38 AM.png, 
> Screenshot 2023-07-31 at 10.41.07 AM.png, Screenshot 2023-07-31 at 10.41.26 
> AM.png, Screenshot 2023-07-31 at 10.41.35 AM.png, Screenshot 2023-07-31 at 
> 10.41.48 AM.png
>
>
> I am having trouble understanding why adding a 'UNION ALL' with a 'GROUP BY' 
> condition in Flink is causing the checkpoint to become unbounded. I've 
> attached two Flink SQL queries where the only difference is the addition of 
> one more 'UNION ALL'. Surprisingly, as soon as we add the 'UNION ALL' part to 
> the query, the checkpoint size keeps increasing, which is not expected. We 
> anticipated it would follow the same pattern of increasing and decreasing 
> based on the throughput, as it does without the 'UNION ALL'.
> When I analyze the breakdown of the checkpoint size, I find that the 
> 'Interval Join' operator has the largest size. I suspect there might be a bug 
> causing this difference in checkpoint behavior due to the addition of the 
> 'GROUP BY' in the query.
> We are using Flink version 1.16.1 with RocksDB as the backend, and 
> incremental checkpointing is enabled.
> PS: Interestingly, when I take periodic savepoints for both jobs, their 
> patterns are similar, and the savepoint size is smaller than the checkpoint 
> in the job with the 'UNION ALL'. Attaching the queries in the comment below.
> With Union ALL:select
>   *
> from
>   (
>     with p2d as (
>       select
>         delivered.job_id,
>         delivered.store_id,
>         substring(
>           GetGeoHash(
>             CAST(delivered.metadata_json.address.lat as double),
>             CAST(delivered.metadata_json.address.lng as double)
>           )
>           from
>             1 for 6
>         ) AS cx_gh6,
>         (
>           (
>             cast(
>               delivered.metadata_json.lastMileDistance as double
>             ) / cast(1000 as double)
>           ) /(
>             cast(
>               delivered.updated_at - pickedup.updated_at as double
>             ) / cast(60 * 60 * 1000 as double)
>           )
>         ) as lm_speed_kmph,
>         delivered.proctime as proctime
>       from
>         awz_s3_OrderLogsEvent pickedup
>         inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
>         and delivered.status = 'DELIVERY_DELIVERED'
>         and delivered.type = 'INSTA'
>         and pickedup.proctime between delivered.proctime - interval '95' 
> minute
>         and delivered.proctime + interval '5' minute
>       where
>         pickedup.status = 'DELIVERY_PICKEDUP'
>         and pickedup.type = 'INSTA'
>     )
>     select
>       'lmSpeedKmph_avg_storeId_30m#' || cast(store_id as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       store_id
>     union all
>     select
>       'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       cx_gh6
>   )
> Without Union all which is showing expected behaviour:
> select
>   *
> from
>   (
>     with p2d as (
>       select
>         delivered.job_id,
>         delivered.store_id,
>         substring(
>           GetGeoHash(
>             CAST(delivered.metadata_json.address.lat as double),
>             CAST(delivered.metadata_json.address.lng as double)
>           )
>           from
>             1 for 6
>         ) AS cx_gh6,
>         (
>           (
>             cast(
>               delivered.metadata_json.lastMileDistance as double
>             ) / cast(1000 as double)
>           ) /(
>             cast(
>               delivered.updated_at - pickedup.updated_at as double
>             ) / cast(60 * 60 * 1000 as double)
>           )
>         ) as lm_speed_kmph,
>         delivered.proctime as proctime
>       from
>         awz_s3_OrderLogsEvent pickedup
>         inner join awz_s3_OrderLogsEvent delivered on delivered.job_id = 
> pickedup.job_id
>         and delivered.status = 'DELIVERY_DELIVERED'
>         and delivered.type = 'INSTA'
>         and pickedup.proctime between delivered.proctime - interval '95' 
> minute
>         and delivered.proctime + interval '5' minute
>       where
>         pickedup.status = 'DELIVERY_PICKEDUP'
>         and pickedup.type = 'INSTA'
>     )
>     select
>       'lmSpeedKmph_avg_cxGh6_30m#' || cast(cx_gh6 as varchar) as key,
>       round(Avg(lm_speed_kmph), 4) AS `value`
>     from
>       p2d
>     group by
>       HOP(
>         proctime,
>         interval '5' minute,
>         interval '30' minute
>       ),
>       cx_gh6
>   )
>  
>  
>  
>  
> !Screenshot 2023-07-31 at 10.37.11 AM.png!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to