[
https://issues.apache.org/jira/browse/FLINK-28983?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gyula Fora closed FLINK-28983.
------------------------------
Resolution: Won't Fix
This is not a bug in the operator, but it's related to Job configuration /
dependency setup
> using serviceaccount in FlinkDeployment not works when sink to aws s3
> ---------------------------------------------------------------------
>
> Key: FLINK-28983
> URL: https://issues.apache.org/jira/browse/FLINK-28983
> Project: Flink
> Issue Type: Bug
> Components: Kubernetes Operator
> Affects Versions: kubernetes-operator-1.1.0
> Reporter: Lichuan Shang
> Priority: Major
>
> I am deploying a Flink CDC job using sql-runner example from official
> examples(see
> [https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example).]
>
> The _flink_ service account has all s3 permissions (`s3:*` in iam policy) but
> the k8s pod keeps on restarting and there's too much errors on pod's log:
>
> {code:java}
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> ... 4 more
> Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist
> on nwlogs: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request
> (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request
> ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID:
> egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=;
> Proxy: null), S3 Extended Request ID:
> egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=:400
> Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code:
> 400 Bad Request; Request ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID:
> egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=;
> Proxy: null)
> at
> org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
> at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
> at
> org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
> at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.verifyBucketExists(S3AFileSystem.java:391)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:322)
> at
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:127)
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBucketWriter(StreamingFileSink.java:428)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:438)
> at
> org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:96)
> at
> org.apache.flink.connector.file.table.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:81)
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request
> (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request
> ID: B3ZNHK1DSM4JDJ39; S3 Extended Request ID:
> egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=;
> Proxy: null), S3 Extended Request ID:
> egjf79lHP0uHq2w4vGqe9yBNRE4exUVEYZ2EP093Aiz5H1YypS4SbcSfSVidbUTQeI/Zv0FmbIw=
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
> at
> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
> at
> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1438)
> at
> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1374)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:392)
> at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
> ... 25 more{code}
>
> I try to use AKSK(which is not recommended) to see if I am lucky. It occurs
> to me the k8s pod is in running state after setting `s3.access-key` and
> `s3.secret-key`.
>
> Here is my *FlinkDeployment* config file:
>
> {code:java}
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
> name: sql-example-stateful-s3
> namespace: flink
> spec:
> image:
> 11122233344455.dkr.ecr.cn-northwest-1.amazonaws.com.cn/flink/flink-sql-runner-example:latest
> imagePullPolicy: Always
> flinkVersion: v1_15
> flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
> state.savepoints.dir: s3://bucket/flink/flink-data/savepoints
> state.checkpoints.dir: s3://bucket/flink/flink-data/checkpoints
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: s3://bucket/flink/flink-data/ha
> execution.checkpointing.interval: "10000"
> state.backend: filesystem
> fs.s3a.endpoint: s3.cn-northwest-1.amazonaws.com.cn
> env.java.opts: -Dcom.amazonaws.services.s3.enableV4
> s3.access-key: <AWS_ACCESS_KEY>
> s3.secret-key: <AWS_SECRET_ACCESS_KEY>
> s3a.endpoint: s3.cn-northwest-1.amazonaws.com.cn
> serviceAccount: flink
> jobManager:
> resource:
> memory: "2048m"
> cpu: 1
> taskManager:
> resource:
> memory: "2048m"
> cpu: 1
> job:
> jarURI: local:///opt/flink/usrlib/sql-runner.jar
> args: ["/opt/flink/usrlib/sql-scripts/orders.sql"]
> parallelism: 1
> upgradeMode: last-state{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)