[ 
https://issues.apache.org/jira/browse/FLINK-34451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17819410#comment-17819410
 ] 

Alex Hoffer edited comment on FLINK-34451 at 2/21/24 11:10 PM:
---------------------------------------------------------------

 
 # Here is my FlinkDeployment:

{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: kafka
  namespace: flink
spec:
  image: [redacted]
  flinkVersion: v1_18
  restartNonce: 25
  flinkConfiguration:
    taskmanager.rpc.port: "50100"
    taskmanager.numberOfTaskSlots: "1"
    blob.server.port: "6124"
    jobmanager.memory.process.size: "null"
    taskmanager.memory.process.size: "2gb"
    high-availability.type: kubernetes
    high-availability.storageDir: 
abfss://job-result-store@[redacted].dfs.core.windows.net/kafka

    state.checkpoints.dir: 
abfss://checkpoints@[redacted].dfs.core.windows.net/kafka
    execution.checkpointing.interval: "30000"
    execution.checkpointing.mode: EXACTLY_ONCE
    state.checkpoint-storage: filesystem

    state.savepoints.dir: 
abfss://savepoints@[redacted].dfs.core.windows.net/kafka

    state.backend.type: rocksdb
    state.backend.incremental: "true"
    state.backend.rocksdb.localdir: /rocksdb

    fs.azure.account.auth.type: OAuth
    fs.azure.account.oauth.provider.type: 
org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
    fs.azure.account.oauth2.client.endpoint: [redacted]
    fs.azure.account.oauth2.client.id: [redacted]

    # Fix bug with hadoop azure that buffers checkpoint blocks in disk rather 
than memory https://issues.apache.org/jira/browse/HADOOP-18707
    fs.azure.data.blocks.buffer: array

    restart-strategy.type: exponentialdelay
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 2m
    job.autoscaler.metrics.window: 1m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 1m
    job.autoscaler.catch-up.duration: 1m
    job.autoscaler.scale-up.grace-period: 10m
    jobmanager.scheduler: adaptive
    pipeline.max-parallelism: "12"
    job.autoscaler.vertex.max-parallelism: "5"

  serviceAccount: flink
  jobManager:
    replicas: 2
    resource:
      memory: "2gb"
      cpu: 1
    podTemplate:
      spec:
        affinity:
          podAntiAffinity:
            preferredDuringSchedulingIgnoredDuringExecution:
            - podAffinityTerm:
                labelSelector:
                  matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - kafka
                  - key: component
                    operator: In
                    values:
                      - jobmanager
                topologyKey: failure-domain.beta.kubernetes.io/zone
              weight: 10
        containers:
          - name: flink-main-container
            resources:
              limits:
                ephemeral-storage: 1Gi
              requests:
                ephemeral-storage: 1Gi
  taskManager:
    resource:
      memory: "2gb"
      cpu: 1
    podTemplate:
      spec:
        affinity:
          podAntiAffinity:
            preferredDuringSchedulingIgnoredDuringExecution:
            - podAffinityTerm:
                labelSelector:
                  matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - kafka
                  - key: component
                    operator: In
                    values:
                      - taskmanager
                topologyKey: failure-domain.beta.kubernetes.io/zone
              weight: 10
        containers:
          - name: flink-main-container
            resources:
              limits:
                ephemeral-storage: 2Gi
              requests:
                ephemeral-storage: 2Gi
            volumeMounts:
              - mountPath: /rocksdb
                name: rocksdb
        volumes:
          - name: rocksdb
            emptyDir:
              sizeLimit: 1Gi
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          ports:
            - containerPort: 9250
              name: metrics
              protocol: TCP
  job:
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/bin/python", "-py", 
"/opt/flink/usrlib/kafka-k6.py", "--kubernetes", "--fivemin_bytessent_stream", 
"--kafka_bootstrap_ip", "10.177.1.26"]
    upgradeMode: savepoint
    parallelism: 1{code}
      2. Yes, I can recreate this scenario each time I try.

      3. My ticket was mistaken, this was found on operator version 1.7.0 (I 
will update the ticket). I just recreated it on the latest Flink Operator image 
available (37ca517).

      4. Just confirmed it occurs on Flink 1.17.0

      5. Did not occur when adaptive scheduler was turned off!

In scenario 5 above, the job correctly flipped back to the last checkpoint. 
*This suggests it may be related to the adaptive scheduler setting.*


was (Author: alexdchoffer):
 
 # Here is my FlinkDeployment:


