[ 
https://issues.apache.org/jira/browse/FLINK-30857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17685140#comment-17685140
 ] 

Yao Zhang commented on FLINK-30857:
-----------------------------------

Hi all,

I am currently investigating this issue.

It seems that the onCreateTable method in 
flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
 invokes AdminClient of Kafka to create topic with number of partitions 
assigned. The relevant codes are listed below:
{code:java}
@Override
public void onCreateTable(Context context, int numBucket, boolean 
ignoreIfExists) {
    Configuration options = 
Configuration.fromMap(context.getCatalogTable().getOptions());
    try (AdminClient adminClient = 
AdminClient.create(toKafkaProperties(options))) {
        Map<String, String> configs = new HashMap<>();
        options.getOptional(LOG_RETENTION)
                .ifPresent(
                        retention ->
                                configs.put(
                                        TopicConfig.RETENTION_MS_CONFIG,
                                        String.valueOf(retention.toMillis())));
        NewTopic topicObj =
                new NewTopic(topic(context), Optional.of(numBucket), 
Optional.empty())
                        .configs(configs);
        adminClient.createTopics(Collections.singleton(topicObj)).all().get();

// ...

} {code}
However, onCreateTable method never has the chance to be executed, as Flink 
table store catalog(implemented by 
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java)
 does not support managed table (it does not override thesupportsManagedTable 
method in org/apache/flink/table/catalog/Catalog.java).

I tried to make it support managed table and updated the sql as below:
{code:java}
                String.format(
                        "CREATE TABLE T (a STRING, b STRING, c STRING) WITH ("
                                + "'log.system'='kafka', "
                                + "'root-path'='/path/to/tablestore-data',"
                                + "'kafka.bootstrap.servers'='%s',"
                                + "'kafka.transaction.timeout.ms'='300000',"
                                + "'table.type'='MANAGED_TABLE',"
                                + "'bucket'='9'"
                                + ")",
                        "kafka1:9092:kafka2:9092,kafka3:9092", "flink-demo")); 
{code}
(As managed table does not allow customizing kafka topic so kafka.topic option 
was removed. )

Now the onCreateTable method is able to be invoked but I got another exception:
{code:java}
Creating topics with default partitions/replication factor are only supported 
in CreateTopicRequest version 4+ {code}
It says default values of partitions or replication factor are not supported 
while creating new topic. I made another change:

From: 
{code:java}
new NewTopic(topic(context), Optional.of(numBucket), Optional.empty()) 
.configs(configs); {code}
To:
{code:java}
// Optional.of((short) 3) is just a value for test purpose only
 NewTopic topicObj = new NewTopic(topic(context), Optional.of(numBucket), 
Optional.of((short) 3))
                            .configs(configs);{code}
Finally the topic with correct partitions and replication factor was 
successfully created.

 

I think there are 2 questions need some further discussion:
 # Should FlinkCatalog support managed table? Even if it is working as 
supporting external tables, the Kafka topic also has to be created via 
AdminClient.
 # Creating new topic with default partitions/replication factor by AdminClient 
seems to have limited support. We have to assign default partitions/replication 
factor in Flink configuration or SQL with clause.

 

> Create table does not create topic with multiple partitions
> -----------------------------------------------------------
>
>                 Key: FLINK-30857
>                 URL: https://issues.apache.org/jira/browse/FLINK-30857
>             Project: Flink
>          Issue Type: Bug
>          Components: Table Store
>            Reporter: Vicky Papavasileiou
>            Priority: Major
>
>  
> {code:java}
> CREATE CATALOG table_store_catalog WITH (
>     'type'='table-store',
>     'warehouse'='s3://my-bucket/table-store'
>  );
> USE CATALOG table_store_catalog;
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count_kafka (
>      word STRING PRIMARY KEY NOT ENFORCED,
>      cnt BIGINT
>  ) WITH (
>      'log.system' = 'kafka',
>      'kafka.bootstrap.servers' = 'broker:9092',
>      'kafka.topic' = 'word_count_log',
>      'bucket'='4'
>  );
> {code}
>  
> The created topic has only one partition
> {code:java}
> Topic: word_count_log    TopicId: udeJwBIkRsSybkf1EerphA    PartitionCount: 1 
>    ReplicationFactor: 1    Configs:
>     Topic: word_count_log    Partition: 0    Leader: 1    Replicas: 1    Isr: 
> 1{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to