alpreu commented on a change in pull request #17539:
URL: https://github.com/apache/flink/pull/17539#discussion_r735335020



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
##########
@@ -60,15 +60,10 @@
 
     @ClassRule
     public static final KafkaContainer KAFKA_CONTAINER =
-            new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0"))
-                    .withEmbeddedZookeeper()
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", 
"1")
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
-                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
-                    .withEnv(
-                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                            String.valueOf(Duration.ofHours(2).toMillis()))
-                    .withLogConsumer(LOG_CONSUMER);
+            configureKafkaContainer(
+                    LOG,
+                    new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0"))

Review comment:
       Use String constant from org.apache.flink.util.DockerImageVersions here? 
   Why does this test use a different version of Kafka?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
##########
@@ -55,12 +55,10 @@
 
     @Container
     private static final KafkaContainer KAFKA_CONTAINER =
-            new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", 
"1")
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
-                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
-                    .withLogConsumer(new Slf4jLogConsumer(LOG))
-                    .withEmbeddedZookeeper();
+            configureKafkaContainer(
+                    LOG,
+                    new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))

Review comment:
       Use String constant from org.apache.flink.util.DockerImageVersions here?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review comment:
       Is this intentional?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
##########
@@ -91,17 +91,12 @@
     private TriggerTimeService timeService;
 
     private static final KafkaContainer KAFKA_CONTAINER =
-            new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
-                    .withEmbeddedZookeeper()
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", 
"1")
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
-                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
-                    .withEnv(
-                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                            String.valueOf(Duration.ofHours(2).toMillis()))
-                    .withNetwork(NETWORK)
-                    .withLogConsumer(LOG_CONSUMER)
-                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+            configureKafkaContainer(
+                    LOG,
+                    new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))

Review comment:
       Use String constant from org.apache.flink.util.DockerImageVersions here?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -124,17 +124,12 @@
 
     @ClassRule
     public static final KafkaContainer KAFKA_CONTAINER =
-            new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
-                    .withEmbeddedZookeeper()
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", 
"1")
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
-                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
-                    .withEnv(
-                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                            String.valueOf(Duration.ofHours(2).toMillis()))
-                    .withNetwork(NETWORK)
-                    .withLogConsumer(LOG_CONSUMER)
-                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+            configureKafkaContainer(
+                    LOG,
+                    new 
KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))

Review comment:
       Use String constant from org.apache.flink.util.DockerImageVersions here?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java
##########
@@ -43,6 +45,46 @@
 
     private KafkaUtil() {}
 
+    /**
+     * This method helps to set commonly used Kafka configurations and aligns 
the internal Kafka log
+     * levels with the ones used by the capturing logger.
+     *
+     * @param logger to derive the log level from
+     * @param container running Kafka
+     * @return configured Kafka container
+     */
+    public static KafkaContainer configureKafkaContainer(Logger logger, 
KafkaContainer container) {
+        String logLevel;
+        if (logger.isErrorEnabled()) {
+            logLevel = "ERROR";
+        } else if (logger.isTraceEnabled()) {
+            logLevel = "TRACE";
+        } else if (logger.isDebugEnabled()) {
+            logLevel = "DEBUG";
+        } else if (logger.isWarnEnabled()) {
+            logLevel = "WARN";
+        } else if (logger.isInfoEnabled()) {
+            logLevel = "INFO";
+        } else {
+            throw new IllegalStateException("Unsupported log level 
configured.");

Review comment:
       Doesn't this fail if logging is set to OFF?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to