[ 
https://issues.apache.org/jira/browse/FLINK-29236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Krishnaiah Narukulla updated FLINK-29236:
-----------------------------------------
    Description: 
SQL API:
{code:java}
 CREATE TEMPORARY TABLE `playevents` (upload_time BIGINT, log_id STRING) WITH ( 
  'connector' = 'kafka',
  'topic' = 'topic1', 
  'properties.bootstrap.servers' = xxx', 
  'properties.group.id' = 'kafka-krish-test3',
  'scan.startup.mode' = 'earliest-offset', 
  'format' = 'avro-cloudera',   
  'avro-cloudera.properties.schema.registry.url' = 'yyy',
  'avro-cloudera.schema-name'='zzz'
  ) {code}
{color:#000000}ClouderaRegistryAvroFormatFactory {color}
{code:java}
maven.artifact(
    group = "org.apache.flink",
    artifact = "flink-avro-cloudera-registry",
    version = "1.14.0-csadh1.6.0.1",
), {code}
{color:#000000}returns optionalOptions as ["schema-name", "properties.*"]

[https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L628]
 does not handle `wildcard patterns`. Hence its throwing error. {color}
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options 
found for 'kafka'.Unsupported 
options:avro-cloudera.properties.schema.registry.urlSupported 
options:avro-cloudera.properties.*
avro-cloudera.schema-name
connector
format
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode
scan.startup.specific-offsets
scan.startup.timestamp-millis
scan.topic-partition-discovery.interval
sink.delivery-guarantee
sink.parallelism
sink.partitioner
sink.semantic
sink.transactional-id-prefix
topic
topic-pattern
value.fields-include
value.format
        at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624)
        at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914)
        at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978)
        at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938)
        at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978)
        at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:176)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156)
 {code}
{color:#000000} {color}

  was:
SQL API:
{code:java}
 CREATE TEMPORARY TABLE `playevents` (upload_time BIGINT, log_id STRING) WITH ( 
  'connector' = 'kafka',
  'topic' = 'topic1', 
  'properties.bootstrap.servers' = xxx', 
  'properties.group.id' = 'kafka-krish-test3',
  'scan.startup.mode' = 'earliest-offset', 
  'format' = 'avro-cloudera',   
  'avro-cloudera.properties.schema.registry.url' = 'yyy',
  'avro-cloudera.schema-name'='zzz'
  ) {code}
{color:#000000}ClouderaRegistryAvroFormatFactory {color}
{code:java}
maven.artifact(
    group = "org.apache.flink",
    artifact = "flink-avro-cloudera-registry",
    version = "1.14.0-csadh1.6.0.1",
), {code}
{color:#000000}returns optionalOptions as ["schema-name", "{*}properties.*{*}"]

[https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L628]
 does not handle `wildcard patterns`. Hence its throwing error. {color}
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options 
found for 'kafka'.Unsupported 
options:avro-cloudera.properties.schema.registry.urlSupported 
options:avro-cloudera.properties.*
avro-cloudera.schema-name
connector
format
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode
scan.startup.specific-offsets
scan.startup.timestamp-millis
scan.topic-partition-discovery.interval
sink.delivery-guarantee
sink.parallelism
sink.partitioner
sink.semantic
sink.transactional-id-prefix
topic
topic-pattern
value.fields-include
value.format
        at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624)
        at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914)
        at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978)
        at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938)
        at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978)
        at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:176)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156)
 {code}
{color:#000000} {color}


> TableFactory wildcard options are not supported
> -----------------------------------------------
>
>                 Key: FLINK-29236
>                 URL: https://issues.apache.org/jira/browse/FLINK-29236
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.0, 1.15.0, 1.16.0
>            Reporter: Krishnaiah Narukulla
>            Priority: Major
>             Fix For: 1.16.0
>
>
> SQL API:
> {code:java}
>  CREATE TEMPORARY TABLE `playevents` (upload_time BIGINT, log_id STRING) WITH 
> ( 
>   'connector' = 'kafka',
>   'topic' = 'topic1', 
>   'properties.bootstrap.servers' = xxx', 
>   'properties.group.id' = 'kafka-krish-test3',
>   'scan.startup.mode' = 'earliest-offset', 
>   'format' = 'avro-cloudera',   
>   'avro-cloudera.properties.schema.registry.url' = 'yyy',
>   'avro-cloudera.schema-name'='zzz'
>   ) {code}
> {color:#000000}ClouderaRegistryAvroFormatFactory {color}
> {code:java}
> maven.artifact(
>     group = "org.apache.flink",
>     artifact = "flink-avro-cloudera-registry",
>     version = "1.14.0-csadh1.6.0.1",
> ), {code}
> {color:#000000}returns optionalOptions as ["schema-name", "properties.*"]
> [https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L628]
>  does not handle `wildcard patterns`. Hence its throwing error. {color}
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Unsupported 
> options found for 'kafka'.Unsupported 
> options:avro-cloudera.properties.schema.registry.urlSupported 
> options:avro-cloudera.properties.*
> avro-cloudera.schema-name
> connector
> format
> key.fields
> key.fields-prefix
> key.format
> properties.bootstrap.servers
> properties.group.id
> property-version
> scan.startup.mode
> scan.startup.specific-offsets
> scan.startup.timestamp-millis
> scan.topic-partition-discovery.interval
> sink.delivery-guarantee
> sink.parallelism
> sink.partitioner
> sink.semantic
> sink.transactional-id-prefix
> topic
> topic-pattern
> value.fields-include
> value.format
>         at 
> org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:624)
>         at 
> org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:914)
>         at 
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:978)
>         at 
> org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:938)
>         at 
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:978)
>         at 
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:176)
>         at 
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:156)
>  {code}
> {color:#000000} {color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to