ferenc-csaky commented on code in PR #104:
URL:
https://github.com/apache/flink-connector-pulsar/pull/104#discussion_r2440777883
##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java:
##########
@@ -218,6 +219,14 @@ private <T> Producer<T> getOrCreateProducer(String topic,
Schema<T> schema)
try {
// Use this method for auto creating the non-exist topics.
Otherwise, it will throw an
// exception.
+ TopicName topicName = TopicName.get(topic);
+ // Step-1: create partitioned topic metadata.
+ if (topicName.isPartitioned()) {
+ pulsarClient
+
.getPartitionsForTopic(TopicName.get(topic).getPartitionedTopicName())
+ .get();
+ }
+ // Step-2: create partition.
Review Comment:
Shouldn't step 2 inside an `else` branch? I don't know anything about the
pulsar client but duplicating this call does not seem right.
##########
flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java:
##########
@@ -218,6 +219,14 @@ private <T> Producer<T> getOrCreateProducer(String topic,
Schema<T> schema)
try {
// Use this method for auto creating the non-exist topics.
Otherwise, it will throw an
// exception.
+ TopicName topicName = TopicName.get(topic);
+ // Step-1: create partitioned topic metadata.
+ if (topicName.isPartitioned()) {
+ pulsarClient
+
.getPartitionsForTopic(TopicName.get(topic).getPartitionedTopicName())
+ .get();
Review Comment:
It seems to me this could be simplified to:
```java
pulsarClient.getPartitionsForTopic(topicName.getPartitionedTopicName()).get();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]