[
https://issues.apache.org/jira/browse/FLINK-30857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17685140#comment-17685140
]
Yao Zhang commented on FLINK-30857:
-----------------------------------
Hi all,
I am currently investigating this issue.
It seems that the onCreateTable method in
flink-table-store-kafka/src/main/java/org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java
invokes AdminClient of Kafka to create topic with number of partitions
assigned. The relevant codes are listed below:
{code:java}
@Override
public void onCreateTable(Context context, int numBucket, boolean
ignoreIfExists) {
Configuration options =
Configuration.fromMap(context.getCatalogTable().getOptions());
try (AdminClient adminClient =
AdminClient.create(toKafkaProperties(options))) {
Map<String, String> configs = new HashMap<>();
options.getOptional(LOG_RETENTION)
.ifPresent(
retention ->
configs.put(
TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(retention.toMillis())));
NewTopic topicObj =
new NewTopic(topic(context), Optional.of(numBucket),
Optional.empty())
.configs(configs);
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
// ...
} {code}
However, onCreateTable method never has the chance to be executed, as Flink
table store catalog(implemented by
flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java)
does not support managed table (it does not override thesupportsManagedTable
method in org/apache/flink/table/catalog/Catalog.java).
I tried to make it support managed table and updated the sql as below:
{code:java}
String.format(
"CREATE TABLE T (a STRING, b STRING, c STRING) WITH ("
+ "'log.system'='kafka', "
+ "'root-path'='/path/to/tablestore-data',"
+ "'kafka.bootstrap.servers'='%s',"
+ "'kafka.transaction.timeout.ms'='300000',"
+ "'table.type'='MANAGED_TABLE',"
+ "'bucket'='9'"
+ ")",
"kafka1:9092:kafka2:9092,kafka3:9092", "flink-demo"));
{code}
(As managed table does not allow customizing kafka topic so kafka.topic option
was removed. )
Now the onCreateTable method is able to be invoked but I got another exception:
{code:java}
Creating topics with default partitions/replication factor are only supported
in CreateTopicRequest version 4+ {code}
It says default values of partitions or replication factor are not supported
while creating new topic. I made another change:
From:
{code:java}
new NewTopic(topic(context), Optional.of(numBucket), Optional.empty())
.configs(configs); {code}
To:
{code:java}
// Optional.of((short) 3) is just a value for test purpose only
NewTopic topicObj = new NewTopic(topic(context), Optional.of(numBucket),
Optional.of((short) 3))
.configs(configs);{code}
Finally the topic with correct partitions and replication factor was
successfully created.
I think there are 2 questions need some further discussion:
# Should FlinkCatalog support managed table? Even if it is working as
supporting external tables, the Kafka topic also has to be created via
AdminClient.
# Creating new topic with default partitions/replication factor by AdminClient
seems to have limited support. We have to assign default partitions/replication
factor in Flink configuration or SQL with clause.
> Create table does not create topic with multiple partitions
> -----------------------------------------------------------
>
> Key: FLINK-30857
> URL: https://issues.apache.org/jira/browse/FLINK-30857
> Project: Flink
> Issue Type: Bug
> Components: Table Store
> Reporter: Vicky Papavasileiou
> Priority: Major
>
>
> {code:java}
> CREATE CATALOG table_store_catalog WITH (
> 'type'='table-store',
> 'warehouse'='s3://my-bucket/table-store'
> );
> USE CATALOG table_store_catalog;
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count_kafka (
> word STRING PRIMARY KEY NOT ENFORCED,
> cnt BIGINT
> ) WITH (
> 'log.system' = 'kafka',
> 'kafka.bootstrap.servers' = 'broker:9092',
> 'kafka.topic' = 'word_count_log',
> 'bucket'='4'
> );
> {code}
>
> The created topic has only one partition
> {code:java}
> Topic: word_count_log TopicId: udeJwBIkRsSybkf1EerphA PartitionCount: 1
> ReplicationFactor: 1 Configs:
> Topic: word_count_log Partition: 0 Leader: 1 Replicas: 1 Isr:
> 1{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)