[
https://issues.apache.org/jira/browse/FLINK-25848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer updated FLINK-25848:
----------------------------------
Parent: (was: FLINK-24227)
Issue Type: Bug (was: Sub-task)
> [FLIP-171] KDS Sink does not fast fail when invalid configuration supplied
> --------------------------------------------------------------------------
>
> Key: FLINK-25848
> URL: https://issues.apache.org/jira/browse/FLINK-25848
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Reporter: Danny Cranmer
> Assignee: Zichen Liu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.15.0
>
>
> h4. Description
> KDS sink does not fail job when invalid configuration provided.
> h4. Reproduction Steps
> - Start a job using an Async Sink implementation, for example KDS
> - Specify an invalid credential provider configuration, for example
> {code}
> CREATE TABLE orders (
> `code` STRING,
> `quantity` BIGINT
> ) WITH (
> 'connector' = 'kinesis',
> 'stream' = 'source',
> 'aws.credentials.provider' = 'ASSUME_ROLE',
> 'aws.region' = 'us-east-1',
> 'format' = 'json'
> );
> {code}
> h4. Actual Results
> - Sink operator transitions to running, consistently retrying
> {code}
> 2022-01-27 08:29:31,582 WARN
> org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkWriter [] - KDS
> Sink failed to persist 5 entries to KDS
> java.util.concurrent.CompletionException:
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.sts.model.StsException:
> 2 validation errors detected: Value null at 'roleArn' failed to satisfy
> constraint: Member must not be null; Value null at 'roleSessionName' failed
> to satisfy constraint: Member must not be null (Service: Sts, Status Code:
> 400, Request ID: af8f2176-aafa-4230-805b-72d90e418810, Extended Request ID:
> null)
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:870)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
> ~[?:?]
> at
> org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.putRecords(DefaultKinesisAsyncClient.java:2112)
>
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
> at
> org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkWriter.submitRequestEntries(KinesisDataStreamsSinkWriter.java:122)
>
> ~[blob_p-a167cef2d4088e526c4e209bb2b4e5a83f8ab359-fc0375a6ef7545bc316b770d110d4564:1.15-SNAPSHOT]
> at
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.flush(AsyncSinkWriter.java:311)
> ~[flink-connector-files-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.prepareCommit(AsyncSinkWriter.java:391)
> ~[flink-connector-files-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.operators.sink.SinkOperator.endInput(SinkOperator.java:192)
> ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:96)
> ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.endInput(RegularOperatorChain.java:97)
> ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
> ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:517)
> ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:802)
> ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:751)
> ~[flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> [flink-dist-1.15-SNAPSHOT.jar:1.15-SNAPSHOT]
> at java.lang.Thread.run(Thread.java:829) [?:?]
> {code}
> h4. Expected Results
> - Job fails fast
> h4. Suggested Resolution
> - Validate configuration and cancel job when fails (legacy connector does
> this)
--
This message was sent by Atlassian Jira
(v8.20.1#820001)