eolivelli commented on code in PR #19923:
URL: https://github.com/apache/pulsar/pull/19923#discussion_r1148505888
##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java:
##########
@@ -417,8 +421,19 @@ static BatchMessageSequenceRef
getMessageSequenceRefForBatchMessage(MessageId me
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
- final int partition = sourceRecord.getPartitionIndex().orElse(0);
- final String topic =
sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName),
sanitizeTopicName);
+ final int partition;
+ final String topic;
+
+ if (collapsePartitionedTopics
+ && sourceRecord.getTopicName().isPresent()
+ &&
TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
Review Comment:
Nit: we an save some work by calling TopicName.get only once
##########
pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java:
##########
@@ -94,6 +94,12 @@ public class PulsarKafkaConnectSinkConfig implements
Serializable {
+ "In some cases it may result in topic name collisions
(topic_a and topic.a will become the same)")
private boolean sanitizeTopicName = false;
+ @FieldDoc(
+ defaultValue = "false",
+ help = "Supply kafka record with topic name without -partition-
suffix for partitioned topics. \n"
+ + "Thi si sa workaround for
https://github.com/apache/pulsar/issues/19922")
Review Comment:
I am not sure we want to keep this reference here as it will go into
generated docs.
We can keep the reference as a regular comment in the code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]