XComp commented on code in PR #21247:
URL: https://github.com/apache/flink/pull/21247#discussion_r1017711266
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java:
##########
@@ -88,8 +88,11 @@ public class KafkaSourceReaderTest extends
SourceReaderTestBase<KafkaPartitionSp
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
- adminClient.createTopics(
- Collections.singleton(new NewTopic(TOPIC, NUM_PARTITIONS,
(short) 1)));
+ adminClient
+ .createTopics(
+ Collections.singleton(new NewTopic(TOPIC,
NUM_PARTITIONS, (short) 1)))
+ .all()
+ .get();
// Use the admin client to trigger the creation of internal
__consumer_offsets topic.
// This makes sure that we won't see unavailable coordinator in
the tests.
waitUtil(
Review Comment:
is the wait still necessary if we make the topic creation blocking?
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java:
##########
@@ -135,8 +134,16 @@ public void createTestTopic(String topic, int
numPartitions, int replicationFact
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
getBootstrapServers());
try (AdminClient admin = AdminClient.create(properties)) {
admin.createTopics(
- Collections.singletonList(
- new NewTopic(topic, numPartitions, (short)
replicationFactor)));
+ Collections.singletonList(
+ new NewTopic(topic, numPartitions, (short)
replicationFactor)))
+ .all()
+ .get();
+ } catch (Exception e) {
+ throw new IllegalStateException(
Review Comment:
We could have come up with a general exception for these kind of cases as
well since they all reflect the same issue. That way, we could have
encapsulated the `String.format` and varargs in the constructor. Just as an
idea... I'm not insisting on that change considering that we're planning to
create a follow-up where all of that is revisited, anyway.
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaInternalProducerITCase.java:
##########
@@ -204,7 +204,7 @@ public void testFlushAfterClosed() {
@Test(timeout = 30000L)
public void
testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws
Exception {
- String topic = "flink-kafka-producer-txn-coordinator-changed";
+ String topic = "flink-kafka-producer-txn-coordinator-changed" +
UUID.randomUUID();
Review Comment:
```suggestion
String topic = "flink-kafka-producer-txn-coordinator-changed-" +
UUID.randomUUID();
```
That's really a nitty thing but I would add a separator between the topic
prefix and the random ID. But I leave it up to you based on how much things
like that annoy you as well. :-D
This comment applies to a few other code locations as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]