JingGe commented on a change in pull request #18656:
URL: https://github.com/apache/flink/pull/18656#discussion_r804551791
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
##########
@@ -212,11 +186,19 @@ public void createTestTopic(
try (AdminClient adminClient =
AdminClient.create(getStandardProperties())) {
NewTopic topicObj = new NewTopic(topic, numberOfPartitions,
(short) replicationFactor);
adminClient.createTopics(Collections.singleton(topicObj)).all().get();
- for (KafkaServer kafkaServer : brokers) {
+ Properties consumerProps = new Properties();
+ consumerProps.putAll(standardProps);
+ consumerProps.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ VoidDeserializer.class.getCanonicalName());
+ consumerProps.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ VoidDeserializer.class.getCanonicalName());
Review comment:
Could you extract method at there to reduce duplicated code?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
##########
@@ -504,4 +394,102 @@ public void close() {
offsetClient.close();
}
}
+
+ private void startKafkaContainerCluster(int numBrokers) {
+ Network network = Network.newNetwork();
+ if (numBrokers > 1) {
+ zookeeper = createZookeeperContainer(network);
+ zookeeper.start();
+ LOG.info("Zookeeper container started");
+ }
+ for (int brokerID = 0; brokerID < numBrokers; brokerID++) {
+ KafkaContainer broker = createKafkaContainer(network, brokerID,
zookeeper);
+ brokers.put(brokerID, broker);
+ }
+ new
ArrayList<>(brokers.values()).parallelStream().forEach(GenericContainer::start);
+ LOG.info("{} brokers started", numBrokers);
+ brokerConnectionString =
+ brokers.values().stream()
+ .map(KafkaContainer::getBootstrapServers)
+ // Here we have URL like
"PLAINTEXT://127.0.0.1:15213", and we only keep the
+ // "127.0.0.1:15213" part in broker connection string
+ .map(server -> server.split("://")[1])
+ .collect(Collectors.joining(","));
+ }
+
+ private GenericContainer<?> createZookeeperContainer(Network network) {
+ return new
GenericContainer<>(DockerImageName.parse(DockerImageVersions.ZOOKEEPER))
+ .withNetwork(network)
+ .withNetworkAliases(ZOOKEEPER_HOSTNAME)
+ .withEnv("ZOOKEEPER_CLIENT_PORT",
String.valueOf(ZOOKEEPER_PORT));
+ }
+
+ private KafkaContainer createKafkaContainer(
+ Network network, int brokerID, @Nullable GenericContainer<?>
zookeeper) {
+ String brokerName = String.format("Kafka-%d", brokerID);
+ KafkaContainer broker =
+ KafkaUtil.createKafkaContainer(DockerImageVersions.KAFKA, LOG,
brokerName)
+ .withNetwork(network)
+ .withNetworkAliases(brokerName)
+ .withEnv("KAFKA_BROKER_ID", String.valueOf(brokerID))
+ .withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50
* 1024 * 1024))
+ .withEnv("KAFKA_REPLICA_FETCH_MAX_BYTES",
String.valueOf(50 * 1024 * 1024))
+ .withEnv(
+ "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+ Integer.toString(1000 * 60 * 60 * 2))
+ // Disable log deletion to prevent records from being
deleted during test
+ // run
+ .withEnv("KAFKA_LOG_RETENTION_MS", "-1")
+ .withEnv("KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS",
String.valueOf(zkTimeout))
+ .withEnv(
+ "KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS",
String.valueOf(zkTimeout));
+
+ if (zookeeper != null) {
+ broker.dependsOn(zookeeper)
+ .withExternalZookeeper(
+ String.format("%s:%d", ZOOKEEPER_HOSTNAME,
ZOOKEEPER_PORT));
+ } else {
+ broker.withEmbeddedZookeeper();
+ }
+ return broker;
+ }
+
+ private void pause(int brokerId) {
+ if (pausedBroker.contains(brokerId)) {
+ return;
+ }
+ DockerClientFactory.instance()
+ .client()
+ .pauseContainerCmd(brokers.get(brokerId).getContainerId())
+ .exec();
+ pausedBroker.add(brokerId);
+ LOG.info("Broker {} is paused", brokerId);
+ }
+
+ private void unpause(int brokerId) throws Exception {
+ if (!pausedBroker.contains(brokerId)) {
+ throw new IllegalStateException(
Review comment:
Is LOG.warn(...) good enough?
##########
File path:
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
##########
@@ -504,4 +394,102 @@ public void close() {
offsetClient.close();
}
}
+
+ private void startKafkaContainerCluster(int numBrokers) {
+ Network network = Network.newNetwork();
+ if (numBrokers > 1) {
+ zookeeper = createZookeeperContainer(network);
+ zookeeper.start();
+ LOG.info("Zookeeper container started");
+ }
+ for (int brokerID = 0; brokerID < numBrokers; brokerID++) {
+ KafkaContainer broker = createKafkaContainer(network, brokerID,
zookeeper);
+ brokers.put(brokerID, broker);
+ }
+ new
ArrayList<>(brokers.values()).parallelStream().forEach(GenericContainer::start);
+ LOG.info("{} brokers started", numBrokers);
+ brokerConnectionString =
+ brokers.values().stream()
+ .map(KafkaContainer::getBootstrapServers)
+ // Here we have URL like
"PLAINTEXT://127.0.0.1:15213", and we only keep the
+ // "127.0.0.1:15213" part in broker connection string
+ .map(server -> server.split("://")[1])
+ .collect(Collectors.joining(","));
+ }
+
+ private GenericContainer<?> createZookeeperContainer(Network network) {
+ return new
GenericContainer<>(DockerImageName.parse(DockerImageVersions.ZOOKEEPER))
+ .withNetwork(network)
+ .withNetworkAliases(ZOOKEEPER_HOSTNAME)
+ .withEnv("ZOOKEEPER_CLIENT_PORT",
String.valueOf(ZOOKEEPER_PORT));
+ }
+
+ private KafkaContainer createKafkaContainer(
+ Network network, int brokerID, @Nullable GenericContainer<?>
zookeeper) {
+ String brokerName = String.format("Kafka-%d", brokerID);
+ KafkaContainer broker =
+ KafkaUtil.createKafkaContainer(DockerImageVersions.KAFKA, LOG,
brokerName)
+ .withNetwork(network)
+ .withNetworkAliases(brokerName)
+ .withEnv("KAFKA_BROKER_ID", String.valueOf(brokerID))
+ .withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50
* 1024 * 1024))
+ .withEnv("KAFKA_REPLICA_FETCH_MAX_BYTES",
String.valueOf(50 * 1024 * 1024))
+ .withEnv(
+ "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
+ Integer.toString(1000 * 60 * 60 * 2))
+ // Disable log deletion to prevent records from being
deleted during test
+ // run
+ .withEnv("KAFKA_LOG_RETENTION_MS", "-1")
+ .withEnv("KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS",
String.valueOf(zkTimeout))
+ .withEnv(
+ "KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS",
String.valueOf(zkTimeout));
+
+ if (zookeeper != null) {
+ broker.dependsOn(zookeeper)
+ .withExternalZookeeper(
+ String.format("%s:%d", ZOOKEEPER_HOSTNAME,
ZOOKEEPER_PORT));
+ } else {
+ broker.withEmbeddedZookeeper();
+ }
+ return broker;
+ }
+
+ private void pause(int brokerId) {
+ if (pausedBroker.contains(brokerId)) {
+ return;
Review comment:
LOG.warn(...)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]