[
https://issues.apache.org/jira/browse/FLINK-30857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686276#comment-17686276
]
Yao Zhang commented on FLINK-30857:
-----------------------------------
Hi community,
After further investigation, I think FlinkCatalog should support managed table
as it does not have a connector option. However
org/apache/flink/table/store/kafka/KafkaLogStoreFactory.java does not support
customized topic. The relevant codes are:
{code:java}
@Override
public Map<String, String> enrichOptions(Context context) {
Map<String, String> options = new
HashMap<>(context.getCatalogTable().getOptions());
Preconditions.checkArgument(
!options.containsKey(TOPIC.key()),
"Managed table can not contain custom topic. "
+ "You need to remove topic in table options or session
config.");
String topic = context.getObjectIdentifier().asSummaryString();
options.put(TOPIC.key(), topic);
return options;
} {code}
I suggest it should support customized topic name. If the topic was not
specified, it could be context.getObjectIdentifier().asSummaryString() instead.
As the issue of Kafka AdminClient, we might add default replication factor for
better compatibility.
Correct me if I am wrong.
> Create table does not create topic with multiple partitions
> -----------------------------------------------------------
>
> Key: FLINK-30857
> URL: https://issues.apache.org/jira/browse/FLINK-30857
> Project: Flink
> Issue Type: Bug
> Components: Table Store
> Reporter: Vicky Papavasileiou
> Priority: Major
>
>
> {code:java}
> CREATE CATALOG table_store_catalog WITH (
> 'type'='table-store',
> 'warehouse'='s3://my-bucket/table-store'
> );
> USE CATALOG table_store_catalog;
> SET 'execution.checkpointing.interval' = '10 s';
> CREATE TABLE word_count_kafka (
> word STRING PRIMARY KEY NOT ENFORCED,
> cnt BIGINT
> ) WITH (
> 'log.system' = 'kafka',
> 'kafka.bootstrap.servers' = 'broker:9092',
> 'kafka.topic' = 'word_count_log',
> 'bucket'='4'
> );
> {code}
>
> The created topic has only one partition
> {code:java}
> Topic: word_count_log TopicId: udeJwBIkRsSybkf1EerphA PartitionCount: 1
> ReplicationFactor: 1 Configs:
> Topic: word_count_log Partition: 0 Leader: 1 Replicas: 1 Isr:
> 1{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)