Lichuan Shang created FLINK-28983: ------------------------------------- Summary: using serviceaccount in FlinkDeployment not works when sink to aws s3 Key: FLINK-28983 URL: https://issues.apache.org/jira/browse/FLINK-28983 Project: Flink Issue Type: Bug Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.1.0 Reporter: Lichuan Shang
I am deploying a Flink CDC job using sql-runner example from official examples(see [https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).] The `flink` service account has all s3 permissions (`s3:*` in iam policy) but the k8s pod keeps on restarting and there's too much errors on pod's log: ``` Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ... 4 more Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236) at org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:391) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:322) at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBucketWriter(StreamingFileSink.java:428) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:438) at org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:96) at org.apache.flink.connector.file.table.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:81) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:750) Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=; Proxy: null), S3 Extended Request ID: egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206) at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1438) at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1374) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:392) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ... 25 more ``` I try to use AKSK(which is not recommended) to see if I am lucky. It occurs to me the k8s pod is in running state after setting `s3.access-key` and `s3.secret-key`. Here is my config file: ```yaml apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: sql-example-stateful-s3 namespace: flink spec: image: 11122233344455.dkr.ecr.cn-northwest-1.amazonaws.com.cn/flink/flink-sql-runner-example:latest imagePullPolicy: Always flinkVersion: v1_15 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" state.savepoints.dir: s3://bucket/flink/flink-data/savepoints state.checkpoints.dir: s3://bucket/flink/flink-data/checkpoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: s3://bucket/flink/flink-data/ha execution.checkpointing.interval: "10000" state.backend: filesystem fs.s3a.endpoint: s3.cn-northwest-1.amazonaws.com.cn env.java.opts: -Dcom.amazonaws.services.s3.enableV4 s3.access-key: <AWS_ACCESS_KEY> s3.secret-key: <AWS_SECRET_ACCESS_KEY> s3a.endpoint: s3.cn-northwest-1.amazonaws.com.cn serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/usrlib/sql-runner.jar args: ["/opt/flink/usrlib/sql-scripts/orders.sql"] parallelism: 1 upgradeMode: last-state ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)