[ 
https://issues.apache.org/jira/browse/FLINK-38578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohsen Rezaei updated FLINK-38578:
----------------------------------
    Description: 
Kinesis source requires configurations to be passed as 
{{org.apache.flink.configuration.Configuration}}, e.g. in the 
[createReader(...)|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java#L159-L159],
 and directly converts the provided configs to a {{java.util.Properties}}, 
which becomes problematic for the AWS_CREDENTIALS_PROVIDER_OPTION which has a 
required value of the enum class 
[{{AWSConfigConstants.CredentialProvider}}|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java#L33-L33].

When this [gets looked up against the same enum 
class|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java#L101-L101],
 it throws the following error:

{code}
Caused by: java.lang.NullPointerException: Name is null
        at java.base/java.lang.Enum.valueOf(Unknown Source)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.config.AWSConfigConstants$CredentialProvider.valueOf(AWSConfigConstants.java:33)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialProviderType(AWSGeneralUtil.java:101)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialsProvider(AWSGeneralUtil.java:140)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialsProvider(AWSGeneralUtil.java:134)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateCredentialProvider(AWSGeneralUtil.java:487)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsConfiguration(AWSGeneralUtil.java:431)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:479)
        at 
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createKinesisStreamProxy(KinesisStreamsSource.java:314)
        at 
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createKinesisStreamProxy(KinesisStreamsSource.java:287)
        at 
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.restoreEnumerator(KinesisStreamsSource.java:189)
        at 
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createEnumerator(KinesisStreamsSource.java:180)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:229)
        ... 8 common frames omitted
{code}

This issue is less prominant for the {{KinesisStreamsSinkBuilder}} since it 
requires a {{java.util.Properties}} at its setter.

  was:
Kinesis source requires configurations to be passed as 
{{org.apache.flink.configuration.Configuration}}, e.g. in the 
[createReader(...)|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java#L159-L159],
 and directly converts the provided configs to a {{java.util.Properties}}, 
which becomes problematic for the AWS_CREDENTIALS_PROVIDER_OPTION which has a 
required value of the enum class 
[{{AWSConfigConstants.CredentialProvider}}|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java#L33-L33].

When this [gets looked up against the same enum 
class|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java#L101-L101],
 it throws the following error:

{code}
Caused by: java.lang.NullPointerException: Name is null
        at java.base/java.lang.Enum.valueOf(Unknown Source)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.config.AWSConfigConstants$CredentialProvider.valueOf(AWSConfigConstants.java:33)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialProviderType(AWSGeneralUtil.java:101)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialsProvider(AWSGeneralUtil.java:140)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialsProvider(AWSGeneralUtil.java:134)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateCredentialProvider(AWSGeneralUtil.java:487)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsConfiguration(AWSGeneralUtil.java:431)
        at 
org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:479)
        at 
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createKinesisStreamProxy(KinesisStreamsSource.java:314)
        at 
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createKinesisStreamProxy(KinesisStreamsSource.java:287)
        at 
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.restoreEnumerator(KinesisStreamsSource.java:189)
        at 
org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createEnumerator(KinesisStreamsSource.java:180)
        at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:229)
        ... 8 common frames omitted
{code}


> Invalid KinesisStreamsSource AWS_CREDENTIALS_PROVIDER_OPTION conversion
> -----------------------------------------------------------------------
>
>                 Key: FLINK-38578
>                 URL: https://issues.apache.org/jira/browse/FLINK-38578
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kinesis
>    Affects Versions: aws-connector-5.0.0
>            Reporter: Mohsen Rezaei
>            Priority: Critical
>
> Kinesis source requires configurations to be passed as 
> {{org.apache.flink.configuration.Configuration}}, e.g. in the 
> [createReader(...)|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java#L159-L159],
>  and directly converts the provided configs to a {{java.util.Properties}}, 
> which becomes problematic for the AWS_CREDENTIALS_PROVIDER_OPTION which has a 
> required value of the enum class 
> [{{AWSConfigConstants.CredentialProvider}}|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/config/AWSConfigConstants.java#L33-L33].
> When this [gets looked up against the same enum 
> class|https://github.com/apache/flink-connector-aws/blob/97505df8540cba9fc56da1ce20c65ad64b3ec943/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSGeneralUtil.java#L101-L101],
>  it throws the following error:
> {code}
> Caused by: java.lang.NullPointerException: Name is null
>       at java.base/java.lang.Enum.valueOf(Unknown Source)
>       at 
> org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.config.AWSConfigConstants$CredentialProvider.valueOf(AWSConfigConstants.java:33)
>       at 
> org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialProviderType(AWSGeneralUtil.java:101)
>       at 
> org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialsProvider(AWSGeneralUtil.java:140)
>       at 
> org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.getCredentialsProvider(AWSGeneralUtil.java:134)
>       at 
> org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateCredentialProvider(AWSGeneralUtil.java:487)
>       at 
> org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsConfiguration(AWSGeneralUtil.java:431)
>       at 
> org.apache.flink.kinesis.shaded.org.apache.flink.connector.aws.util.AWSGeneralUtil.validateAwsCredentials(AWSGeneralUtil.java:479)
>       at 
> org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createKinesisStreamProxy(KinesisStreamsSource.java:314)
>       at 
> org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createKinesisStreamProxy(KinesisStreamsSource.java:287)
>       at 
> org.apache.flink.connector.kinesis.source.KinesisStreamsSource.restoreEnumerator(KinesisStreamsSource.java:189)
>       at 
> org.apache.flink.connector.kinesis.source.KinesisStreamsSource.createEnumerator(KinesisStreamsSource.java:180)
>       at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:229)
>       ... 8 common frames omitted
> {code}
> This issue is less prominant for the {{KinesisStreamsSinkBuilder}} since it 
> requires a {{java.util.Properties}} at its setter.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to