JingGe commented on a change in pull request #18246:
URL: https://github.com/apache/flink/pull/18246#discussion_r786885133



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSourceTestRecordGenerator.java
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.source.testutils;
+
+import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Util class for generating records in Kafka source tests. */
+public class KafkaSourceTestRecordGenerator {
+
+    public static final int NUM_RECORDS_PER_PARTITION = 10;
+    public static final Class<StringSerializer> KEY_SERIALIZER = 
StringSerializer.class;
+    public static final Class<IntegerSerializer> VALUE_SERIALIZER = 
IntegerSerializer.class;
+    public static final Class<StringDeserializer> KEY_DESERIALIZER = 
StringDeserializer.class;
+    public static final Class<IntegerDeserializer> VALUE_DESERIALIZER = 
IntegerDeserializer.class;
+
+    public static Map<Integer, Map<String, KafkaPartitionSplit>> 
getSplitsByOwners(

Review comment:
       javadoc is missing

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
##########
@@ -72,221 +67,217 @@
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static 
org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestRecordGenerator.KEY_SERIALIZER;
+import static 
org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestRecordGenerator.VALUE_SERIALIZER;
+import static 
org.apache.flink.connector.kafka.source.testutils.KafkaSourceTestRecordGenerator.getRecordsForTopic;
+import static 
org.apache.flink.connector.kafka.testutils.extension.KafkaContext.DEFAULT_NUM_PARTITIONS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for {@link KafkaSource}. */
+class KafkaSourceITCase extends SourceTestSuiteBase<String> {
+
+    // Kafka cluster
+    @RegisterExtension static final KafkaExtension KAFKA = new 
KafkaExtension();
+
+    // Defines test environment on Flink MiniCluster
+    @TestEnv static final MiniClusterTestEnvironment FLINK = new 
MiniClusterTestEnvironment();
+
+    // Defines 2 External context Factories, so test cases will be invoked 
twice using these two
+    // kinds of external contexts.
+    @SuppressWarnings("unused")
+    @ExternalContextFactory
+    KafkaSingleTopicExternalContext.Factory singleTopic =
+            new KafkaSingleTopicExternalContext.Factory(
+                    KafkaSourceITCase.KAFKA::getBootstrapServers);
+
+    @SuppressWarnings("unused")
+    @ExternalContextFactory
+    KafkaMultipleTopicExternalContext.Factory multipleTopic =
+            new KafkaMultipleTopicExternalContext.Factory(
+                    KafkaSourceITCase.KAFKA::getBootstrapServers);
 
-/** Unite test class for {@link KafkaSource}. */
-public class KafkaSourceITCase {
     private static final String TOPIC1 = "topic1";
     private static final String TOPIC2 = "topic2";
 
-    @Nested
-    @TestInstance(Lifecycle.PER_CLASS)
-    class KafkaSpecificTests {
-        @BeforeAll
-        public void setup() throws Throwable {
-            KafkaSourceTestEnv.setup();
-            KafkaSourceTestEnv.setupTopic(
-                    TOPIC1, true, true, 
KafkaSourceTestEnv::getRecordsForTopicWithoutTimestamp);
-            KafkaSourceTestEnv.setupTopic(
-                    TOPIC2, true, true, 
KafkaSourceTestEnv::getRecordsForTopicWithoutTimestamp);
-        }
+    @BeforeAll
+    void setup() throws Throwable {

Review comment:
       Is it possible to use @Topic in this case? There are also some other 
cases where `kafkaContext.createToic(...)` has been used. What is the rule of 
choosing one of both solutions to create topics?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/extension/KafkaContext.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.testutils.extension;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Context for interacting with {@link KafkaExtension}.
+ *
+ * <p>A typical usage of this context is to use as a parameter of test method 
with {@link
+ * KafkaExtension} registered in the test class.
+ *
+ * <pre>{@code
+ * @Kafka
+ * public class KafkaTest {
+ *     @Test
+ *     void test(KafkaContext kafkaContext) {...}
+ * }
+ * }</pre>
+ *
+ * <p>If you need to interact with Kafka at some place that cannot be injected 
as parameters (such
+ * as creating topic in method annotated by {@link BeforeAll}), you can create 
the context with
+ * {@link KafkaExtension#createKafkaContext()}:
+ *
+ * <pre>{@code
+ * public class KafkaTest {
+ *     @RegisterExtension
+ *     static final KafkaExtension KAFKA = new KafkaExtension();
+ *
+ *     @BeforeAll
+ *     static void prepare() {
+ *         KafkaContext kafkaContext = KAFKA.createKafkaContext();
+ *         kafkaContext.createTopic("test");
+ *     }
+ *
+ *     @Test
+ *     void test(KafkaContext kafkaContext) {...}
+ * }
+ * }</pre>
+ */
+public class KafkaContext {

Review comment:
       This class actives more like a KafkaClient or KafkaToolkit. Using 
"Context" will confuse the user.

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/extension/KafkaContext.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.testutils.extension;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+/**
+ * Context for interacting with {@link KafkaExtension}.
+ *
+ * <p>A typical usage of this context is to use as a parameter of test method 
with {@link
+ * KafkaExtension} registered in the test class.
+ *
+ * <pre>{@code
+ * @Kafka
+ * public class KafkaTest {
+ *     @Test
+ *     void test(KafkaContext kafkaContext) {...}
+ * }
+ * }</pre>
+ *
+ * <p>If you need to interact with Kafka at some place that cannot be injected 
as parameters (such
+ * as creating topic in method annotated by {@link BeforeAll}), you can create 
the context with
+ * {@link KafkaExtension#createKafkaContext()}:
+ *
+ * <pre>{@code
+ * public class KafkaTest {
+ *     @RegisterExtension
+ *     static final KafkaExtension KAFKA = new KafkaExtension();

Review comment:
       I would suggest considering it again, whether we should use @Kafka, 
especially when there are still some cases lead us go back to use 
KafkaExtension. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/annotations/Kafka.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.testutils.annotations;
+
+import org.apache.flink.connector.kafka.testutils.extension.KafkaExtension;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/** Shortcut annotation for {@code @ExtendWith(KafkaExtension.class)}. */
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@ExtendWith(KafkaExtension.class)
+public @interface Kafka {}

Review comment:
       It is nice to have these syntax sugar. It would be also fine to expose 
it, since the developer will be aware of using JUnit extension and the 
@ExtendWith(TestLoggerExtension.class) has been used too. It would be great if 
we could provide consistency. 
   
   One more thing, there should be some additional guideline for it, otherwise 
we will end up with using both @kafka and @ExtendWith(KafkaExtension.class) 
simultaneously. If we decide to use this syntax sugar, a ticket should be 
created to extend the archunit test to cover it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to