[
https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Danny Cranmer updated FLINK-34044:
----------------------------------
Fix Version/s: aws-connector-4.3.0
> Kinesis Sink Cannot be Created via TableDescriptor
> --------------------------------------------------
>
> Key: FLINK-34044
> URL: https://issues.apache.org/jira/browse/FLINK-34044
> Project: Flink
> Issue Type: Bug
> Components: Connectors / AWS
> Affects Versions: aws-connector-4.2.0
> Reporter: Tilman Krokotsch
> Assignee: Ahmed Hamdy
> Priority: Major
> Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> When trying to create a Kinesis Stream Sink in Table API via a
> TableDescriptor I get an error:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException
> at
> java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
> at
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
> at
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
> at
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.<init>(KinesisStreamsConnectorOptionsUtils.java:90)
> at
> org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
> at
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
> ... 20 more
> {code}
> Here is a minimum reproducing example with Flink-1.17.2 and
> flink-connector-kinesis-4.2.0:
> {code:java}
> public class Job {
> public static void main(String[] args) throws Exception {
> // create data stream environment
> StreamExecutionEnvironment sEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
> Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
> tEnv.createTemporaryTable(
> "exampleTable",
> TableDescriptor.forConnector("datagen").schema(a).build());
> TableDescriptor descriptor =
> TableDescriptor.forConnector("kinesis")
> .schema(a)
> .format("json")
> .option("stream", "abc")
> .option("aws.region", "eu-central-1")
> .build();
> tEnv.createTemporaryTable("sinkTable", descriptor);
> tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
> }
> } {code}
> From my investigation, the error is triggered by the `ResolvedCatalogTable`
> used when re-mapping the deprecated Kinesis options in
> `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns
> an `UnmodifiableMap` which is not mutable.
> If the sink table is created via SQL, the error does not occur:
> {code:java}
> tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
> {code}
> because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)