[ 
https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-34044:
-------------------------------------

    Assignee: Ahmed Hamdy

> Kinesis Sink Cannot be Created via TableDescriptor
> --------------------------------------------------
>
>                 Key: FLINK-34044
>                 URL: https://issues.apache.org/jira/browse/FLINK-34044
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / AWS
>    Affects Versions: aws-connector-4.2.0
>            Reporter: Tilman Krokotsch
>            Assignee: Ahmed Hamdy
>            Priority: Major
>              Labels: pull-request-available
>
> When trying to create a Kinesis Stream Sink in Table API via a 
> TableDescriptor I get an error:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException
>     at 
> java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.<init>(KinesisStreamsConnectorOptionsUtils.java:90)
>     at 
> org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
>     at 
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
>     ... 20 more
> {code}
> Here is a minimum reproducing example with Flink-1.17.2 and 
> flink-connector-kinesis-4.2.0:
> {code:java}
> public class Job {
>   public static void main(String[] args) throws Exception {
>     // create data stream environment
>     StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>     sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
>     Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
>     tEnv.createTemporaryTable(
>         "exampleTable", 
> TableDescriptor.forConnector("datagen").schema(a).build());
>     TableDescriptor descriptor =
>         TableDescriptor.forConnector("kinesis")
>             .schema(a)
>             .format("json")
>             .option("stream", "abc")
>             .option("aws.region", "eu-central-1")
>             .build();
>     tEnv.createTemporaryTable("sinkTable", descriptor);
>     tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
>   }
> } {code}
> From my investigation, the error is triggered by the `ResolvedCatalogTable` 
> used when re-mapping the deprecated Kinesis options in 
> `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns 
> an `UnmodifiableMap` which is not mutable.
> If the sink table is created via SQL, the error does not occur:
> {code:java}
> tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
> {code}
> because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to