[ 
https://issues.apache.org/jira/browse/FLINK-30857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686276#comment-17686276
 ] 

Yao Zhang commented on FLINK-30857:
-----------------------------------

Hi community,

After further investigation, I think FlinkCatalog should support managed table 
as it does not have a connector option. However 
org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java does not support 
customized topic. The relevant codes are:
{code:java}
@Override
public Map<String, String> enrichOptions(Context context) {
    Map<String, String> options = new 
HashMap<>(context.getCatalogTable().getOptions());
    Preconditions.checkArgument(
            !options.containsKey(TOPIC.key()),
            "Managed table can not contain custom topic. "
                    + "You need to remove topic in table options or session 
config.");

    String topic = context.getObjectIdentifier().asSummaryString();
    options.put(TOPIC.key(), topic);
    return options;
} {code}
I suggest it should support customized topic name. If the topic was not 
specified, it could be context.getObjectIdentifier().asSummaryString() instead.

As the issue of Kafka AdminClient, we might add default replication factor for 
better compatibility.

Correct me if I am wrong.

 

 

 

> Create table does not create topic with multiple partitions
> -----------------------------------------------------------
>
>                 Key: FLINK-30857
>                 URL: https://issues.apache.org/jira/browse/FLINK-30857
>             Project: Flink
>          Issue Type: Bug
>          Components: Table Store
>            Reporter: Vicky Papavasileiou
>            Priority: Major
>
>  
> {code:java}
> CREATE CATALOG table_store_catalog WITH (
>     'type'='table-store',
>     'warehouse'='s3://my-bucket/table-store'
>  );
> USE CATALOG table_store_catalog;
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count_kafka (
>      word STRING PRIMARY KEY NOT ENFORCED,
>      cnt BIGINT
>  ) WITH (
>      'log.system' = 'kafka',
>      'kafka.bootstrap.servers' = 'broker:9092',
>      'kafka.topic' = 'word_count_log',
>      'bucket'='4'
>  );
> {code}
>  
> The created topic has only one partition
> {code:java}
> Topic: word_count_log    TopicId: udeJwBIkRsSybkf1EerphA    PartitionCount: 1 
>    ReplicationFactor: 1    Configs:
>     Topic: word_count_log    Partition: 0    Leader: 1    Replicas: 1    Isr: 
> 1{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to