XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017943854
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean
useLegacySchema) throws Exce
// create topics with content
final List<String> topics = new ArrayList<>();
for (int i = 0; i < numTopics; i++) {
- final String topic = topicNamePrefix + i;
+ final String topic = topicNamePrefix + i + UUID.randomUUID();
Review Comment:
```suggestion
final String topic = String.format("%s-%d-%s", topicNamePrefix,
i, UUID.randomUUID());
```
sorry for being nitpicky about this specific one but I feel like here it's
even more crucial that we add a separator since we provide this `i` identifier
to each topic which would be kind of swallowed into the UUID.
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java:
##########
@@ -100,10 +98,7 @@ private void createTopic(String topicName, int
numPartitions, short replicationF
replicationFactor);
NewTopic newTopic = new NewTopic(topicName, numPartitions,
replicationFactor);
try {
- kafkaAdminClient
Review Comment:
The diff `KafkaSinkExternalContext` should go into the FLINK-29914 commit.
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java:
##########
@@ -429,13 +428,12 @@ private void createTestTopic(String topic, int
numPartitions, short replicationF
admin.createTopics(
Collections.singletonList(
new NewTopic(topic, numPartitions,
replicationFactor)));
- result.all().get(1, TimeUnit.MINUTES);
Review Comment:
The diff if `KafkaSinkITCase` should go into the FLINK-29914 commit.
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java:
##########
@@ -1238,7 +1238,7 @@ public void runProduceConsumeMultipleTopics(boolean
useLegacySchema) throws Exce
// create topics with content
final List<String> topics = new ArrayList<>();
for (int i = 0; i < numTopics; i++) {
- final String topic = topicNamePrefix + i;
+ final String topic = topicNamePrefix + i + UUID.randomUUID();
Review Comment:
I noticed that we have also other occurrences where we're not adding the
UUID to the topic (I didn't check that in my first pass). Is there a specific
reason why we only select certain topics? Alternatively, we could add the
randomization of topics to
[KafkaTestBase.createTestTopic](https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java#L215)
and
[KafkaTableTestBase](https://github.com/apache/flink/blob/ca9fea2459f7e066eac0fb59b382e58c41e0d702/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java#L132).
We would have to change the methods' signatures to return the randomized topic
String (we should rename the method name in that case as well) which will then
be used in the corresponding tests:
```
// old method signature:
public void createTestTopic(String topic, int numPartitions, int
replicationFactor) {
// ...
// new method signature:
public String createRandomizedTestTopic(String topicPrefix, int
numPartitions, int replicationFactor) {
```
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]