[ 
https://issues.apache.org/jira/browse/FLINK-31135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718815#comment-17718815
 ] 

SwathiChandrashekar commented on FLINK-31135:
---------------------------------------------

[~zhihaochen] , thanks for the attachment of the JM logs where the job is first 
submitted. Earlier JM logs did not help much to root cause, as it had recovered 
a job which already had multiple checkpoints and further checkpoints trigger 
post that failed due to >1MB issue and needs to be manually cleaned up, if 
succeeded.

The new JM attachment helped to deduce the issue as it starts with job 
submission.

After the initial 5 checkpoints, when the cleanup was happening, flink threw 
the following error :
{code:java}
{"@timestamp":"2023-05-02T07:21:34.047Z","ecs.version":"1.2.0","log.level":"WARN","message":"Fail
 to subsume the old 
checkpoint.","process.thread.name":"jobmanager-io-thread-1","log.logger":"org.apache.flink.runtime.checkpoint.CheckpointSubsumeHelper","error.type":"java.util.concurrent.ExecutionException","error.message":"java.io.IOException:
 
/parked-logs-ingestion-644b80/ha/parked-logs-ingestion-644b80/completedCheckpointf2bebc94bd32
 could not be deleted for unknown 
reasons.","error.stack_trace":"java.util.concurrent.ExecutionException: 
java.io.IOException: 
/parked-logs-ingestion-644b80/ha/parked-logs-ingestion-644b80/completedCheckpointf2bebc94bd32
 could not be deleted for unknown reasons.\n\tat 
java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown 
Source)\n\tat java.base/java.util.concurrent.CompletableFuture.get(Unknown 
Source)\n\tat 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.releaseAndTryRemove(KubernetesStateHandleStore.java:526)\n\tat
 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.tryRemove(DefaultCompletedCheckpointStore.java:242)\n\tat
 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.tryRemoveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:227)\n\tat
 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.lambda$addCheckpointAndSubsumeOldestOne$0(DefaultCompletedCheckpointStore.java:145)\n\tat
 
org.apache.flink.runtime.checkpoint.CheckpointSubsumeHelper.subsume(CheckpointSubsumeHelper.java:70)\n\tat
 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne(DefaultCompletedCheckpointStore.java:141)\n\tat
 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.addCompletedCheckpointToStoreAndSubsumeOldest(CheckpointCoordinator.java:1382)\n\tat
 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1249)\n\tat
 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1134)\n\tat
 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)\n\tat
 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)\n\tat
 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)\n\tat 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: 
java.io.IOException: 
/parked-logs-ingestion-644b80/ha/parked-logs-ingestion-644b80/completedCheckpointf2bebc94bd32
 could not be deleted for unknown reasons.\n\tat 
org.apache.flink.fs.s3presto.FlinkS3PrestoFileSystem.deleteObject(FlinkS3PrestoFileSystem.java:135)\n\tat
 
org.apache.flink.fs.s3presto.FlinkS3PrestoFileSystem.delete(FlinkS3PrestoFileSystem.java:66)\n\tat
 
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)\n\tat
 
org.apache.flink.runtime.state.filesystem.FileStateHandle.discardState(FileStateHandle.java:89)\n\tat
 
org.apache.flink.runtime.state.RetrievableStreamStateHandle.discardState(RetrievableStreamStateHandle.java:76)\n\tat
 
org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.lambda$releaseAndTryRemove$12(KubernetesStateHandleStore.java:510)\n\tat
 java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown 
Source)\n\tat 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown 
Source)\n\tat java.base/java.util.concurrent.CompletableFuture.complete(Unknown 
Source)\n\tat 
org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperation$1(FutureUtils.java:201)\n\tat
 java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown 
Source)\n\tat 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
 Source)\n\tat 
java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown 
Source)\n\t... 3 more\n"} {code}
Flink is encountering a IO Exception when Flink is trying to delete a file in 
S3. It says file .
{code:java}
java.io.IOException: 
/parked-logs-ingestion-644b80/ha/parked-logs-ingestion-644b80/completedCheckpointf2bebc94bd32
 could not be deleted for unknown reasons.{code}
{code:java}
java.io.IOException: 
/parked-logs-ingestion-644b80/ha/parked-logs-ingestion-644b80/completedCheckpointf2bebc94bd32
 could not be deleted for unknown reasons.\n\tat 
org.apache.flink.fs.s3presto.FlinkS3PrestoFileSystem.deleteObject(FlinkS3PrestoFileSystem.java:135)\n\tat
 {code}
Possible reasons:

1) Permission issues: Flink might not have permission to delete the objects 
from S3 where the checkpoints are stored.

2) Compatibility issues between the flink version and the S3 file system 
implemention you are using.

3) S3 related some config issue.

> ConfigMap DataSize went > 1 MB and cluster stopped working
> ----------------------------------------------------------
>
>                 Key: FLINK-31135
>                 URL: https://issues.apache.org/jira/browse/FLINK-31135
>             Project: Flink
>          Issue Type: Bug
>          Components: Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.2.0
>            Reporter: Sriram Ganesh
>            Priority: Major
>         Attachments: dump_cm.yaml, 
> flink--kubernetes-application-0-parked-logs-ingestion-644b80-b4bc58747-lc865.log.zip,
>  image-2023-04-19-09-48-19-089.png, image-2023-05-03-13-47-51-440.png, 
> image-2023-05-03-13-50-54-783.png, image-2023-05-03-13-51-21-685.png, 
> jobmanager_log.txt, 
> parked-logs-ingestion-644b80-3494e4c01b82eb7a75a76080974b41cd-config-map.yaml
>
>
> I am Flink Operator to manage clusters. Flink version: 1.15.2. Flink jobs 
> failed with the below error. It seems the config map size went beyond 1 MB 
> (default size). 
> Since it is managed by the operator and config maps are not updated with any 
> manual intervention, I suspect it could be an operator issue. 
>  
> {code:java}
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
> executing: PUT at: 
> https://<IP>/api/v1/namespaces/<NS>/configmaps/<job>-config-map. Message: 
> ConfigMap "<job>-config-map" is invalid: []: Too long: must have at most 
> 1048576 bytes. Received status: Status(apiVersion=v1, code=422, 
> details=StatusDetails(causes=[StatusCause(field=[], message=Too long: must 
> have at most 1048576 bytes, reason=FieldValueTooLong, 
> additionalProperties={})], group=null, kind=ConfigMap, name=<job>-config-map, 
> retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, 
> message=ConfigMap "<job>-config-map" is invalid: []: Too long: must have at 
> most 1048576 bytes, metadata=ListMeta(_continue=null, 
> remainingItemCount=null, resourceVersion=null, selfLink=null, 
> additionalProperties={}), reason=Invalid, status=Failure, 
> additionalProperties={}).
> at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.requestFailure(OperationSupport.java:673)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.assertResponseCode(OperationSupport.java:612)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:560)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:521)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:347)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleUpdate(OperationSupport.java:327)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleUpdate(BaseOperation.java:781)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.lambda$replace$1(HasMetadataOperation.java:183)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:188)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:130)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation.replace(HasMetadataOperation.java:41)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$attemptCheckAndUpdateConfigMap$11(Fabric8FlinkKubeClient.java:325)
>  ~[flink-dist-1.15.2.jar:1.15.2]
> at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
> ... 3 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to