Savonitar commented on code in PR #237:
URL:
https://github.com/apache/flink-connector-kafka/pull/237#discussion_r2961110405
##########
flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
##########
@@ -139,43 +137,8 @@ private void tryDelete(AdminClient adminClient, String
topic) throws Exception {
@Override
public void createTestTopic(
String topic, int numberOfPartitions, int replicationFactor,
Properties properties) {
- createNewTopic(topic, numberOfPartitions, replicationFactor,
getStandardProperties());
- }
-
- public static void createNewTopic(
- String topic, int numberOfPartitions, int replicationFactor,
Properties properties) {
- LOG.info("Creating topic {}", topic);
- try (AdminClient adminClient = AdminClient.create(properties)) {
- NewTopic topicObj = new NewTopic(topic, numberOfPartitions,
(short) replicationFactor);
-
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
- CommonTestUtils.waitUtil(
- () -> {
- try {
- // Ensure all partitions have a leader elected and
logs initialized
- Map<TopicPartition, OffsetSpec> offsetSpecs = new
HashMap<>();
- for (int i = 0; i < numberOfPartitions; i++) {
- offsetSpecs.put(
- new TopicPartition(topic, i),
OffsetSpec.earliest());
- }
- adminClient
- .listOffsets(offsetSpecs)
- .all()
- .get(REQUEST_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
- return true;
- } catch (Exception e) {
- LOG.warn(
- "Partitions for topic {} not yet ready to
serve requests",
- topic,
- e);
- return false;
- }
- },
- Duration.ofSeconds(30),
- String.format("New topic \"%s\" is not ready within
timeout", topicObj));
- } catch (Exception e) {
- e.printStackTrace();
- fail("Create test topic : " + topic + " failed, " +
e.getMessage());
- }
+ createNewTopicAndWaitForPartitionAssignment(
+ topic, numberOfPartitions, replicationFactor, properties);
Review Comment:
Previously, in the old code, it was `getStandardProperties()`.
I see that this method is called from
[org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment](https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java#L85),
and the call site passes empty properties (`new Properties()`). Then these
empty properties are used by `AdminClient`.
Meanwhile, `getStandardProperties()` used properties with `bootstrap.server`
and others.
I recommend to keep existing behavior (passing `getStandardProperties()`),
unless Im missing something, indeed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]