{code:java}
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: kafka
  namespace: flink
spec:
  image: [redacted]
  flinkVersion: v1_18
  restartNonce: 25
  flinkConfiguration:
    taskmanager.rpc.port: "50100"
    taskmanager.numberOfTaskSlots: "1"
    blob.server.port: "6124"
    jobmanager.memory.process.size: "null"
    taskmanager.memory.process.size: "2gb"
    high-availability.type: kubernetes
    high-availability.storageDir: 
abfss://job-result-store@[redacted].dfs.core.windows.net/kafka

    state.checkpoints.dir: 
abfss://checkpoints@[redacted].dfs.core.windows.net/kafka
    execution.checkpointing.interval: "30000"
    execution.checkpointing.mode: EXACTLY_ONCE
    state.checkpoint-storage: filesystem

    state.savepoints.dir: 
abfss://savepoints@[redacted].dfs.core.windows.net/kafka

    state.backend.type: rocksdb
    state.backend.incremental: "true"
    state.backend.rocksdb.localdir: /rocksdb

    fs.azure.account.auth.type: OAuth
    fs.azure.account.oauth.provider.type: 
org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
    fs.azure.account.oauth2.client.endpoint: [redacted]
    fs.azure.account.oauth2.client.id: [redacted]

    # Fix bug with hadoop azure that buffers checkpoint blocks in disk rather 
than memory https://issues.apache.org/jira/browse/HADOOP-18707
    fs.azure.data.blocks.buffer: array

    restart-strategy.type: exponentialdelay
    job.autoscaler.enabled: "true"
    job.autoscaler.stabilization.interval: 2m
    job.autoscaler.metrics.window: 1m
    job.autoscaler.target.utilization: "0.6"
    job.autoscaler.target.utilization.boundary: "0.2"
    job.autoscaler.restart.time: 1m
    job.autoscaler.catch-up.duration: 1m
    job.autoscaler.scale-up.grace-period: 10m
    jobmanager.scheduler: adaptive
    pipeline.max-parallelism: "12"
    job.autoscaler.vertex.max-parallelism: "5"

  serviceAccount: flink
  jobManager:
    replicas: 2
    resource:
      memory: "2gb"
      cpu: 1
    podTemplate:
      spec:
        affinity:
          podAntiAffinity:
            preferredDuringSchedulingIgnoredDuringExecution:
            - podAffinityTerm:
                labelSelector:
                  matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - kafka
                  - key: component
                    operator: In
                    values:
                      - jobmanager
                topologyKey: failure-domain.beta.kubernetes.io/zone
              weight: 10
        containers:
          - name: flink-main-container
            resources:
              limits:
                ephemeral-storage: 1Gi
              requests:
                ephemeral-storage: 1Gi
  taskManager:
    resource:
      memory: "2gb"
      cpu: 1
    podTemplate:
      spec:
        affinity:
          podAntiAffinity:
            preferredDuringSchedulingIgnoredDuringExecution:
            - podAffinityTerm:
                labelSelector:
                  matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - kafka
                  - key: component
                    operator: In
                    values:
                      - taskmanager
                topologyKey: failure-domain.beta.kubernetes.io/zone
              weight: 10
        containers:
          - name: flink-main-container
            resources:
              limits:
                ephemeral-storage: 2Gi
              requests:
                ephemeral-storage: 2Gi
            volumeMounts:
              - mountPath: /rocksdb
                name: rocksdb
        volumes:
          - name: rocksdb
            emptyDir:
              sizeLimit: 1Gi
  podTemplate:
    spec:
      containers:
        - name: flink-main-container
          ports:
            - containerPort: 9250
              name: metrics
              protocol: TCP
  job:
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/bin/python", "-py", 
"/opt/flink/usrlib/kafka-k6.py", "--kubernetes", "--fivemin_bytessent_stream", 
"--kafka_bootstrap_ip", "10.177.1.26"]
    upgradeMode: savepoint
    parallelism: 1{code}

 # Yes, I can recreate this scenario each time I try.
 # My ticket was mistaken, this was found on operator version 1.7.0 (I will 
update the ticket). I just recreated it on the latest Flink Operator image 
available (37ca517).
 # Just confirmed it occurs on Flink 1.17.0
 # Did not occur when adaptive scheduler was turned off!


In scenario 5 above, the job correctly flipped back to the last checkpoint. 
*This suggests it may be related to the adaptive scheduler setting.*

