This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0bc2234b60d1a0635e238d18990695943158123c Author: Fabian Paul <[email protected]> AuthorDate: Tue Dec 14 16:03:29 2021 +0100 [FLINK-25266][e2e] Convert StreamingKafkaITCase to SmokeKafkaITCase covering application packaging --- .../flink/tests/util/kafka/SmokeKafkaITCase.java | 185 ++++++++++++++++++ .../tests/util/kafka/StreamingKafkaITCase.java | 216 --------------------- .../flink/streaming/kafka/test/KafkaExample.java | 54 +++--- 3 files changed, 212 insertions(+), 243 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java new file mode 100644 index 0000000..c1a706f --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.kafka; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.connector.kafka.testutils.KafkaUtil; +import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.JobSubmission; +import org.apache.flink.tests.util.flink.container.FlinkContainers; +import org.apache.flink.testutils.junit.FailsOnJava11; +import org.apache.flink.util.TestLoggerExtension; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.VoidDeserializer; +import org.apache.kafka.common.serialization.VoidSerializer; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer; +import static org.apache.flink.util.DockerImageVersions.KAFKA; +import static org.assertj.core.api.Assertions.assertThat; + +/** smoke test for the kafka connectors. */ +@Category(value = {FailsOnJava11.class}) +@ExtendWith({TestLoggerExtension.class}) +@Testcontainers +public class SmokeKafkaITCase { + + private static final Logger LOG = LoggerFactory.getLogger(SmokeKafkaITCase.class); + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final Network NETWORK = Network.newNetwork(); + private static final String EXAMPLE_JAR_MATCHER = "flink-streaming-kafka-test.*"; + + @Container + public static final KafkaContainer KAFKA_CONTAINER = + createKafkaContainer(KAFKA, LOG) + .withEmbeddedZookeeper() + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + @RegisterExtension + public static final FlinkContainers FLINK = + FlinkContainers.builder() + .setConfiguration(getConfiguration()) + .setLogger(LOG) + .dependsOn(KAFKA_CONTAINER) + .build(); + + private static AdminClient admin; + private static KafkaProducer<Void, Integer> producer; + + private static Configuration getConfiguration() { + // modify configuration to have enough slots + final Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3); + flinkConfig.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); + return flinkConfig; + } + + @BeforeAll + private static void setUp() { + final Map<String, Object> adminProperties = new HashMap<>(); + adminProperties.put( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + KAFKA_CONTAINER.getBootstrapServers()); + admin = AdminClient.create(adminProperties); + final Properties producerProperties = new Properties(); + producerProperties.putAll(adminProperties); + producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, VoidSerializer.class); + producerProperties.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producer = new KafkaProducer<>(producerProperties); + } + + @AfterAll + private static void teardown() { + admin.close(); + producer.close(); + } + + @Test + public void testKafka() throws Exception { + final Path kafkaExampleJar = TestUtils.getResource(EXAMPLE_JAR_MATCHER); + + final String inputTopic = "test-input-" + "-" + UUID.randomUUID(); + final String outputTopic = "test-output" + "-" + UUID.randomUUID(); + + // create the required topics + final short replicationFactor = 1; + admin.createTopics( + Lists.newArrayList( + new NewTopic(inputTopic, 1, replicationFactor), + new NewTopic(outputTopic, 1, replicationFactor))); + + producer.send(new ProducerRecord<>(inputTopic, 1)); + producer.send(new ProducerRecord<>(inputTopic, 2)); + producer.send(new ProducerRecord<>(inputTopic, 3)); + + // run the Flink job + FLINK.submitJob( + new JobSubmission.JobSubmissionBuilder(kafkaExampleJar) + .setDetached(false) + .addArgument("--input-topic", inputTopic) + .addArgument("--output-topic", outputTopic) + .addArgument("--prefix", "PREFIX") + .addArgument( + "--bootstrap.servers", + String.join( + ",", + KAFKA_CONTAINER.getBootstrapServers(), + KAFKA_CONTAINER.getNetworkAliases().stream() + .map( + host -> + String.join( + ":", + host, + Integer.toString(9092))) + .collect(Collectors.joining(",")))) + .addArgument("--group.id", "myconsumer") + .addArgument("--auto.offset.reset", "earliest") + .addArgument("--transaction.timeout.ms", "900000") + .addArgument("--flink.partition-discovery.interval-millis", "1000") + .build()); + final Properties consumerProperties = new Properties(); + consumerProperties.put( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + KAFKA_CONTAINER.getBootstrapServers()); + consumerProperties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, VoidDeserializer.class); + consumerProperties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + List<Integer> records = + KafkaUtil.drainAllRecordsFromTopic(outputTopic, consumerProperties).stream() + .map(r -> ByteBuffer.wrap(r.value()).getInt()) + .collect(Collectors.toList()); + assertThat(records).hasSize(3).containsExactly(1, 2, 3); + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java deleted file mode 100644 index e8ad197..0000000 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java +++ /dev/null @@ -1,216 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.kafka; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.tests.util.TestUtils; -import org.apache.flink.tests.util.flink.ClusterController; -import org.apache.flink.tests.util.flink.FlinkResource; -import org.apache.flink.tests.util.flink.FlinkResourceSetup; -import org.apache.flink.tests.util.flink.JobSubmission; -import org.apache.flink.testutils.junit.FailsOnJava11; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.Timeout; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.file.Path; -import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -/** End-to-end test for the kafka connectors. */ -@RunWith(Parameterized.class) -@Category(value = {FailsOnJava11.class}) -@Ignore("FLINK-25266") -public class StreamingKafkaITCase extends TestLogger { - - private static final Logger LOG = LoggerFactory.getLogger(StreamingKafkaITCase.class); - - @Parameterized.Parameters(name = "{index}: kafka-version:{1}") - public static Collection<Object[]> data() { - return Arrays.asList(new Object[][] {{"flink-streaming-kafka-test.*", "2.4.1"}}); - } - - private final Path kafkaExampleJar; - - private final String kafkaVersion; - - @Rule - public final Timeout timeout = - Timeout.builder() - .withTimeout(3, TimeUnit.MINUTES) - .withLookingForStuckThread(true) - .build(); - - @Rule public final KafkaResource kafka; - - @Rule - public final FlinkResource flink = - FlinkResource.get( - FlinkResourceSetup.builder().addConfiguration(getConfiguration()).build()); - - private static Configuration getConfiguration() { - // modify configuration to have enough slots - final Configuration flinkConfig = new Configuration(); - flinkConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3); - return flinkConfig; - } - - public StreamingKafkaITCase(final String kafkaExampleJarPattern, final String kafkaVersion) { - this.kafkaExampleJar = TestUtils.getResource(kafkaExampleJarPattern); - this.kafka = KafkaResource.get(kafkaVersion); - this.kafkaVersion = kafkaVersion; - } - - @Test - public void testKafka() throws Exception { - try (final ClusterController clusterController = flink.startCluster(1)) { - - final String inputTopic = - "test-input-" + kafkaVersion + "-" + UUID.randomUUID().toString(); - final String outputTopic = - "test-output" + kafkaVersion + "-" + UUID.randomUUID().toString(); - - // create the required topics - kafka.createTopic(1, 1, inputTopic); - kafka.createTopic(1, 1, outputTopic); - - // run the Flink job (detached mode) - clusterController.submitJob( - new JobSubmission.JobSubmissionBuilder(kafkaExampleJar) - .setDetached(true) - .addArgument("--input-topic", inputTopic) - .addArgument("--output-topic", outputTopic) - .addArgument("--prefix", "PREFIX") - .addArgument( - "--bootstrap.servers", - kafka.getBootstrapServerAddresses().stream() - .map( - address -> - address.getHostString() - + ':' - + address.getPort()) - .collect(Collectors.joining(","))) - .addArgument("--group.id", "myconsumer") - .addArgument("--auto.offset.reset", "earliest") - .addArgument("--transaction.timeout.ms", "900000") - .addArgument("--flink.partition-discovery.interval-millis", "1000") - .build(), - Duration.ofMinutes(2L)); - - LOG.info("Sending messages to Kafka topic [{}] ...", inputTopic); - // send some data to Kafka - kafka.sendKeyedMessages( - inputTopic, - "\t", - "key\telephant,5,45218", - "key\tsquirrel,12,46213", - "key\tbee,3,51348", - "key\tsquirrel,22,52444", - "key\tbee,10,53412", - "key\telephant,9,54867"); - - LOG.info("Verifying messages from Kafka topic [{}] ...", outputTopic); - { - final List<String> messages = kafka.readMessage(6, "kafka-e2e-driver", outputTopic); - - final List<String> elephants = filterMessages(messages, "elephant"); - final List<String> squirrels = filterMessages(messages, "squirrel"); - final List<String> bees = filterMessages(messages, "bee"); - - // check all keys - Assert.assertEquals( - Arrays.asList("elephant,5,45218", "elephant,14,54867"), elephants); - Assert.assertEquals( - Arrays.asList("squirrel,12,46213", "squirrel,34,52444"), squirrels); - Assert.assertEquals(Arrays.asList("bee,3,51348", "bee,13,53412"), bees); - } - - // now, we add a new partition to the topic - LOG.info("Repartitioning Kafka topic [{}] ...", inputTopic); - kafka.setNumPartitions(2, inputTopic); - Assert.assertEquals( - "Failed adding a partition to input topic.", - 2, - kafka.getNumPartitions(inputTopic)); - - // send some more messages to Kafka - LOG.info("Sending more messages to Kafka topic [{}] ...", inputTopic); - kafka.sendKeyedMessages( - inputTopic, - "\t", - "key\telephant,13,64213", - "key\tgiraffe,9,65555", - "key\tbee,5,65647", - "key\tsquirrel,18,66413"); - - // verify that our assumption that the new partition actually has written messages is - // correct - Assert.assertNotEquals( - "The newly created partition does not have any new messages, and therefore partition discovery cannot be verified.", - 0L, - kafka.getPartitionOffset(inputTopic, 1)); - - LOG.info("Verifying messages from Kafka topic [{}] ...", outputTopic); - { - final List<String> messages = kafka.readMessage(4, "kafka-e2e-driver", outputTopic); - - final List<String> elephants = filterMessages(messages, "elephant"); - final List<String> squirrels = filterMessages(messages, "squirrel"); - final List<String> bees = filterMessages(messages, "bee"); - final List<String> giraffes = filterMessages(messages, "giraffe"); - - Assert.assertEquals( - String.format("Messages from Kafka %s: %s", kafkaVersion, messages), - Arrays.asList("elephant,27,64213"), - elephants); - Assert.assertEquals( - String.format("Messages from Kafka %s: %s", kafkaVersion, messages), - Arrays.asList("squirrel,52,66413"), - squirrels); - Assert.assertEquals( - String.format("Messages from Kafka %s: %s", kafkaVersion, messages), - Arrays.asList("bee,18,65647"), - bees); - Assert.assertEquals( - String.format("Messages from Kafka %s: %s", kafkaVersion, messages), - Arrays.asList("giraffe,9,65555"), - giraffes); - } - } - } - - private static List<String> filterMessages(final List<String> messages, final String keyword) { - return messages.stream().filter(msg -> msg.contains(keyword)).collect(Collectors.toList()); - } -} diff --git a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java index 01e4819..cb90a1d 100644 --- a/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java +++ b/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java @@ -17,29 +17,24 @@ package org.apache.flink.streaming.kafka.test; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.source.KafkaSource; +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor; -import org.apache.flink.streaming.kafka.test.base.KafkaEvent; -import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema; import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil; -import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; /** - * A simple example that shows how to read from and write to modern Kafka. This will read String - * messages from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, - * and finally perform a rolling addition on each key for which the results are written back to - * another topic. - * - * <p>This example also demonstrates using a watermark assigner to generate per-partition watermarks - * directly in the Flink Kafka consumer. For demonstration purposes, it is assumed that the String - * messages are of formatted as a (word,frequency,timestamp) tuple. + * A simple application used as smoke test example to forward messages from one topic to another + * topic in batch mode. * * <p>Example usage: --input-topic test-input --output-topic test-output --bootstrap.servers * localhost:9092 --group.id myconsumer @@ -51,19 +46,25 @@ public class KafkaExample extends KafkaExampleUtil { final ParameterTool parameterTool = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool); - DataStream<KafkaEvent> input = - env.addSource( - new FlinkKafkaConsumer<>( - parameterTool.getRequired("input-topic"), - new KafkaEventSchema(), - parameterTool.getProperties()) - .assignTimestampsAndWatermarks( - new CustomWatermarkExtractor())) - .keyBy("word") - .map(new RollingAdditionMapper()); + DataStream<Integer> input = + env.fromSource( + KafkaSource.<Integer>builder() + .setBootstrapServers( + parameterTool + .getProperties() + .getProperty( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) + .setBounded(OffsetsInitializer.latest()) + .setDeserializer( + KafkaRecordDeserializationSchema.valueOnly( + IntegerDeserializer.class)) + .setTopics(parameterTool.getRequired("input-topic")) + .build(), + WatermarkStrategy.noWatermarks(), + "kafka-source"); input.sinkTo( - KafkaSink.<KafkaEvent>builder() + KafkaSink.<Integer>builder() .setBootstrapServers( parameterTool .getProperties() @@ -71,10 +72,9 @@ public class KafkaExample extends KafkaExampleUtil { .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(parameterTool.getRequired("output-topic")) - .setValueSerializationSchema(new KafkaEventSchema()) + .setKafkaValueSerializer(IntegerSerializer.class) .build()) .build()); - - env.execute("Modern Kafka Example"); + env.execute("Smoke Kafka Example"); } }