> [Kubernetes Operator] Job with restarting TaskManagers uses wrong/misleading 
> fallback approach
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-34451
>                 URL: https://issues.apache.org/jira/browse/FLINK-34451
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.6.1
>         Environment: Operator version: 1.6.1
> Flink version 1.18.0
> HA JobManagers
> Adaptive scheduler mode using the operator's autoscaler
> Checkpointing at an interval of 60s
> Upgrade mode savepoint
>            Reporter: Alex Hoffer
>            Priority: Major
>
>  
> We had a situation where TaskManagers were constantly restarting from OOM. 
> We're using the Adaptive scheduler with the Kubernetes Operator, and a 
> restart strategy of exponential backoff, and so the JobManagers remained 
> alive. We're also using savepoint upgrade mode. 
> When we tried to remedy the situation by raising the direct memory allocation 
> to the pods, we were surprised that Flink used the last savepoint taken, 
> rather than the checkpoint. This was unfortunate for us because we are on 
> adaptive scheduler and the job hasn't changed in some time, so this last 
> savepoint was 6 days old! Meanwhile, checkpoints were taken every minute up 
> until failure. I can confirm the HA metadata existed in the configmaps, and 
> the corresponding checkpoints existed in remote storage for it to access. 
> Plus, no Flink version changes were in the deployment.
> The Operator logs reported that it was using last-state recovery in this 
> situation:
> {code:java}
> 2024-02-15 19:38:38,252 o.a.f.k.o.l.AuditUtils         [INFO ][job-name] >>> 
> Event  | Info    | SPECCHANGED     | UPGRADE change(s) detected (Diff: 
> FlinkDeploymentSpec[image : image:0a7c41b -> image:ebebc53, restartNonce : 
> null -> 100]), starting reconciliation.
> 2024-02-15 19:38:38,252 o.a.f.k.o.r.d.AbstractJobReconciler [INFO ][job-name] 
> Upgrading/Restarting running job, suspending first...
> 2024-02-15 19:38:38,260 o.a.f.k.o.r.d.ApplicationReconciler [INFO ][job-name] 
> Job is not running but HA metadata is available for last state restore, ready 
> for upgrade
> 2024-02-15 19:38:38,270 o.a.f.k.o.l.AuditUtils         [INFO ][job-name] >>> 
> Event  | Info    | SUSPENDED       | Suspending existing deployment.
> 2024-02-15 19:38:38,270 o.a.f.k.o.s.NativeFlinkService [INFO ][job-name] 
> Deleting JobManager deployment while preserving HA metadata. 
> 2024-02-15 19:38:40,431 o.a.f.k.o.l.AuditUtils         [INFO ][job-name] >>> 
> Status | Info    | UPGRADING       | The resource is being upgraded 
> 2024-02-15 19:38:40,532 o.a.f.k.o.l.AuditUtils         [INFO ][job-name] >>> 
> Event  | Info    | SUBMIT          | Starting deployment
> 2024-02-15 19:38:40,532 o.a.f.k.o.s.AbstractFlinkService [INFO ][job-name] 
> Deploying application cluster requiring last-state from HA metadata
> 2024-02-15 19:38:40,538 o.a.f.k.o.u.FlinkUtils         [INFO ][job-name] Job 
> graph in ConfigMap job-name-cluster-config-map is deleted {code}
> But when the job booted up, it reported restoring from savepoint:
> {code:java}
> 2024-02-15 19:39:03,887 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring 
> job 522b3c363499d81ed7922aa30b13e237 from Savepoint 20207 @ 0 for 
> 522b3c363499d81ed7922aa30b13e237 located at 
> abfss://savepoi...@storageaccount.dfs.core.windows.net/job-name/savepoint-522b3c-8836a1edc709.
>  {code}
> Our expectation was that the Operator logs were true, and that it would be 
> restoring from checkpoint. We had to scramble and manually restore from the 
> checkpoint to restore function.
>  
>  
> It's also worth noting I can recreate this issue in a testing environment. 
> The process for doing so is:
> - Boot up HA JobManagers with checkpoints on and savepoint upgrade mode, 
> using adaptive scheduler
> - Make a dummy change to trigger a savepoint.
> - Allow the TaskManagers to process some data and hit the checkpoint interval.
> - Cause the TaskManagers to crash. In our case, we could use up a bunch of 
> memory in the pods and cause it to crash.
> - Observe the Operator logs saying it is restoring from last-state, but watch 
> as the pods instead use the last savepoint.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to